diff --git a/service/mgr/group.go b/service/mgr/group.go index 0019c28c..0a11a021 100644 --- a/service/mgr/group.go +++ b/service/mgr/group.go @@ -206,7 +206,15 @@ func (g *Group) stopFrom(index int) (ok bool) { ok = false } m.mgr.Cancel() - if m.mgr.WaitForWorkers(0) { + + var waitSucceeded bool + if m.mgr.hasStopWorker() { + waitSucceeded = m.mgr.WaitForWorkersFromStop(0) + } else { + waitSucceeded = m.mgr.WaitForWorkers(0) + } + + if waitSucceeded { m.mgr.Info("stopped", "time", time.Since(startTime)) } else { ok = false diff --git a/service/mgr/worker.go b/service/mgr/worker.go index 2151a2c8..8572153e 100644 --- a/service/mgr/worker.go +++ b/service/mgr/worker.go @@ -30,6 +30,12 @@ type WorkerCtx struct { workerMgr *WorkerMgr // TODO: Attach to context instead? logger *slog.Logger + + // isStopWorker indicates whether this worker is responsible for stopping + // the manager. When true, the manager will not wait for this worker to + // finish during stop, preventing deadlocks where the stop worker + // would wait for itself to complete. + isStopWorker bool } // AddToCtx adds the WorkerCtx to the given context. @@ -250,12 +256,27 @@ func (m *Manager) manageWorker(name string, fn func(w *WorkerCtx) error) { // - Panic catching. // - Flow control helpers. func (m *Manager) Do(name string, fn func(w *WorkerCtx) error) error { + return m.do(name, false, fn) +} + +// DoAsStopWorker is like Do(...), but marks the worker as a stop worker. +// This means that the manager will not wait for this worker to finish when stopping. +// Only one stop worker can be started at a time. +func (m *Manager) DoAsStopWorker(name string, fn func(w *WorkerCtx) error) error { + if m.hasStopWorker() { + return fmt.Errorf("cannot start stop worker %q: already has a stop worker", name) + } + return m.do(name, true, fn) +} + +func (m *Manager) do(name string, isStopWorker bool, fn func(w *WorkerCtx) error) error { // Create context. w := &WorkerCtx{ - name: name, - workFunc: fn, - ctx: m.Ctx(), - logger: m.logger.With("worker", name), + name: name, + workFunc: fn, + ctx: m.Ctx(), + logger: m.logger.With("worker", name), + isStopWorker: isStopWorker, } m.workerStart(w) diff --git a/service/mgr/worker_info.go b/service/mgr/worker_info.go index 7f044189..61c9b86d 100644 --- a/service/mgr/worker_info.go +++ b/service/mgr/worker_info.go @@ -73,6 +73,22 @@ func (m *Manager) unregisterWorker(w *WorkerCtx) { } } +func (m *Manager) hasStopWorker() bool { + m.workersLock.Lock() + defer m.workersLock.Unlock() + + for _, w := range m.workers { + if w == nil { + continue + } + if w.isStopWorker { + return true + } + } + + return false +} + // WorkerInfo holds status information about a managers workers. type WorkerInfo struct { Running int diff --git a/spn/captain/api.go b/spn/captain/api.go index cfd38de4..16bf817c 100644 --- a/spn/captain/api.go +++ b/spn/captain/api.go @@ -34,7 +34,7 @@ func handleReInit(ar *api.Request) (msg string, err error) { } // Make sure SPN is stopped and wait for it to complete. - err = module.mgr.Do("stop SPN for re-init", module.instance.SPNGroup().EnsureStoppedWorker) + err = module.mgr.DoAsStopWorker("stop SPN for re-init", module.instance.SPNGroup().EnsureStoppedWorker) if err != nil { return "", fmt.Errorf("failed to stop SPN for re-init: %w", err) }