From 07f4253e0b52bcf38cb4a7f2b300ee875817e36d Mon Sep 17 00:00:00 2001 From: Daniel Date: Fri, 21 Jul 2023 16:05:57 +0200 Subject: [PATCH] Improve logging and make linter happy --- .../interception/ebpf/bandwidth/interface.go | 3 +++ .../ebpf/connection_listener/worker.go | 2 ++ .../interception/windowskext/bandwidth_stats.go | 5 ++++- firewall/packet_handler.go | 8 +++++--- firewall/prompt.go | 4 ++-- netquery/database.go | 17 ++++++++++------- netquery/manager.go | 12 ++++++------ netquery/module_api.go | 6 +++--- netquery/orm/query_runner.go | 2 +- netquery/orm/schema_builder.go | 13 +++++++------ netquery/query.go | 2 ++ netquery/query_handler.go | 3 ++- 12 files changed, 47 insertions(+), 30 deletions(-) diff --git a/firewall/interception/ebpf/bandwidth/interface.go b/firewall/interception/ebpf/bandwidth/interface.go index 53d2406d..f247b157 100644 --- a/firewall/interception/ebpf/bandwidth/interface.go +++ b/firewall/interception/ebpf/bandwidth/interface.go @@ -142,6 +142,9 @@ func reportBandwidth(ctx context.Context, objs bpfObjects, bandwidthUpdates chan case bandwidthUpdates <- update: case <-ctx.Done(): return + default: + log.Warning("ebpf: bandwidth update queue is full, skipping rest of batch") + return } } } diff --git a/firewall/interception/ebpf/connection_listener/worker.go b/firewall/interception/ebpf/connection_listener/worker.go index 4798e389..1dee07be 100644 --- a/firewall/interception/ebpf/connection_listener/worker.go +++ b/firewall/interception/ebpf/connection_listener/worker.go @@ -8,6 +8,7 @@ import ( "fmt" "net" "sync/atomic" + "time" "github.com/cilium/ebpf/link" "github.com/cilium/ebpf/ringbuf" @@ -112,6 +113,7 @@ func ConnectionListenerWorker(ctx context.Context, packets chan packet.Packet) e Src: convertArrayToIPv4(event.Saddr, packet.IPVersion(event.IpVersion)), Dst: convertArrayToIPv4(event.Daddr, packet.IPVersion(event.IpVersion)), PID: int(event.Pid), + SeenAt: time.Now(), }) if isEventValid(event) { // DEBUG: diff --git a/firewall/interception/windowskext/bandwidth_stats.go b/firewall/interception/windowskext/bandwidth_stats.go index 7147db97..2a1bddc0 100644 --- a/firewall/interception/windowskext/bandwidth_stats.go +++ b/firewall/interception/windowskext/bandwidth_stats.go @@ -55,7 +55,7 @@ func reportBandwidth(ctx context.Context, bandwidthUpdates chan *packet.Bandwidt } // Report all statistics. - for _, stat := range stats { + for i, stat := range stats { connID := packet.CreateConnectionID( packet.IPProtocol(stat.protocol), convertArrayToIP(stat.localIP, stat.ipV6 == 1), stat.localPort, @@ -72,6 +72,9 @@ func reportBandwidth(ctx context.Context, bandwidthUpdates chan *packet.Bandwidt case bandwidthUpdates <- update: case <-ctx.Done(): return nil + default: + log.Warningf("kext: bandwidth update queue is full, skipping rest of batch (%d entries)", len(stats)-i) + return nil } } diff --git a/firewall/packet_handler.go b/firewall/packet_handler.go index 0e70bcb9..97df6eff 100644 --- a/firewall/packet_handler.go +++ b/firewall/packet_handler.go @@ -511,7 +511,7 @@ func issueVerdict(conn *network.Connection, pkt packet.Packet, verdict network.V atomic.AddUint64(packetsFailed, 1) err = pkt.Drop() case network.VerdictUndecided, network.VerdictUndeterminable: - log.Warningf("filter: tried to apply verdict %s to pkt %s: dropping instead", verdict, pkt) + log.Tracer(pkt.Ctx()).Warningf("filter: tried to apply verdict %s to pkt %s: dropping instead", verdict, pkt) fallthrough default: atomic.AddUint64(packetsDropped, 1) @@ -519,7 +519,7 @@ func issueVerdict(conn *network.Connection, pkt packet.Packet, verdict network.V } if err != nil { - log.Warningf("filter: failed to apply verdict to pkt %s: %s", pkt, err) + log.Tracer(pkt.Ctx()).Warningf("filter: failed to apply verdict to pkt %s: %s", pkt, err) } } @@ -656,8 +656,10 @@ func updateBandwidth(ctx context.Context, bwUpdate *packet.BandwidthUpdate) { conn.BytesSent += bwUpdate.BytesSent default: log.Warningf("filter: unsupported bandwidth update method: %d", bwUpdate.Method) + return } + // Update bandwidth in the netquery module. if netquery.DefaultModule != nil && conn.BandwidthEnabled { if err := netquery.DefaultModule.Store.UpdateBandwidth( ctx, @@ -667,7 +669,7 @@ func updateBandwidth(ctx context.Context, bwUpdate *packet.BandwidthUpdate) { conn.BytesReceived, conn.BytesSent, ); err != nil { - log.Errorf("firewall: failed to persist bandwidth data: %s", err) + log.Errorf("filter: failed to persist bandwidth data: %s", err) } } } diff --git a/firewall/prompt.go b/firewall/prompt.go index e1a380d9..e3582ba0 100644 --- a/firewall/prompt.go +++ b/firewall/prompt.go @@ -91,12 +91,12 @@ func createPrompt(ctx context.Context, conn *network.Connection) (n *notificatio layeredProfile := conn.Process().Profile() if layeredProfile == nil { log.Tracer(ctx).Warningf("filter: tried creating prompt for connection without profile") - return + return nil } localProfile := layeredProfile.LocalProfile() if localProfile == nil { log.Tracer(ctx).Warningf("filter: tried creating prompt for connection without local profile") - return + return nil } // first check if there is an existing notification for this. diff --git a/netquery/database.go b/netquery/database.go index 27207680..f1abc633 100644 --- a/netquery/database.go +++ b/netquery/database.go @@ -112,7 +112,7 @@ type ( } ) -// New opens a new in-memory database named path. +// New opens a new in-memory database named path and attaches a persistent history database. // // The returned Database used connection pooling for read-only connections // (see Execute). To perform database writes use either Save() or ExecuteWrite(). @@ -131,7 +131,6 @@ func New(dbPath string) (*Database, error) { dbPath, sqlite.OpenReadOnly, sqlite.OpenSharedCache, - //sqlite.OpenMemory, sqlite.OpenURI, ) if err != nil { @@ -171,7 +170,6 @@ func New(dbPath string) (*Database, error) { sqlite.OpenReadWrite, sqlite.OpenWAL, sqlite.OpenSharedCache, - //sqlite.OpenMemory, sqlite.OpenURI, ) if err != nil { @@ -337,11 +335,14 @@ func (db *Database) Cleanup(ctx context.Context, threshold time.Time) (int, erro return result[0].Count, nil } +// RemoveAllHistoryData removes all connections from the history database. func (db *Database) RemoveAllHistoryData(ctx context.Context) error { query := fmt.Sprintf("DELETE FROM %s.connections", HistoryDatabase) return db.ExecuteWrite(ctx, query) } +// RemoveHistoryForProfile removes all connections from the history database +// for a given profile ID (source/id). func (db *Database) RemoveHistoryForProfile(ctx context.Context, profileID string) error { query := fmt.Sprintf("DELETE FROM %s.connections WHERE profile = :profile", HistoryDatabase) return db.ExecuteWrite(ctx, query, orm.WithNamedArgs(map[string]any{ @@ -389,13 +390,15 @@ func (db *Database) MarkAllHistoryConnectionsEnded(ctx context.Context) error { return nil } -func (db *Database) UpdateBandwidth(ctx context.Context, enableHistory bool, processKey string, connID string, incoming *uint64, outgoing *uint64) error { +// UpdateBandwidth updates bandwidth data for the connection and optionally also writes +// the bandwidth data to the history database. +func (db *Database) UpdateBandwidth(ctx context.Context, enableHistory bool, processKey string, connID string, bytesReceived uint64, bytesSent uint64) error { data := connID + "-" + processKey hash := sha256.Sum256([]byte(data)) - dbConnId := hex.EncodeToString(hash[:]) + dbConnID := hex.EncodeToString(hash[:]) params := map[string]any{ - ":id": dbConnId, + ":id": dbConnID, } parts := []string{} @@ -439,7 +442,7 @@ func (db *Database) UpdateBandwidth(ctx context.Context, enableHistory bool, pro func (db *Database) Save(ctx context.Context, conn Conn, enableHistory bool) error { // convert the connection to a param map where each key is already translated // to the sql column name. We also skip bytes_received and bytes_sent since those - // will be updated independenly from the connection object. + // will be updated independently from the connection object. connMap, err := orm.ToParamMap(ctx, conn, "", orm.DefaultEncodeConfig, []string{ "bytes_received", "bytes_sent", diff --git a/netquery/manager.go b/netquery/manager.go index b6be97be..c49aa5c2 100644 --- a/netquery/manager.go +++ b/netquery/manager.go @@ -31,16 +31,16 @@ type ( // database as ended NOW. MarkAllHistoryConnectionsEnded(context.Context) error - // RemoveHistoryForProfile removes all connections from the history database - // for a given profile ID (source/id) - RemoveHistoryForProfile(context.Context, string) error - // RemoveAllHistoryData removes all connections from the history database. RemoveAllHistoryData(context.Context) error - // UpdateBandwidth updates bandwith data for the connection and optionally also writes + // RemoveHistoryForProfile removes all connections from the history database. + // for a given profile ID (source/id) + RemoveHistoryForProfile(context.Context, string) error + + // UpdateBandwidth updates bandwidth data for the connection and optionally also writes // the bandwidth data to the history database. - UpdateBandwidth(ctx context.Context, enableHistory bool, processKey string, connID string, incoming *uint64, outgoing *uint64) error + UpdateBandwidth(ctx context.Context, enableHistory bool, processKey string, connID string, bytesReceived uint64, bytesSent uint64) error } // Manager handles new and updated network.Connections feeds and persists them diff --git a/netquery/module_api.go b/netquery/module_api.go index f102ec65..344f9391 100644 --- a/netquery/module_api.go +++ b/netquery/module_api.go @@ -8,6 +8,7 @@ import ( "time" "github.com/hashicorp/go-multierror" + "github.com/safing/portbase/api" "github.com/safing/portbase/config" "github.com/safing/portbase/database" @@ -19,6 +20,7 @@ import ( "github.com/safing/portmaster/network" ) +// DefaultModule is the default netquery module. var DefaultModule *module type module struct { @@ -120,8 +122,6 @@ func (m *module) prepare() error { ProfileIDs []string `json:"profileIDs"` } - defer r.Body.Close() - dec := json.NewDecoder(r.Body) dec.DisallowUnknownFields() @@ -240,7 +240,7 @@ func (m *module) stop() error { if err := m.mng.store.MarkAllHistoryConnectionsEnded(ctx); err != nil { // handle the error by just logging it. There's not much we can do here // and returning an error to the module system doesn't help much as well... - log.Errorf("failed to mark connections in history database as eded: %w", err) + log.Errorf("netquery: failed to mark connections in history database as ended: %s", err) } return nil diff --git a/netquery/orm/query_runner.go b/netquery/orm/query_runner.go index f59cca79..135a29f6 100644 --- a/netquery/orm/query_runner.go +++ b/netquery/orm/query_runner.go @@ -148,7 +148,7 @@ func RunQuery(ctx context.Context, conn *sqlite.Conn, sql string, modifiers ...Q for colIdx := 0; colIdx < stmt.ColumnCount(); colIdx++ { name := stmt.ColumnName(colIdx) - switch stmt.ColumnType(colIdx) { + switch stmt.ColumnType(colIdx) { //nolint:exhaustive // TODO: handle type BLOB? case sqlite.TypeText: resultDump[name] = stmt.ColumnText(colIdx) case sqlite.TypeFloat: diff --git a/netquery/orm/schema_builder.go b/netquery/orm/schema_builder.go index 080c5003..6aba2a1f 100644 --- a/netquery/orm/schema_builder.go +++ b/netquery/orm/schema_builder.go @@ -7,8 +7,9 @@ import ( "strconv" "strings" - "github.com/safing/portbase/log" "zombiezen.com/go/sqlite" + + "github.com/safing/portbase/log" ) var errSkipStructField = errors.New("struct field should be skipped") @@ -110,13 +111,13 @@ func (def ColumnDef) AsSQL() string { } if def.Default != nil { sql += " DEFAULT " - switch def.Type { + switch def.Type { //nolint:exhaustive // TODO: handle types BLOB, NULL? case sqlite.TypeFloat: - sql += strconv.FormatFloat(def.Default.(float64), 'b', 0, 64) + sql += strconv.FormatFloat(def.Default.(float64), 'b', 0, 64) //nolint:forcetypeassert case sqlite.TypeInteger: - sql += strconv.FormatInt(def.Default.(int64), 10) + sql += strconv.FormatInt(def.Default.(int64), 10) //nolint:forcetypeassert case sqlite.TypeText: - sql += fmt.Sprintf("%q", def.Default.(string)) + sql += fmt.Sprintf("%q", def.Default.(string)) //nolint:forcetypeassert default: log.Errorf("unsupported default value: %q %q", def.Type, def.Default) sql = strings.TrimSuffix(sql, " DEFAULT ") @@ -257,7 +258,7 @@ func applyStructFieldTag(fieldType reflect.StructField, def *ColumnDef) error { if strings.HasPrefix(k, TagTypePrefixDefault) { defaultValue := strings.TrimPrefix(k, TagTypePrefixDefault) - switch def.Type { + switch def.Type { //nolint:exhaustive case sqlite.TypeFloat: fv, err := strconv.ParseFloat(defaultValue, 64) if err != nil { diff --git a/netquery/query.go b/netquery/query.go index 264f0bd7..06b766f6 100644 --- a/netquery/query.go +++ b/netquery/query.go @@ -14,8 +14,10 @@ import ( "github.com/safing/portmaster/netquery/orm" ) +// DatabaseName is a database name constant. type DatabaseName string +// Databases. const ( LiveDatabase = DatabaseName("main") HistoryDatabase = DatabaseName("history") diff --git a/netquery/query_handler.go b/netquery/query_handler.go index 3c6bb453..e555965d 100644 --- a/netquery/query_handler.go +++ b/netquery/query_handler.go @@ -12,9 +12,10 @@ import ( "strings" "time" + "golang.org/x/exp/slices" + "github.com/safing/portbase/log" "github.com/safing/portmaster/netquery/orm" - "golang.org/x/exp/slices" ) var charOnlyRegexp = regexp.MustCompile("[a-zA-Z]+")