feat: refactor interception modules into pausable group
- Add GroupModule to wrap interception, dnsmonitor, and compat modules - Simplify pause/resume operations by grouping related modules - Update worker info collection to handle nested module groups - Remove deprecated flags and improve module lifecycle management - Add proper atomic state tracking for nfqueue interception https://github.com/safing/portmaster/issues/2050
This commit is contained in:
@@ -78,14 +78,20 @@ func prep() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func start() error {
|
func start() error {
|
||||||
isActive.Store(true)
|
// Create fresh worker managers on Start() to enable clean restarts after Stop().
|
||||||
|
// (after module.Manager().Stop() has been called, all workers are stopped and cannot be reused)
|
||||||
|
module.selfcheckWorkerMgr = module.Manager().NewWorkerMgr("compatibility self-check", selfcheckTaskFunc, nil)
|
||||||
|
module.cleanNotifyThresholdWorkerMgr = module.Manager().NewWorkerMgr("clean notify thresholds", cleanNotifyThreshold, nil)
|
||||||
|
|
||||||
startNotify()
|
startNotify()
|
||||||
|
|
||||||
selfcheckNetworkChangedFlag.Refresh()
|
selfcheckNetworkChangedFlag.Refresh()
|
||||||
|
|
||||||
|
// Schedule periodic self-checks.
|
||||||
module.selfcheckWorkerMgr.Repeat(5 * time.Minute).Delay(selfcheckTaskRetryAfter)
|
module.selfcheckWorkerMgr.Repeat(5 * time.Minute).Delay(selfcheckTaskRetryAfter)
|
||||||
module.cleanNotifyThresholdWorkerMgr.Repeat(1 * time.Hour)
|
module.cleanNotifyThresholdWorkerMgr.Repeat(1 * time.Hour)
|
||||||
|
|
||||||
|
// Add network change callback to trigger immediate self-check.
|
||||||
module.instance.NetEnv().EventNetworkChange.AddCallback("trigger compat self-check", func(_ *mgr.WorkerCtx, _ struct{}) (bool, error) {
|
module.instance.NetEnv().EventNetworkChange.AddCallback("trigger compat self-check", func(_ *mgr.WorkerCtx, _ struct{}) (bool, error) {
|
||||||
module.selfcheckWorkerMgr.Delay(selfcheckTaskRetryAfter)
|
module.selfcheckWorkerMgr.Delay(selfcheckTaskRetryAfter)
|
||||||
return false, nil
|
return false, nil
|
||||||
@@ -94,16 +100,11 @@ func start() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func stop() error {
|
func stop() error {
|
||||||
isActive.Store(false)
|
|
||||||
resetSelfCheckState()
|
resetSelfCheckState()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func selfcheckTaskFunc(wc *mgr.WorkerCtx) error {
|
func selfcheckTaskFunc(wc *mgr.WorkerCtx) error {
|
||||||
if !isActive.Load() {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create tracing logger.
|
// Create tracing logger.
|
||||||
ctx, tracer := log.AddTracer(wc.Ctx())
|
ctx, tracer := log.AddTracer(wc.Ctx())
|
||||||
@@ -166,9 +167,6 @@ func SelfCheckIsFailing() bool {
|
|||||||
var (
|
var (
|
||||||
module *Compat
|
module *Compat
|
||||||
shimLoaded atomic.Bool
|
shimLoaded atomic.Bool
|
||||||
// isActive is a simple shutdown flag to prevent self-check worker from executing during stop().
|
|
||||||
// TODO: consider correct start/stop of the workers instead.
|
|
||||||
isActive atomic.Bool
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// New returns a new Compat module.
|
// New returns a new Compat module.
|
||||||
@@ -180,11 +178,7 @@ func New(instance instance) (*Compat, error) {
|
|||||||
module = &Compat{
|
module = &Compat{
|
||||||
mgr: m,
|
mgr: m,
|
||||||
instance: instance,
|
instance: instance,
|
||||||
|
states: mgr.NewStateMgr(m),
|
||||||
selfcheckWorkerMgr: m.NewWorkerMgr("compatibility self-check", selfcheckTaskFunc, nil),
|
|
||||||
cleanNotifyThresholdWorkerMgr: m.NewWorkerMgr("clean notify thresholds", cleanNotifyThreshold, nil),
|
|
||||||
|
|
||||||
states: mgr.NewStateMgr(m),
|
|
||||||
}
|
}
|
||||||
if err := prep(); err != nil {
|
if err := prep(); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|||||||
@@ -8,8 +8,6 @@ import (
|
|||||||
|
|
||||||
"github.com/safing/portmaster/base/config"
|
"github.com/safing/portmaster/base/config"
|
||||||
"github.com/safing/portmaster/base/notifications"
|
"github.com/safing/portmaster/base/notifications"
|
||||||
"github.com/safing/portmaster/service/compat"
|
|
||||||
"github.com/safing/portmaster/service/firewall/interception"
|
|
||||||
"github.com/safing/portmaster/service/mgr"
|
"github.com/safing/portmaster/service/mgr"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -32,8 +30,7 @@ type Control struct {
|
|||||||
|
|
||||||
type instance interface {
|
type instance interface {
|
||||||
Config() *config.Config
|
Config() *config.Config
|
||||||
Interception() *interception.Interception
|
InterceptionGroup() *mgr.GroupModule
|
||||||
Compat() *compat.Compat
|
|
||||||
SPNGroup() *mgr.ExtendedGroup
|
SPNGroup() *mgr.ExtendedGroup
|
||||||
IsShuttingDown() bool
|
IsShuttingDown() bool
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -68,16 +68,9 @@ func (c *Control) pause(duration time.Duration, onlySPN bool) (retErr error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if !c.pauseInfo.Interception {
|
if !c.pauseInfo.Interception {
|
||||||
modulesToResume := []mgr.Module{
|
if err := c.instance.InterceptionGroup().Stop(); err != nil {
|
||||||
c.instance.Compat(),
|
return err
|
||||||
c.instance.Interception(),
|
|
||||||
}
|
}
|
||||||
for _, m := range modulesToResume {
|
|
||||||
if err := m.Stop(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
c.mgr.Info("interception paused")
|
c.mgr.Info("interception paused")
|
||||||
c.pauseInfo.Interception = true
|
c.pauseInfo.Interception = true
|
||||||
}
|
}
|
||||||
@@ -106,14 +99,8 @@ func (c *Control) resume() (retErr error) {
|
|||||||
c.stopResumeWorker()
|
c.stopResumeWorker()
|
||||||
|
|
||||||
if c.pauseInfo.Interception {
|
if c.pauseInfo.Interception {
|
||||||
modulesToResume := []mgr.Module{
|
if err := c.instance.InterceptionGroup().Start(); err != nil {
|
||||||
c.instance.Interception(),
|
return err
|
||||||
c.instance.Compat(),
|
|
||||||
}
|
|
||||||
for _, m := range modulesToResume {
|
|
||||||
if err := m.Start(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
c.mgr.Info("interception resumed")
|
c.mgr.Info("interception resumed")
|
||||||
c.pauseInfo.Interception = false
|
c.pauseInfo.Interception = false
|
||||||
|
|||||||
@@ -24,10 +24,26 @@ func (i *Instance) GetWorkerInfo() (*mgr.WorkerInfo, error) {
|
|||||||
for _, m := range i.serviceGroup.Modules() {
|
for _, m := range i.serviceGroup.Modules() {
|
||||||
wi, _ := m.Manager().WorkerInfo(snapshot) // Does not fail when we provide a snapshot.
|
wi, _ := m.Manager().WorkerInfo(snapshot) // Does not fail when we provide a snapshot.
|
||||||
infos = append(infos, wi)
|
infos = append(infos, wi)
|
||||||
|
|
||||||
|
// Check if module is a nested modules group
|
||||||
|
if gm, ok := m.(*mgr.GroupModule); ok {
|
||||||
|
for _, sm := range gm.Modules() {
|
||||||
|
wi, _ := sm.Manager().WorkerInfo(snapshot) // Does not fail when we provide a snapshot.
|
||||||
|
infos = append(infos, wi)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
for _, m := range i.SpnGroup.Modules() {
|
for _, m := range i.SpnGroup.Modules() {
|
||||||
wi, _ := m.Manager().WorkerInfo(snapshot) // Does not fail when we provide a snapshot.
|
wi, _ := m.Manager().WorkerInfo(snapshot) // Does not fail when we provide a snapshot.
|
||||||
infos = append(infos, wi)
|
infos = append(infos, wi)
|
||||||
|
|
||||||
|
// Check if module is a nested modules group
|
||||||
|
if gm, ok := m.(*mgr.GroupModule); ok {
|
||||||
|
for _, sm := range gm.Modules() {
|
||||||
|
wi, _ := sm.Manager().WorkerInfo(snapshot) // Does not fail when we provide a snapshot.
|
||||||
|
infos = append(infos, wi)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return mgr.MergeWorkerInfo(infos...), nil
|
return mgr.MergeWorkerInfo(infos...), nil
|
||||||
|
|||||||
@@ -34,11 +34,6 @@ func startInterception(packets chan packet.Packet) error {
|
|||||||
|
|
||||||
// stop starts the interception.
|
// stop starts the interception.
|
||||||
func stopInterception() error {
|
func stopInterception() error {
|
||||||
// TODO: stop ebpf workers gracefully
|
|
||||||
// E.g.:
|
|
||||||
// module.mgr.Cancel()
|
|
||||||
// <-m.Done()
|
|
||||||
|
|
||||||
return StopNfqueueInterception()
|
return StopNfqueueInterception()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,10 +1,10 @@
|
|||||||
package interception
|
package interception
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"flag"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync/atomic"
|
||||||
|
|
||||||
"github.com/coreos/go-iptables/iptables"
|
"github.com/coreos/go-iptables/iptables"
|
||||||
"github.com/hashicorp/go-multierror"
|
"github.com/hashicorp/go-multierror"
|
||||||
@@ -30,15 +30,10 @@ var (
|
|||||||
out6Queue nfQueue
|
out6Queue nfQueue
|
||||||
in6Queue nfQueue
|
in6Queue nfQueue
|
||||||
|
|
||||||
|
isRunning atomic.Bool
|
||||||
shutdownSignal = make(chan struct{})
|
shutdownSignal = make(chan struct{})
|
||||||
|
|
||||||
experimentalNfqueueBackend bool
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
|
||||||
flag.BoolVar(&experimentalNfqueueBackend, "experimental-nfqueue", false, "(deprecated flag; always used)")
|
|
||||||
}
|
|
||||||
|
|
||||||
// nfQueue encapsulates nfQueue providers.
|
// nfQueue encapsulates nfQueue providers.
|
||||||
type nfQueue interface {
|
type nfQueue interface {
|
||||||
PacketChannel() <-chan packet.Packet
|
PacketChannel() <-chan packet.Packet
|
||||||
@@ -262,12 +257,13 @@ func deactivateIPTables(protocol iptables.Protocol, rules, chains []string) erro
|
|||||||
|
|
||||||
// StartNfqueueInterception starts the nfqueue interception.
|
// StartNfqueueInterception starts the nfqueue interception.
|
||||||
func StartNfqueueInterception(packets chan<- packet.Packet) (err error) {
|
func StartNfqueueInterception(packets chan<- packet.Packet) (err error) {
|
||||||
// @deprecated, remove in v1
|
if !isRunning.CompareAndSwap(false, true) {
|
||||||
if experimentalNfqueueBackend {
|
return nil // already running
|
||||||
log.Warningf("[DEPRECATED] --experimental-nfqueue has been deprecated as the backend is now used by default")
|
|
||||||
log.Warningf("[DEPRECATED] please remove the flag from your configuration!")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Reset shutdown signal
|
||||||
|
shutdownSignal = make(chan struct{})
|
||||||
|
|
||||||
err = activateNfqueueFirewall()
|
err = activateNfqueueFirewall()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not initialize nfqueue: %w", err)
|
return fmt.Errorf("could not initialize nfqueue: %w", err)
|
||||||
@@ -305,6 +301,11 @@ func StartNfqueueInterception(packets chan<- packet.Packet) (err error) {
|
|||||||
|
|
||||||
// StopNfqueueInterception stops the nfqueue interception.
|
// StopNfqueueInterception stops the nfqueue interception.
|
||||||
func StopNfqueueInterception() error {
|
func StopNfqueueInterception() error {
|
||||||
|
if !isRunning.CompareAndSwap(true, false) {
|
||||||
|
return nil // not running
|
||||||
|
}
|
||||||
|
|
||||||
|
// Signal shutdown to packet handler
|
||||||
defer close(shutdownSignal)
|
defer close(shutdownSignal)
|
||||||
|
|
||||||
if out4Queue != nil {
|
if out4Queue != nil {
|
||||||
|
|||||||
@@ -58,7 +58,8 @@ type Instance struct {
|
|||||||
shutdownCtx context.Context
|
shutdownCtx context.Context
|
||||||
cancelShutdownCtx context.CancelFunc
|
cancelShutdownCtx context.CancelFunc
|
||||||
|
|
||||||
serviceGroup *mgr.Group
|
serviceGroup *mgr.Group
|
||||||
|
serviceGroupInterception *mgr.GroupModule
|
||||||
|
|
||||||
binDir string
|
binDir string
|
||||||
dataDir string
|
dataDir string
|
||||||
@@ -313,6 +314,12 @@ func New(svcCfg *ServiceConfig) (*Instance, error) { //nolint:maintidx
|
|||||||
return instance, fmt.Errorf("create terminal module: %w", err)
|
return instance, fmt.Errorf("create terminal module: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Grouped interception modules that can be paused/resumed together.
|
||||||
|
instance.serviceGroupInterception = mgr.NewGroupModule("Interception Group",
|
||||||
|
instance.interception,
|
||||||
|
instance.dnsmonitor,
|
||||||
|
instance.compat)
|
||||||
|
|
||||||
// Add all modules to instance group.
|
// Add all modules to instance group.
|
||||||
instance.serviceGroup = mgr.NewGroup(
|
instance.serviceGroup = mgr.NewGroup(
|
||||||
instance.base,
|
instance.base,
|
||||||
@@ -340,10 +347,13 @@ func New(svcCfg *ServiceConfig) (*Instance, error) { //nolint:maintidx
|
|||||||
instance.resolver,
|
instance.resolver,
|
||||||
instance.filterLists,
|
instance.filterLists,
|
||||||
instance.customlist,
|
instance.customlist,
|
||||||
instance.interception,
|
|
||||||
instance.dnsmonitor,
|
|
||||||
|
|
||||||
instance.compat,
|
// Grouped interception modules:
|
||||||
|
// instance.interception,
|
||||||
|
// instance.dnsmonitor,
|
||||||
|
// instance.compat
|
||||||
|
instance.serviceGroupInterception,
|
||||||
|
|
||||||
instance.status,
|
instance.status,
|
||||||
instance.broadcasts,
|
instance.broadcasts,
|
||||||
instance.sync,
|
instance.sync,
|
||||||
@@ -549,6 +559,11 @@ func (i *Instance) Interception() *interception.Interception {
|
|||||||
return i.interception
|
return i.interception
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// InterceptionGroup returns the grouped interception modules that can be paused together.
|
||||||
|
func (i *Instance) InterceptionGroup() *mgr.GroupModule {
|
||||||
|
return i.serviceGroupInterception
|
||||||
|
}
|
||||||
|
|
||||||
// DNSMonitor returns the dns-listener module.
|
// DNSMonitor returns the dns-listener module.
|
||||||
func (i *Instance) DNSMonitor() *dnsmonitor.DNSMonitor {
|
func (i *Instance) DNSMonitor() *dnsmonitor.DNSMonitor {
|
||||||
return i.dnsmonitor
|
return i.dnsmonitor
|
||||||
|
|||||||
33
service/mgr/group_module.go
Normal file
33
service/mgr/group_module.go
Normal file
@@ -0,0 +1,33 @@
|
|||||||
|
package mgr
|
||||||
|
|
||||||
|
// GroupModule is a module that wraps a group of modules,
|
||||||
|
// to allow nesting groups of modules in parent group.
|
||||||
|
type GroupModule struct {
|
||||||
|
mgr *Manager
|
||||||
|
group *Group
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewGroupModule(name string, modules ...Module) *GroupModule {
|
||||||
|
return &GroupModule{
|
||||||
|
mgr: New(name),
|
||||||
|
group: NewGroup(modules...),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (gm *GroupModule) Manager() *Manager {
|
||||||
|
return gm.mgr
|
||||||
|
}
|
||||||
|
|
||||||
|
func (gm *GroupModule) Start() error {
|
||||||
|
return gm.group.Start()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (gm *GroupModule) Stop() error {
|
||||||
|
return gm.group.Stop()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Modules returns the modules in the group wrapped by this group module.
|
||||||
|
// (mimics Group.Modules())
|
||||||
|
func (gm *GroupModule) Modules() []Module {
|
||||||
|
return gm.group.Modules()
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user