diff --git a/service/mgr/events.go b/service/mgr/events.go index 03436dc6..359f9a92 100644 --- a/service/mgr/events.go +++ b/service/mgr/events.go @@ -2,7 +2,6 @@ package mgr import ( - "fmt" "slices" "sync" "sync/atomic" @@ -107,6 +106,12 @@ func (em *EventMgr[T]) Submit(event T) { // Run callbacks. for _, ec := range em.callbacks { + // Check if callback was canceled. + if ec.canceled.Load() { + anyCanceled = true + continue + } + // Execute callback. var ( cancel bool @@ -114,29 +119,38 @@ func (em *EventMgr[T]) Submit(event T) { ) if em.mgr != nil { // Prefer executing in worker. - wkrErr := em.mgr.Do("execute event callback", func(w *WorkerCtx) error { - cancel, err = ec.callback(w, event) //nolint:scopelint // Execution is within scope. + name := "event " + em.name + " callback " + ec.name + em.mgr.Go(name, func(w *WorkerCtx) error { + cancel, err = ec.callback(w, event) + // Handle error and cancelation. + if err != nil { + w.Warn( + "event callback failed", + "event", em.name, + "callback", ec.name, + "err", err, + ) + } + if cancel { + ec.canceled.Store(true) + } return nil }) - if wkrErr != nil { - err = fmt.Errorf("callback execution failed: %w", wkrErr) - } } else { cancel, err = ec.callback(nil, event) - } - - // Handle error and cancelation. - if err != nil && em.mgr != nil { - em.mgr.Warn( - "event callback failed", - "event", em.name, - "callback", ec.name, - "err", err, - ) - } - if cancel { - ec.canceled.Store(true) - anyCanceled = true + // Handle error and cancelation. + if err != nil && em.mgr != nil { + em.mgr.Warn( + "event callback failed", + "event", em.name, + "callback", ec.name, + "err", err, + ) + } + if cancel { + ec.canceled.Store(true) + anyCanceled = true + } } }