From 71f6f093845f212890f2dfee7594b6fe12496626 Mon Sep 17 00:00:00 2001 From: Daniel Date: Fri, 28 Feb 2025 11:38:27 +0100 Subject: [PATCH] Use waitgroup instead of mutex for sqlite storage --- base/database/storage/sqlite/sqlite.go | 60 ++++++++++++-------------- 1 file changed, 28 insertions(+), 32 deletions(-) diff --git a/base/database/storage/sqlite/sqlite.go b/base/database/storage/sqlite/sqlite.go index 38f17ed4..325da138 100644 --- a/base/database/storage/sqlite/sqlite.go +++ b/base/database/storage/sqlite/sqlite.go @@ -33,9 +33,9 @@ import ( type SQLite struct { name string - db *sql.DB - bob bob.DB - lock sync.RWMutex + db *sql.DB + bob bob.DB + wg sync.WaitGroup ctx context.Context cancelCtx context.CancelFunc @@ -102,8 +102,8 @@ func openSQLite(name, location string, printStmts bool) (*SQLite, error) { // Get returns a database record. func (db *SQLite) Get(key string) (record.Record, error) { - db.lock.RLock() - defer db.lock.RUnlock() + db.wg.Add(1) + defer db.wg.Done() // Get record from database. r, err := models.FindRecord(db.ctx, db.bob, key) @@ -137,6 +137,9 @@ func (db *SQLite) Put(r record.Record) (record.Record, error) { } func (db *SQLite) putRecord(r record.Record, tx *bob.Tx) (record.Record, error) { + db.wg.Add(1) + defer db.wg.Done() + // Lock record if in a transaction. if tx != nil { r.Lock() @@ -170,10 +173,6 @@ func (db *SQLite) putRecord(r record.Record, tx *bob.Tx) (record.Record, error) Crownjewel: omit.From(m.IsCrownJewel()), } - // Lock for writing. - db.lock.Lock() - defer db.lock.Unlock() - // Simulate upsert with custom selection on conflict. dbQuery := models.Records.Insert( &setter, @@ -197,10 +196,8 @@ func (db *SQLite) putRecord(r record.Record, tx *bob.Tx) (record.Record, error) // PutMany stores many records in the database. func (db *SQLite) PutMany(shadowDelete bool) (chan<- record.Record, <-chan error) { - db.lock.Lock() - defer db.lock.Unlock() - // we could lock for every record, but we want to have the same behaviour - // as the other storage backends, especially for testing. + db.wg.Add(1) + defer db.wg.Done() batch := make(chan record.Record, 100) errs := make(chan error, 1) @@ -245,9 +242,8 @@ func (db *SQLite) PutMany(shadowDelete bool) (chan<- record.Record, <-chan error // Delete deletes a record from the database. func (db *SQLite) Delete(key string) error { - // Lock for writing. - db.lock.Lock() - defer db.lock.Unlock() + db.wg.Add(1) + defer db.wg.Done() toDelete := &models.Record{Key: key} return toDelete.Delete(db.ctx, db.bob) @@ -255,6 +251,9 @@ func (db *SQLite) Delete(key string) error { // Query returns a an iterator for the supplied query. func (db *SQLite) Query(q *query.Query, local, internal bool) (*iterator.Iterator, error) { + db.wg.Add(1) + defer db.wg.Done() + _, err := q.Check() if err != nil { return nil, fmt.Errorf("invalid query: %w", err) @@ -267,6 +266,9 @@ func (db *SQLite) Query(q *query.Query, local, internal bool) (*iterator.Iterato } func (db *SQLite) queryExecutor(queryIter *iterator.Iterator, q *query.Query, local, internal bool) { + db.wg.Add(1) + defer db.wg.Done() + // Build query. var recordQuery *sqlite.ViewQuery[*models.Record, models.RecordSlice] if q.DatabaseKeyPrefix() != "" { @@ -278,9 +280,7 @@ func (db *SQLite) queryExecutor(queryIter *iterator.Iterator, q *query.Query, lo } // Get cursor to go over all records in the query. - db.lock.RLock() cursor, err := models.RecordsQuery.Cursor(recordQuery, db.ctx, db.bob) - db.lock.RUnlock() if err != nil { queryIter.Finish(err) return @@ -352,11 +352,11 @@ recordsLoop: // Purge deletes all records that match the given query. It returns the number of successful deletes and an error. func (db *SQLite) Purge(ctx context.Context, q *query.Query, local, internal, shadowDelete bool) (int, error) { + db.wg.Add(1) + defer db.wg.Done() + // Optimize for local and internal queries without where clause and without shadow delete. if local && internal && !shadowDelete && !q.HasWhereCondition() { - db.lock.Lock() - defer db.lock.Unlock() - // First count entries (SQLite does not support affected rows) n, err := models.Records.Query( models.SelectWhere.Records.Key.Like(q.DatabaseKeyPrefix()+"%"), @@ -374,9 +374,6 @@ func (db *SQLite) Purge(ctx context.Context, q *query.Query, local, internal, sh // Optimize for local and internal queries without where clause, but with shadow delete. if local && internal && shadowDelete && !q.HasWhereCondition() { - db.lock.Lock() - defer db.lock.Unlock() - // First count entries (SQLite does not support affected rows) n, err := models.Records.Query( models.SelectWhere.Records.Key.Like(q.DatabaseKeyPrefix()+"%"), @@ -429,8 +426,8 @@ func (db *SQLite) Injected() bool { // MaintainRecordStates maintains records states in the database. func (db *SQLite) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time, shadowDelete bool) error { - db.lock.Lock() - defer db.lock.Unlock() + db.wg.Add(1) + defer db.wg.Done() now := time.Now().Unix() purgeThreshold := purgeDeletedBefore.Unix() @@ -471,8 +468,8 @@ func (db *SQLite) MaintainRecordStates(ctx context.Context, purgeDeletedBefore t } func (db *SQLite) Maintain(ctx context.Context) error { - db.lock.Lock() - defer db.lock.Unlock() + db.wg.Add(1) + defer db.wg.Done() // Remove up to about 100KB of SQLite pages from the freelist on every run. // (Assuming 4KB page size.) @@ -481,8 +478,8 @@ func (db *SQLite) Maintain(ctx context.Context) error { } func (db *SQLite) MaintainThorough(ctx context.Context) error { - db.lock.Lock() - defer db.lock.Unlock() + db.wg.Add(1) + defer db.wg.Done() // Remove all pages from the freelist. _, err := db.db.ExecContext(ctx, "PRAGMA incremental_vacuum;") @@ -491,8 +488,7 @@ func (db *SQLite) MaintainThorough(ctx context.Context) error { // Shutdown shuts down the database. func (db *SQLite) Shutdown() error { - db.lock.Lock() - defer db.lock.Unlock() + db.wg.Wait() db.cancelCtx() return db.bob.Close()