Replace dataroot module with BinDir and DataDir on instance, adapt modules

This commit is contained in:
Daniel
2024-11-06 10:48:02 +01:00
parent 0f3f3c360f
commit 7bc1c3b764
39 changed files with 819 additions and 482 deletions

View File

@@ -39,9 +39,9 @@ var (
filterListLock sync.RWMutex
// Updater files for tracking upgrades.
baseFile *updates.File
intermediateFile *updates.File
urgentFile *updates.File
baseFile *updates.Artifact
intermediateFile *updates.Artifact
urgentFile *updates.Artifact
filterListsLoaded chan struct{}
)
@@ -77,7 +77,7 @@ func isLoaded() bool {
// processListFile opens the latest version of file and decodes it's DSDL
// content. It calls processEntry for each decoded filterlists entry.
func processListFile(ctx context.Context, filter *scopedBloom, file *updates.File) error {
func processListFile(ctx context.Context, filter *scopedBloom, file *updates.Artifact) error {
f, err := os.Open(file.Path())
if err != nil {
return err

View File

@@ -162,7 +162,7 @@ func getListIndexFromCache() (*ListIndexFile, error) {
var (
// listIndexUpdate must only be used by updateListIndex.
listIndexUpdate *updates.File
listIndexUpdate *updates.Artifact
listIndexUpdateLock sync.Mutex
)

View File

@@ -63,7 +63,7 @@ func performUpdate(ctx context.Context) error {
// First, update the list index.
err := updateListIndex()
if err != nil {
log.Errorf("intel/filterlists: failed update list index: %s", err)
log.Warningf("intel/filterlists: failed update list index: %s", err)
}
upgradables, err := getUpgradableFiles()
@@ -83,7 +83,7 @@ func performUpdate(ctx context.Context) error {
// perform the actual upgrade by processing each file
// in the returned order.
for idx, file := range upgradables {
log.Debugf("intel/filterlists: applying update (%d) %s version %s", idx, file.Identifier(), file.Version())
log.Debugf("intel/filterlists: applying update (%d) %s version %s", idx, file.Filename, file.Version)
if file == baseFile {
if idx != 0 {
@@ -101,7 +101,7 @@ func performUpdate(ctx context.Context) error {
}
if err := processListFile(ctx, filterToUpdate, file); err != nil {
return fmt.Errorf("failed to process upgrade %s: %w", file.Identifier(), err)
return fmt.Errorf("failed to process upgrade %s version %s: %w", file.Filename, file.Version, err)
}
}
@@ -145,10 +145,10 @@ func performUpdate(ctx context.Context) error {
// try to save the highest version of our files.
highestVersion := upgradables[len(upgradables)-1]
if err := setCacheDatabaseVersion(highestVersion.Version()); err != nil {
if err := setCacheDatabaseVersion(highestVersion.Version); err != nil {
log.Errorf("intel/filterlists: failed to save cache database version: %s", err)
} else {
log.Infof("intel/filterlists: successfully migrated cache database to %s", highestVersion.Version())
log.Infof("intel/filterlists: successfully migrated cache database to %s", highestVersion.Version)
}
// The list update succeeded, resolve any states.
@@ -174,51 +174,51 @@ func removeAllObsoleteFilterEntries(wc *mgr.WorkerCtx) error {
// getUpgradableFiles returns a slice of filterlists files
// that should be updated. The files MUST be updated and
// processed in the returned order!
func getUpgradableFiles() ([]*updates.File, error) {
var updateOrder []*updates.File
func getUpgradableFiles() ([]*updates.Artifact, error) {
var updateOrder []*updates.Artifact
// cacheDBInUse := isLoaded()
cacheDBInUse := isLoaded()
// if baseFile == nil || !cacheDBInUse { // TODO(vladimir): || baseFile.UpgradeAvailable()
// var err error
// baseFile, err = module.instance.Updates().GetFile(baseListFilePath)
// if err != nil {
// return nil, err
// }
// log.Tracef("intel/filterlists: base file needs update, selected version %s", baseFile.Version())
// updateOrder = append(updateOrder, baseFile)
// }
newBaseFile, err := module.instance.IntelUpdates().GetFile(baseListFilePath)
if err != nil {
log.Warningf("intel/filterlists: failed to get base update: %s", err)
} else if newer, _ := newBaseFile.IsNewerThan(baseFile); newer || !cacheDBInUse {
log.Tracef("intel/filterlists: base file needs update to version %s", newBaseFile.Version)
if newBaseFile.SemVer() == nil {
log.Warningf("intel/filterlists: base file needs update to version %s, but semver is invalid", newBaseFile.Version)
} else {
updateOrder = append(updateOrder, newBaseFile)
}
}
// if intermediateFile == nil || intermediateFile.UpgradeAvailable() || !cacheDBInUse {
// var err error
// intermediateFile, err = getFile(intermediateListFilePath)
// if err != nil && !errors.Is(err, updater.ErrNotFound) {
// return nil, err
// }
newIntermediateFile, err := module.instance.IntelUpdates().GetFile(intermediateListFilePath)
if err != nil {
log.Warningf("intel/filterlists: failed to get intermediate update: %s", err)
} else if newer, _ := newIntermediateFile.IsNewerThan(intermediateFile); newer || !cacheDBInUse {
log.Tracef("intel/filterlists: intermediate file needs update to version %s", newIntermediateFile.Version)
if newIntermediateFile.SemVer() == nil {
log.Warningf("intel/filterlists: intermediate file needs update to version %s, but semver is invalid", newIntermediateFile.Version)
} else {
updateOrder = append(updateOrder, newIntermediateFile)
}
}
// if err == nil {
// log.Tracef("intel/filterlists: intermediate file needs update, selected version %s", intermediateFile.Version())
// updateOrder = append(updateOrder, intermediateFile)
// }
// }
// if urgentFile == nil || urgentFile.UpgradeAvailable() || !cacheDBInUse {
// var err error
// urgentFile, err = getFile(urgentListFilePath)
// if err != nil && !errors.Is(err, updater.ErrNotFound) {
// return nil, err
// }
// if err == nil {
// log.Tracef("intel/filterlists: urgent file needs update, selected version %s", urgentFile.Version())
// updateOrder = append(updateOrder, urgentFile)
// }
// }
newUrgentFile, err := module.instance.IntelUpdates().GetFile(urgentListFilePath)
if err != nil {
log.Warningf("intel/filterlists: failed to get urgent update: %s", err)
} else if newer, _ := newUrgentFile.IsNewerThan(urgentFile); newer || !cacheDBInUse {
log.Tracef("intel/filterlists: urgent file needs update to version %s", newUrgentFile.Version)
if newUrgentFile.SemVer() == nil {
log.Warningf("intel/filterlists: urgent file needs update to version %s, but semver is invalid", newUrgentFile.Version)
} else {
updateOrder = append(updateOrder, newUrgentFile)
}
}
return resolveUpdateOrder(updateOrder)
}
func resolveUpdateOrder(updateOrder []*updates.File) ([]*updates.File, error) {
func resolveUpdateOrder(updateOrder []*updates.Artifact) ([]*updates.Artifact, error) {
// sort the update order by ascending version
sort.Sort(byAscVersion(updateOrder))
log.Tracef("intel/filterlists: order of updates: %v", updateOrder)
@@ -239,9 +239,8 @@ func resolveUpdateOrder(updateOrder []*updates.File) ([]*updates.File, error) {
startAtIdx := -1
for idx, file := range updateOrder {
ver, _ := version.NewSemver(file.Version())
log.Tracef("intel/filterlists: checking file with version %s against %s", ver, cacheDBVersion)
if ver.GreaterThan(cacheDBVersion) && (startAtIdx == -1 || file == baseFile) {
log.Tracef("intel/filterlists: checking file with version %s against %s", file.SemVer(), cacheDBVersion)
if file.SemVer().GreaterThan(cacheDBVersion) && (startAtIdx == -1 || file == baseFile) {
startAtIdx = idx
}
}
@@ -258,15 +257,12 @@ func resolveUpdateOrder(updateOrder []*updates.File) ([]*updates.File, error) {
return updateOrder[startAtIdx:], nil
}
type byAscVersion []*updates.File
type byAscVersion []*updates.Artifact
func (fs byAscVersion) Len() int { return len(fs) }
func (fs byAscVersion) Less(i, j int) bool {
vi, _ := version.NewSemver(fs[i].Version())
vj, _ := version.NewSemver(fs[j].Version())
return vi.LessThan(vj)
return fs[i].SemVer().LessThan(fs[j].SemVer())
}
func (fs byAscVersion) Swap(i, j int) {

View File

@@ -17,6 +17,12 @@ var worker *updateWorker
func init() {
worker = &updateWorker{
trigger: make(chan struct{}),
v4: updateBroadcaster{
dbName: v4MMDBResource,
},
v6: updateBroadcaster{
dbName: v6MMDBResource,
},
}
}
@@ -27,26 +33,50 @@ const (
type geoIPDB struct {
*maxminddb.Reader
file *updates.File
update *updates.Artifact
}
// updateBroadcaster stores a geoIPDB and provides synchronized
// access to the MMDB reader. It also supports broadcasting to
// multiple waiters when a new database becomes available.
type updateBroadcaster struct {
rw sync.RWMutex
db *geoIPDB
rw sync.RWMutex
db *geoIPDB
dbName string
waiter chan struct{}
}
// NeedsUpdate returns true if the current broadcaster needs a
// database update.
func (ub *updateBroadcaster) NeedsUpdate() bool {
// AvailableUpdate returns a new update artifact if the current broadcaster
// needs a database update.
func (ub *updateBroadcaster) AvailableUpdate() *updates.Artifact {
ub.rw.RLock()
defer ub.rw.RUnlock()
return ub.db == nil // TODO(vladimir) is this needed: || ub.db.file.UpgradeAvailable()
// Get artifact.
artifact, err := module.instance.IntelUpdates().GetFile(ub.dbName)
if err != nil {
// Check if the geoip database is included in the binary index instead.
// TODO: Remove when intelhub builds the geoip database.
if artifact2, err2 := module.instance.BinaryUpdates().GetFile(ub.dbName); err2 == nil {
artifact = artifact2
err = nil
} else {
log.Warningf("geoip: failed to get geoip update: %s", err)
return nil
}
}
// Return artifact if not yet initialized.
if ub.db == nil {
return artifact
}
// Compare and return artifact only when confirmed newer.
if newer, _ := artifact.IsNewerThan(ub.db.update); newer {
return artifact
}
return nil
}
// ReplaceDatabase replaces (or initially sets) the mmdb database.
@@ -153,16 +183,18 @@ func (upd *updateWorker) start() {
func (upd *updateWorker) run(ctx *mgr.WorkerCtx) error {
for {
if upd.v4.NeedsUpdate() {
if v4, err := getGeoIPDB(v4MMDBResource); err == nil {
update := upd.v4.AvailableUpdate()
if update != nil {
if v4, err := getGeoIPDB(update); err == nil {
upd.v4.ReplaceDatabase(v4)
} else {
log.Warningf("geoip: failed to get v4 database: %s", err)
}
}
if upd.v6.NeedsUpdate() {
if v6, err := getGeoIPDB(v6MMDBResource); err == nil {
update = upd.v6.AvailableUpdate()
if update != nil {
if v6, err := getGeoIPDB(update); err == nil {
upd.v6.ReplaceDatabase(v6)
} else {
log.Warningf("geoip: failed to get v6 database: %s", err)
@@ -177,36 +209,17 @@ func (upd *updateWorker) run(ctx *mgr.WorkerCtx) error {
}
}
func getGeoIPDB(resource string) (*geoIPDB, error) {
log.Debugf("geoip: opening database %s", resource)
func getGeoIPDB(update *updates.Artifact) (*geoIPDB, error) {
log.Debugf("geoip: opening database %s", update.Path())
file, err := open(resource)
if err != nil {
return nil, err
}
reader, err := maxminddb.Open(file.Path())
reader, err := maxminddb.Open(update.Path())
if err != nil {
return nil, fmt.Errorf("failed to open: %w", err)
}
log.Debugf("geoip: successfully opened database %s", resource)
log.Debugf("geoip: successfully opened database %s", update.Filename)
return &geoIPDB{
Reader: reader,
file: file,
update: update,
}, nil
}
func open(resource string) (*updates.File, error) {
f, err := module.instance.IntelUpdates().GetFile(resource)
if err != nil {
return nil, fmt.Errorf("getting file: %w", err)
}
// unpacked, err := f.Unpack(".gz", updater.UnpackGZIP)
// if err != nil {
// return nil, "", fmt.Errorf("unpacking file: %w", err)
// }
return f, nil
}

View File

@@ -66,5 +66,6 @@ func New(instance instance) (*GeoIP, error) {
}
type instance interface {
BinaryUpdates() *updates.Updater
IntelUpdates() *updates.Updater
}