Update netquery to support history module
This commit is contained in:
@@ -2,18 +2,23 @@ package netquery
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"path"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/go-multierror"
|
||||
"github.com/jackc/puddle/v2"
|
||||
"zombiezen.com/go/sqlite"
|
||||
"zombiezen.com/go/sqlite/sqlitex"
|
||||
|
||||
"github.com/safing/portbase/dataroot"
|
||||
"github.com/safing/portbase/log"
|
||||
"github.com/safing/portmaster/netquery/orm"
|
||||
"github.com/safing/portmaster/network"
|
||||
@@ -46,6 +51,7 @@ type (
|
||||
Schema *orm.TableSchema
|
||||
|
||||
readConnPool *puddle.Pool[*sqlite.Conn]
|
||||
historyPath string
|
||||
|
||||
l sync.Mutex
|
||||
writeConn *sqlite.Conn
|
||||
@@ -82,7 +88,9 @@ type (
|
||||
Latitude float64 `sqlite:"latitude"`
|
||||
Longitude float64 `sqlite:"longitude"`
|
||||
Scope netutils.IPScope `sqlite:"scope"`
|
||||
Verdict network.Verdict `sqlite:"verdict"`
|
||||
WorstVerdict network.Verdict `sqlite:"worst_verdict"`
|
||||
ActiveVerdict network.Verdict `sqlite:"verdict"`
|
||||
FirewallVerdict network.Verdict `sqlite:"firewall_verdict"`
|
||||
Started time.Time `sqlite:"started,text,time"`
|
||||
Ended *time.Time `sqlite:"ended,text,time"`
|
||||
Tunneled bool `sqlite:"tunneled"`
|
||||
@@ -93,6 +101,8 @@ type (
|
||||
Allowed *bool `sqlite:"allowed"`
|
||||
ProfileRevision int `sqlite:"profile_revision"`
|
||||
ExitNode *string `sqlite:"exit_node"`
|
||||
BWIncoming uint64 `sqlite:"bw_incoming,default=0"`
|
||||
BWOutgoing uint64 `sqlite:"bw_outgoing,default=0"`
|
||||
|
||||
// TODO(ppacher): support "NOT" in search query to get rid of the following helper fields
|
||||
Active bool `sqlite:"active"` // could use "ended IS NOT NULL" or "ended IS NULL"
|
||||
@@ -108,21 +118,27 @@ type (
|
||||
// (see Execute). To perform database writes use either Save() or ExecuteWrite().
|
||||
// Note that write connections are serialized by the Database object before being
|
||||
// handed over to SQLite.
|
||||
func New(path string) (*Database, error) {
|
||||
func New(dbPath string) (*Database, error) {
|
||||
historyParentDir := dataroot.Root().ChildDir("databases", 0o700)
|
||||
if err := historyParentDir.Ensure(); err != nil {
|
||||
return nil, fmt.Errorf("failed to ensure database directory exists: %w", err)
|
||||
}
|
||||
|
||||
historyPath := "file://" + path.Join(historyParentDir.Path, "history.db")
|
||||
|
||||
constructor := func(ctx context.Context) (*sqlite.Conn, error) {
|
||||
c, err := sqlite.OpenConn(
|
||||
path,
|
||||
dbPath,
|
||||
sqlite.OpenReadOnly,
|
||||
sqlite.OpenNoMutex, //nolint:staticcheck // We like to be explicit.
|
||||
sqlite.OpenSharedCache,
|
||||
//sqlite.OpenMemory,
|
||||
sqlite.OpenURI,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to open read-only sqlite connection at %s: %w", path, err)
|
||||
return nil, fmt.Errorf("failed to open read-only sqlite connection at %s: %w", dbPath, err)
|
||||
}
|
||||
|
||||
if err := sqlitex.ExecuteTransient(c, "ATTACH DATABASE 'file:///tmp/history.db?mode=ro' AS history", nil); err != nil {
|
||||
if err := sqlitex.ExecuteTransient(c, "ATTACH DATABASE '"+historyPath+"?mode=ro' AS history", nil); err != nil {
|
||||
return nil, fmt.Errorf("failed to attach history database: %w", err)
|
||||
}
|
||||
|
||||
@@ -150,23 +166,23 @@ func New(path string) (*Database, error) {
|
||||
}
|
||||
|
||||
writeConn, err := sqlite.OpenConn(
|
||||
path,
|
||||
dbPath,
|
||||
sqlite.OpenCreate,
|
||||
sqlite.OpenReadWrite,
|
||||
sqlite.OpenNoMutex, //nolint:staticcheck // We like to be explicit.
|
||||
sqlite.OpenWAL,
|
||||
sqlite.OpenSharedCache,
|
||||
//sqlite.OpenMemory,
|
||||
sqlite.OpenURI,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to open sqlite at %s: %w", path, err)
|
||||
return nil, fmt.Errorf("failed to open sqlite at %s: %w", dbPath, err)
|
||||
}
|
||||
|
||||
return &Database{
|
||||
readConnPool: pool,
|
||||
Schema: schema,
|
||||
writeConn: writeConn,
|
||||
historyPath: historyPath,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -197,9 +213,7 @@ func (db *Database) ApplyMigrations() error {
|
||||
db.l.Lock()
|
||||
defer db.l.Unlock()
|
||||
|
||||
// Attach the history database
|
||||
log.Errorf("attaching database history")
|
||||
if err := sqlitex.ExecuteTransient(db.writeConn, "ATTACH DATABASE 'file:///tmp/history.db?mode=rwc' AS 'history';", nil); err != nil {
|
||||
if err := sqlitex.ExecuteTransient(db.writeConn, "ATTACH DATABASE '"+db.historyPath+"?mode=rwc' AS 'history';", nil); err != nil {
|
||||
return fmt.Errorf("failed to attach history database: %w", err)
|
||||
}
|
||||
|
||||
@@ -207,7 +221,7 @@ func (db *Database) ApplyMigrations() error {
|
||||
for _, dbName := range dbNames {
|
||||
// get the create-table SQL statement from the inferred schema
|
||||
sql := db.Schema.CreateStatement(dbName, true)
|
||||
log.Errorf("creating table schema for database %q", dbName)
|
||||
log.Debugf("creating table schema for database %q", dbName)
|
||||
|
||||
// execute the SQL
|
||||
if err := sqlitex.ExecuteTransient(db.writeConn, sql, nil); err != nil {
|
||||
@@ -285,7 +299,7 @@ func (db *Database) CountRows(ctx context.Context) (int, error) {
|
||||
return result[0].Count, nil
|
||||
}
|
||||
|
||||
// Cleanup removes all connections that have ended before threshold.
|
||||
// Cleanup removes all connections that have ended before threshold from the live database.
|
||||
//
|
||||
// NOTE(ppacher): there is no easy way to get the number of removed
|
||||
// rows other than counting them in a first step. Though, that's
|
||||
@@ -323,6 +337,18 @@ func (db *Database) Cleanup(ctx context.Context, threshold time.Time) (int, erro
|
||||
return result[0].Count, nil
|
||||
}
|
||||
|
||||
func (db *Database) RemoveAllHistoryData(ctx context.Context) error {
|
||||
query := fmt.Sprintf("DELETE FROM %s.connections", HistoryDatabase)
|
||||
return db.ExecuteWrite(ctx, query)
|
||||
}
|
||||
|
||||
func (db *Database) RemoveHistoryForProfile(ctx context.Context, profileID string) error {
|
||||
query := fmt.Sprintf("DELETE FROM %s.connections WHERE profile = :profile", HistoryDatabase)
|
||||
return db.ExecuteWrite(ctx, query, orm.WithNamedArgs(map[string]any{
|
||||
":profile": profileID,
|
||||
}))
|
||||
}
|
||||
|
||||
// dumpTo is a simple helper method that dumps all rows stored in the SQLite database
|
||||
// as JSON to w.
|
||||
// Any error aborts dumping rows and is returned.
|
||||
@@ -350,13 +376,74 @@ func (db *Database) dumpTo(ctx context.Context, w io.Writer) error { //nolint:un
|
||||
return enc.Encode(conns)
|
||||
}
|
||||
|
||||
// MarkAllHistoryConnectionsEnded marks all connections in the history database as ended.
|
||||
func (db *Database) MarkAllHistoryConnectionsEnded(ctx context.Context) error {
|
||||
query := fmt.Sprintf("UPDATE %s.connections SET active = FALSE, ended = :ended WHERE active = TRUE", HistoryDatabase)
|
||||
|
||||
if err := db.ExecuteWrite(ctx, query, orm.WithNamedArgs(map[string]any{
|
||||
":ended": time.Now().Format(orm.SqliteTimeFormat),
|
||||
})); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (db *Database) UpdateBandwidth(ctx context.Context, enableHistory bool, processKey string, connID string, incoming *uint64, outgoing *uint64) error {
|
||||
data := connID + "-" + processKey
|
||||
hash := sha256.Sum256([]byte(data))
|
||||
dbConnId := hex.EncodeToString(hash[:])
|
||||
|
||||
params := map[string]any{
|
||||
":id": dbConnId,
|
||||
}
|
||||
|
||||
parts := []string{}
|
||||
if incoming != nil {
|
||||
parts = append(parts, "bw_incoming = :bw_incoming")
|
||||
params[":bw_incoming"] = *incoming
|
||||
}
|
||||
|
||||
if outgoing != nil {
|
||||
parts = append(parts, "bw_outgoing = :bw_outgoing")
|
||||
params[":bw_outgoing"] = *outgoing
|
||||
}
|
||||
|
||||
updateSet := strings.Join(parts, ", ")
|
||||
|
||||
updateStmts := []string{
|
||||
fmt.Sprintf(`UPDATE %s.connections SET %s WHERE id = :id`, LiveDatabase, updateSet),
|
||||
}
|
||||
|
||||
if enableHistory {
|
||||
updateStmts = append(updateStmts,
|
||||
fmt.Sprintf(`UPDATE %s.connections SET %s WHERE id = :id`, HistoryDatabase, updateSet),
|
||||
)
|
||||
}
|
||||
|
||||
merr := new(multierror.Error)
|
||||
for _, stmt := range updateStmts {
|
||||
if err := db.ExecuteWrite(ctx, stmt, orm.WithNamedArgs(params)); err != nil {
|
||||
merr.Errors = append(merr.Errors, err)
|
||||
}
|
||||
}
|
||||
|
||||
return merr.ErrorOrNil()
|
||||
}
|
||||
|
||||
// Save inserts the connection conn into the SQLite database. If conn
|
||||
// already exists the table row is updated instead.
|
||||
//
|
||||
// Save uses the database write connection instead of relying on the
|
||||
// connection pool.
|
||||
func (db *Database) Save(ctx context.Context, conn Conn, enableHistory bool) error {
|
||||
connMap, err := orm.ToParamMap(ctx, conn, "", orm.DefaultEncodeConfig)
|
||||
// convert the connection to a param map where each key is already translated
|
||||
// to the sql column name. We also skip bw_incoming and bw_outgoing since those
|
||||
// will be updated independenly from the connection object.
|
||||
connMap, err := orm.ToParamMap(ctx, conn, "", orm.DefaultEncodeConfig, []string{
|
||||
"bw_incoming",
|
||||
"bw_outgoing",
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to encode connection for SQL: %w", err)
|
||||
}
|
||||
@@ -387,10 +474,10 @@ func (db *Database) Save(ctx context.Context, conn Conn, enableHistory bool) err
|
||||
|
||||
// TODO(ppacher): make sure this one can be cached to speed up inserting
|
||||
// and save some CPU cycles for the user
|
||||
dbNames := []string{"main"}
|
||||
dbNames := []DatabaseName{LiveDatabase}
|
||||
|
||||
if enableHistory {
|
||||
dbNames = append(dbNames, "history")
|
||||
dbNames = append(dbNames, HistoryDatabase)
|
||||
}
|
||||
|
||||
for _, dbName := range dbNames {
|
||||
|
||||
Reference in New Issue
Block a user