From 4e3e17fa8ca0e06d7d6e4fe036902da033e2e9d5 Mon Sep 17 00:00:00 2001 From: Patrick Pacher Date: Wed, 16 Mar 2022 20:38:51 +0100 Subject: [PATCH] Add initial sqlite support in netquery --- netquery/database.go | 298 +++++++++++++++++++++++++++++++ netquery/manager.go | 225 +++++++++++++++++++++++ netquery/module_api.go | 140 +++++++++++++++ netquery/runtime_query_runner.go | 77 ++++++++ 4 files changed, 740 insertions(+) create mode 100644 netquery/database.go create mode 100644 netquery/manager.go create mode 100644 netquery/module_api.go create mode 100644 netquery/runtime_query_runner.go diff --git a/netquery/database.go b/netquery/database.go new file mode 100644 index 00000000..7485f19c --- /dev/null +++ b/netquery/database.go @@ -0,0 +1,298 @@ +package netquery + +import ( + "context" + "encoding/json" + "fmt" + "io" + "strings" + "sync" + "time" + + "github.com/safing/portbase/log" + "github.com/safing/portmaster/netquery/orm" + "github.com/safing/portmaster/network" + "github.com/safing/portmaster/network/netutils" + "github.com/safing/portmaster/network/packet" + "zombiezen.com/go/sqlite" + "zombiezen.com/go/sqlite/sqlitex" +) + +// InMemory is the "file path" to open a new in-memory database. +const InMemory = ":memory:" + +// Available connection types as their string representation. +const ( + ConnTypeDNS = "dns" + ConnTypeIP = "ip" +) + +// ConnectionTypeToString is a lookup map to get the string representation +// of a network.ConnectionType as used by this package. +var ConnectionTypeToString = map[network.ConnectionType]string{ + network.DNSRequest: ConnTypeDNS, + network.IPConnection: ConnTypeIP, +} + +type ( + // Database represents a SQLite3 backed connection database. + // It's use is tailored for persistance and querying of network.Connection. + // Access to the underlying SQLite database is synchronized. + // + // TODO(ppacher): somehow I'm receiving SIGBUS or SIGSEGV when no doing + // synchronization in *Database. Check what exactly sqlite.OpenFullMutex, etc.. + // are actually supposed to do. + // + Database struct { + l sync.Mutex + conn *sqlite.Conn + } + + // Conn is a network connection that is stored in a SQLite database and accepted + // by the *Database type of this package. This also defines, using the ./orm package, + // the table schema and the model that is exposed via the runtime database as well as + // the query API. + // + // Use ConvertConnection from this package to convert a network.Connection to this + // representation. + Conn struct { + // ID is a device-unique identifier for the connection. It is built + // from network.Connection by hashing the connection ID and the start + // time. We cannot just use the network.Connection.ID because it is only unique + // as long as the connection is still active and might be, although unlikely, + // reused afterwards. + ID string `sqlite:"id,primary"` + Type string `sqlite:"type,varchar(8)"` + External bool `sqlite:"external"` + IPVersion packet.IPVersion `sqlite:"ip_version"` + IPProtocol packet.IPProtocol `sqlite:"ip_protocol"` + LocalIP string `sqlite:"local_ip"` + LocalPort uint16 `sqlite:"local_port"` + RemoteIP string `sqlite:"remote_ip"` + RemotePort uint16 `sqlite:"remote_port"` + Domain string `sqlite:"domain"` + Country string `sqlite:"country,varchar(2)"` + ASN uint `sqlite:"asn"` + ASOwner string `sqlite:"as_owner"` + Latitude float64 `sqlite:"latitude"` + Longitude float64 `sqlite:"longitude"` + Scope netutils.IPScope `sqlite:"scope"` + Verdict network.Verdict `sqlite:"verdict"` + Started time.Time `sqlite:"started,text"` + Ended *time.Time `sqlite:"ended,text"` + Tunneled bool `sqlite:"tunneled"` + Encrypted bool `sqlite:"encrypted"` + Internal bool `sqlite:"internal"` + Inbound bool `sqlite:"inbound"` + ExtraData json.RawMessage `sqlite:"extra_data"` + } +) + +// New opens a new database at path. The database is opened with Full-Mutex, Write-Ahead-Log (WAL) +// and Shared-Cache enabled. +// +// TODO(ppacher): check which sqlite "open flags" provide the best performance and don't cause +// SIGBUS/SIGSEGV when used with out a dedicated mutex in *Database. +// +func New(path string) (*Database, error) { + c, err := sqlite.OpenConn( + path, + sqlite.OpenCreate, + sqlite.OpenReadWrite, + sqlite.OpenFullMutex, + sqlite.OpenWAL, + sqlite.OpenSharedCache, + ) + if err != nil { + return nil, fmt.Errorf("failed to open sqlite at %s: %w", path, err) + } + + return &Database{conn: c}, nil +} + +// NewInMemory is like New but creates a new in-memory database and +// automatically applies the connection table schema. +func NewInMemory() (*Database, error) { + db, err := New(InMemory) + if err != nil { + return nil, err + } + + // this should actually never happen because an in-memory database + // always starts empty... + if err := db.ApplyMigrations(); err != nil { + return nil, fmt.Errorf("failed to prepare database: %w", err) + } + + return db, nil +} + +// ApplyMigrations applies any table and data migrations that are needed +// to bring db up-to-date with the built-in schema. +// TODO(ppacher): right now this only applies the current schema and ignores +// any data-migrations. Once the history module is implemented this should +// become/use a full migration system -- use zombiezen.com/go/sqlite/sqlitemigration +func (db *Database) ApplyMigrations() error { + schema, err := orm.GenerateTableSchema("connections", Conn{}) + if err != nil { + return fmt.Errorf("failed to generate table schema for conncetions: %w", err) + } + + // get the create-table SQL statement from the infered schema + sql := schema.CreateStatement(false) + + // execute the SQL + if err := sqlitex.ExecuteTransient(db.conn, sql, nil); err != nil { + return fmt.Errorf("failed to create schema: %w", err) + } + + return nil +} + +// Execute executes a custom SQL query against the SQLite database used by db. +// It uses orm.RunQuery() under the hood so please refer to the orm package for +// more information about available options. +func (db *Database) Execute(ctx context.Context, sql string, args ...orm.QueryOption) error { + db.l.Lock() + defer db.l.Unlock() + + return orm.RunQuery(ctx, db.conn, sql, args...) +} + +// CountRows returns the number of rows stored in the database. +func (db *Database) CountRows(ctx context.Context) (int, error) { + var result []struct { + Count int `sqlite:"count"` + } + + if err := db.Execute(ctx, "SELECT COUNT(*) AS count FROM connections", orm.WithResult(&result)); err != nil { + return 0, fmt.Errorf("failed to perform query: %w", err) + } + + if len(result) != 1 { + return 0, fmt.Errorf("unexpected number of rows returned, expected 1 got %d", len(result)) + } + + return result[0].Count, nil +} + +// Cleanup removes all connections that have ended before threshold. +// +// NOTE(ppacher): there is no easy way to get the number of removed +// rows other than counting them in a first step. Though, that's +// probably not worth the cylces... +func (db *Database) Cleanup(ctx context.Context, threshold time.Time) (int, error) { + where := `WHERE ended IS NOT NULL + AND datetime(ended) < :threshold` + sql := "DELETE FROM connections " + where + ";" + + args := orm.WithNamedArgs(map[string]interface{}{ + ":threshold": threshold, + }) + + var result []struct { + Count int `sqlite:"count"` + } + if err := db.Execute( + ctx, + "SELECT COUNT(*) AS count FROM connections "+where, + args, + orm.WithTransient(), + orm.WithResult(&result), + ); err != nil { + return 0, fmt.Errorf("failed to perform query: %w", err) + } + if len(result) != 1 { + return 0, fmt.Errorf("unexpected number of rows, expected 1 got %d", len(result)) + } + + err := db.Execute(ctx, sql, args) + if err != nil { + return 0, err + } + + return result[0].Count, nil +} + +// dumpTo is a simple helper method that dumps all rows stored in the SQLite database +// as JSON to w. +// Any error aborts dumping rows and is returned. +func (db *Database) dumpTo(ctx context.Context, w io.Writer) error { + db.l.Lock() + defer db.l.Unlock() + + var conns []Conn + if err := sqlitex.ExecuteTransient(db.conn, "SELECT * FROM connections", &sqlitex.ExecOptions{ + ResultFunc: func(stmt *sqlite.Stmt) error { + var c Conn + if err := orm.DecodeStmt(ctx, stmt, &c, orm.DefaultDecodeConfig); err != nil { + return err + } + + conns = append(conns, c) + return nil + }, + }); err != nil { + return err + } + + enc := json.NewEncoder(w) + enc.SetIndent("", " ") + return enc.Encode(conns) +} + +// Save inserts the connection conn into the SQLite database. If conn +// already exists the table row is updated instead. +func (db *Database) Save(ctx context.Context, conn Conn) error { + connMap, err := orm.EncodeAsMap(ctx, conn, "", orm.DefaultEncodeConfig) + if err != nil { + return fmt.Errorf("failed to encode connection for SQL: %w", err) + } + + columns := make([]string, 0, len(connMap)) + placeholders := make([]string, 0, len(connMap)) + values := make(map[string]interface{}, len(connMap)) + updateSets := make([]string, 0, len(connMap)) + + for key, value := range connMap { + columns = append(columns, key) + placeholders = append(placeholders, ":"+key) + values[":"+key] = value + updateSets = append(updateSets, fmt.Sprintf("%s = :%s", key, key)) + } + + db.l.Lock() + defer db.l.Unlock() + + // TODO(ppacher): make sure this one can be cached to speed up inserting + // and save some CPU cycles for the user + sql := fmt.Sprintf( + `INSERT INTO connections (%s) + VALUES(%s) + ON CONFLICT(id) DO UPDATE SET + %s + `, + strings.Join(columns, ", "), + strings.Join(placeholders, ", "), + strings.Join(updateSets, ", "), + ) + + if err := sqlitex.ExecuteTransient(db.conn, sql, &sqlitex.ExecOptions{ + Named: values, + ResultFunc: func(stmt *sqlite.Stmt) error { + log.Errorf("netquery: got result statement with %d columns", stmt.ColumnCount()) + return nil + }, + }); err != nil { + log.Errorf("netquery: failed to execute: %s", err) + return err + } + + return nil +} + +// Close closes the underlying database connection. db should and cannot be +// used after Close() has returned. +func (db *Database) Close() error { + return db.conn.Close() +} diff --git a/netquery/manager.go b/netquery/manager.go new file mode 100644 index 00000000..cd9b2335 --- /dev/null +++ b/netquery/manager.go @@ -0,0 +1,225 @@ +package netquery + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "encoding/json" + "fmt" + "time" + + "github.com/safing/portbase/database/record" + "github.com/safing/portbase/formats/dsd" + "github.com/safing/portbase/log" + "github.com/safing/portbase/runtime" + "github.com/safing/portmaster/network" +) + +type ( + // ConnectionStore describes the interface that is used by Manager + // to save new or updated connection objects. + // It is implemented by the *Database type of this package. + ConnectionStore interface { + // Save is called to perists the new or updated connection. If required, + // It's up the the implementation to figure out if the operation is an + // insert or an update. + // The ID of Conn is unique and can be trusted to never collide with other + // connections of the save device. + Save(context.Context, Conn) error + } + + // Manager handles new and updated network.Connections feeds and persists them + // at a connection store. + // Manager also registers itself as a runtime database and pushes updates to + // connections using the local format. + // Users should use this update feed rather than the deprecated "network:" database. + Manager struct { + store ConnectionStore + push runtime.PushFunc + runtimeReg *runtime.Registry + pushPrefix string + } +) + +// NewManager returns a new connection manager that persists all newly created or +// updated connections at store. +func NewManager(store ConnectionStore, pushPrefix string, reg *runtime.Registry) (*Manager, error) { + mng := &Manager{ + store: store, + runtimeReg: reg, + pushPrefix: pushPrefix, + } + + push, err := reg.Register(pushPrefix, runtime.SimpleValueGetterFunc(mng.runtimeGet)) + if err != nil { + return nil, err + } + mng.push = push + + return mng, nil +} + +func (mng *Manager) runtimeGet(keyOrPrefix string) ([]record.Record, error) { + // TODO(ppacher): + // we don't yet support querying using the runtime database here ... + // consider exposing connection from the database at least by ID. + // + // NOTE(ppacher): + // for debugging purposes use RuntimeQueryRunner to execute plain + // SQL queries against the database using portbase/database/runtime. + return nil, nil +} + +// HandleFeed starts reading new and updated connections from feed and persists them +// in the configured ConnectionStore. HandleFeed blocks until either ctx is cancelled +// or feed is closed. +// Any errors encountered when processing new or updated connections are logged but +// otherwise ignored. +// HandleFeed handles and persists updates one after each other! Depending on the system +// load the user might want to use a buffered channel for feed. +func (mng *Manager) HandleFeed(ctx context.Context, feed <-chan *network.Connection) { + // count the number of inserted rows for logging purposes. + // + // TODO(ppacher): how to handle the, though unlikely case, of a + // overflow to 0 here? + var count uint64 + + for { + select { + case <-ctx.Done(): + return + + case conn, ok := <-feed: + if !ok { + return + } + + model, err := convertConnection(conn) + if err != nil { + log.Errorf("netquery: failed to convert connection %s to sqlite model: %w", conn.ID, err) + + continue + } + + log.Infof("netquery: persisting create/update to connection %s", conn.ID) + + if err := mng.store.Save(ctx, *model); err != nil { + log.Errorf("netquery: failed to save connection %s in sqlite database: %w", conn.ID, err) + + continue + } + + // push an update for the connection + if err := mng.pushConnUpdate(ctx, *model); err != nil { + log.Errorf("netquery: failed to push update for conn %s via database system: %w", conn.ID, err) + } + + count++ + + if count%20 == 0 { + log.Debugf("netquery: persisted %d connections so far", count) + } + } + } +} + +func (mng *Manager) pushConnUpdate(ctx context.Context, conn Conn) error { + blob, err := json.Marshal(conn) + if err != nil { + return fmt.Errorf("failed to marshal connection: %w", err) + } + + key := fmt.Sprintf("%s:%s%s", mng.runtimeReg.DatabaseName(), mng.pushPrefix, conn.ID) + wrapper, err := record.NewWrapper( + key, + new(record.Meta), + dsd.JSON, + blob, + ) + if err != nil { + return fmt.Errorf("failed to create record wrapper: %w", err) + } + + // FIXME(ppacher): it may happen that started != now for NEW connections. + // In that case we would push and UPD rather than NEW even if + // the connection is new ... + // Though, that's still better than always pushing NEW for existing + // connections. + // If we would use UnixNano() here chances would be even worse. + // + // Verify if the check in portbase/api/database.go is vulnerable + // to such timing issues in general. + wrapper.SetMeta(&record.Meta{ + Created: conn.Started.Unix(), + Modified: time.Now().Unix(), + }) + + mng.push(wrapper) + return nil +} + +// convertConnection converts conn to the local representation used +// to persist the information in SQLite. convertConnection attempts +// to lock conn and may thus block for some time. +func convertConnection(conn *network.Connection) (*Conn, error) { + conn.Lock() + defer conn.Unlock() + + c := Conn{ + ID: genConnID(conn), + External: conn.External, + IPVersion: conn.IPVersion, + IPProtocol: conn.IPProtocol, + LocalIP: conn.LocalIP.String(), + LocalPort: conn.LocalPort, + Verdict: conn.Verdict, + Started: time.Unix(conn.Started, 0), + Tunneled: conn.Tunneled, + Encrypted: conn.Encrypted, + Internal: conn.Internal, + Inbound: conn.Inbound, + Type: ConnectionTypeToString[conn.Type], + } + + if conn.Ended > 0 { + ended := time.Unix(conn.Ended, 0) + c.Ended = &ended + } + + extraData := map[string]interface{}{} + + if conn.Entity != nil { + extraData["cname"] = conn.Entity.CNAME + extraData["blockedByLists"] = conn.Entity.BlockedByLists + extraData["blockedEntities"] = conn.Entity.BlockedEntities + extraData["reason"] = conn.Reason + + c.RemoteIP = conn.Entity.IP.String() + c.RemotePort = conn.Entity.Port // FIXME(ppacher): or do we want DstPort() here? + c.Domain = conn.Entity.Domain + c.Country = conn.Entity.Country + c.ASN = conn.Entity.ASN + c.ASOwner = conn.Entity.ASOrg + c.Scope = conn.Entity.IPScope + if conn.Entity.Coordinates != nil { + c.Latitude = conn.Entity.Coordinates.Latitude + c.Longitude = conn.Entity.Coordinates.Longitude + } + } + + // pre-compute the JSON blob for the extra data column + // and assign it. + extraDataBlob, err := json.Marshal(extraData) + if err != nil { + return nil, fmt.Errorf("failed to marshal extra data: %w", err) + } + c.ExtraData = extraDataBlob + + return &c, nil +} + +func genConnID(conn *network.Connection) string { + data := conn.ID + "-" + time.Unix(conn.Started, 0).String() + hash := sha256.Sum256([]byte(data)) + return hex.EncodeToString(hash[:]) +} diff --git a/netquery/module_api.go b/netquery/module_api.go new file mode 100644 index 00000000..da4e7a1a --- /dev/null +++ b/netquery/module_api.go @@ -0,0 +1,140 @@ +package netquery + +import ( + "context" + "fmt" + "time" + + "github.com/safing/portbase/database" + "github.com/safing/portbase/database/query" + "github.com/safing/portbase/log" + "github.com/safing/portbase/modules" + "github.com/safing/portbase/runtime" + "github.com/safing/portmaster/network" +) + +func init() { + var ( + module *modules.Module + db *database.Interface + sqlStore *Database + mng *Manager + ) + + module = modules.Register( + "netquery", + /* Prepare Module */ + func() error { + var err error + + db = database.NewInterface(&database.Options{ + Local: true, + Internal: true, + CacheSize: 0, + }) + + sqlStore, err = NewInMemory() + if err != nil { + return fmt.Errorf("failed to create in-memory database: %w", err) + } + + mng, err = NewManager(sqlStore, "netquery/updates/", runtime.DefaultRegistry) + if err != nil { + return fmt.Errorf("failed to create manager: %w", err) + } + + return nil + }, + /* Start Module */ + func() error { + ch := make(chan *network.Connection, 100) + + module.StartServiceWorker("netquery-feeder", time.Second, func(ctx context.Context) error { + sub, err := db.Subscribe(query.New("network:")) + if err != nil { + return fmt.Errorf("failed to subscribe to network tree: %w", err) + } + defer sub.Cancel() + + for { + select { + case <-ctx.Done(): + return nil + case rec, ok := <-sub.Feed: + if !ok { + return nil + } + + conn, ok := rec.(*network.Connection) + if !ok { + // This is fine as we also receive process updates on + // this channel. + continue + } + + ch <- conn + } + } + }) + + module.StartServiceWorker("netquery-persister", time.Second, func(ctx context.Context) error { + defer close(ch) + + mng.HandleFeed(ctx, ch) + return nil + }) + + module.StartWorker("netquery-row-cleaner", func(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return nil + case <-time.After(10 * time.Second): + count, err := sqlStore.Cleanup(ctx, time.Now().Add(-5*time.Minute)) + if err != nil { + log.Errorf("netquery: failed to count number of rows in memory: %w", err) + } else { + log.Infof("netquery: successfully removed %d old rows", count) + } + } + } + }) + + module.StartWorker("netquery-row-counter", func(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return nil + case <-time.After(5 * time.Second): + count, err := sqlStore.CountRows(ctx) + if err != nil { + log.Errorf("netquery: failed to count number of rows in memory: %w", err) + } else { + log.Infof("netquery: currently holding %d rows in memory", count) + } + + /* + if err := sqlStore.dumpTo(ctx, os.Stderr); err != nil { + log.Errorf("netquery: failed to dump sqlite memory content: %w", err) + } + */ + } + } + }) + + // for debugging, we provide a simple direct SQL query interface using + // the runtime database + _, err := NewRuntimeQueryRunner(sqlStore, "netquery/query/", runtime.DefaultRegistry) + if err != nil { + return fmt.Errorf("failed to set up runtime SQL query runner: %w", err) + } + + return nil + }, + nil, + "network", + "database", + ) + + module.Enable() +} diff --git a/netquery/runtime_query_runner.go b/netquery/runtime_query_runner.go new file mode 100644 index 00000000..43f58539 --- /dev/null +++ b/netquery/runtime_query_runner.go @@ -0,0 +1,77 @@ +package netquery + +import ( + "context" + "encoding/json" + "fmt" + "strings" + + "github.com/safing/portbase/database/record" + "github.com/safing/portbase/formats/dsd" + "github.com/safing/portbase/log" + "github.com/safing/portbase/runtime" + "github.com/safing/portmaster/netquery/orm" +) + +// RuntimeQueryRunner provides a simple interface for the runtime database +// that allows direct SQL queries to be performed against db. +// Each resulting row of that query are marshaled as map[string]interface{} +// and returned as a single record to the caller. +// +// Using portbase/database#Query is not possible because portbase/database will +// complain about the SQL query being invalid. To work around that issue, +// RuntimeQueryRunner uses a 'GET key' request where the SQL query is embedded into +// the record key. +type RuntimeQueryRunner struct { + db *Database + reg *runtime.Registry + keyPrefix string +} + +// NewRuntimeQueryRunner returns a new runtime SQL query runner that parses +// and serves SQL queries form GET / requests. +func NewRuntimeQueryRunner(db *Database, prefix string, reg *runtime.Registry) (*RuntimeQueryRunner, error) { + runner := &RuntimeQueryRunner{ + db: db, + reg: reg, + keyPrefix: prefix, + } + + if _, err := reg.Register(prefix, runtime.SimpleValueGetterFunc(runner.get)); err != nil { + return nil, fmt.Errorf("failed to register runtime value provider: %w", err) + } + + return runner, nil +} + +func (runner *RuntimeQueryRunner) get(keyOrPrefix string) ([]record.Record, error) { + query := strings.TrimPrefix( + keyOrPrefix, + runner.keyPrefix, + ) + + log.Infof("netquery: executing custom SQL query: %q", query) + + var result []map[string]interface{} + if err := runner.db.Execute(context.Background(), query, orm.WithResult(&result)); err != nil { + return nil, fmt.Errorf("failed to perform query %q: %w", query, err) + } + + // we need to wrap the result slice into a map as portbase/database attempts + // to inject a _meta field. + blob, err := json.Marshal(map[string]interface{}{ + "result": result, + }) + if err != nil { + return nil, fmt.Errorf("failed to marshal result: %w", err) + } + + key := fmt.Sprintf("%s:%s", runner.reg.DatabaseName(), keyOrPrefix) + wrapper, err := record.NewWrapper(key, nil, dsd.JSON, blob) + if err != nil { + return nil, fmt.Errorf("failed to create record wrapper: %w", err) + } + wrapper.SetMeta(new(record.Meta)) + + return []record.Record{wrapper}, nil +}