Implement review suggestions
This commit is contained in:
@@ -38,7 +38,7 @@ type updateBroadcaster struct {
|
|||||||
rw sync.RWMutex
|
rw sync.RWMutex
|
||||||
db *geoIPDB
|
db *geoIPDB
|
||||||
|
|
||||||
waiters []chan struct{}
|
waiter chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// NeedsUpdate returns true if the current broadcaster needs a
|
// NeedsUpdate returns true if the current broadcaster needs a
|
||||||
@@ -67,20 +67,21 @@ func (ub *updateBroadcaster) ReplaceDatabase(db *geoIPDB) {
|
|||||||
// notifyWaiters notifies and removes all waiters. Must be called
|
// notifyWaiters notifies and removes all waiters. Must be called
|
||||||
// with ub.rw locked.
|
// with ub.rw locked.
|
||||||
func (ub *updateBroadcaster) notifyWaiters() {
|
func (ub *updateBroadcaster) notifyWaiters() {
|
||||||
waiters := ub.waiters
|
waiter := ub.waiter
|
||||||
ub.waiters = nil
|
ub.waiter = nil
|
||||||
for _, c := range waiters {
|
close(waiter)
|
||||||
close(c)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// getWaiter appends and returns a new waiter channel that gets closed
|
// getWaiter appends and returns a new waiter channel that gets closed
|
||||||
// when a new database version is available. Must be called with
|
// when a new database version is available. Must be called with
|
||||||
// ub.rw locked.
|
// ub.rw locked.
|
||||||
func (ub *updateBroadcaster) getWaiter() chan struct{} {
|
func (ub *updateBroadcaster) getWaiter() chan struct{} {
|
||||||
ch := make(chan struct{})
|
if ub.waiter != nil {
|
||||||
ub.waiters = append(ub.waiters, ch)
|
return ub.waiter
|
||||||
return ch
|
}
|
||||||
|
|
||||||
|
ub.waiter = make(chan struct{})
|
||||||
|
return ub.waiter
|
||||||
}
|
}
|
||||||
|
|
||||||
type updateWorker struct {
|
type updateWorker struct {
|
||||||
@@ -149,21 +150,7 @@ func (upd *updateWorker) start() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (upd *updateWorker) run(ctx context.Context) error {
|
func (upd *updateWorker) run(ctx context.Context) error {
|
||||||
firstTime := true
|
|
||||||
for {
|
for {
|
||||||
// update immediately if we're just got started (that happens in
|
|
||||||
// triggerUpdate() and we might get started a bit late for the
|
|
||||||
// channel notification to be sent). If not, wait for the next
|
|
||||||
// trigger our our ctx to be cancelled during shutdown.
|
|
||||||
if !firstTime {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return nil
|
|
||||||
case <-upd.trigger:
|
|
||||||
}
|
|
||||||
}
|
|
||||||
firstTime = false
|
|
||||||
|
|
||||||
if upd.v4.NeedsUpdate() {
|
if upd.v4.NeedsUpdate() {
|
||||||
if v4, err := getGeoIPDB(v4MMDBResource); err == nil {
|
if v4, err := getGeoIPDB(v4MMDBResource); err == nil {
|
||||||
upd.v4.ReplaceDatabase(v4)
|
upd.v4.ReplaceDatabase(v4)
|
||||||
@@ -179,6 +166,12 @@ func (upd *updateWorker) run(ctx context.Context) error {
|
|||||||
log.Warningf("geoip: failed to get v6 database: %s", err)
|
log.Warningf("geoip: failed to get v6 database: %s", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil
|
||||||
|
case <-upd.trigger:
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user