Merge branch 'v2.0' into task/refactor-spn

This commit is contained in:
Natanael Rodriguez Ramos
2025-05-28 11:41:10 +01:00
53 changed files with 2417 additions and 141 deletions

View File

@@ -264,6 +264,20 @@ func (c *Controller) Purge(ctx context.Context, q *query.Query, local, internal
return 0, ErrNotImplemented
}
// PurgeOlderThan deletes all records last updated before the given time.
// It returns the number of successful deletes and an error.
func (c *Controller) PurgeOlderThan(ctx context.Context, prefix string, purgeBefore time.Time, local, internal bool) (int, error) {
if shuttingDown.IsSet() {
return 0, ErrShuttingDown
}
if purger, ok := c.storage.(storage.PurgeOlderThan); ok {
return purger.PurgeOlderThan(ctx, prefix, purgeBefore, local, internal, c.shadowDelete)
}
return 0, ErrNotImplemented
}
// Shutdown shuts down the storage.
func (c *Controller) Shutdown() error {
return c.storage.Shutdown()

View File

@@ -562,6 +562,27 @@ func (i *Interface) Purge(ctx context.Context, q *query.Query) (int, error) {
return db.Purge(ctx, q, i.options.Local, i.options.Internal)
}
// PurgeOlderThan deletes all records last updated before the given time.
// It returns the number of successful deletes and an error.
func (i *Interface) PurgeOlderThan(ctx context.Context, prefix string, purgeBefore time.Time) (int, error) {
dbName, dbKeyPrefix := record.ParseKey(prefix)
if dbName == "" {
return 0, errors.New("unknown database")
}
db, err := getController(dbName)
if err != nil {
return 0, err
}
// Check if database is read only before we add to the cache.
if db.ReadOnly() {
return 0, ErrReadOnly
}
return db.PurgeOlderThan(ctx, dbKeyPrefix, purgeBefore, i.options.Local, i.options.Internal)
}
// Subscribe subscribes to updates matching the given query.
func (i *Interface) Subscribe(q *query.Query) (*Subscription, error) {
_, err := q.Check()

View File

@@ -99,6 +99,11 @@ func (q *Query) MatchesKey(dbKey string) bool {
return strings.HasPrefix(dbKey, q.dbKeyPrefix)
}
// HasWhereCondition returns whether the query has a "where" condition set.
func (q *Query) HasWhereCondition() bool {
return q.where != nil
}
// MatchesRecord checks whether the query matches the supplied database record (value only).
func (q *Query) MatchesRecord(r record.Record) bool {
if q.where == nil {

View File

@@ -102,7 +102,7 @@ func (b *Base) SetMeta(meta *Meta) {
b.meta = meta
}
// Marshal marshals the object, without the database key or metadata. It returns nil if the record is deleted.
// Marshal marshals the format and data.
func (b *Base) Marshal(self Record, format uint8) ([]byte, error) {
if b.Meta() == nil {
return nil, errors.New("missing meta")
@@ -119,7 +119,20 @@ func (b *Base) Marshal(self Record, format uint8) ([]byte, error) {
return dumped, nil
}
// MarshalRecord packs the object, including metadata, into a byte array for saving in a database.
// MarshalDataOnly marshals the data only.
func (b *Base) MarshalDataOnly(self Record, format uint8) ([]byte, error) {
if b.Meta() == nil {
return nil, errors.New("missing meta")
}
if b.Meta().Deleted > 0 {
return nil, nil
}
return dsd.DumpWithoutIdentifier(self, format, "")
}
// MarshalRecord marshals the data, format and metadata.
func (b *Base) MarshalRecord(self Record) ([]byte, error) {
if b.Meta() == nil {
return nil, errors.New("missing meta")

View File

@@ -49,11 +49,21 @@ func (m *Meta) MakeCrownJewel() {
m.cronjewel = true
}
// IsCrownJewel returns whether the database record is marked as a crownjewel.
func (m *Meta) IsCrownJewel() bool {
return m.cronjewel
}
// MakeSecret sets the database record as secret, meaning that it may only be used internally, and not by interfacing processes, such as the UI.
func (m *Meta) MakeSecret() {
m.secret = true
}
// IsSecret returns whether the database record is marked as a secret.
func (m *Meta) IsSecret() bool {
return m.secret
}
// Update updates the internal meta states and should be called before writing the record to the database.
func (m *Meta) Update() {
now := time.Now().Unix()

View File

@@ -20,6 +20,7 @@ type Record interface {
// Serialization.
Marshal(self Record, format uint8) ([]byte, error)
MarshalDataOnly(self Record, format uint8) ([]byte, error)
MarshalRecord(self Record) ([]byte, error)
GetAccessor(self Record) accessor.Accessor

View File

@@ -79,7 +79,21 @@ func NewWrapper(key string, meta *Meta, format uint8, data []byte) (*Wrapper, er
}, nil
}
// Marshal marshals the object, without the database key or metadata.
// NewWrapperFromDatabase returns a new record wrapper for the given data.
func NewWrapperFromDatabase(dbName, dbKey string, meta *Meta, format uint8, data []byte) (*Wrapper, error) {
return &Wrapper{
Base{
dbName: dbName,
dbKey: dbKey,
meta: meta,
},
sync.Mutex{},
format,
data,
}, nil
}
// Marshal marshals the format and data.
func (w *Wrapper) Marshal(r Record, format uint8) ([]byte, error) {
if w.Meta() == nil {
return nil, errors.New("missing meta")
@@ -100,7 +114,24 @@ func (w *Wrapper) Marshal(r Record, format uint8) ([]byte, error) {
return data, nil
}
// MarshalRecord packs the object, including metadata, into a byte array for saving in a database.
// MarshalDataOnly marshals the data only.
func (w *Wrapper) MarshalDataOnly(self Record, format uint8) ([]byte, error) {
if w.Meta() == nil {
return nil, errors.New("missing meta")
}
if w.Meta().Deleted > 0 {
return nil, nil
}
if format != dsd.AUTO && format != w.Format {
return nil, errors.New("could not dump model, wrapped object format mismatch")
}
return w.Data, nil
}
// MarshalRecord marshals the data, format and metadata.
func (w *Wrapper) MarshalRecord(r Record) ([]byte, error) {
// Duplication necessary, as the version from Base would call Base.Marshal instead of Wrapper.Marshal

View File

@@ -46,3 +46,8 @@ type Batcher interface {
type Purger interface {
Purge(ctx context.Context, q *query.Query, local, internal, shadowDelete bool) (int, error)
}
// PurgeOlderThan defines the database storage API for backends that support the PurgeOlderThan operation.
type PurgeOlderThan interface {
PurgeOlderThan(ctx context.Context, prefix string, purgeBefore time.Time, local, internal, shadowDelete bool) (int, error)
}

View File

@@ -0,0 +1,6 @@
sqlite:
dsn: "testdata/schema.db"
except:
migrations:
no_factory: true

View File

@@ -0,0 +1,7 @@
-- +migrate Up
-- SQL in section 'Up' is executed when this migration is applied
PRAGMA auto_vacuum = INCREMENTAL; -- https://sqlite.org/pragma.html#pragma_auto_vacuum
-- +migrate Down
-- SQL section 'Down' is executed when this migration is rolled back
PRAGMA auto_vacuum = NONE; -- https://sqlite.org/pragma.html#pragma_auto_vacuum

View File

@@ -0,0 +1,19 @@
-- +migrate Up
-- SQL in section 'Up' is executed when this migration is applied
CREATE TABLE records (
key TEXT PRIMARY KEY,
format SMALLINT,
value BLOB,
created BIGINT NOT NULL,
modified BIGINT NOT NULL,
expires BIGINT DEFAULT 0 NOT NULL,
deleted BIGINT DEFAULT 0 NOT NULL,
secret BOOLEAN DEFAULT FALSE NOT NULL,
crownjewel BOOLEAN DEFAULT FALSE NOT NULL
);
-- +migrate Down
-- SQL section 'Down' is executed when this migration is rolled back
DROP TABLE records;

View File

@@ -0,0 +1,5 @@
development:
dialect: sqlite3
datasource: testdata/schema.db
dir: migrations
table: migrations

View File

@@ -0,0 +1,131 @@
// Code generated by BobGen sqlite v0.30.0. DO NOT EDIT.
// This file is meant to be re-generated in place and/or deleted at any time.
package models
import (
"hash/maphash"
"strings"
"github.com/stephenafamo/bob"
"github.com/stephenafamo/bob/clause"
"github.com/stephenafamo/bob/dialect/sqlite"
"github.com/stephenafamo/bob/dialect/sqlite/dialect"
sqliteDriver "modernc.org/sqlite"
)
var TableNames = struct {
Records string
}{
Records: "records",
}
var ColumnNames = struct {
Records recordColumnNames
}{
Records: recordColumnNames{
Key: "key",
Format: "format",
Value: "value",
Created: "created",
Modified: "modified",
Expires: "expires",
Deleted: "deleted",
Secret: "secret",
Crownjewel: "crownjewel",
},
}
var (
SelectWhere = Where[*dialect.SelectQuery]()
InsertWhere = Where[*dialect.InsertQuery]()
UpdateWhere = Where[*dialect.UpdateQuery]()
DeleteWhere = Where[*dialect.DeleteQuery]()
)
func Where[Q sqlite.Filterable]() struct {
Records recordWhere[Q]
} {
return struct {
Records recordWhere[Q]
}{
Records: buildRecordWhere[Q](RecordColumns),
}
}
var (
SelectJoins = getJoins[*dialect.SelectQuery]
UpdateJoins = getJoins[*dialect.UpdateQuery]
)
type joinSet[Q interface{ aliasedAs(string) Q }] struct {
InnerJoin Q
LeftJoin Q
RightJoin Q
}
func (j joinSet[Q]) AliasedAs(alias string) joinSet[Q] {
return joinSet[Q]{
InnerJoin: j.InnerJoin.aliasedAs(alias),
LeftJoin: j.LeftJoin.aliasedAs(alias),
RightJoin: j.RightJoin.aliasedAs(alias),
}
}
type joins[Q dialect.Joinable] struct{}
func buildJoinSet[Q interface{ aliasedAs(string) Q }, C any, F func(C, string) Q](c C, f F) joinSet[Q] {
return joinSet[Q]{
InnerJoin: f(c, clause.InnerJoin),
LeftJoin: f(c, clause.LeftJoin),
RightJoin: f(c, clause.RightJoin),
}
}
func getJoins[Q dialect.Joinable]() joins[Q] {
return joins[Q]{}
}
type modAs[Q any, C interface{ AliasedAs(string) C }] struct {
c C
f func(C) bob.Mod[Q]
}
func (m modAs[Q, C]) Apply(q Q) {
m.f(m.c).Apply(q)
}
func (m modAs[Q, C]) AliasedAs(alias string) bob.Mod[Q] {
m.c = m.c.AliasedAs(alias)
return m
}
func randInt() int64 {
out := int64(new(maphash.Hash).Sum64())
if out < 0 {
return -out % 10000
}
return out % 10000
}
// ErrUniqueConstraint captures all unique constraint errors by explicitly leaving `s` empty.
var ErrUniqueConstraint = &UniqueConstraintError{s: ""}
type UniqueConstraintError struct {
// s is a string uniquely identifying the constraint in the raw error message returned from the database.
s string
}
func (e *UniqueConstraintError) Error() string {
return e.s
}
func (e *UniqueConstraintError) Is(target error) bool {
err, ok := target.(*sqliteDriver.Error)
if !ok {
return false
}
return err.Code() == 2067 && strings.Contains(err.Error(), e.s)
}

View File

@@ -0,0 +1,9 @@
// Code generated by BobGen sqlite v0.30.0. DO NOT EDIT.
// This file is meant to be re-generated in place and/or deleted at any time.
package models
import "github.com/stephenafamo/bob"
// Make sure the type Record runs hooks after queries
var _ bob.HookableType = &Record{}

View File

@@ -0,0 +1,555 @@
// Code generated by BobGen sqlite v0.30.0. DO NOT EDIT.
// This file is meant to be re-generated in place and/or deleted at any time.
package models
import (
"context"
"io"
"github.com/aarondl/opt/null"
"github.com/aarondl/opt/omit"
"github.com/aarondl/opt/omitnull"
"github.com/stephenafamo/bob"
"github.com/stephenafamo/bob/dialect/sqlite"
"github.com/stephenafamo/bob/dialect/sqlite/dialect"
"github.com/stephenafamo/bob/dialect/sqlite/dm"
"github.com/stephenafamo/bob/dialect/sqlite/sm"
"github.com/stephenafamo/bob/dialect/sqlite/um"
"github.com/stephenafamo/bob/expr"
)
// Record is an object representing the database table.
type Record struct {
Key string `db:"key,pk" `
Format null.Val[int16] `db:"format" `
Value null.Val[[]byte] `db:"value" `
Created int64 `db:"created" `
Modified int64 `db:"modified" `
Expires int64 `db:"expires" `
Deleted int64 `db:"deleted" `
Secret bool `db:"secret" `
Crownjewel bool `db:"crownjewel" `
}
// RecordSlice is an alias for a slice of pointers to Record.
// This should almost always be used instead of []*Record.
type RecordSlice []*Record
// Records contains methods to work with the records table
var Records = sqlite.NewTablex[*Record, RecordSlice, *RecordSetter]("", "records")
// RecordsQuery is a query on the records table
type RecordsQuery = *sqlite.ViewQuery[*Record, RecordSlice]
type recordColumnNames struct {
Key string
Format string
Value string
Created string
Modified string
Expires string
Deleted string
Secret string
Crownjewel string
}
var RecordColumns = buildRecordColumns("records")
type recordColumns struct {
tableAlias string
Key sqlite.Expression
Format sqlite.Expression
Value sqlite.Expression
Created sqlite.Expression
Modified sqlite.Expression
Expires sqlite.Expression
Deleted sqlite.Expression
Secret sqlite.Expression
Crownjewel sqlite.Expression
}
func (c recordColumns) Alias() string {
return c.tableAlias
}
func (recordColumns) AliasedAs(alias string) recordColumns {
return buildRecordColumns(alias)
}
func buildRecordColumns(alias string) recordColumns {
return recordColumns{
tableAlias: alias,
Key: sqlite.Quote(alias, "key"),
Format: sqlite.Quote(alias, "format"),
Value: sqlite.Quote(alias, "value"),
Created: sqlite.Quote(alias, "created"),
Modified: sqlite.Quote(alias, "modified"),
Expires: sqlite.Quote(alias, "expires"),
Deleted: sqlite.Quote(alias, "deleted"),
Secret: sqlite.Quote(alias, "secret"),
Crownjewel: sqlite.Quote(alias, "crownjewel"),
}
}
type recordWhere[Q sqlite.Filterable] struct {
Key sqlite.WhereMod[Q, string]
Format sqlite.WhereNullMod[Q, int16]
Value sqlite.WhereNullMod[Q, []byte]
Created sqlite.WhereMod[Q, int64]
Modified sqlite.WhereMod[Q, int64]
Expires sqlite.WhereMod[Q, int64]
Deleted sqlite.WhereMod[Q, int64]
Secret sqlite.WhereMod[Q, bool]
Crownjewel sqlite.WhereMod[Q, bool]
}
func (recordWhere[Q]) AliasedAs(alias string) recordWhere[Q] {
return buildRecordWhere[Q](buildRecordColumns(alias))
}
func buildRecordWhere[Q sqlite.Filterable](cols recordColumns) recordWhere[Q] {
return recordWhere[Q]{
Key: sqlite.Where[Q, string](cols.Key),
Format: sqlite.WhereNull[Q, int16](cols.Format),
Value: sqlite.WhereNull[Q, []byte](cols.Value),
Created: sqlite.Where[Q, int64](cols.Created),
Modified: sqlite.Where[Q, int64](cols.Modified),
Expires: sqlite.Where[Q, int64](cols.Expires),
Deleted: sqlite.Where[Q, int64](cols.Deleted),
Secret: sqlite.Where[Q, bool](cols.Secret),
Crownjewel: sqlite.Where[Q, bool](cols.Crownjewel),
}
}
// RecordSetter is used for insert/upsert/update operations
// All values are optional, and do not have to be set
// Generated columns are not included
type RecordSetter struct {
Key omit.Val[string] `db:"key,pk" `
Format omitnull.Val[int16] `db:"format" `
Value omitnull.Val[[]byte] `db:"value" `
Created omit.Val[int64] `db:"created" `
Modified omit.Val[int64] `db:"modified" `
Expires omit.Val[int64] `db:"expires" `
Deleted omit.Val[int64] `db:"deleted" `
Secret omit.Val[bool] `db:"secret" `
Crownjewel omit.Val[bool] `db:"crownjewel" `
}
func (s RecordSetter) SetColumns() []string {
vals := make([]string, 0, 9)
if !s.Key.IsUnset() {
vals = append(vals, "key")
}
if !s.Format.IsUnset() {
vals = append(vals, "format")
}
if !s.Value.IsUnset() {
vals = append(vals, "value")
}
if !s.Created.IsUnset() {
vals = append(vals, "created")
}
if !s.Modified.IsUnset() {
vals = append(vals, "modified")
}
if !s.Expires.IsUnset() {
vals = append(vals, "expires")
}
if !s.Deleted.IsUnset() {
vals = append(vals, "deleted")
}
if !s.Secret.IsUnset() {
vals = append(vals, "secret")
}
if !s.Crownjewel.IsUnset() {
vals = append(vals, "crownjewel")
}
return vals
}
func (s RecordSetter) Overwrite(t *Record) {
if !s.Key.IsUnset() {
t.Key, _ = s.Key.Get()
}
if !s.Format.IsUnset() {
t.Format, _ = s.Format.GetNull()
}
if !s.Value.IsUnset() {
t.Value, _ = s.Value.GetNull()
}
if !s.Created.IsUnset() {
t.Created, _ = s.Created.Get()
}
if !s.Modified.IsUnset() {
t.Modified, _ = s.Modified.Get()
}
if !s.Expires.IsUnset() {
t.Expires, _ = s.Expires.Get()
}
if !s.Deleted.IsUnset() {
t.Deleted, _ = s.Deleted.Get()
}
if !s.Secret.IsUnset() {
t.Secret, _ = s.Secret.Get()
}
if !s.Crownjewel.IsUnset() {
t.Crownjewel, _ = s.Crownjewel.Get()
}
}
func (s *RecordSetter) Apply(q *dialect.InsertQuery) {
q.AppendHooks(func(ctx context.Context, exec bob.Executor) (context.Context, error) {
return Records.BeforeInsertHooks.RunHooks(ctx, exec, s)
})
if len(q.Table.Columns) == 0 {
q.Table.Columns = s.SetColumns()
}
q.AppendValues(bob.ExpressionFunc(func(ctx context.Context, w io.Writer, d bob.Dialect, start int) ([]any, error) {
vals := make([]bob.Expression, 0, 9)
if !s.Key.IsUnset() {
vals = append(vals, sqlite.Arg(s.Key))
}
if !s.Format.IsUnset() {
vals = append(vals, sqlite.Arg(s.Format))
}
if !s.Value.IsUnset() {
vals = append(vals, sqlite.Arg(s.Value))
}
if !s.Created.IsUnset() {
vals = append(vals, sqlite.Arg(s.Created))
}
if !s.Modified.IsUnset() {
vals = append(vals, sqlite.Arg(s.Modified))
}
if !s.Expires.IsUnset() {
vals = append(vals, sqlite.Arg(s.Expires))
}
if !s.Deleted.IsUnset() {
vals = append(vals, sqlite.Arg(s.Deleted))
}
if !s.Secret.IsUnset() {
vals = append(vals, sqlite.Arg(s.Secret))
}
if !s.Crownjewel.IsUnset() {
vals = append(vals, sqlite.Arg(s.Crownjewel))
}
return bob.ExpressSlice(ctx, w, d, start, vals, "", ", ", "")
}))
}
func (s RecordSetter) UpdateMod() bob.Mod[*dialect.UpdateQuery] {
return um.Set(s.Expressions()...)
}
func (s RecordSetter) Expressions(prefix ...string) []bob.Expression {
exprs := make([]bob.Expression, 0, 9)
if !s.Key.IsUnset() {
exprs = append(exprs, expr.Join{Sep: " = ", Exprs: []bob.Expression{
sqlite.Quote(append(prefix, "key")...),
sqlite.Arg(s.Key),
}})
}
if !s.Format.IsUnset() {
exprs = append(exprs, expr.Join{Sep: " = ", Exprs: []bob.Expression{
sqlite.Quote(append(prefix, "format")...),
sqlite.Arg(s.Format),
}})
}
if !s.Value.IsUnset() {
exprs = append(exprs, expr.Join{Sep: " = ", Exprs: []bob.Expression{
sqlite.Quote(append(prefix, "value")...),
sqlite.Arg(s.Value),
}})
}
if !s.Created.IsUnset() {
exprs = append(exprs, expr.Join{Sep: " = ", Exprs: []bob.Expression{
sqlite.Quote(append(prefix, "created")...),
sqlite.Arg(s.Created),
}})
}
if !s.Modified.IsUnset() {
exprs = append(exprs, expr.Join{Sep: " = ", Exprs: []bob.Expression{
sqlite.Quote(append(prefix, "modified")...),
sqlite.Arg(s.Modified),
}})
}
if !s.Expires.IsUnset() {
exprs = append(exprs, expr.Join{Sep: " = ", Exprs: []bob.Expression{
sqlite.Quote(append(prefix, "expires")...),
sqlite.Arg(s.Expires),
}})
}
if !s.Deleted.IsUnset() {
exprs = append(exprs, expr.Join{Sep: " = ", Exprs: []bob.Expression{
sqlite.Quote(append(prefix, "deleted")...),
sqlite.Arg(s.Deleted),
}})
}
if !s.Secret.IsUnset() {
exprs = append(exprs, expr.Join{Sep: " = ", Exprs: []bob.Expression{
sqlite.Quote(append(prefix, "secret")...),
sqlite.Arg(s.Secret),
}})
}
if !s.Crownjewel.IsUnset() {
exprs = append(exprs, expr.Join{Sep: " = ", Exprs: []bob.Expression{
sqlite.Quote(append(prefix, "crownjewel")...),
sqlite.Arg(s.Crownjewel),
}})
}
return exprs
}
// FindRecord retrieves a single record by primary key
// If cols is empty Find will return all columns.
func FindRecord(ctx context.Context, exec bob.Executor, KeyPK string, cols ...string) (*Record, error) {
if len(cols) == 0 {
return Records.Query(
SelectWhere.Records.Key.EQ(KeyPK),
).One(ctx, exec)
}
return Records.Query(
SelectWhere.Records.Key.EQ(KeyPK),
sm.Columns(Records.Columns().Only(cols...)),
).One(ctx, exec)
}
// RecordExists checks the presence of a single record by primary key
func RecordExists(ctx context.Context, exec bob.Executor, KeyPK string) (bool, error) {
return Records.Query(
SelectWhere.Records.Key.EQ(KeyPK),
).Exists(ctx, exec)
}
// AfterQueryHook is called after Record is retrieved from the database
func (o *Record) AfterQueryHook(ctx context.Context, exec bob.Executor, queryType bob.QueryType) error {
var err error
switch queryType {
case bob.QueryTypeSelect:
ctx, err = Records.AfterSelectHooks.RunHooks(ctx, exec, RecordSlice{o})
case bob.QueryTypeInsert:
ctx, err = Records.AfterInsertHooks.RunHooks(ctx, exec, RecordSlice{o})
case bob.QueryTypeUpdate:
ctx, err = Records.AfterUpdateHooks.RunHooks(ctx, exec, RecordSlice{o})
case bob.QueryTypeDelete:
ctx, err = Records.AfterDeleteHooks.RunHooks(ctx, exec, RecordSlice{o})
}
return err
}
// PrimaryKeyVals returns the primary key values of the Record
func (o *Record) PrimaryKeyVals() bob.Expression {
return sqlite.Arg(o.Key)
}
func (o *Record) pkEQ() dialect.Expression {
return sqlite.Quote("records", "key").EQ(bob.ExpressionFunc(func(ctx context.Context, w io.Writer, d bob.Dialect, start int) ([]any, error) {
return o.PrimaryKeyVals().WriteSQL(ctx, w, d, start)
}))
}
// Update uses an executor to update the Record
func (o *Record) Update(ctx context.Context, exec bob.Executor, s *RecordSetter) error {
v, err := Records.Update(s.UpdateMod(), um.Where(o.pkEQ())).One(ctx, exec)
if err != nil {
return err
}
*o = *v
return nil
}
// Delete deletes a single Record record with an executor
func (o *Record) Delete(ctx context.Context, exec bob.Executor) error {
_, err := Records.Delete(dm.Where(o.pkEQ())).Exec(ctx, exec)
return err
}
// Reload refreshes the Record using the executor
func (o *Record) Reload(ctx context.Context, exec bob.Executor) error {
o2, err := Records.Query(
SelectWhere.Records.Key.EQ(o.Key),
).One(ctx, exec)
if err != nil {
return err
}
*o = *o2
return nil
}
// AfterQueryHook is called after RecordSlice is retrieved from the database
func (o RecordSlice) AfterQueryHook(ctx context.Context, exec bob.Executor, queryType bob.QueryType) error {
var err error
switch queryType {
case bob.QueryTypeSelect:
ctx, err = Records.AfterSelectHooks.RunHooks(ctx, exec, o)
case bob.QueryTypeInsert:
ctx, err = Records.AfterInsertHooks.RunHooks(ctx, exec, o)
case bob.QueryTypeUpdate:
ctx, err = Records.AfterUpdateHooks.RunHooks(ctx, exec, o)
case bob.QueryTypeDelete:
ctx, err = Records.AfterDeleteHooks.RunHooks(ctx, exec, o)
}
return err
}
func (o RecordSlice) pkIN() dialect.Expression {
if len(o) == 0 {
return sqlite.Raw("NULL")
}
return sqlite.Quote("records", "key").In(bob.ExpressionFunc(func(ctx context.Context, w io.Writer, d bob.Dialect, start int) ([]any, error) {
pkPairs := make([]bob.Expression, len(o))
for i, row := range o {
pkPairs[i] = row.PrimaryKeyVals()
}
return bob.ExpressSlice(ctx, w, d, start, pkPairs, "", ", ", "")
}))
}
// copyMatchingRows finds models in the given slice that have the same primary key
// then it first copies the existing relationships from the old model to the new model
// and then replaces the old model in the slice with the new model
func (o RecordSlice) copyMatchingRows(from ...*Record) {
for i, old := range o {
for _, new := range from {
if new.Key != old.Key {
continue
}
o[i] = new
break
}
}
}
// UpdateMod modifies an update query with "WHERE primary_key IN (o...)"
func (o RecordSlice) UpdateMod() bob.Mod[*dialect.UpdateQuery] {
return bob.ModFunc[*dialect.UpdateQuery](func(q *dialect.UpdateQuery) {
q.AppendHooks(func(ctx context.Context, exec bob.Executor) (context.Context, error) {
return Records.BeforeUpdateHooks.RunHooks(ctx, exec, o)
})
q.AppendLoader(bob.LoaderFunc(func(ctx context.Context, exec bob.Executor, retrieved any) error {
var err error
switch retrieved := retrieved.(type) {
case *Record:
o.copyMatchingRows(retrieved)
case []*Record:
o.copyMatchingRows(retrieved...)
case RecordSlice:
o.copyMatchingRows(retrieved...)
default:
// If the retrieved value is not a Record or a slice of Record
// then run the AfterUpdateHooks on the slice
_, err = Records.AfterUpdateHooks.RunHooks(ctx, exec, o)
}
return err
}))
q.AppendWhere(o.pkIN())
})
}
// DeleteMod modifies an delete query with "WHERE primary_key IN (o...)"
func (o RecordSlice) DeleteMod() bob.Mod[*dialect.DeleteQuery] {
return bob.ModFunc[*dialect.DeleteQuery](func(q *dialect.DeleteQuery) {
q.AppendHooks(func(ctx context.Context, exec bob.Executor) (context.Context, error) {
return Records.BeforeDeleteHooks.RunHooks(ctx, exec, o)
})
q.AppendLoader(bob.LoaderFunc(func(ctx context.Context, exec bob.Executor, retrieved any) error {
var err error
switch retrieved := retrieved.(type) {
case *Record:
o.copyMatchingRows(retrieved)
case []*Record:
o.copyMatchingRows(retrieved...)
case RecordSlice:
o.copyMatchingRows(retrieved...)
default:
// If the retrieved value is not a Record or a slice of Record
// then run the AfterDeleteHooks on the slice
_, err = Records.AfterDeleteHooks.RunHooks(ctx, exec, o)
}
return err
}))
q.AppendWhere(o.pkIN())
})
}
func (o RecordSlice) UpdateAll(ctx context.Context, exec bob.Executor, vals RecordSetter) error {
if len(o) == 0 {
return nil
}
_, err := Records.Update(vals.UpdateMod(), o.UpdateMod()).All(ctx, exec)
return err
}
func (o RecordSlice) DeleteAll(ctx context.Context, exec bob.Executor) error {
if len(o) == 0 {
return nil
}
_, err := Records.Delete(o.DeleteMod()).Exec(ctx, exec)
return err
}
func (o RecordSlice) ReloadAll(ctx context.Context, exec bob.Executor) error {
if len(o) == 0 {
return nil
}
o2, err := Records.Query(sm.Where(o.pkIN())).All(ctx, exec)
if err != nil {
return err
}
o.copyMatchingRows(o2...)
return nil
}

View File

@@ -0,0 +1,125 @@
package sqlite
import (
"context"
"strconv"
"github.com/stephenafamo/bob"
"github.com/stephenafamo/bob/dialect/sqlite/im"
"github.com/stephenafamo/bob/expr"
"github.com/safing/portmaster/base/database/record"
"github.com/safing/portmaster/base/database/storage/sqlite/models"
"github.com/safing/structures/dsd"
)
var UsePreparedStatements bool = true
// PutMany stores many records in the database.
func (db *SQLite) putManyWithPreparedStmts(shadowDelete bool) (chan<- record.Record, <-chan error) {
batch := make(chan record.Record, 100)
errs := make(chan error, 1)
// Simulate upsert with custom selection on conflict.
rawQuery, _, err := models.Records.Insert(
im.Into("records", "key", "format", "value", "created", "modified", "expires", "deleted", "secret", "crownjewel"),
im.Values(expr.Arg("key"), expr.Arg("format"), expr.Arg("value"), expr.Arg("created"), expr.Arg("modified"), expr.Arg("expires"), expr.Arg("deleted"), expr.Arg("secret"), expr.Arg("crownjewel")),
im.OnConflict("key").DoUpdate(
im.SetExcluded("format", "value", "created", "modified", "expires", "deleted", "secret", "crownjewel"),
),
).Build(db.ctx)
if err != nil {
errs <- err
return batch, errs
}
// Start transaction.
tx, err := db.bob.BeginTx(db.ctx, nil)
if err != nil {
errs <- err
return batch, errs
}
// Create prepared statement WITHIN TRANSACTION.
preparedStmt, err := tx.PrepareContext(db.ctx, rawQuery)
if err != nil {
errs <- err
return batch, errs
}
// start handler
go func() {
// Read all put records.
writeBatch:
for {
select {
case r := <-batch:
if r != nil {
// Write record.
err := writeWithPreparedStatement(db.ctx, &preparedStmt, r)
if err != nil {
errs <- err
break writeBatch
}
} else {
// Finalize transcation.
errs <- tx.Commit()
return
}
case <-db.ctx.Done():
break writeBatch
}
}
// Rollback transaction.
errs <- tx.Rollback()
}()
return batch, errs
}
func writeWithPreparedStatement(ctx context.Context, pStmt *bob.StdPrepared, r record.Record) error {
r.Lock()
defer r.Unlock()
// Serialize to JSON.
data, err := r.MarshalDataOnly(r, dsd.JSON)
if err != nil {
return err
}
// Get Meta.
m := r.Meta()
// Insert.
if len(data) > 0 {
format := strconv.Itoa(dsd.JSON)
_, err = pStmt.ExecContext(
ctx,
r.DatabaseKey(),
format,
data,
m.Created,
m.Modified,
m.Expires,
m.Deleted,
m.IsSecret(),
m.IsCrownJewel(),
)
} else {
_, err = pStmt.ExecContext(
ctx,
r.DatabaseKey(),
nil,
nil,
m.Created,
m.Modified,
m.Expires,
m.Deleted,
m.IsSecret(),
m.IsCrownJewel(),
)
}
return err
}

View File

@@ -0,0 +1,86 @@
package sqlite
import (
"strconv"
"testing"
)
func BenchmarkPutMany(b *testing.B) {
// Configure prepared statement usage.
origSetting := UsePreparedStatements
UsePreparedStatements = false
defer func() {
UsePreparedStatements = origSetting
}()
// Run benchmark.
benchPutMany(b)
}
func BenchmarkPutManyPreparedStmt(b *testing.B) {
// Configure prepared statement usage.
origSetting := UsePreparedStatements
UsePreparedStatements = true
defer func() {
UsePreparedStatements = origSetting
}()
// Run benchmark.
benchPutMany(b)
}
func benchPutMany(b *testing.B) { //nolint:thelper
// Start database.
testDir := b.TempDir()
db, err := openSQLite("test", testDir, false)
if err != nil {
b.Fatal(err)
}
defer func() {
// shutdown
err = db.Shutdown()
if err != nil {
b.Fatal(err)
}
}()
// Start benchmarking.
b.ResetTimer()
// Benchmark PutMany.
records, errs := db.PutMany(false)
for i := range b.N {
// Create test record.
newTestRecord := &TestRecord{
S: "banana",
I: 42,
I8: 42,
I16: 42,
I32: 42,
I64: 42,
UI: 42,
UI8: 42,
UI16: 42,
UI32: 42,
UI64: 42,
F32: 42.42,
F64: 42.42,
B: true,
}
newTestRecord.UpdateMeta()
newTestRecord.SetKey("test:" + strconv.Itoa(i))
select {
case records <- newTestRecord:
case err := <-errs:
b.Fatal(err)
}
}
// Finalize.
close(records)
err = <-errs
if err != nil {
b.Fatal(err)
}
}

View File

@@ -0,0 +1,51 @@
package sqlite
// Base command for sql-migrate:
//go:generate -command migrate go tool github.com/rubenv/sql-migrate/sql-migrate
// Run missing migrations:
//go:generate migrate up --config=migrations_config.yml
// Redo last migration:
// x go:generate migrate redo --config=migrations_config.yml
// Undo all migrations:
// x go:generate migrate down --config=migrations_config.yml
// Generate models with bob:
//go:generate go tool github.com/stephenafamo/bob/gen/bobgen-sqlite
import (
"embed"
migrate "github.com/rubenv/sql-migrate"
"github.com/safing/portmaster/base/database/record"
"github.com/safing/portmaster/base/database/storage/sqlite/models"
)
//go:embed migrations/*
var dbMigrations embed.FS
func getMigrations() migrate.EmbedFileSystemMigrationSource {
return migrate.EmbedFileSystemMigrationSource{
FileSystem: dbMigrations,
Root: "migrations",
}
}
func getMeta(r *models.Record) *record.Meta {
meta := &record.Meta{
Created: r.Created,
Modified: r.Modified,
Expires: r.Expires,
Deleted: r.Deleted,
}
if r.Secret {
meta.MakeSecret()
}
if r.Crownjewel {
meta.MakeCrownJewel()
}
return meta
}

View File

@@ -0,0 +1,566 @@
package sqlite
import (
"context"
"database/sql"
"errors"
"fmt"
"path/filepath"
"sync"
"time"
"github.com/aarondl/opt/omit"
"github.com/aarondl/opt/omitnull"
migrate "github.com/rubenv/sql-migrate"
sqldblogger "github.com/simukti/sqldb-logger"
"github.com/stephenafamo/bob"
"github.com/stephenafamo/bob/dialect/sqlite"
"github.com/stephenafamo/bob/dialect/sqlite/im"
"github.com/stephenafamo/bob/dialect/sqlite/um"
_ "modernc.org/sqlite"
"github.com/safing/portmaster/base/database/accessor"
"github.com/safing/portmaster/base/database/iterator"
"github.com/safing/portmaster/base/database/query"
"github.com/safing/portmaster/base/database/record"
"github.com/safing/portmaster/base/database/storage"
"github.com/safing/portmaster/base/database/storage/sqlite/models"
"github.com/safing/portmaster/base/log"
"github.com/safing/structures/dsd"
)
// Errors.
var (
ErrQueryTimeout = errors.New("query timeout")
)
// SQLite storage.
type SQLite struct {
name string
db *sql.DB
bob bob.DB
wg sync.WaitGroup
ctx context.Context
cancelCtx context.CancelFunc
}
func init() {
_ = storage.Register("sqlite", func(name, location string) (storage.Interface, error) {
return NewSQLite(name, location)
})
}
// NewSQLite creates a sqlite database.
func NewSQLite(name, location string) (*SQLite, error) {
return openSQLite(name, location, false)
}
// openSQLite creates a sqlite database.
func openSQLite(name, location string, printStmts bool) (*SQLite, error) {
dbFile := filepath.Join(location, "db.sqlite")
// Open database file.
// Default settings:
// _time_format = YYYY-MM-DDTHH:MM:SS.SSS
// _txlock = deferred
db, err := sql.Open("sqlite", dbFile)
if err != nil {
return nil, fmt.Errorf("open sqlite: %w", err)
}
// Enable statement printing.
if printStmts {
db = sqldblogger.OpenDriver(dbFile, db.Driver(), &statementLogger{})
}
// Set other settings.
pragmas := []string{
"PRAGMA journal_mode=WAL;", // Corruption safe write ahead log for txs.
"PRAGMA synchronous=NORMAL;", // Best for WAL.
"PRAGMA cache_size=-10000;", // 10MB Cache.
"PRAGMA busy_timeout=3000;", // 3s (3000ms) timeout for locked tables.
}
for _, pragma := range pragmas {
_, err := db.Exec(pragma)
if err != nil {
return nil, fmt.Errorf("failed to init sqlite with %s: %w", pragma, err)
}
}
// Run migrations on database.
n, err := migrate.Exec(db, "sqlite3", getMigrations(), migrate.Up)
if err != nil {
return nil, fmt.Errorf("migrate sqlite: %w", err)
}
log.Debugf("database/sqlite: ran %d migrations on %s database", n, name)
// Return as bob database.
ctx, cancelCtx := context.WithCancel(context.Background())
return &SQLite{
name: name,
db: db,
bob: bob.NewDB(db),
ctx: ctx,
cancelCtx: cancelCtx,
}, nil
}
// Get returns a database record.
func (db *SQLite) Get(key string) (record.Record, error) {
db.wg.Add(1)
defer db.wg.Done()
// Get record from database.
r, err := models.FindRecord(db.ctx, db.bob, key)
if err != nil {
return nil, fmt.Errorf("%w: %w", storage.ErrNotFound, err)
}
// Return data in wrapper.
return record.NewWrapperFromDatabase(
db.name,
key,
getMeta(r),
uint8(r.Format.GetOrZero()), //nolint:gosec // Values are within uint8.
r.Value.GetOrZero(),
)
}
// GetMeta returns the metadata of a database record.
func (db *SQLite) GetMeta(key string) (*record.Meta, error) {
r, err := db.Get(key)
if err != nil {
return nil, err
}
return r.Meta(), nil
}
// Put stores a record in the database.
func (db *SQLite) Put(r record.Record) (record.Record, error) {
return db.putRecord(r, nil)
}
func (db *SQLite) putRecord(r record.Record, tx *bob.Tx) (record.Record, error) {
db.wg.Add(1)
defer db.wg.Done()
// Lock record if in a transaction.
if tx != nil {
r.Lock()
defer r.Unlock()
}
// Serialize to JSON.
data, err := r.MarshalDataOnly(r, dsd.JSON)
if err != nil {
return nil, err
}
// Prepare for setter.
setFormat := omitnull.From(int16(dsd.JSON))
setData := omitnull.From(data)
if len(data) == 0 {
setFormat.Null()
setData.Null()
}
// Create structure for insert.
m := r.Meta()
setter := models.RecordSetter{
Key: omit.From(r.DatabaseKey()),
Format: setFormat,
Value: setData,
Created: omit.From(m.Created),
Modified: omit.From(m.Modified),
Expires: omit.From(m.Expires),
Deleted: omit.From(m.Deleted),
Secret: omit.From(m.IsSecret()),
Crownjewel: omit.From(m.IsCrownJewel()),
}
// Simulate upsert with custom selection on conflict.
dbQuery := models.Records.Insert(
&setter,
im.OnConflict("key").DoUpdate(
im.SetExcluded("format", "value", "created", "modified", "expires", "deleted", "secret", "crownjewel"),
),
)
// Execute in transaction or directly.
if tx != nil {
_, err = dbQuery.Exec(db.ctx, tx)
} else {
_, err = dbQuery.Exec(db.ctx, db.bob)
}
if err != nil {
return nil, err
}
return r, nil
}
// PutMany stores many records in the database.
func (db *SQLite) PutMany(shadowDelete bool) (chan<- record.Record, <-chan error) {
db.wg.Add(1)
defer db.wg.Done()
// Check if we should use prepared statement optimized inserting.
if UsePreparedStatements {
return db.putManyWithPreparedStmts(shadowDelete)
}
batch := make(chan record.Record, 100)
errs := make(chan error, 1)
tx, err := db.bob.BeginTx(db.ctx, nil)
if err != nil {
errs <- err
return batch, errs
}
// start handler
go func() {
// Read all put records.
writeBatch:
for {
select {
case r := <-batch:
if r != nil {
// Write record.
_, err := db.putRecord(r, &tx)
if err != nil {
errs <- err
break writeBatch
}
} else {
// Finalize transcation.
errs <- tx.Commit()
return
}
case <-db.ctx.Done():
break writeBatch
}
}
// Rollback transaction.
errs <- tx.Rollback()
}()
return batch, errs
}
// Delete deletes a record from the database.
func (db *SQLite) Delete(key string) error {
db.wg.Add(1)
defer db.wg.Done()
toDelete := &models.Record{Key: key}
return toDelete.Delete(db.ctx, db.bob)
}
// Query returns a an iterator for the supplied query.
func (db *SQLite) Query(q *query.Query, local, internal bool) (*iterator.Iterator, error) {
db.wg.Add(1)
defer db.wg.Done()
_, err := q.Check()
if err != nil {
return nil, fmt.Errorf("invalid query: %w", err)
}
queryIter := iterator.New()
go db.queryExecutor(queryIter, q, local, internal)
return queryIter, nil
}
func (db *SQLite) queryExecutor(queryIter *iterator.Iterator, q *query.Query, local, internal bool) {
db.wg.Add(1)
defer db.wg.Done()
// Build query.
var recordQuery *sqlite.ViewQuery[*models.Record, models.RecordSlice]
if q.DatabaseKeyPrefix() != "" {
recordQuery = models.Records.View.Query(
models.SelectWhere.Records.Key.Like(q.DatabaseKeyPrefix() + "%"),
)
} else {
recordQuery = models.Records.View.Query()
}
// Get cursor to go over all records in the query.
cursor, err := models.RecordsQuery.Cursor(recordQuery, db.ctx, db.bob)
if err != nil {
queryIter.Finish(err)
return
}
defer func() {
_ = cursor.Close()
}()
recordsLoop:
for cursor.Next() {
// Get next record
r, cErr := cursor.Get()
if cErr != nil {
err = fmt.Errorf("cursor error: %w", cErr)
break recordsLoop
}
// Check if key matches.
if !q.MatchesKey(r.Key) {
continue recordsLoop
}
// Check Meta.
m := getMeta(r)
if !m.CheckValidity() ||
!m.CheckPermission(local, internal) {
continue recordsLoop
}
// Check Data.
if q.HasWhereCondition() {
if r.Format.IsNull() || r.Value.IsNull() {
continue recordsLoop
}
jsonData := string(r.Value.GetOrZero())
jsonAccess := accessor.NewJSONAccessor(&jsonData)
if !q.MatchesAccessor(jsonAccess) {
continue recordsLoop
}
}
// Build database record.
matched, _ := record.NewWrapperFromDatabase(
db.name,
r.Key,
m,
uint8(r.Format.GetOrZero()), //nolint:gosec // Values are within uint8.
r.Value.GetOrZero(),
)
select {
case <-queryIter.Done:
break recordsLoop
case queryIter.Next <- matched:
default:
select {
case <-queryIter.Done:
break recordsLoop
case queryIter.Next <- matched:
case <-time.After(1 * time.Second):
err = ErrQueryTimeout
break recordsLoop
}
}
}
queryIter.Finish(err)
}
// Purge deletes all records that match the given query. It returns the number of successful deletes and an error.
func (db *SQLite) Purge(ctx context.Context, q *query.Query, local, internal, shadowDelete bool) (int, error) {
db.wg.Add(1)
defer db.wg.Done()
// Optimize for local and internal queries without where clause and without shadow delete.
if local && internal && !shadowDelete && !q.HasWhereCondition() {
// First count entries (SQLite does not support affected rows)
n, err := models.Records.Query(
models.SelectWhere.Records.Key.Like(q.DatabaseKeyPrefix()+"%"),
).Count(db.ctx, db.bob)
if err != nil || n == 0 {
return int(n), err
}
// Delete entries.
_, err = models.Records.Delete(
models.DeleteWhere.Records.Key.Like(q.DatabaseKeyPrefix()+"%"),
).Exec(db.ctx, db.bob)
return int(n), err
}
// Optimize for local and internal queries without where clause, but with shadow delete.
if local && internal && shadowDelete && !q.HasWhereCondition() {
// First count entries (SQLite does not support affected rows)
n, err := models.Records.Query(
models.SelectWhere.Records.Key.Like(q.DatabaseKeyPrefix()+"%"),
).Count(db.ctx, db.bob)
if err != nil || n == 0 {
return int(n), err
}
// Mark purged records as deleted.
now := time.Now().Unix()
_, err = models.Records.Update(
um.SetCol("format").ToArg(nil),
um.SetCol("value").ToArg(nil),
um.SetCol("deleted").ToArg(now),
models.UpdateWhere.Records.Key.Like(q.DatabaseKeyPrefix()+"%"),
).Exec(db.ctx, db.bob)
return int(n), err
}
// Otherwise, iterate over all entries and delete matching ones.
// TODO: Non-local, non-internal or content matching queries are not supported at the moment.
return 0, storage.ErrNotImplemented
}
// PurgeOlderThan deletes all records last updated before the given time. It returns the number of successful deletes and an error.
func (db *SQLite) PurgeOlderThan(ctx context.Context, prefix string, purgeBefore time.Time, local, internal, shadowDelete bool) (int, error) {
db.wg.Add(1)
defer db.wg.Done()
purgeBeforeInt := purgeBefore.Unix()
// Optimize for local and internal queries without where clause and without shadow delete.
if local && internal && !shadowDelete {
// First count entries (SQLite does not support affected rows)
n, err := models.Records.Query(
models.SelectWhere.Records.Key.Like(prefix+"%"),
models.SelectWhere.Records.Modified.LT(purgeBeforeInt),
).Count(db.ctx, db.bob)
if err != nil || n == 0 {
return int(n), err
}
// Delete entries.
_, err = models.Records.Delete(
models.DeleteWhere.Records.Key.Like(prefix+"%"),
models.DeleteWhere.Records.Modified.LT(purgeBeforeInt),
).Exec(db.ctx, db.bob)
return int(n), err
}
// Optimize for local and internal queries without where clause, but with shadow delete.
if local && internal && shadowDelete {
// First count entries (SQLite does not support affected rows)
n, err := models.Records.Query(
models.SelectWhere.Records.Key.Like(prefix+"%"),
models.SelectWhere.Records.Modified.LT(purgeBeforeInt),
).Count(db.ctx, db.bob)
if err != nil || n == 0 {
return int(n), err
}
// Mark purged records as deleted.
now := time.Now().Unix()
_, err = models.Records.Update(
um.SetCol("format").ToArg(nil),
um.SetCol("value").ToArg(nil),
um.SetCol("deleted").ToArg(now),
models.UpdateWhere.Records.Key.Like(prefix+"%"),
models.UpdateWhere.Records.Modified.LT(purgeBeforeInt),
).Exec(db.ctx, db.bob)
return int(n), err
}
// TODO: Non-local or non-internal queries are not supported at the moment.
return 0, storage.ErrNotImplemented
}
// ReadOnly returns whether the database is read only.
func (db *SQLite) ReadOnly() bool {
return false
}
// Injected returns whether the database is injected.
func (db *SQLite) Injected() bool {
return false
}
// MaintainRecordStates maintains records states in the database.
func (db *SQLite) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time, shadowDelete bool) error {
db.wg.Add(1)
defer db.wg.Done()
now := time.Now().Unix()
purgeThreshold := purgeDeletedBefore.Unix()
// Option 1: Using shadow delete.
if shadowDelete {
// Mark expired records as deleted.
_, err := models.Records.Update(
um.SetCol("format").ToArg(nil),
um.SetCol("value").ToArg(nil),
um.SetCol("deleted").ToArg(now),
models.UpdateWhere.Records.Deleted.EQ(0),
models.UpdateWhere.Records.Expires.GT(0),
models.UpdateWhere.Records.Expires.LT(now),
).Exec(db.ctx, db.bob)
if err != nil {
return fmt.Errorf("failed to shadow delete expired records: %w", err)
}
// Purge deleted records before threshold.
_, err = models.Records.Delete(
models.DeleteWhere.Records.Deleted.GT(0),
models.DeleteWhere.Records.Deleted.LT(purgeThreshold),
).Exec(db.ctx, db.bob)
if err != nil {
return fmt.Errorf("failed to purge deleted records (before threshold): %w", err)
}
return nil
}
// Option 2: Immediate delete.
// Delete expired record.
_, err := models.Records.Delete(
models.DeleteWhere.Records.Expires.GT(0),
models.DeleteWhere.Records.Expires.LT(now),
).Exec(db.ctx, db.bob)
if err != nil {
return fmt.Errorf("failed to delete expired records: %w", err)
}
// Delete shadow deleted records.
_, err = models.Records.Delete(
models.DeleteWhere.Records.Deleted.GT(0),
).Exec(db.ctx, db.bob)
if err != nil {
return fmt.Errorf("failed to purge deleted records: %w", err)
}
return nil
}
func (db *SQLite) Maintain(ctx context.Context) error {
db.wg.Add(1)
defer db.wg.Done()
// Remove up to about 100KB of SQLite pages from the freelist on every run.
// (Assuming 4KB page size.)
_, err := db.db.ExecContext(ctx, "PRAGMA incremental_vacuum(25);")
return err
}
func (db *SQLite) MaintainThorough(ctx context.Context) error {
db.wg.Add(1)
defer db.wg.Done()
// Remove all pages from the freelist.
_, err := db.db.ExecContext(ctx, "PRAGMA incremental_vacuum;")
return err
}
// Shutdown shuts down the database.
func (db *SQLite) Shutdown() error {
db.wg.Wait()
db.cancelCtx()
return db.bob.Close()
}
type statementLogger struct{}
func (sl statementLogger) Log(ctx context.Context, level sqldblogger.Level, msg string, data map[string]interface{}) {
fmt.Printf("SQL: %s --- %+v\n", msg, data)
}

View File

@@ -0,0 +1,216 @@
package sqlite
import (
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/safing/portmaster/base/database/query"
"github.com/safing/portmaster/base/database/record"
"github.com/safing/portmaster/base/database/storage"
)
var (
// Compile time interface checks.
_ storage.Interface = &SQLite{}
_ storage.Batcher = &SQLite{}
_ storage.Purger = &SQLite{}
)
type TestRecord struct { //nolint:maligned
record.Base
sync.Mutex
S string
I int
I8 int8
I16 int16
I32 int32
I64 int64
UI uint
UI8 uint8
UI16 uint16
UI32 uint32
UI64 uint64
F32 float32
F64 float64
B bool
}
func TestSQLite(t *testing.T) {
t.Parallel()
// start
testDir := t.TempDir()
db, err := openSQLite("test", testDir, true)
if err != nil {
t.Fatal(err)
}
defer func() {
// shutdown
err = db.Shutdown()
if err != nil {
t.Fatal(err)
}
}()
a := &TestRecord{
S: "banana",
I: 42,
I8: 42,
I16: 42,
I32: 42,
I64: 42,
UI: 42,
UI8: 42,
UI16: 42,
UI32: 42,
UI64: 42,
F32: 42.42,
F64: 42.42,
B: true,
}
a.SetMeta(&record.Meta{})
a.Meta().Update()
a.SetKey("test:A")
// put record
_, err = db.Put(a)
if err != nil {
t.Fatal(err)
}
// get and compare
r1, err := db.Get("A")
if err != nil {
t.Fatal(err)
}
a1 := &TestRecord{}
err = record.Unwrap(r1, a1)
if err != nil {
t.Fatal(err)
}
assert.Equal(t, a, a1, "struct must match")
// setup query test records
qA := &TestRecord{}
qA.SetKey("test:path/to/A")
qA.UpdateMeta()
qB := &TestRecord{}
qB.SetKey("test:path/to/B")
qB.UpdateMeta()
// Set creation/modification in the past.
qB.Meta().Created = time.Now().Add(-time.Hour).Unix()
qB.Meta().Modified = time.Now().Add(-time.Hour).Unix()
qC := &TestRecord{}
qC.SetKey("test:path/to/C")
qC.UpdateMeta()
// Set expiry in the past.
qC.Meta().Expires = time.Now().Add(-time.Hour).Unix()
qZ := &TestRecord{}
qZ.SetKey("test:z")
qZ.UpdateMeta()
put, errs := db.PutMany(false)
put <- qA
put <- qB
put <- qC
put <- qZ
close(put)
err = <-errs
if err != nil {
t.Fatal(err)
}
// test query
q := query.New("test:path/to/").MustBeValid()
it, err := db.Query(q, true, true)
if err != nil {
t.Fatal(err)
}
cnt := 0
for range it.Next {
cnt++
}
if it.Err() != nil {
t.Fatal(it.Err())
}
if cnt != 2 {
// Note: One is expired.
t.Fatalf("unexpected query result count: %d", cnt)
}
// delete
err = db.Delete("A")
if err != nil {
t.Fatal(err)
}
// check if its gone
_, err = db.Get("A")
if err == nil {
t.Fatal("should fail")
}
// purge older than
n, err := db.PurgeOlderThan(t.Context(), "path/to/", time.Now().Add(-30*time.Minute), true, true, false)
if err != nil {
t.Fatal(err)
}
if n != 1 {
t.Fatalf("unexpected purge older than delete count: %d", n)
}
// maintenance
err = db.MaintainRecordStates(t.Context(), time.Now().Add(-time.Minute), true)
if err != nil {
t.Fatal(err)
}
// maintenance
err = db.MaintainRecordStates(t.Context(), time.Now(), false)
if err != nil {
t.Fatal(err)
}
// purge
n, err = db.Purge(t.Context(), query.New("test:path/to/").MustBeValid(), true, true, true)
if err != nil {
t.Fatal(err)
}
if n != 1 {
t.Fatalf("unexpected purge delete count: %d", n)
}
// Maintenance
err = db.Maintain(t.Context())
if err != nil {
t.Fatalf("Maintain: %s", err)
}
err = db.MaintainThorough(t.Context())
if err != nil {
t.Fatalf("MaintainThorough: %s", err)
}
// test query
q = query.New("test").MustBeValid()
it, err = db.Query(q, true, true)
if err != nil {
t.Fatal(err)
}
cnt = 0
for range it.Next {
cnt++
}
if it.Err() != nil {
t.Fatal(it.Err())
}
if cnt != 1 {
t.Fatalf("unexpected query result count: %d", cnt)
}
}

View File