From 4d2d91972b952225da34f2d8d73940c3281d6bf4 Mon Sep 17 00:00:00 2001 From: Alexandr Stelnykovych Date: Thu, 6 Nov 2025 17:28:38 +0200 Subject: [PATCH] feat: refactor interception modules into pausable group - Add GroupModule to wrap interception, dnsmonitor, and compat modules - Simplify pause/resume operations by grouping related modules - Update worker info collection to handle nested module groups - Remove deprecated flags and improve module lifecycle management - Add proper atomic state tracking for nfqueue interception https://github.com/safing/portmaster/issues/2050 --- service/compat/module.go | 22 +++++-------- service/control/module.go | 5 +-- service/control/pause.go | 21 +++--------- service/debug.go | 16 +++++++++ .../interception/interception_linux.go | 5 --- .../firewall/interception/nfqueue_linux.go | 23 ++++++------- service/instance.go | 23 ++++++++++--- service/mgr/group_module.go | 33 +++++++++++++++++++ 8 files changed, 93 insertions(+), 55 deletions(-) create mode 100644 service/mgr/group_module.go diff --git a/service/compat/module.go b/service/compat/module.go index a6fbfeb8..28d67d48 100644 --- a/service/compat/module.go +++ b/service/compat/module.go @@ -78,14 +78,20 @@ func prep() error { } func start() error { - isActive.Store(true) + // Create fresh worker managers on Start() to enable clean restarts after Stop(). + // (after module.Manager().Stop() has been called, all workers are stopped and cannot be reused) + module.selfcheckWorkerMgr = module.Manager().NewWorkerMgr("compatibility self-check", selfcheckTaskFunc, nil) + module.cleanNotifyThresholdWorkerMgr = module.Manager().NewWorkerMgr("clean notify thresholds", cleanNotifyThreshold, nil) startNotify() selfcheckNetworkChangedFlag.Refresh() + + // Schedule periodic self-checks. module.selfcheckWorkerMgr.Repeat(5 * time.Minute).Delay(selfcheckTaskRetryAfter) module.cleanNotifyThresholdWorkerMgr.Repeat(1 * time.Hour) + // Add network change callback to trigger immediate self-check. module.instance.NetEnv().EventNetworkChange.AddCallback("trigger compat self-check", func(_ *mgr.WorkerCtx, _ struct{}) (bool, error) { module.selfcheckWorkerMgr.Delay(selfcheckTaskRetryAfter) return false, nil @@ -94,16 +100,11 @@ func start() error { } func stop() error { - isActive.Store(false) resetSelfCheckState() - return nil } func selfcheckTaskFunc(wc *mgr.WorkerCtx) error { - if !isActive.Load() { - return nil - } // Create tracing logger. ctx, tracer := log.AddTracer(wc.Ctx()) @@ -166,9 +167,6 @@ func SelfCheckIsFailing() bool { var ( module *Compat shimLoaded atomic.Bool - // isActive is a simple shutdown flag to prevent self-check worker from executing during stop(). - // TODO: consider correct start/stop of the workers instead. - isActive atomic.Bool ) // New returns a new Compat module. @@ -180,11 +178,7 @@ func New(instance instance) (*Compat, error) { module = &Compat{ mgr: m, instance: instance, - - selfcheckWorkerMgr: m.NewWorkerMgr("compatibility self-check", selfcheckTaskFunc, nil), - cleanNotifyThresholdWorkerMgr: m.NewWorkerMgr("clean notify thresholds", cleanNotifyThreshold, nil), - - states: mgr.NewStateMgr(m), + states: mgr.NewStateMgr(m), } if err := prep(); err != nil { return nil, err diff --git a/service/control/module.go b/service/control/module.go index 30fa8130..d04ff7e1 100644 --- a/service/control/module.go +++ b/service/control/module.go @@ -8,8 +8,6 @@ import ( "github.com/safing/portmaster/base/config" "github.com/safing/portmaster/base/notifications" - "github.com/safing/portmaster/service/compat" - "github.com/safing/portmaster/service/firewall/interception" "github.com/safing/portmaster/service/mgr" ) @@ -32,8 +30,7 @@ type Control struct { type instance interface { Config() *config.Config - Interception() *interception.Interception - Compat() *compat.Compat + InterceptionGroup() *mgr.GroupModule SPNGroup() *mgr.ExtendedGroup IsShuttingDown() bool } diff --git a/service/control/pause.go b/service/control/pause.go index c872a71f..79387d08 100644 --- a/service/control/pause.go +++ b/service/control/pause.go @@ -68,16 +68,9 @@ func (c *Control) pause(duration time.Duration, onlySPN bool) (retErr error) { } if !c.pauseInfo.Interception { - modulesToResume := []mgr.Module{ - c.instance.Compat(), - c.instance.Interception(), + if err := c.instance.InterceptionGroup().Stop(); err != nil { + return err } - for _, m := range modulesToResume { - if err := m.Stop(); err != nil { - return err - } - } - c.mgr.Info("interception paused") c.pauseInfo.Interception = true } @@ -106,14 +99,8 @@ func (c *Control) resume() (retErr error) { c.stopResumeWorker() if c.pauseInfo.Interception { - modulesToResume := []mgr.Module{ - c.instance.Interception(), - c.instance.Compat(), - } - for _, m := range modulesToResume { - if err := m.Start(); err != nil { - return err - } + if err := c.instance.InterceptionGroup().Start(); err != nil { + return err } c.mgr.Info("interception resumed") c.pauseInfo.Interception = false diff --git a/service/debug.go b/service/debug.go index 46a66421..f1a25c7b 100644 --- a/service/debug.go +++ b/service/debug.go @@ -24,10 +24,26 @@ func (i *Instance) GetWorkerInfo() (*mgr.WorkerInfo, error) { for _, m := range i.serviceGroup.Modules() { wi, _ := m.Manager().WorkerInfo(snapshot) // Does not fail when we provide a snapshot. infos = append(infos, wi) + + // Check if module is a nested modules group + if gm, ok := m.(*mgr.GroupModule); ok { + for _, sm := range gm.Modules() { + wi, _ := sm.Manager().WorkerInfo(snapshot) // Does not fail when we provide a snapshot. + infos = append(infos, wi) + } + } } for _, m := range i.SpnGroup.Modules() { wi, _ := m.Manager().WorkerInfo(snapshot) // Does not fail when we provide a snapshot. infos = append(infos, wi) + + // Check if module is a nested modules group + if gm, ok := m.(*mgr.GroupModule); ok { + for _, sm := range gm.Modules() { + wi, _ := sm.Manager().WorkerInfo(snapshot) // Does not fail when we provide a snapshot. + infos = append(infos, wi) + } + } } return mgr.MergeWorkerInfo(infos...), nil diff --git a/service/firewall/interception/interception_linux.go b/service/firewall/interception/interception_linux.go index 1d71eb15..24109f48 100644 --- a/service/firewall/interception/interception_linux.go +++ b/service/firewall/interception/interception_linux.go @@ -34,11 +34,6 @@ func startInterception(packets chan packet.Packet) error { // stop starts the interception. func stopInterception() error { - // TODO: stop ebpf workers gracefully - // E.g.: - // module.mgr.Cancel() - // <-m.Done() - return StopNfqueueInterception() } diff --git a/service/firewall/interception/nfqueue_linux.go b/service/firewall/interception/nfqueue_linux.go index 875509b6..d43629f7 100644 --- a/service/firewall/interception/nfqueue_linux.go +++ b/service/firewall/interception/nfqueue_linux.go @@ -1,10 +1,10 @@ package interception import ( - "flag" "fmt" "sort" "strings" + "sync/atomic" "github.com/coreos/go-iptables/iptables" "github.com/hashicorp/go-multierror" @@ -30,15 +30,10 @@ var ( out6Queue nfQueue in6Queue nfQueue + isRunning atomic.Bool shutdownSignal = make(chan struct{}) - - experimentalNfqueueBackend bool ) -func init() { - flag.BoolVar(&experimentalNfqueueBackend, "experimental-nfqueue", false, "(deprecated flag; always used)") -} - // nfQueue encapsulates nfQueue providers. type nfQueue interface { PacketChannel() <-chan packet.Packet @@ -262,12 +257,13 @@ func deactivateIPTables(protocol iptables.Protocol, rules, chains []string) erro // StartNfqueueInterception starts the nfqueue interception. func StartNfqueueInterception(packets chan<- packet.Packet) (err error) { - // @deprecated, remove in v1 - if experimentalNfqueueBackend { - log.Warningf("[DEPRECATED] --experimental-nfqueue has been deprecated as the backend is now used by default") - log.Warningf("[DEPRECATED] please remove the flag from your configuration!") + if !isRunning.CompareAndSwap(false, true) { + return nil // already running } + // Reset shutdown signal + shutdownSignal = make(chan struct{}) + err = activateNfqueueFirewall() if err != nil { return fmt.Errorf("could not initialize nfqueue: %w", err) @@ -305,6 +301,11 @@ func StartNfqueueInterception(packets chan<- packet.Packet) (err error) { // StopNfqueueInterception stops the nfqueue interception. func StopNfqueueInterception() error { + if !isRunning.CompareAndSwap(true, false) { + return nil // not running + } + + // Signal shutdown to packet handler defer close(shutdownSignal) if out4Queue != nil { diff --git a/service/instance.go b/service/instance.go index 34e5609e..5b3ab760 100644 --- a/service/instance.go +++ b/service/instance.go @@ -58,7 +58,8 @@ type Instance struct { shutdownCtx context.Context cancelShutdownCtx context.CancelFunc - serviceGroup *mgr.Group + serviceGroup *mgr.Group + serviceGroupInterception *mgr.GroupModule binDir string dataDir string @@ -313,6 +314,12 @@ func New(svcCfg *ServiceConfig) (*Instance, error) { //nolint:maintidx return instance, fmt.Errorf("create terminal module: %w", err) } + // Grouped interception modules that can be paused/resumed together. + instance.serviceGroupInterception = mgr.NewGroupModule("Interception Group", + instance.interception, + instance.dnsmonitor, + instance.compat) + // Add all modules to instance group. instance.serviceGroup = mgr.NewGroup( instance.base, @@ -340,10 +347,13 @@ func New(svcCfg *ServiceConfig) (*Instance, error) { //nolint:maintidx instance.resolver, instance.filterLists, instance.customlist, - instance.interception, - instance.dnsmonitor, - instance.compat, + // Grouped interception modules: + // instance.interception, + // instance.dnsmonitor, + // instance.compat + instance.serviceGroupInterception, + instance.status, instance.broadcasts, instance.sync, @@ -549,6 +559,11 @@ func (i *Instance) Interception() *interception.Interception { return i.interception } +// InterceptionGroup returns the grouped interception modules that can be paused together. +func (i *Instance) InterceptionGroup() *mgr.GroupModule { + return i.serviceGroupInterception +} + // DNSMonitor returns the dns-listener module. func (i *Instance) DNSMonitor() *dnsmonitor.DNSMonitor { return i.dnsmonitor diff --git a/service/mgr/group_module.go b/service/mgr/group_module.go new file mode 100644 index 00000000..214b9fee --- /dev/null +++ b/service/mgr/group_module.go @@ -0,0 +1,33 @@ +package mgr + +// GroupModule is a module that wraps a group of modules, +// to allow nesting groups of modules in parent group. +type GroupModule struct { + mgr *Manager + group *Group +} + +func NewGroupModule(name string, modules ...Module) *GroupModule { + return &GroupModule{ + mgr: New(name), + group: NewGroup(modules...), + } +} + +func (gm *GroupModule) Manager() *Manager { + return gm.mgr +} + +func (gm *GroupModule) Start() error { + return gm.group.Start() +} + +func (gm *GroupModule) Stop() error { + return gm.group.Stop() +} + +// Modules returns the modules in the group wrapped by this group module. +// (mimics Group.Modules()) +func (gm *GroupModule) Modules() []Module { + return gm.group.Modules() +}