Restructure modules (#1572)

* Move portbase into monorepo

* Add new simple module mgr

* [WIP] Switch to new simple module mgr

* Add StateMgr and more worker variants

* [WIP] Switch more modules

* [WIP] Switch more modules

* [WIP] swtich more modules

* [WIP] switch all SPN modules

* [WIP] switch all service modules

* [WIP] Convert all workers to the new module system

* [WIP] add new task system to module manager

* [WIP] Add second take for scheduling workers

* [WIP] Add FIXME for bugs in new scheduler

* [WIP] Add minor improvements to scheduler

* [WIP] Add new worker scheduler

* [WIP] Fix more bug related to new module system

* [WIP] Fix start handing of the new module system

* [WIP] Improve startup process

* [WIP] Fix minor issues

* [WIP] Fix missing subsystem in settings

* [WIP] Initialize managers in constructor

* [WIP] Move module event initialization to constrictors

* [WIP] Fix setting for enabling and disabling the SPN module

* [WIP] Move API registeration into module construction

* [WIP] Update states mgr for all modules

* [WIP] Add CmdLine operation support

* Add state helper methods to module group and instance

* Add notification and module status handling to status package

* Fix starting issues

* Remove pilot widget and update security lock to new status data

* Remove debug logs

* Improve http server shutdown

* Add workaround for cleanly shutting down firewall+netquery

* Improve logging

* Add syncing states with notifications for new module system

* Improve starting, stopping, shutdown; resolve FIXMEs/TODOs

* [WIP] Fix most unit tests

* Review new module system and fix minor issues

* Push shutdown and restart events again via API

* Set sleep mode via interface

* Update example/template module

* [WIP] Fix spn/cabin unit test

* Remove deprecated UI elements

* Make log output more similar for the logging transition phase

* Switch spn hub and observer cmds to new module system

* Fix log sources

* Make worker mgr less error prone

* Fix tests and minor issues

* Fix observation hub

* Improve shutdown and restart handling

* Split up big connection.go source file

* Move varint and dsd packages to structures repo

* Improve expansion test

* Fix linter warnings

* Fix interception module on windows

* Fix linter errors

---------

Co-authored-by: Vladimir Stoilov <vladimir@safing.io>
This commit is contained in:
Daniel Hååvi
2024-08-09 17:15:48 +02:00
committed by GitHub
parent 10a77498f4
commit 80664d1a27
647 changed files with 37690 additions and 3366 deletions

2
service/mgr/doc.go Normal file
View File

@@ -0,0 +1,2 @@
// Package mgr provides simple managing of flow control and logging.
package mgr

175
service/mgr/events.go Normal file
View File

@@ -0,0 +1,175 @@
//nolint:structcheck,golint // TODO: Seems broken for generics.
package mgr
import (
"fmt"
"slices"
"sync"
"sync/atomic"
)
// EventMgr is a simple event manager.
type EventMgr[T any] struct {
name string
mgr *Manager
lock sync.Mutex
subs []*EventSubscription[T]
callbacks []*EventCallback[T]
}
// EventSubscription is a subscription to an event.
type EventSubscription[T any] struct {
name string
events chan T
canceled atomic.Bool
}
// EventCallback is a registered callback to an event.
type EventCallback[T any] struct {
name string
callback EventCallbackFunc[T]
canceled atomic.Bool
}
// EventCallbackFunc defines the event callback function.
type EventCallbackFunc[T any] func(*WorkerCtx, T) (cancel bool, err error)
// NewEventMgr returns a new event manager.
// It is easiest used as a public field on a struct,
// so that others can simply Subscribe() oder AddCallback().
func NewEventMgr[T any](eventName string, mgr *Manager) *EventMgr[T] {
return &EventMgr[T]{
name: eventName,
mgr: mgr,
}
}
// Subscribe subscribes to events.
// The received events are shared among all subscribers and callbacks.
// Be sure to apply proper concurrency safeguards, if applicable.
func (em *EventMgr[T]) Subscribe(subscriberName string, chanSize int) *EventSubscription[T] {
em.lock.Lock()
defer em.lock.Unlock()
es := &EventSubscription[T]{
name: subscriberName,
events: make(chan T, chanSize),
}
em.subs = append(em.subs, es)
return es
}
// AddCallback adds a callback to executed on events.
// The received events are shared among all subscribers and callbacks.
// Be sure to apply proper concurrency safeguards, if applicable.
func (em *EventMgr[T]) AddCallback(callbackName string, callback EventCallbackFunc[T]) {
em.lock.Lock()
defer em.lock.Unlock()
ec := &EventCallback[T]{
name: callbackName,
callback: callback,
}
em.callbacks = append(em.callbacks, ec)
}
// Submit submits a new event.
func (em *EventMgr[T]) Submit(event T) {
em.lock.Lock()
defer em.lock.Unlock()
var anyCanceled bool
// Send to subscriptions.
for _, sub := range em.subs {
// Check if subscription was canceled.
if sub.canceled.Load() {
anyCanceled = true
continue
}
// Submit via channel.
select {
case sub.events <- event:
default:
if em.mgr != nil {
em.mgr.Warn(
"event subscription channel overflow",
"event", em.name,
"subscriber", sub.name,
)
}
}
}
// Run callbacks.
for _, ec := range em.callbacks {
// Execute callback.
var (
cancel bool
err error
)
if em.mgr != nil {
// Prefer executing in worker.
wkrErr := em.mgr.Do("execute event callback", func(w *WorkerCtx) error {
cancel, err = ec.callback(w, event) //nolint:scopelint // Execution is within scope.
return nil
})
if wkrErr != nil {
err = fmt.Errorf("callback execution failed: %w", wkrErr)
}
} else {
cancel, err = ec.callback(nil, event)
}
// Handle error and cancelation.
if err != nil && em.mgr != nil {
em.mgr.Warn(
"event callback failed",
"event", em.name,
"callback", ec.name,
"err", err,
)
}
if cancel {
ec.canceled.Store(true)
anyCanceled = true
}
}
// If any canceled subscription/callback was seen, clean the slices.
if anyCanceled {
em.clean()
}
}
// clean removes all canceled subscriptions and callbacks.
func (em *EventMgr[T]) clean() {
em.subs = slices.DeleteFunc[[]*EventSubscription[T], *EventSubscription[T]](em.subs, func(es *EventSubscription[T]) bool {
return es.canceled.Load()
})
em.callbacks = slices.DeleteFunc[[]*EventCallback[T], *EventCallback[T]](em.callbacks, func(ec *EventCallback[T]) bool {
return ec.canceled.Load()
})
}
// Events returns a read channel for the events.
// The received events are shared among all subscribers and callbacks.
// Be sure to apply proper concurrency safeguards, if applicable.
func (es *EventSubscription[T]) Events() <-chan T {
return es.events
}
// Cancel cancels the subscription.
// The events channel is not closed, but will not receive new events.
func (es *EventSubscription[T]) Cancel() {
es.canceled.Store(true)
}
// Done returns whether the event subscription has been canceled.
func (es *EventSubscription[T]) Done() bool {
return es.canceled.Load()
}

286
service/mgr/group.go Normal file
View File

@@ -0,0 +1,286 @@
package mgr
import (
"context"
"errors"
"fmt"
"reflect"
"strings"
"sync/atomic"
"time"
)
var (
// ErrUnsuitableGroupState is returned when an operation cannot be executed due to an unsuitable state.
ErrUnsuitableGroupState = errors.New("unsuitable group state")
// ErrInvalidGroupState is returned when a group is in an invalid state and cannot be recovered.
ErrInvalidGroupState = errors.New("invalid group state")
// ErrExecuteCmdLineOp is returned when modules are created, but request
// execution of a (somewhere else set) command line operation instead.
ErrExecuteCmdLineOp = errors.New("command line operation execution requested")
)
const (
groupStateOff int32 = iota
groupStateStarting
groupStateRunning
groupStateStopping
groupStateInvalid
)
func groupStateToString(state int32) string {
switch state {
case groupStateOff:
return "off"
case groupStateStarting:
return "starting"
case groupStateRunning:
return "running"
case groupStateStopping:
return "stopping"
case groupStateInvalid:
return "invalid"
}
return "unknown"
}
// Group describes a group of modules.
type Group struct {
modules []*groupModule
state atomic.Int32
}
type groupModule struct {
module Module
mgr *Manager
}
// Module is an manage-able instance of some component.
type Module interface {
Manager() *Manager
Start() error
Stop() error
}
// NewGroup returns a new group of modules.
func NewGroup(modules ...Module) *Group {
// Create group.
g := &Group{
modules: make([]*groupModule, 0, len(modules)),
}
// Initialize groups modules.
for _, m := range modules {
g.Add(m)
}
return g
}
// Add validates the given module and adds it to the group, if all requirements are met.
// Not safe for concurrent use with any other method.
// All modules must be added before anything else is done with the group.
func (g *Group) Add(m Module) {
mgr := m.Manager()
// Check module.
switch {
case m == nil:
// Skip nil values to allow for cleaner code.
return
case reflect.ValueOf(m).IsNil():
// If nil values are given via a struct, they are will be interfaces to a
// nil type. Ignore these too.
return
case mgr == nil:
// Ignore modules that do not return a manager.
return
case mgr.Name() == "":
// Force name if none is set.
// TODO: Unsafe if module is already logging, etc.
mgr.setName(makeModuleName(m))
}
// Add module to group.
g.modules = append(g.modules, &groupModule{
module: m,
mgr: mgr,
})
}
// Start starts all modules in the group in the defined order.
// If a module fails to start, itself and all previous modules
// will be stopped in the reverse order.
func (g *Group) Start() error {
// Check group state.
switch g.state.Load() {
case groupStateRunning:
// Already running.
return nil
case groupStateInvalid:
// Something went terribly wrong, cannot recover from here.
return fmt.Errorf("%w: cannot recover", ErrInvalidGroupState)
default:
if !g.state.CompareAndSwap(groupStateOff, groupStateStarting) {
return fmt.Errorf("%w: group is not off, state: %s", ErrUnsuitableGroupState, groupStateToString(g.state.Load()))
}
}
// Start modules.
for i, m := range g.modules {
m.mgr.Debug("starting")
startTime := time.Now()
err := m.mgr.Do("start module "+m.mgr.name, func(_ *WorkerCtx) error {
return m.module.Start() //nolint:scopelint // Execution is synchronous.
})
if err != nil {
m.mgr.Error(
"failed to start",
"err", err,
"time", time.Since(startTime),
)
if !g.stopFrom(i) {
g.state.Store(groupStateInvalid)
} else {
g.state.Store(groupStateOff)
}
return fmt.Errorf("failed to start %s: %w", m.mgr.name, err)
}
m.mgr.Info("started", "time", time.Since(startTime))
}
g.state.Store(groupStateRunning)
return nil
}
// Stop stops all modules in the group in the reverse order.
func (g *Group) Stop() error {
// Check group state.
switch g.state.Load() {
case groupStateOff:
// Already stopped.
return nil
case groupStateInvalid:
// Something went terribly wrong, cannot recover from here.
return fmt.Errorf("%w: cannot recover", ErrInvalidGroupState)
default:
if !g.state.CompareAndSwap(groupStateRunning, groupStateStopping) {
return fmt.Errorf("%w: group is not running, state: %s", ErrUnsuitableGroupState, groupStateToString(g.state.Load()))
}
}
// Stop modules.
if !g.stopFrom(len(g.modules) - 1) {
g.state.Store(groupStateInvalid)
return errors.New("failed to stop")
}
g.state.Store(groupStateOff)
return nil
}
func (g *Group) stopFrom(index int) (ok bool) {
ok = true
// Stop modules.
for i := index; i >= 0; i-- {
m := g.modules[i]
m.mgr.Debug("stopping")
startTime := time.Now()
err := m.mgr.Do("stop module "+m.mgr.name, func(_ *WorkerCtx) error {
return m.module.Stop()
})
if err != nil {
m.mgr.Error(
"failed to stop",
"err", err,
"time", time.Since(startTime),
)
ok = false
}
m.mgr.Cancel()
if m.mgr.WaitForWorkers(0) {
m.mgr.Info("stopped", "time", time.Since(startTime))
} else {
ok = false
m.mgr.Error(
"failed to stop",
"err", "timed out",
"workerCnt", m.mgr.workerCnt.Load(),
"time", time.Since(startTime),
)
}
}
// Reset modules.
if !ok {
// Stopping failed somewhere, reset anyway after a short wait.
// This will be very uncommon and can help to mitigate race conditions in these events.
time.Sleep(time.Second)
}
for _, m := range g.modules {
m.mgr.Reset()
}
return ok
}
// Ready returns whether all modules in the group have been started and are still running.
func (g *Group) Ready() bool {
return g.state.Load() == groupStateRunning
}
// GetStates returns the current states of all group modules.
func (g *Group) GetStates() []StateUpdate {
updates := make([]StateUpdate, 0, len(g.modules))
for _, gm := range g.modules {
if stateful, ok := gm.module.(StatefulModule); ok {
updates = append(updates, stateful.States().Export())
}
}
return updates
}
// AddStatesCallback adds the given callback function to all group modules that
// expose a state manager at States().
func (g *Group) AddStatesCallback(callbackName string, callback EventCallbackFunc[StateUpdate]) {
for _, gm := range g.modules {
if stateful, ok := gm.module.(StatefulModule); ok {
stateful.States().AddCallback(callbackName, callback)
}
}
}
// Modules returns a copy of the modules.
func (g *Group) Modules() []Module {
copied := make([]Module, 0, len(g.modules))
for _, gm := range g.modules {
copied = append(copied, gm.module)
}
return copied
}
// RunModules is a simple wrapper function to start modules and stop them again
// when the given context is canceled.
func RunModules(ctx context.Context, modules ...Module) error {
g := NewGroup(modules...)
// Start module.
if err := g.Start(); err != nil {
return fmt.Errorf("failed to start: %w", err)
}
// Stop module when context is canceled.
<-ctx.Done()
return g.Stop()
}
func makeModuleName(m Module) string {
return strings.TrimPrefix(fmt.Sprintf("%T", m), "*")
}

92
service/mgr/group_ext.go Normal file
View File

@@ -0,0 +1,92 @@
package mgr
import (
"context"
"errors"
"sync"
"time"
)
// ExtendedGroup extends the group with additional helpful functionality.
type ExtendedGroup struct {
*Group
ensureCtx context.Context
ensureCancel context.CancelFunc
ensureLock sync.Mutex
}
// NewExtendedGroup returns a new extended group.
func NewExtendedGroup(modules ...Module) *ExtendedGroup {
return UpgradeGroup(NewGroup(modules...))
}
// UpgradeGroup upgrades a regular group to an extended group.
func UpgradeGroup(g *Group) *ExtendedGroup {
return &ExtendedGroup{
Group: g,
ensureCancel: func() {},
}
}
// EnsureStartedWorker tries to start the group until it succeeds or fails permanently.
func (eg *ExtendedGroup) EnsureStartedWorker(wCtx *WorkerCtx) error {
// Setup worker.
var ctx context.Context
func() {
eg.ensureLock.Lock()
defer eg.ensureLock.Unlock()
eg.ensureCancel()
eg.ensureCtx, eg.ensureCancel = context.WithCancel(wCtx.Ctx())
ctx = eg.ensureCtx
}()
for {
err := eg.Group.Start()
switch {
case err == nil:
return nil
case errors.Is(err, ErrInvalidGroupState):
wCtx.Debug("group start delayed", "err", err)
default:
return err
}
select {
case <-ctx.Done():
return nil
case <-time.After(1 * time.Second):
}
}
}
// EnsureStoppedWorker tries to stop the group until it succeeds or fails permanently.
func (eg *ExtendedGroup) EnsureStoppedWorker(wCtx *WorkerCtx) error {
// Setup worker.
var ctx context.Context
func() {
eg.ensureLock.Lock()
defer eg.ensureLock.Unlock()
eg.ensureCancel()
eg.ensureCtx, eg.ensureCancel = context.WithCancel(wCtx.Ctx())
ctx = eg.ensureCtx
}()
for {
err := eg.Group.Stop()
switch {
case err == nil:
return nil
case errors.Is(err, ErrInvalidGroupState):
wCtx.Debug("group stop delayed", "err", err)
default:
return err
}
select {
case <-ctx.Done():
return nil
case <-time.After(1 * time.Second):
}
}
}

226
service/mgr/manager.go Normal file
View File

@@ -0,0 +1,226 @@
package mgr
import (
"context"
"log/slog"
"runtime"
"sync/atomic"
"time"
)
// ManagerNameSLogKey is used as the logging key for the name of the manager.
var ManagerNameSLogKey = "manager"
// Manager manages workers.
type Manager struct {
name string
logger *slog.Logger
ctx context.Context
cancelCtx context.CancelFunc
workerCnt atomic.Int32
workersDone chan struct{}
}
// New returns a new manager.
func New(name string) *Manager {
return newManager(name)
}
func newManager(name string) *Manager {
m := &Manager{
name: name,
logger: slog.Default().With(ManagerNameSLogKey, name),
workersDone: make(chan struct{}),
}
m.ctx, m.cancelCtx = context.WithCancel(context.Background())
return m
}
// Name returns the manager name.
func (m *Manager) Name() string {
return m.name
}
// setName sets the manager name and resets the logger to use that name.
// Not safe for concurrent use with any other module methods.
func (m *Manager) setName(newName string) {
m.name = newName
m.logger = slog.Default().With(ManagerNameSLogKey, m.name)
}
// Ctx returns the worker context.
func (m *Manager) Ctx() context.Context {
return m.ctx
}
// Cancel cancels the worker context.
func (m *Manager) Cancel() {
m.cancelCtx()
}
// Done returns the context Done channel.
func (m *Manager) Done() <-chan struct{} {
return m.ctx.Done()
}
// IsDone checks whether the manager context is done.
func (m *Manager) IsDone() bool {
return m.ctx.Err() != nil
}
// LogEnabled reports whether the logger emits log records at the given level.
// The manager context is automatically supplied.
func (m *Manager) LogEnabled(level slog.Level) bool {
return m.logger.Enabled(m.ctx, level)
}
// Debug logs at LevelDebug.
// The manager context is automatically supplied.
func (m *Manager) Debug(msg string, args ...any) {
if !m.logger.Enabled(m.ctx, slog.LevelDebug) {
return
}
m.writeLog(slog.LevelDebug, msg, args...)
}
// Info logs at LevelInfo.
// The manager context is automatically supplied.
func (m *Manager) Info(msg string, args ...any) {
if !m.logger.Enabled(m.ctx, slog.LevelInfo) {
return
}
m.writeLog(slog.LevelInfo, msg, args...)
}
// Warn logs at LevelWarn.
// The manager context is automatically supplied.
func (m *Manager) Warn(msg string, args ...any) {
if !m.logger.Enabled(m.ctx, slog.LevelWarn) {
return
}
m.writeLog(slog.LevelWarn, msg, args...)
}
// Error logs at LevelError.
// The manager context is automatically supplied.
func (m *Manager) Error(msg string, args ...any) {
if !m.logger.Enabled(m.ctx, slog.LevelError) {
return
}
m.writeLog(slog.LevelError, msg, args...)
}
// Log emits a log record with the current time and the given level and message.
// The manager context is automatically supplied.
func (m *Manager) Log(level slog.Level, msg string, args ...any) {
if !m.logger.Enabled(m.ctx, level) {
return
}
m.writeLog(level, msg, args...)
}
// LogAttrs is a more efficient version of Log() that accepts only Attrs.
// The manager context is automatically supplied.
func (m *Manager) LogAttrs(level slog.Level, msg string, attrs ...slog.Attr) {
if !m.logger.Enabled(m.ctx, level) {
return
}
var pcs [1]uintptr
runtime.Callers(2, pcs[:]) // skip "Callers" and "LogAttrs".
r := slog.NewRecord(time.Now(), level, msg, pcs[0])
r.AddAttrs(attrs...)
_ = m.logger.Handler().Handle(m.ctx, r)
}
func (m *Manager) writeLog(level slog.Level, msg string, args ...any) {
var pcs [1]uintptr
runtime.Callers(3, pcs[:]) // skip "Callers", "writeLog" and the calling function.
r := slog.NewRecord(time.Now(), level, msg, pcs[0])
r.Add(args...)
_ = m.logger.Handler().Handle(m.ctx, r)
}
// WaitForWorkers waits for all workers of this manager to be done.
// The default maximum waiting time is one minute.
func (m *Manager) WaitForWorkers(max time.Duration) (done bool) {
return m.waitForWorkers(max, 0)
}
// WaitForWorkersFromStop is a special version of WaitForWorkers, meant to be called from the stop routine.
// It waits for all workers of this manager to be done, except for the Stop function.
// The default maximum waiting time is one minute.
func (m *Manager) WaitForWorkersFromStop(max time.Duration) (done bool) {
return m.waitForWorkers(max, 1)
}
func (m *Manager) waitForWorkers(max time.Duration, limit int32) (done bool) {
// Return immediately if there are no workers.
if m.workerCnt.Load() <= limit {
return true
}
// Setup timers.
reCheckDuration := 10 * time.Millisecond
if max <= 0 {
max = time.Minute
}
reCheck := time.NewTimer(reCheckDuration)
maxWait := time.NewTimer(max)
defer reCheck.Stop()
defer maxWait.Stop()
// Wait for workers to finish, plus check the count in intervals.
for {
if m.workerCnt.Load() <= limit {
return true
}
select {
case <-m.workersDone:
if m.workerCnt.Load() <= limit {
return true
}
case <-reCheck.C:
// Check worker count again.
// This is a dead simple and effective way to avoid all the channel race conditions.
reCheckDuration *= 2
reCheck.Reset(reCheckDuration)
case <-maxWait.C:
return m.workerCnt.Load() <= limit
}
}
}
func (m *Manager) workerStart() {
m.workerCnt.Add(1)
}
func (m *Manager) workerDone() {
if m.workerCnt.Add(-1) <= 1 {
// Notify all waiters.
for {
select {
case m.workersDone <- struct{}{}:
default:
return
}
}
}
}
// Reset resets the manager in order to be able to be used again.
// In the process, the current context is canceled.
// As part of a module (in a group), the module might be stopped and started again.
// This method is not goroutine-safe. The caller must make sure the manager is
// not being used in any way during execution.
func (m *Manager) Reset() {
m.cancelCtx()
m.ctx, m.cancelCtx = context.WithCancel(context.Background())
m.workerCnt.Store(0)
m.workersDone = make(chan struct{})
}

View File

@@ -0,0 +1,58 @@
package mgr
import "time"
// SleepyTicker is wrapper over time.Ticker that respects the sleep mode of the module.
type SleepyTicker struct {
ticker time.Ticker
normalDuration time.Duration
sleepDuration time.Duration
sleepMode bool
sleepChannel chan time.Time
}
// NewSleepyTicker returns a new SleepyTicker. This is a wrapper of the standard time.Ticker but it respects modules.Module sleep mode. Check https://pkg.go.dev/time#Ticker.
// If sleepDuration is set to 0 ticker will not tick during sleep.
func NewSleepyTicker(normalDuration time.Duration, sleepDuration time.Duration) *SleepyTicker {
st := &SleepyTicker{
ticker: *time.NewTicker(normalDuration),
normalDuration: normalDuration,
sleepDuration: sleepDuration,
sleepMode: false,
}
return st
}
// Wait waits until the module is not in sleep mode and returns time.Ticker.C channel.
func (st *SleepyTicker) Wait() <-chan time.Time {
if st.sleepMode && st.sleepDuration == 0 {
return st.sleepChannel
}
return st.ticker.C
}
// Stop turns off a ticker. After Stop, no more ticks will be sent. Stop does not close the channel, to prevent a concurrent goroutine reading from the channel from seeing an erroneous "tick".
func (st *SleepyTicker) Stop() {
st.ticker.Stop()
}
// SetSleep sets the sleep mode of the ticker. If enabled is true, the ticker will tick with sleepDuration. If enabled is false, the ticker will tick with normalDuration.
func (st *SleepyTicker) SetSleep(enabled bool) {
st.sleepMode = enabled
if enabled {
if st.sleepDuration > 0 {
st.ticker.Reset(st.sleepDuration)
} else {
// Next call to Wait will wait until SetSleep is called with enabled == false
st.sleepChannel = make(chan time.Time)
}
} else {
st.ticker.Reset(st.normalDuration)
if st.sleepDuration > 0 {
// Notify that we are not sleeping anymore.
close(st.sleepChannel)
}
}
}

188
service/mgr/states.go Normal file
View File

@@ -0,0 +1,188 @@
package mgr
import (
"slices"
"sync"
"time"
)
// StateMgr is a simple state manager.
type StateMgr struct {
states []State
statesLock sync.Mutex
statesEventMgr *EventMgr[StateUpdate]
mgr *Manager
}
// State describes the state of a manager or module.
type State struct {
// ID is a program-unique ID.
// It must not only be unique within the StateMgr, but for the whole program,
// as it may be re-used with related systems.
// Required.
ID string
// Name is the name of the state.
// This may also serve as a notification title.
// Required.
Name string
// Message is a more detailed message about the state.
// Optional.
Message string
// Type defines the type of the state.
// Optional.
Type StateType
// Time is the time when the state was created or the originating incident occurred.
// Optional, will be set to current time if not set.
Time time.Time
// Data can hold any additional data necessary for further processing of connected systems.
// Optional.
Data any
}
// StateType defines commonly used states.
type StateType string
// State Types.
const (
StateTypeUndefined = ""
StateTypeHint = "hint"
StateTypeWarning = "warning"
StateTypeError = "error"
)
// Severity returns a number representing the gravity of the state for ordering.
func (st StateType) Severity() int {
switch st {
case StateTypeUndefined:
return 0
case StateTypeHint:
return 1
case StateTypeWarning:
return 2
case StateTypeError:
return 3
default:
return 0
}
}
// StateUpdate is used to update others about a state change.
type StateUpdate struct {
Module string
States []State
}
// StatefulModule is used for interface checks on modules.
type StatefulModule interface {
States() *StateMgr
}
// NewStateMgr returns a new state manager.
func NewStateMgr(mgr *Manager) *StateMgr {
return &StateMgr{
statesEventMgr: NewEventMgr[StateUpdate]("state update", mgr),
mgr: mgr,
}
}
// NewStateMgr returns a new state manager.
func (m *Manager) NewStateMgr() *StateMgr {
return NewStateMgr(m)
}
// Add adds a state.
// If a state with the same ID already exists, it is replaced.
func (m *StateMgr) Add(s State) {
m.statesLock.Lock()
defer m.statesLock.Unlock()
if s.Time.IsZero() {
s.Time = time.Now()
}
// Update or add state.
index := slices.IndexFunc(m.states, func(es State) bool {
return es.ID == s.ID
})
if index >= 0 {
m.states[index] = s
} else {
m.states = append(m.states, s)
}
m.statesEventMgr.Submit(m.export())
}
// Remove removes the state with the given ID.
func (m *StateMgr) Remove(id string) {
m.statesLock.Lock()
defer m.statesLock.Unlock()
// Quick check if slice is empty.
// It is a common pattern to remove a state when no error was encountered at
// a critical operation. This means that StateMgr.Remove will be called often.
if len(m.states) == 0 {
return
}
var entryRemoved bool
m.states = slices.DeleteFunc(m.states, func(s State) bool {
if s.ID == id {
entryRemoved = true
return true
}
return false
})
if entryRemoved {
m.statesEventMgr.Submit(m.export())
}
}
// Clear removes all states.
func (m *StateMgr) Clear() {
m.statesLock.Lock()
defer m.statesLock.Unlock()
m.states = nil
m.statesEventMgr.Submit(m.export())
}
// Export returns the current states.
func (m *StateMgr) Export() StateUpdate {
m.statesLock.Lock()
defer m.statesLock.Unlock()
return m.export()
}
// export returns the current states.
func (m *StateMgr) export() StateUpdate {
name := ""
if m.mgr != nil {
name = m.mgr.name
}
return StateUpdate{
Module: name,
States: slices.Clone(m.states),
}
}
// Subscribe subscribes to state update events.
func (m *StateMgr) Subscribe(subscriberName string, chanSize int) *EventSubscription[StateUpdate] {
return m.statesEventMgr.Subscribe(subscriberName, chanSize)
}
// AddCallback adds a callback to state update events.
func (m *StateMgr) AddCallback(callbackName string, callback EventCallbackFunc[StateUpdate]) {
m.statesEventMgr.AddCallback(callbackName, callback)
}

351
service/mgr/worker.go Normal file
View File

@@ -0,0 +1,351 @@
package mgr
import (
"context"
"errors"
"fmt"
"log/slog"
"os"
"runtime"
"runtime/debug"
"strings"
"time"
)
// workerContextKey is a key used for the context key/value storage.
type workerContextKey struct{}
// WorkerCtxContextKey is the key used to add the WorkerCtx to a context.
var WorkerCtxContextKey = workerContextKey{}
// WorkerCtx provides workers with the necessary environment for flow control
// and logging.
type WorkerCtx struct {
ctx context.Context
cancelCtx context.CancelFunc
workerMgr *WorkerMgr // TODO: Attach to context instead?
logger *slog.Logger
}
// AddToCtx adds the WorkerCtx to the given context.
func (w *WorkerCtx) AddToCtx(ctx context.Context) context.Context {
return context.WithValue(ctx, WorkerCtxContextKey, w)
}
// WorkerFromCtx returns the WorkerCtx from the given context.
func WorkerFromCtx(ctx context.Context) *WorkerCtx {
v := ctx.Value(WorkerCtxContextKey)
if w, ok := v.(*WorkerCtx); ok {
return w
}
return nil
}
// Ctx returns the worker context.
// Is automatically canceled after the worker stops/returns, regardless of error.
func (w *WorkerCtx) Ctx() context.Context {
return w.ctx
}
// Cancel cancels the worker context.
// Is automatically called after the worker stops/returns, regardless of error.
func (w *WorkerCtx) Cancel() {
w.cancelCtx()
}
// WorkerMgr returns the worker manager the worker was started from.
// Returns nil if the worker is not associated with a scheduler.
func (w *WorkerCtx) WorkerMgr() *WorkerMgr {
return w.workerMgr
}
// Done returns the context Done channel.
func (w *WorkerCtx) Done() <-chan struct{} {
return w.ctx.Done()
}
// IsDone checks whether the worker context is done.
func (w *WorkerCtx) IsDone() bool {
return w.ctx.Err() != nil
}
// Logger returns the logger used by the worker context.
func (w *WorkerCtx) Logger() *slog.Logger {
return w.logger
}
// LogEnabled reports whether the logger emits log records at the given level.
// The worker context is automatically supplied.
func (w *WorkerCtx) LogEnabled(level slog.Level) bool {
return w.logger.Enabled(w.ctx, level)
}
// Debug logs at LevelDebug.
// The worker context is automatically supplied.
func (w *WorkerCtx) Debug(msg string, args ...any) {
if !w.logger.Enabled(w.ctx, slog.LevelDebug) {
return
}
w.writeLog(slog.LevelDebug, msg, args...)
}
// Info logs at LevelInfo.
// The worker context is automatically supplied.
func (w *WorkerCtx) Info(msg string, args ...any) {
if !w.logger.Enabled(w.ctx, slog.LevelInfo) {
return
}
w.writeLog(slog.LevelInfo, msg, args...)
}
// Warn logs at LevelWarn.
// The worker context is automatically supplied.
func (w *WorkerCtx) Warn(msg string, args ...any) {
if !w.logger.Enabled(w.ctx, slog.LevelWarn) {
return
}
w.writeLog(slog.LevelWarn, msg, args...)
}
// Error logs at LevelError.
// The worker context is automatically supplied.
func (w *WorkerCtx) Error(msg string, args ...any) {
if !w.logger.Enabled(w.ctx, slog.LevelError) {
return
}
w.writeLog(slog.LevelError, msg, args...)
}
// Log emits a log record with the current time and the given level and message.
// The worker context is automatically supplied.
func (w *WorkerCtx) Log(level slog.Level, msg string, args ...any) {
if !w.logger.Enabled(w.ctx, level) {
return
}
w.writeLog(level, msg, args...)
}
// LogAttrs is a more efficient version of Log() that accepts only Attrs.
// The worker context is automatically supplied.
func (w *WorkerCtx) LogAttrs(level slog.Level, msg string, attrs ...slog.Attr) {
if !w.logger.Enabled(w.ctx, level) {
return
}
var pcs [1]uintptr
runtime.Callers(2, pcs[:]) // skip "Callers" and "LogAttrs".
r := slog.NewRecord(time.Now(), level, msg, pcs[0])
r.AddAttrs(attrs...)
_ = w.logger.Handler().Handle(w.ctx, r)
}
func (w *WorkerCtx) writeLog(level slog.Level, msg string, args ...any) {
var pcs [1]uintptr
runtime.Callers(3, pcs[:]) // skip "Callers", "writeLog" and the calling function.
r := slog.NewRecord(time.Now(), level, msg, pcs[0])
r.Add(args...)
_ = w.logger.Handler().Handle(w.ctx, r)
}
// Go starts the given function in a goroutine (as a "worker").
// The worker context has
// - A separate context which is canceled when the functions returns.
// - Access to named structure logging.
// - Given function is re-run after failure (with backoff).
// - Panic catching.
// - Flow control helpers.
func (m *Manager) Go(name string, fn func(w *WorkerCtx) error) {
// m.logger.Log(m.ctx, slog.LevelInfo, "worker started", "name", name)
go m.manageWorker(name, fn)
}
func (m *Manager) manageWorker(name string, fn func(w *WorkerCtx) error) {
m.workerStart()
defer m.workerDone()
w := &WorkerCtx{
logger: m.logger.With("worker", name),
}
w.ctx = m.ctx
backoff := time.Second
failCnt := 0
for {
panicInfo, err := m.runWorker(w, fn)
switch {
case err == nil:
// No error means that the worker is finished.
return
case errors.Is(err, context.Canceled), errors.Is(err, context.DeadlineExceeded):
// A canceled context or exceeded deadline also means that the worker is finished.
return
default:
// Any other errors triggers a restart with backoff.
// If manager is stopping, just log error and return.
if m.IsDone() {
if panicInfo != "" {
w.Error(
"worker failed",
"err", err,
"file", panicInfo,
)
} else {
w.Error(
"worker failed",
"err", err,
)
}
return
}
// Count failure and increase backoff (up to limit),
failCnt++
backoff *= 2
if backoff > time.Minute {
backoff = time.Minute
}
// Log error and retry after backoff duration.
if panicInfo != "" {
w.Error(
"worker failed",
"failCnt", failCnt,
"backoff", backoff,
"err", err,
"file", panicInfo,
)
} else {
w.Error(
"worker failed",
"failCnt", failCnt,
"backoff", backoff,
"err", err,
)
}
select {
case <-time.After(backoff):
case <-m.ctx.Done():
return
}
}
}
}
// Do directly executes the given function (as a "worker").
// The worker context has
// - A separate context which is canceled when the functions returns.
// - Access to named structure logging.
// - Given function is re-run after failure (with backoff).
// - Panic catching.
// - Flow control helpers.
func (m *Manager) Do(name string, fn func(w *WorkerCtx) error) error {
m.workerStart()
defer m.workerDone()
// Create context.
w := &WorkerCtx{
ctx: m.Ctx(),
logger: m.logger.With("worker", name),
}
// Run worker.
panicInfo, err := m.runWorker(w, fn)
switch {
case err == nil:
// No error means that the worker is finished.
return nil
case errors.Is(err, context.Canceled), errors.Is(err, context.DeadlineExceeded):
// A canceled context or exceeded deadline also means that the worker is finished.
return err
default:
// Log error and return.
if panicInfo != "" {
w.Error(
"worker failed",
"err", err,
"file", panicInfo,
)
} else {
w.Error(
"worker failed",
"err", err,
)
}
return err
}
}
func (m *Manager) runWorker(w *WorkerCtx, fn func(w *WorkerCtx) error) (panicInfo string, err error) {
// Create worker context that is canceled when worker finished or dies.
w.ctx, w.cancelCtx = context.WithCancel(w.ctx)
defer w.Cancel()
// Recover from panic.
defer func() {
panicVal := recover()
if panicVal != nil {
err = fmt.Errorf("panic: %s", panicVal)
// Print panic to stderr.
stackTrace := string(debug.Stack())
fmt.Fprintf(
os.Stderr,
"===== PANIC =====\n%s\n\n%s===== END =====\n",
panicVal,
stackTrace,
)
// Find the line in the stack trace that refers to where the panic occurred.
stackLines := strings.Split(stackTrace, "\n")
foundPanic := false
for i, line := range stackLines {
if !foundPanic {
if strings.Contains(line, "panic(") {
foundPanic = true
}
} else {
if strings.Contains(line, "portmaster") {
if i+1 < len(stackLines) {
panicInfo = strings.SplitN(strings.TrimSpace(stackLines[i+1]), " ", 2)[0]
}
break
}
}
}
}
}()
err = fn(w)
return //nolint
}
// Repeat executes the given function periodically in a goroutine (as a "worker").
// The worker context has
// - A separate context which is canceled when the functions returns.
// - Access to named structure logging.
// - By default error/panic will be logged. For custom behavior supply errorFn, the argument is optional.
// - Flow control helpers.
// - Repeat is intended for long running tasks that are mostly idle.
func (m *Manager) Repeat(name string, period time.Duration, fn func(w *WorkerCtx) error) *WorkerMgr {
t := m.NewWorkerMgr(name, fn, nil)
return t.Repeat(period)
}
// Delay starts the given function delayed in a goroutine (as a "worker").
// The worker context has
// - A separate context which is canceled when the functions returns.
// - Access to named structure logging.
// - By default error/panic will be logged. For custom behavior supply errorFn, the argument is optional.
// - Panic catching.
// - Flow control helpers.
func (m *Manager) Delay(name string, period time.Duration, fn func(w *WorkerCtx) error) *WorkerMgr {
t := m.NewWorkerMgr(name, fn, nil)
return t.Delay(period)
}

319
service/mgr/workermgr.go Normal file
View File

@@ -0,0 +1,319 @@
package mgr
import (
"context"
"errors"
"sync"
"time"
)
// WorkerMgr schedules a worker.
type WorkerMgr struct {
mgr *Manager
ctx *WorkerCtx
// Definition.
name string
fn func(w *WorkerCtx) error
errorFn func(c *WorkerCtx, err error, panicInfo string)
// Manual trigger.
run chan struct{}
// Actions.
actionLock sync.Mutex
selectAction chan struct{}
delay *workerMgrDelay
repeat *workerMgrRepeat
keepAlive *workerMgrNoop
}
type taskAction interface {
Wait() <-chan time.Time
Ack()
}
// Delay.
type workerMgrDelay struct {
s *WorkerMgr
timer *time.Timer
}
func (s *WorkerMgr) newDelay(duration time.Duration) *workerMgrDelay {
return &workerMgrDelay{
s: s,
timer: time.NewTimer(duration),
}
}
func (sd *workerMgrDelay) Wait() <-chan time.Time { return sd.timer.C }
func (sd *workerMgrDelay) Ack() {
sd.s.actionLock.Lock()
defer sd.s.actionLock.Unlock()
// Remove delay, as it can only fire once.
sd.s.delay = nil
// Reset repeat.
sd.s.repeat.Reset()
// Stop timer.
sd.timer.Stop()
}
func (sd *workerMgrDelay) Stop() {
if sd == nil {
return
}
sd.timer.Stop()
}
// Repeat.
type workerMgrRepeat struct {
ticker *time.Ticker
interval time.Duration
}
func (s *WorkerMgr) newRepeat(interval time.Duration) *workerMgrRepeat {
return &workerMgrRepeat{
ticker: time.NewTicker(interval),
interval: interval,
}
}
func (sr *workerMgrRepeat) Wait() <-chan time.Time { return sr.ticker.C }
func (sr *workerMgrRepeat) Ack() {}
func (sr *workerMgrRepeat) Reset() {
if sr == nil {
return
}
sr.ticker.Reset(sr.interval)
}
func (sr *workerMgrRepeat) Stop() {
if sr == nil {
return
}
sr.ticker.Stop()
}
// Noop.
type workerMgrNoop struct{}
func (sn *workerMgrNoop) Wait() <-chan time.Time { return nil }
func (sn *workerMgrNoop) Ack() {}
// NewWorkerMgr creates a new scheduler for the given worker function.
// Errors and panic will only be logged by default.
// If custom behavior is required, supply an errorFn.
// If all scheduling has ended, the scheduler will end itself,
// including all related workers, except if keep-alive is enabled.
func (m *Manager) NewWorkerMgr(name string, fn func(w *WorkerCtx) error, errorFn func(c *WorkerCtx, err error, panicInfo string)) *WorkerMgr {
// Create task context.
wCtx := &WorkerCtx{
logger: m.logger.With("worker", name),
}
wCtx.ctx, wCtx.cancelCtx = context.WithCancel(m.Ctx())
s := &WorkerMgr{
mgr: m,
ctx: wCtx,
name: name,
fn: fn,
errorFn: errorFn,
run: make(chan struct{}, 1),
selectAction: make(chan struct{}, 1),
}
go s.taskMgr()
return s
}
func (s *WorkerMgr) taskMgr() {
s.mgr.workerStart()
defer s.mgr.workerDone()
// If the task manager ends, end all descendants too.
defer s.ctx.cancelCtx()
// Timers and tickers.
var (
action taskAction
)
defer func() {
s.delay.Stop()
s.repeat.Stop()
}()
// Wait for the first action.
select {
case <-s.selectAction:
case <-s.ctx.Done():
return
}
manage:
for {
// Select action.
func() {
s.actionLock.Lock()
defer s.actionLock.Unlock()
switch {
case s.delay != nil:
action = s.delay
case s.repeat != nil:
action = s.repeat
case s.keepAlive != nil:
action = s.keepAlive
default:
action = nil
}
}()
if action == nil {
return
}
// Wait for trigger or action.
select {
case <-action.Wait():
action.Ack()
// Time-triggered execution.
case <-s.run:
// Manually triggered execution.
case <-s.selectAction:
// Re-select action.
continue manage
case <-s.ctx.Done():
// Abort!
return
}
// Run worker.
wCtx := &WorkerCtx{
workerMgr: s,
logger: s.ctx.logger,
}
wCtx.ctx, wCtx.cancelCtx = context.WithCancel(s.ctx.ctx)
panicInfo, err := s.mgr.runWorker(wCtx, s.fn)
switch {
case err == nil:
// Continue with scheduling.
case errors.Is(err, context.Canceled), errors.Is(err, context.DeadlineExceeded):
// Worker was canceled, continue with scheduling.
// A canceled context or exceeded deadline also means that the worker is finished.
default:
// Log error and return.
if panicInfo != "" {
wCtx.Error(
"worker failed",
"err", err,
"file", panicInfo,
)
} else {
wCtx.Error(
"worker failed",
"err", err,
)
}
// Delegate error handling to the error function, otherwise just continue the scheduler.
// The error handler can stop the scheduler if it wants to.
if s.errorFn != nil {
s.errorFn(s.ctx, err, panicInfo)
}
}
}
}
// Go executes the worker immediately.
// If the worker is currently being executed,
// the next execution will commence afterwards.
// Calling Go() implies KeepAlive() if nothing else was specified yet.
func (s *WorkerMgr) Go() {
s.actionLock.Lock()
defer s.actionLock.Unlock()
// Check if any action is already defined.
switch {
case s.delay != nil:
case s.repeat != nil:
case s.keepAlive != nil:
default:
s.keepAlive = &workerMgrNoop{}
s.check()
}
// Reset repeat if set.
s.repeat.Reset()
// Stop delay if set.
s.delay.Stop()
s.delay = nil
// Send run command
select {
case s.run <- struct{}{}:
default:
}
}
// Stop immediately stops the scheduler and all related workers.
func (s *WorkerMgr) Stop() {
s.ctx.cancelCtx()
}
// Delay will schedule the worker to run after the given duration.
// If set, the repeat schedule will continue afterwards.
// Disable the delay by passing 0.
func (s *WorkerMgr) Delay(duration time.Duration) *WorkerMgr {
s.actionLock.Lock()
defer s.actionLock.Unlock()
s.delay.Stop()
s.delay = nil
if duration > 0 {
s.delay = s.newDelay(duration)
}
s.check()
return s
}
// Repeat will repeatedly execute the worker using the given interval.
// Disable repeating by passing 0.
func (s *WorkerMgr) Repeat(interval time.Duration) *WorkerMgr {
s.actionLock.Lock()
defer s.actionLock.Unlock()
s.repeat.Stop()
s.repeat = nil
if interval > 0 {
s.repeat = s.newRepeat(interval)
}
s.check()
return s
}
// KeepAlive instructs the scheduler to not self-destruct,
// even if all scheduled work is complete.
func (s *WorkerMgr) KeepAlive() *WorkerMgr {
s.actionLock.Lock()
defer s.actionLock.Unlock()
s.keepAlive = &workerMgrNoop{}
s.check()
return s
}
func (s *WorkerMgr) check() {
select {
case s.selectAction <- struct{}{}:
default:
}
}

View File

@@ -0,0 +1,157 @@
package mgr
import (
"sync/atomic"
"testing"
"time"
)
func TestWorkerMgrDelay(t *testing.T) {
t.Parallel()
m := New("DelayTest")
value := atomic.Bool{}
value.Store(false)
// Create a task that will after 1 second.
m.NewWorkerMgr("test", func(w *WorkerCtx) error {
value.Store(true)
return nil
}, nil).Delay(1 * time.Second)
// Check if value is set after 1 second and not before or after.
iterations := 0
for !value.Load() {
iterations++
time.Sleep(10 * time.Millisecond)
}
// 5% difference is acceptable since time.Sleep can't be perfect and it may very on different computers.
if iterations < 95 || iterations > 105 {
t.Errorf("WorkerMgr did not delay for a whole second it=%d", iterations)
}
}
func TestWorkerMgrRepeat(t *testing.T) {
t.Parallel()
m := New("RepeatTest")
value := atomic.Bool{}
value.Store(false)
// Create a task that should repeat every 100 milliseconds.
m.NewWorkerMgr("test", func(w *WorkerCtx) error {
value.Store(true)
return nil
}, nil).Repeat(100 * time.Millisecond)
// Check 10 consecutive runs they should be delayed for around 100 milliseconds each.
for range 10 {
iterations := 0
for !value.Load() {
iterations++
time.Sleep(10 * time.Millisecond)
}
// 10% difference is acceptable at this scale since time.Sleep can't be perfect and it may very on different computers.
if iterations < 9 || iterations > 11 {
t.Errorf("Worker was not delayed for a 100 milliseconds it=%d", iterations)
return
}
// Reset value
value.Store(false)
}
}
func TestWorkerMgrDelayAndRepeat(t *testing.T) { //nolint:dupl
t.Parallel()
m := New("DelayAndRepeatTest")
value := atomic.Bool{}
value.Store(false)
// Create a task that should delay for 1 second and then repeat every 100 milliseconds.
m.NewWorkerMgr("test", func(w *WorkerCtx) error {
value.Store(true)
return nil
}, nil).Delay(1 * time.Second).Repeat(100 * time.Millisecond)
iterations := 0
for !value.Load() {
iterations++
time.Sleep(10 * time.Millisecond)
}
// 5% difference is acceptable since time.Sleep can't be perfect and it may very on different computers.
if iterations < 95 || iterations > 105 {
t.Errorf("WorkerMgr did not delay for a whole second it=%d", iterations)
}
// Reset value
value.Store(false)
// Check 10 consecutive runs they should be delayed for around 100 milliseconds each.
for range 10 {
iterations = 0
for !value.Load() {
iterations++
time.Sleep(10 * time.Millisecond)
}
// 10% difference is acceptable at this scale since time.Sleep can't be perfect and it may very on different computers.
if iterations < 9 || iterations > 11 {
t.Errorf("Worker was not delayed for a 100 milliseconds it=%d", iterations)
return
}
// Reset value
value.Store(false)
}
}
func TestWorkerMgrRepeatAndDelay(t *testing.T) { //nolint:dupl
t.Parallel()
m := New("RepeatAndDelayTest")
value := atomic.Bool{}
value.Store(false)
// Create a task that should delay for 1 second and then repeat every 100 milliseconds but with reverse command order.
m.NewWorkerMgr("test", func(w *WorkerCtx) error {
value.Store(true)
return nil
}, nil).Repeat(100 * time.Millisecond).Delay(1 * time.Second)
iterations := 0
for !value.Load() {
iterations++
time.Sleep(10 * time.Millisecond)
}
// 5% difference is acceptable since time.Sleep can't be perfect and it may very on different computers.
if iterations < 95 || iterations > 105 {
t.Errorf("WorkerMgr did not delay for a whole second it=%d", iterations)
}
// Reset value
value.Store(false)
// Check 10 consecutive runs they should be delayed for around 100 milliseconds each.
for range 10 {
iterations := 0
for !value.Load() {
iterations++
time.Sleep(10 * time.Millisecond)
}
// 10% difference is acceptable at this scale since time.Sleep can't be perfect and it may very on different computers.
if iterations < 9 || iterations > 11 {
t.Errorf("Worker was not delayed for a 100 milliseconds it=%d", iterations)
return
}
// Reset value
value.Store(false)
}
}