Fix resource index not updated. Minor module improvements

This commit is contained in:
Patrick Pacher
2020-04-14 14:02:05 +02:00
parent a6d768958d
commit fc9835f91a
5 changed files with 44 additions and 17 deletions

View File

@@ -76,7 +76,7 @@ func (bf *scopedBloom) add(scope, value string) {
// not have support for it. We just drop the value // not have support for it. We just drop the value
// as a call to Test() for that scope will always // as a call to Test() for that scope will always
// return "true" // return "true"
log.Warningf("failed to add unknown entity type %q", scope) log.Warningf("failed to add unknown entity type %q with value %q", scope, value)
return return
} }

View File

@@ -34,7 +34,6 @@ const (
) )
const bfFalsePositiveRate = 0.001 const bfFalsePositiveRate = 0.001
const filterlistsDisabled = "filterlists:disabled"
var ( var (
filterListLock sync.RWMutex filterListLock sync.RWMutex
@@ -128,12 +127,19 @@ func processListFile(ctx context.Context, filter *scopedBloom, file *updater.Fil
return nil return nil
}) })
persistRecords(startSafe, records)
return g.Wait()
}
func persistRecords(startJob func(func() error), records <-chan record.Record) {
var cnt int var cnt int
start := time.Now() start := time.Now()
batch := database.NewInterface(&database.Options{Local: true, Internal: true}) batch := database.NewInterface(&database.Options{Local: true, Internal: true})
var startBatch func() var processBatch func() error
processBatch := func() error {
processBatch = func() error {
batchPut := batch.PutMany("cache") batchPut := batch.PutMany("cache")
for r := range records { for r := range records {
if err := batchPut(r); err != nil { if err := batchPut(r); err != nil {
@@ -152,7 +158,7 @@ func processListFile(ctx context.Context, filter *scopedBloom, file *updater.Fil
return err return err
} }
startBatch() startJob(processBatch)
return nil return nil
} }
@@ -160,13 +166,8 @@ func processListFile(ctx context.Context, filter *scopedBloom, file *updater.Fil
return batchPut(nil) return batchPut(nil)
} }
startBatch = func() {
startSafe(processBatch)
}
startBatch() startJob(processBatch)
return g.Wait()
} }
func normalizeEntry(entry *listEntry) { func normalizeEntry(entry *listEntry) {

View File

@@ -15,6 +15,14 @@ var (
module *modules.Module module *modules.Module
) )
const (
filterlistsDisabled = "filterlists:disabled"
filterlistsStaleDataSurvived = "filterlists:staledata"
filterlistsStaleDataDescr = "Removing stale filter list records failed. Some connections may be overblocked."
filterlistsUpdateInProgress = "filterlists:update-in-progress"
filterlistsUpdateInProgressDescr = "Performance slightly degraded during list update."
)
// booleans mainly used to decouple the module // booleans mainly used to decouple the module
// during testing. // during testing.
var ( var (
@@ -23,6 +31,8 @@ var (
) )
func init() { func init() {
ignoreNetEnvEvents.Set()
module = modules.Register("filterlists", prep, start, nil, "core", "netenv") module = modules.Register("filterlists", prep, start, nil, "core", "netenv")
} }

View File

@@ -28,8 +28,10 @@ func tryListUpdate(ctx context.Context) error {
return err return err
} }
// if the module is in an error state resolve that right now. // if the module is in an error, warning or hint state resolve that right now.
module.Resolve(filterlistsDisabled) module.Resolve(filterlistsDisabled)
module.Resolve(filterlistsStaleDataSurvived)
module.Resolve(filterlistsUpdateInProgress)
return nil return nil
} }
@@ -38,11 +40,15 @@ func performUpdate(ctx context.Context) error {
log.Debugf("intel/filterlists: upgrade already in progress") log.Debugf("intel/filterlists: upgrade already in progress")
return nil return nil
} }
defer updateInProgress.UnSet()
module.Hint(filterlistsUpdateInProgress, filterlistsUpdateInProgressDescr)
upgradables, err := getUpgradableFiles() upgradables, err := getUpgradableFiles()
if err != nil { if err != nil {
return err return err
} }
log.Debugf("intel/filterlists: resources to update: %v", upgradables)
if len(upgradables) == 0 { if len(upgradables) == 0 {
log.Debugf("intel/filterlists: ignoring update, latest version is already used") log.Debugf("intel/filterlists: ignoring update, latest version is already used")
@@ -55,7 +61,7 @@ func performUpdate(ctx context.Context) error {
// perform the actual upgrade by processing each file // perform the actual upgrade by processing each file
// in the returned order. // in the returned order.
for idx, file := range upgradables { for idx, file := range upgradables {
log.Debugf("applying update %s version %s", file.Identifier(), file.Version()) log.Debugf("intel/filterlists: applying update (%d) %s version %s", idx, file.Identifier(), file.Version())
if file == baseFile { if file == baseFile {
if idx != 0 { if idx != 0 {
@@ -101,7 +107,13 @@ func performUpdate(ctx context.Context) error {
// been updated now. Once we are done, start a worker // been updated now. Once we are done, start a worker
// for that purpose. // for that purpose.
if cleanupRequired { if cleanupRequired {
defer module.StartWorker("filterlists:cleanup", removeAllObsoleteFilterEntries) if err := module.RunWorker("filterlists:cleanup", removeAllObsoleteFilterEntries); err != nil {
// if we failed to remove all stale cache entries
// we abort now WITHOUT updating the database version. This means
// we'll try again during the next update.
module.Warning(filterlistsStaleDataSurvived, filterlistsStaleDataDescr)
return fmt.Errorf("failed to cleanup stale cache records: %w", err)
}
} }
// try to save the highest version of our files. // try to save the highest version of our files.
@@ -114,8 +126,9 @@ func performUpdate(ctx context.Context) error {
} }
func removeAllObsoleteFilterEntries(_ context.Context) error { func removeAllObsoleteFilterEntries(_ context.Context) error {
log.Infof("intel/filterlists: cleanup task started, removing obsolete filter list entries ...")
for { for {
done, err := removeObsoleteFilterEntries(1000) done, err := removeObsoleteFilterEntries(10000)
if err != nil { if err != nil {
return err return err
} }
@@ -127,7 +140,6 @@ func removeAllObsoleteFilterEntries(_ context.Context) error {
} }
func removeObsoleteFilterEntries(batchSize int) (bool, error) { func removeObsoleteFilterEntries(batchSize int) (bool, error) {
log.Infof("intel/filterlists: cleanup task started, removing obsolete filter list entries ...")
iter, err := cache.Query( iter, err := cache.Query(
query.New(filterListKeyPrefix).Where( query.New(filterListKeyPrefix).Where(

View File

@@ -157,9 +157,13 @@ func DisableUpdateSchedule() error {
} }
func checkForUpdates(ctx context.Context) error { func checkForUpdates(ctx context.Context) error {
if err := registry.UpdateIndexes(); err != nil {
return fmt.Errorf("updates: failed to update indexes: %w", err)
}
err := registry.DownloadUpdates(ctx) err := registry.DownloadUpdates(ctx)
if err != nil { if err != nil {
return fmt.Errorf("updates: failed to update: %s", err) return fmt.Errorf("updates: failed to update: %w", err)
} }
module.TriggerEvent(ResourceUpdateEvent, nil) module.TriggerEvent(ResourceUpdateEvent, nil)
return nil return nil