Merge pull request #1276 from safing/feature/netquery-sum

Add support for SUM in netquery
This commit is contained in:
Daniel Hovie
2023-08-09 15:07:41 +02:00
committed by GitHub
13 changed files with 254 additions and 65 deletions

2
go.mod
View File

@@ -20,7 +20,7 @@ require (
github.com/safing/jess v0.3.1
github.com/safing/portbase v0.17.1
github.com/safing/portmaster-android/go v0.0.0-20230605085256-6abf4c495626
github.com/safing/spn v0.6.13
github.com/safing/spn v0.6.14
github.com/shirou/gopsutil v3.21.11+incompatible
github.com/spf13/cobra v1.7.0
github.com/spkg/zipfs v0.7.1

2
go.sum
View File

@@ -220,6 +220,8 @@ github.com/safing/spn v0.6.12 h1:LdQODfwzsNBipaMV3GH1REEzjJp48i38mYuHv+GyGAk=
github.com/safing/spn v0.6.12/go.mod h1:Mh9bmkqFhO/dHNi9RWXzoXjQij893I4Lj8Wn4tQ0KZA=
github.com/safing/spn v0.6.13 h1:aqFWQTPSs1RHLxpoyAt+uVG4v4Tgf96OpmLXGvQxo/I=
github.com/safing/spn v0.6.13/go.mod h1:Mh9bmkqFhO/dHNi9RWXzoXjQij893I4Lj8Wn4tQ0KZA=
github.com/safing/spn v0.6.14 h1:2cXiAxL3zPQkpZ2mHQGuJfE5GmlPgBA7xKoM6gFgcQs=
github.com/safing/spn v0.6.14/go.mod h1:Mh9bmkqFhO/dHNi9RWXzoXjQij893I4Lj8Wn4tQ0KZA=
github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww=
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/seehuhn/fortuna v1.0.1 h1:lu9+CHsmR0bZnx5Ay646XvCSRJ8PJTi5UYJwDBX68H0=

View File

@@ -16,12 +16,14 @@ import (
"zombiezen.com/go/sqlite"
"zombiezen.com/go/sqlite/sqlitex"
"github.com/safing/portbase/config"
"github.com/safing/portbase/dataroot"
"github.com/safing/portbase/log"
"github.com/safing/portmaster/netquery/orm"
"github.com/safing/portmaster/network"
"github.com/safing/portmaster/network/netutils"
"github.com/safing/portmaster/network/packet"
"github.com/safing/portmaster/profile"
)
// InMemory is the "file path" to open a new in-memory database.
@@ -202,6 +204,49 @@ func NewInMemory() (*Database, error) {
return db, nil
}
// Close closes the database, including pools and connections.
func (db *Database) Close() error {
db.readConnPool.Close()
if err := db.writeConn.Close(); err != nil {
return err
}
return nil
}
// VacuumHistory rewrites the history database in order to purge deleted records.
func VacuumHistory(ctx context.Context) (err error) {
historyParentDir := dataroot.Root().ChildDir("databases", 0o700)
if err := historyParentDir.Ensure(); err != nil {
return fmt.Errorf("failed to ensure database directory exists: %w", err)
}
// Get file location of history database.
historyFile := filepath.Join(historyParentDir.Path, "history.db")
// Convert to SQLite URI path.
historyURI := "file:///" + strings.TrimPrefix(filepath.ToSlash(historyFile), "/")
writeConn, err := sqlite.OpenConn(
historyURI,
sqlite.OpenCreate,
sqlite.OpenReadWrite,
sqlite.OpenWAL,
sqlite.OpenSharedCache,
sqlite.OpenURI,
)
if err != nil {
return err
}
defer func() {
if closeErr := writeConn.Close(); closeErr != nil && err == nil {
err = closeErr
}
}()
return orm.RunQuery(ctx, writeConn, "VACUUM")
}
// ApplyMigrations applies any table and data migrations that are needed
// to bring db up-to-date with the built-in schema.
// TODO(ppacher): right now this only applies the current schema and ignores
@@ -377,6 +422,78 @@ func (db *Database) dumpTo(ctx context.Context, w io.Writer) error { //nolint:un
return enc.Encode(conns)
}
// PurgeOldHistory deletes history data outside of the (per-app) retention time frame.
func (db *Database) PurgeOldHistory(ctx context.Context) error {
// Setup tracer for the clean up process.
ctx, tracer := log.AddTracer(ctx)
defer tracer.Submit()
defer tracer.Info("history: deleted connections outside of retention from %d profiles")
// Get list of profiles in history.
query := "SELECT DISTINCT profile FROM history.connections"
var result []struct {
Profile string `sqlite:"profile"`
}
if err := db.Execute(ctx, query, orm.WithResult(&result)); err != nil {
return fmt.Errorf("failed to get a list of profiles from the history database: %w", err)
}
var (
// Get global retention days - do not delete in case of error.
globalRetentionDays = config.GetAsInt(profile.CfgOptionKeepHistoryKey, 0)()
profileName string
retentionDays int64
profileCnt int
merr = new(multierror.Error)
)
for _, row := range result {
// Get profile and retention days.
id := strings.TrimPrefix(row.Profile, string(profile.SourceLocal)+"/")
p, err := profile.GetLocalProfile(id, nil, nil)
if err == nil {
profileName = p.String()
retentionDays = p.LayeredProfile().KeepHistory()
} else {
// Getting profile failed, fallback to global setting.
tracer.Errorf("history: failed to load profile for id %s: %s", id, err)
profileName = row.Profile
retentionDays = globalRetentionDays
}
// Skip deleting if history should be kept forever.
if retentionDays == 0 {
tracer.Tracef("history: retention is disabled for %s, skipping", profileName)
continue
}
// Count profiles where connections were deleted.
profileCnt++
// TODO: count cleared connections
threshold := time.Now().Add(-1 * time.Duration(retentionDays) * time.Hour * 24)
if err := db.ExecuteWrite(ctx,
"DELETE FROM history.connections WHERE profile = :profile AND active = FALSE AND datetime(started) < datetime(:threshold)",
orm.WithNamedArgs(map[string]any{
":profile": row.Profile,
":threshold": threshold.Format(orm.SqliteTimeFormat),
}),
); err != nil {
tracer.Warningf("history: failed to delete connections of %s: %s", profileName, err)
merr.Errors = append(merr.Errors, fmt.Errorf("profile %s: %w", row.Profile, err))
} else {
tracer.Debugf(
"history: deleted connections older than %d days (before %s) of %s",
retentionDays,
threshold,
profileName,
)
}
}
return merr.ErrorOrNil()
}
// MarkAllHistoryConnectionsEnded marks all connections in the history database as ended.
func (db *Database) MarkAllHistoryConnectionsEnded(ctx context.Context) error {
query := fmt.Sprintf("UPDATE %s.connections SET active = FALSE, ended = :ended WHERE active = TRUE", HistoryDatabase)
@@ -512,9 +629,3 @@ func (db *Database) Save(ctx context.Context, conn Conn, enableHistory bool) err
return nil
}
// Close closes the underlying database connection. db should and cannot be
// used after Close() has returned.
func (db *Database) Close() error {
return db.writeConn.Close()
}

View File

@@ -39,6 +39,12 @@ type (
// UpdateBandwidth updates bandwidth data for the connection and optionally also writes
// the bandwidth data to the history database.
UpdateBandwidth(ctx context.Context, enableHistory bool, processKey string, connID string, bytesReceived uint64, bytesSent uint64) error
// PurgeOldHistory deletes data outside of the retention time frame from the history database.
PurgeOldHistory(ctx context.Context) error
// Close closes the connection store. It must not be used afterwards.
Close() error
}
// Manager handles new and updated network.Connections feeds and persists them

View File

@@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"net/http"
"time"
"github.com/hashicorp/go-multierror"
@@ -87,58 +86,58 @@ func (m *module) prepare() error {
}
if err := api.RegisterEndpoint(api.Endpoint{
Name: "Query Connections",
Description: "Query the in-memory sqlite connection database.",
Path: "netquery/query",
MimeType: "application/json",
Read: api.PermitUser, // Needs read+write as the query is sent using POST data.
Write: api.PermitUser, // Needs read+write as the query is sent using POST data.
BelongsTo: m.Module,
HandlerFunc: queryHander.ServeHTTP,
Name: "Query Connections",
Description: "Query the in-memory sqlite connection database.",
}); err != nil {
return fmt.Errorf("failed to register API endpoint: %w", err)
}
if err := api.RegisterEndpoint(api.Endpoint{
Name: "Active Connections Chart",
Description: "Query the in-memory sqlite connection database and return a chart of active connections.",
Path: "netquery/charts/connection-active",
MimeType: "application/json",
Write: api.PermitUser,
BelongsTo: m.Module,
HandlerFunc: chartHandler.ServeHTTP,
Name: "Active Connections Chart",
Description: "Query the in-memory sqlite connection database and return a chart of active connections.",
}); err != nil {
return fmt.Errorf("failed to register API endpoint: %w", err)
}
if err := api.RegisterEndpoint(api.Endpoint{
Path: "netquery/history/clear",
MimeType: "application/json",
Write: api.PermitUser,
BelongsTo: m.Module,
HandlerFunc: func(w http.ResponseWriter, r *http.Request) {
Name: "Remove connections from profile history",
Description: "Remove all connections from the history database for one or more profiles",
Path: "netquery/history/clear",
MimeType: "application/json",
Write: api.PermitUser,
BelongsTo: m.Module,
ActionFunc: func(ar *api.Request) (msg string, err error) {
// TODO: Use query parameters instead.
var body struct {
ProfileIDs []string `json:"profileIDs"`
}
dec := json.NewDecoder(r.Body)
dec := json.NewDecoder(ar.Body)
dec.DisallowUnknownFields()
if err := dec.Decode(&body); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
return "", err
}
if len(body.ProfileIDs) == 0 {
if err := m.mng.store.RemoveAllHistoryData(r.Context()); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
if err := m.mng.store.RemoveAllHistoryData(ar.Context()); err != nil {
return "", err
}
} else {
merr := new(multierror.Error)
for _, profileID := range body.ProfileIDs {
if err := m.mng.store.RemoveHistoryForProfile(r.Context(), profileID); err != nil {
if err := m.mng.store.RemoveHistoryForProfile(ar.Context(), profileID); err != nil {
merr.Errors = append(merr.Errors, fmt.Errorf("failed to clear history for %q: %w", profileID, err))
} else {
log.Infof("netquery: successfully cleared history for %s", profileID)
@@ -146,16 +145,27 @@ func (m *module) prepare() error {
}
if err := merr.ErrorOrNil(); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
return "", err
}
}
w.WriteHeader(http.StatusNoContent)
return "Successfully cleared history.", nil
},
}); err != nil {
return fmt.Errorf("failed to register API endpoint: %w", err)
}
if err := api.RegisterEndpoint(api.Endpoint{
Name: "Apply connection history retention threshold",
Path: "netquery/history/cleanup",
Write: api.PermitUser,
BelongsTo: m.Module,
ActionFunc: func(ar *api.Request) (msg string, err error) {
if err := m.Store.PurgeOldHistory(ar.Context()); err != nil {
return "", err
}
return "Deleted outdated connections.", nil
},
Name: "Remove connections from profile history",
Description: "Remove all connections from the history database for one or more profiles",
}); err != nil {
return fmt.Errorf("failed to register API endpoint: %w", err)
}
@@ -164,7 +174,7 @@ func (m *module) prepare() error {
}
func (m *module) start() error {
m.StartServiceWorker("netquery-feeder", time.Second, func(ctx context.Context) error {
m.StartServiceWorker("netquery connection feed listener", 0, func(ctx context.Context) error {
sub, err := m.db.Subscribe(query.New("network:"))
if err != nil {
return fmt.Errorf("failed to subscribe to network tree: %w", err)
@@ -195,12 +205,12 @@ func (m *module) start() error {
}
})
m.StartServiceWorker("netquery-persister", time.Second, func(ctx context.Context) error {
m.StartServiceWorker("netquery connection feed handler", 0, func(ctx context.Context) error {
m.mng.HandleFeed(ctx, m.feed)
return nil
})
m.StartServiceWorker("netquery-row-cleaner", time.Second, func(ctx context.Context) error {
m.StartServiceWorker("netquery live db cleaner", 0, func(ctx context.Context) error {
for {
select {
case <-ctx.Done():
@@ -209,14 +219,18 @@ func (m *module) start() error {
threshold := time.Now().Add(-network.DeleteConnsAfterEndedThreshold)
count, err := m.Store.Cleanup(ctx, threshold)
if err != nil {
log.Errorf("netquery: failed to count number of rows in memory: %s", err)
log.Errorf("netquery: failed to removed old connections from live db: %s", err)
} else {
log.Tracef("netquery: successfully removed %d old rows that ended before %s", count, threshold)
log.Tracef("netquery: successfully removed %d old connections from live db that ended before %s", count, threshold)
}
}
}
})
m.NewTask("network history cleaner", func(ctx context.Context, _ *modules.Task) error {
return m.Store.PurgeOldHistory(ctx)
}).Repeat(time.Hour).Schedule(time.Now().Add(10 * time.Minute))
// For debugging, provide a simple direct SQL query interface using
// the runtime database.
// Only expose in development mode.
@@ -242,5 +256,14 @@ func (m *module) stop() error {
log.Errorf("netquery: failed to mark connections in history database as ended: %s", err)
}
if err := m.mng.store.Close(); err != nil {
log.Errorf("netquery: failed to close sqlite database: %s", err)
} else {
// Clear deleted connections from database.
if err := VacuumHistory(ctx); err != nil {
log.Errorf("netquery: failed to execute VACUUM in history database: %s", err)
}
}
return nil
}

View File

@@ -53,6 +53,7 @@ type (
Sum struct {
Condition Query `json:"condition"`
Field string `json:"field"`
As string `json:"as"`
Distinct bool `json:"distinct"`
}

View File

@@ -218,8 +218,11 @@ func (req *QueryRequestPayload) prepareSelectedFields(ctx context.Context, schem
case s.Distinct != nil:
field = *s.Distinct
case s.Sum != nil:
// field is not used in case of $sum
field = "*"
if s.Sum.Field != "" {
field = s.Sum.Field
} else {
field = "*"
}
case s.Min != nil:
if s.Min.Field != "" {
field = s.Min.Field
@@ -261,9 +264,19 @@ func (req *QueryRequestPayload) prepareSelectedFields(ctx context.Context, schem
return fmt.Errorf("missing 'as' for $sum")
}
clause, params, err := s.Sum.Condition.toSQLWhereClause(ctx, fmt.Sprintf("sel%d", idx), schema, orm.DefaultEncodeConfig)
if err != nil {
return fmt.Errorf("in $sum: %w", err)
var (
clause string
params map[string]any
)
if s.Sum.Field != "" {
clause = s.Sum.Field
} else {
var err error
clause, params, err = s.Sum.Condition.toSQLWhereClause(ctx, fmt.Sprintf("sel%d", idx), schema, orm.DefaultEncodeConfig)
if err != nil {
return fmt.Errorf("in $sum: %w", err)
}
}
req.mergeParams(params)

View File

@@ -594,7 +594,7 @@ func (conn *Connection) UpdateFeatures() error {
if user.MayUse(account.FeatureHistory) {
lProfile := conn.Process().Profile()
if lProfile != nil {
conn.HistoryEnabled = lProfile.HistoryEnabled()
conn.HistoryEnabled = lProfile.EnableHistory()
}
}

View File

@@ -112,6 +112,10 @@ var (
cfgOptionEnableHistory config.BoolOption
cfgOptionEnableHistoryOrder = 96
CfgOptionKeepHistoryKey = "history/keep"
cfgOptionKeepHistory config.IntOption
cfgOptionKeepHistoryOrder = 97
// Setting "Enable SPN" at order 128.
CfgOptionUseSPNKey = "spn/use"
@@ -248,7 +252,7 @@ func registerConfiguration() error { //nolint:maintidx
// Enable History
err = config.Register(&config.Option{
Name: "Enable Connection History",
Name: "Enable Network History",
Key: CfgOptionEnableHistoryKey,
Description: "Save connections in a database (on disk) in order to view and search them later. Changes might take a couple minutes to apply to all connections.",
OptType: config.OptTypeBool,
@@ -257,7 +261,7 @@ func registerConfiguration() error { //nolint:maintidx
DefaultValue: false,
Annotations: config.Annotations{
config.DisplayOrderAnnotation: cfgOptionEnableHistoryOrder,
config.CategoryAnnotation: "History",
config.CategoryAnnotation: "General",
config.RequiresFeatureID: account.FeatureHistory,
},
})
@@ -267,6 +271,31 @@ func registerConfiguration() error { //nolint:maintidx
cfgOptionEnableHistory = config.Concurrent.GetAsBool(CfgOptionEnableHistoryKey, false)
cfgBoolOptions[CfgOptionEnableHistoryKey] = cfgOptionEnableHistory
err = config.Register(&config.Option{
Name: "Keep Network History",
Key: CfgOptionKeepHistoryKey,
Description: `Specify how many days the network history data should be kept. Please keep in mind that more available history data makes reports (coming soon) a lot more useful.
Older data is deleted in intervals and cleared from the database continually. If in a hurry, shutdown or restart Portmaster to clear deleted entries immediately.
Set to 0 days to keep network history forever. Depending on your device, this might affect performance.`,
OptType: config.OptTypeInt,
ReleaseLevel: config.ReleaseLevelStable,
ExpertiseLevel: config.ExpertiseLevelUser,
DefaultValue: 30,
Annotations: config.Annotations{
config.UnitAnnotation: "Days",
config.DisplayOrderAnnotation: cfgOptionKeepHistoryOrder,
config.CategoryAnnotation: "General",
config.RequiresFeatureID: account.FeatureHistory,
},
})
if err != nil {
return err
}
cfgOptionKeepHistory = config.Concurrent.GetAsInt(CfgOptionKeepHistoryKey, 30)
cfgIntOptions[CfgOptionKeepHistoryKey] = cfgOptionKeepHistory
rulesHelp := strings.ReplaceAll(`Rules are checked from top to bottom, stopping after the first match. They can match:
- By address: "192.168.0.1"

View File

@@ -127,10 +127,12 @@ func GetLocalProfile(id string, md MatchingData, createProfileCallback func() *P
// Update metadata.
var changed bool
if special {
changed = updateSpecialProfileMetadata(profile, md.Path())
} else {
changed = profile.updateMetadata(md.Path())
if md != nil {
if special {
changed = updateSpecialProfileMetadata(profile, md.Path())
} else {
changed = profile.updateMetadata(md.Path())
}
}
// Save if created or changed.

View File

@@ -49,7 +49,8 @@ type LayeredProfile struct {
DomainHeuristics config.BoolOption `json:"-"`
UseSPN config.BoolOption `json:"-"`
SPNRoutingAlgorithm config.StringOption `json:"-"`
HistoryEnabled config.BoolOption `json:"-"`
EnableHistory config.BoolOption `json:"-"`
KeepHistory config.IntOption `json:"-"`
}
// NewLayeredProfile returns a new layered profile based on the given local profile.
@@ -121,10 +122,14 @@ func NewLayeredProfile(localProfile *Profile) *LayeredProfile {
CfgOptionRoutingAlgorithmKey,
cfgOptionRoutingAlgorithm,
)
lp.HistoryEnabled = lp.wrapBoolOption(
lp.EnableHistory = lp.wrapBoolOption(
CfgOptionEnableHistoryKey,
cfgOptionEnableHistory,
)
lp.KeepHistory = lp.wrapIntOption(
CfgOptionKeepHistoryKey,
cfgOptionKeepHistory,
)
lp.LayerIDs = append(lp.LayerIDs, localProfile.ScopedID())
lp.layers = append(lp.layers, localProfile)

View File

@@ -136,7 +136,6 @@ type Profile struct { //nolint:maligned // not worth the effort
filterListIDs []string
spnUsagePolicy endpoints.Endpoints
spnExitHubPolicy endpoints.Endpoints
enableHistory bool
// Lifecycle Management
outdated *abool.AtomicBool
@@ -234,11 +233,6 @@ func (profile *Profile) parseConfig() error {
}
}
enableHistory, ok := profile.configPerspective.GetAsBool(CfgOptionEnableHistoryKey)
if ok {
profile.enableHistory = enableHistory
}
return lastErr
}
@@ -321,11 +315,6 @@ func (profile *Profile) IsOutdated() bool {
return profile.outdated.IsSet()
}
// HistoryEnabled returns true if connection history is enabled for the profile.
func (profile *Profile) HistoryEnabled() bool {
return profile.enableHistory
}
// GetEndpoints returns the endpoint list of the profile. This functions
// requires the profile to be read locked.
func (profile *Profile) GetEndpoints() endpoints.Endpoints {

View File

@@ -125,7 +125,7 @@ The format is: "protocol://ip:port?parameter=value&parameter=value"
config.CategoryAnnotation: "Servers",
config.QuickSettingsAnnotation: []config.QuickSetting{
{
Name: "Cloudflare (with Malware Filter)",
Name: "Set Cloudflare (with Malware Filter)",
Action: config.QuickReplace,
Value: []string{
"dot://cloudflare-dns.com?ip=1.1.1.2&name=Cloudflare&blockedif=zeroip",
@@ -133,7 +133,7 @@ The format is: "protocol://ip:port?parameter=value&parameter=value"
},
},
{
Name: "Quad9",
Name: "Set Quad9",
Action: config.QuickReplace,
Value: []string{
"dot://dns.quad9.net?ip=9.9.9.9&name=Quad9&blockedif=empty",
@@ -141,7 +141,7 @@ The format is: "protocol://ip:port?parameter=value&parameter=value"
},
},
{
Name: "AdGuard",
Name: "Set AdGuard",
Action: config.QuickReplace,
Value: []string{
"dot://dns.adguard.com?ip=94.140.14.14&name=AdGuard&blockedif=zeroip",
@@ -149,12 +149,20 @@ The format is: "protocol://ip:port?parameter=value&parameter=value"
},
},
{
Name: "Foundation for Applied Privacy",
Name: "Set Foundation for Applied Privacy",
Action: config.QuickReplace,
Value: []string{
"dot://dot1.applied-privacy.net?ip=146.255.56.98&name=AppliedPrivacy",
},
},
{
Name: "Add Cloudflare (as fallback)",
Action: config.QuickMergeBottom,
Value: []string{
"dot://cloudflare-dns.com?ip=1.1.1.1&name=Cloudflare&blockedif=zeroip",
"dot://cloudflare-dns.com?ip=1.0.0.1&name=Cloudflare&blockedif=zeroip",
},
},
},
"self:detail:internalSpecialUseDomains": internalSpecialUseDomains,
"self:detail:connectivityDomains": netenv.ConnectivityDomains,