diff --git a/intel/filterlists/bloom.go b/intel/filterlists/bloom.go index 4df4f70a..93cde1ed 100644 --- a/intel/filterlists/bloom.go +++ b/intel/filterlists/bloom.go @@ -76,7 +76,7 @@ func (bf *scopedBloom) add(scope, value string) { // not have support for it. We just drop the value // as a call to Test() for that scope will always // return "true" - log.Warningf("failed to add unknown entity type %q", scope) + log.Warningf("failed to add unknown entity type %q with value %q", scope, value) return } diff --git a/intel/filterlists/database.go b/intel/filterlists/database.go index cee940f2..46c54673 100644 --- a/intel/filterlists/database.go +++ b/intel/filterlists/database.go @@ -34,7 +34,6 @@ const ( ) const bfFalsePositiveRate = 0.001 -const filterlistsDisabled = "filterlists:disabled" var ( filterListLock sync.RWMutex @@ -128,12 +127,19 @@ func processListFile(ctx context.Context, filter *scopedBloom, file *updater.Fil return nil }) + persistRecords(startSafe, records) + + return g.Wait() +} + +func persistRecords(startJob func(func() error), records <-chan record.Record) { var cnt int start := time.Now() batch := database.NewInterface(&database.Options{Local: true, Internal: true}) - var startBatch func() - processBatch := func() error { + var processBatch func() error + + processBatch = func() error { batchPut := batch.PutMany("cache") for r := range records { if err := batchPut(r); err != nil { @@ -152,7 +158,7 @@ func processListFile(ctx context.Context, filter *scopedBloom, file *updater.Fil return err } - startBatch() + startJob(processBatch) return nil } @@ -160,13 +166,8 @@ func processListFile(ctx context.Context, filter *scopedBloom, file *updater.Fil return batchPut(nil) } - startBatch = func() { - startSafe(processBatch) - } - startBatch() - - return g.Wait() + startJob(processBatch) } func normalizeEntry(entry *listEntry) { diff --git a/intel/filterlists/module.go b/intel/filterlists/module.go index 14805f69..9b74c5c4 100644 --- a/intel/filterlists/module.go +++ b/intel/filterlists/module.go @@ -15,6 +15,14 @@ var ( module *modules.Module ) +const ( + filterlistsDisabled = "filterlists:disabled" + filterlistsStaleDataSurvived = "filterlists:staledata" + filterlistsStaleDataDescr = "Removing stale filter list records failed. Some connections may be overblocked." + filterlistsUpdateInProgress = "filterlists:update-in-progress" + filterlistsUpdateInProgressDescr = "Performance slightly degraded during list update." +) + // booleans mainly used to decouple the module // during testing. var ( @@ -23,6 +31,8 @@ var ( ) func init() { + ignoreNetEnvEvents.Set() + module = modules.Register("filterlists", prep, start, nil, "core", "netenv") } diff --git a/intel/filterlists/updater.go b/intel/filterlists/updater.go index a9c0322b..5a4bfffc 100644 --- a/intel/filterlists/updater.go +++ b/intel/filterlists/updater.go @@ -28,8 +28,10 @@ func tryListUpdate(ctx context.Context) error { return err } - // if the module is in an error state resolve that right now. + // if the module is in an error, warning or hint state resolve that right now. module.Resolve(filterlistsDisabled) + module.Resolve(filterlistsStaleDataSurvived) + module.Resolve(filterlistsUpdateInProgress) return nil } @@ -38,11 +40,15 @@ func performUpdate(ctx context.Context) error { log.Debugf("intel/filterlists: upgrade already in progress") return nil } + defer updateInProgress.UnSet() + + module.Hint(filterlistsUpdateInProgress, filterlistsUpdateInProgressDescr) upgradables, err := getUpgradableFiles() if err != nil { return err } + log.Debugf("intel/filterlists: resources to update: %v", upgradables) if len(upgradables) == 0 { log.Debugf("intel/filterlists: ignoring update, latest version is already used") @@ -55,7 +61,7 @@ func performUpdate(ctx context.Context) error { // perform the actual upgrade by processing each file // in the returned order. for idx, file := range upgradables { - log.Debugf("applying update %s version %s", file.Identifier(), file.Version()) + log.Debugf("intel/filterlists: applying update (%d) %s version %s", idx, file.Identifier(), file.Version()) if file == baseFile { if idx != 0 { @@ -101,7 +107,13 @@ func performUpdate(ctx context.Context) error { // been updated now. Once we are done, start a worker // for that purpose. if cleanupRequired { - defer module.StartWorker("filterlists:cleanup", removeAllObsoleteFilterEntries) + if err := module.RunWorker("filterlists:cleanup", removeAllObsoleteFilterEntries); err != nil { + // if we failed to remove all stale cache entries + // we abort now WITHOUT updating the database version. This means + // we'll try again during the next update. + module.Warning(filterlistsStaleDataSurvived, filterlistsStaleDataDescr) + return fmt.Errorf("failed to cleanup stale cache records: %w", err) + } } // try to save the highest version of our files. @@ -114,8 +126,9 @@ func performUpdate(ctx context.Context) error { } func removeAllObsoleteFilterEntries(_ context.Context) error { + log.Infof("intel/filterlists: cleanup task started, removing obsolete filter list entries ...") for { - done, err := removeObsoleteFilterEntries(1000) + done, err := removeObsoleteFilterEntries(10000) if err != nil { return err } @@ -127,7 +140,6 @@ func removeAllObsoleteFilterEntries(_ context.Context) error { } func removeObsoleteFilterEntries(batchSize int) (bool, error) { - log.Infof("intel/filterlists: cleanup task started, removing obsolete filter list entries ...") iter, err := cache.Query( query.New(filterListKeyPrefix).Where( diff --git a/updates/main.go b/updates/main.go index f2c9b78c..f8741bf7 100644 --- a/updates/main.go +++ b/updates/main.go @@ -157,9 +157,13 @@ func DisableUpdateSchedule() error { } func checkForUpdates(ctx context.Context) error { + if err := registry.UpdateIndexes(); err != nil { + return fmt.Errorf("updates: failed to update indexes: %w", err) + } + err := registry.DownloadUpdates(ctx) if err != nil { - return fmt.Errorf("updates: failed to update: %s", err) + return fmt.Errorf("updates: failed to update: %w", err) } module.TriggerEvent(ResourceUpdateEvent, nil) return nil