Fix and improve packet handling procedures

This commit is contained in:
Daniel
2023-07-28 23:12:50 +02:00
parent 0ca6a71ee7
commit 4fcfb424c9
3 changed files with 123 additions and 70 deletions

View File

@@ -428,42 +428,6 @@ func NewIncompleteConnection(pkt packet.Packet) *Connection {
// GatherConnectionInfo gathers information on the process and remote entity.
func (conn *Connection) GatherConnectionInfo(pkt packet.Packet) (err error) {
// Get PID if not yet available.
if conn.PID == process.UndefinedProcessID {
// Get process by looking at the system state tables.
// Apply direction as reported from the state tables.
conn.PID, conn.Inbound, _ = process.GetPidOfConnection(pkt.Ctx(), pkt.Info())
// Errors are informational and are logged to the context.
}
// Get Process and Profile.
if conn.process == nil {
// We got connection from the system.
conn.process, err = process.GetProcessWithProfile(pkt.Ctx(), conn.PID)
if err == nil {
// Add process/profile metadata for connection.
conn.ProcessContext = getProcessContext(pkt.Ctx(), conn.process)
conn.ProfileRevisionCounter = conn.process.Profile().RevisionCnt()
// Inherit internal status of profile.
if localProfile := conn.process.Profile().LocalProfile(); localProfile != nil {
conn.Internal = localProfile.Internal
if err := conn.updateFeatures(); err != nil && !errors.Is(err, access.ErrNotLoggedIn) {
log.Tracer(pkt.Ctx()).Warningf("network: failed to check for enabled features: %s", err)
}
}
} else {
conn.process = nil
if pkt.InfoOnly() {
log.Tracer(pkt.Ctx()).Debugf("network: failed to get process and profile of PID %d: %s", conn.PID, err)
} else {
log.Tracer(pkt.Ctx()).Warningf("network: failed to get process and profile of PID %d: %s", conn.PID, err)
}
}
}
// Create remote entity.
if conn.Entity == nil {
// Remote
@@ -509,6 +473,44 @@ func (conn *Connection) GatherConnectionInfo(pkt packet.Packet) (err error) {
}
}
// Get PID if not yet available.
if conn.PID == process.UndefinedProcessID {
// Get process by looking at the system state tables.
// Apply direction as reported from the state tables.
conn.PID, conn.Inbound, _ = process.GetPidOfConnection(pkt.Ctx(), pkt.Info())
// Errors are informational and are logged to the context.
}
// Get Process and Profile.
if conn.process == nil {
conn.process, err = process.GetProcessWithProfile(pkt.Ctx(), conn.PID)
// Errors are informational and are logged to the context.
if err != nil {
if pkt.InfoOnly() {
conn.process = nil // Try again with real packet.
log.Tracer(pkt.Ctx()).Debugf("network: failed to get process and profile of PID %d: %s", conn.PID, err)
} else {
log.Tracer(pkt.Ctx()).Warningf("network: failed to get process and profile of PID %d: %s", conn.PID, err)
}
}
}
// Apply process/profile info to connection.
if conn.ProfileRevisionCounter == 0 && conn.process != nil {
// Add process/profile metadata for connection.
conn.ProcessContext = getProcessContext(pkt.Ctx(), conn.process)
conn.ProfileRevisionCounter = conn.process.Profile().RevisionCnt()
// Inherit internal status of profile.
if localProfile := conn.process.Profile().LocalProfile(); localProfile != nil {
conn.Internal = localProfile.Internal
if err := conn.updateFeatures(); err != nil && !errors.Is(err, access.ErrNotLoggedIn) {
log.Tracer(pkt.Ctx()).Warningf("network: connection %s failed to check for enabled features: %s", conn, err)
}
}
}
// Find domain and DNS context of entity.
if conn.Entity.Domain == "" && conn.process.Profile() != nil {
// check if we can find a domain for that IP
@@ -539,13 +541,21 @@ func (conn *Connection) GatherConnectionInfo(pkt packet.Packet) (err error) {
}
}
// Data collection is only complete with a packet.
if pkt.InfoOnly() {
return nil
// Check if we have all required data for a complete packet.
switch {
case pkt.InfoOnly():
// We need a full packet.
case conn.process == nil:
// We need a process.
case conn.process.Profile() == nil:
// We need a profile.
case conn.Entity == nil:
// We need an entity.
default:
// Data is complete!
conn.dataComplete.Set()
}
// If we have all data and have seen an actual packet, the connection data is complete.
conn.dataComplete.Set()
return nil
}
@@ -833,6 +843,7 @@ func (conn *Connection) HandlePacket(pkt packet.Packet) {
_ = pkt.Drop()
}
} else {
// Run default handler.
defaultFirewallHandler(conn, pkt)
// Record metrics.
@@ -840,6 +851,8 @@ func (conn *Connection) HandlePacket(pkt packet.Packet) {
}
}
var infoOnlyPacketsActive = abool.New()
// packetHandlerWorker sequentially handles queued packets.
func (conn *Connection) packetHandlerWorker(ctx context.Context) error {
// Copy packet queue, so we can remove the reference from the connection
@@ -862,24 +875,42 @@ func (conn *Connection) packetHandlerWorker(ctx context.Context) error {
}
pktSeq++
// Check if we should expect an(other) info only packet.
// Only wait if this is the first packet and is not an info packet itself.
if pktSeq == 1 && pkt.ExpectInfo() && !pkt.InfoOnly() {
// Debug: FIXME
// Attempt to optimize packet handling order by handling info-only packets first.
switch {
case pktSeq > 1:
// Order correction is only for first packet.
case pkt.InfoOnly():
// Correct order only if first packet is not info-only.
// We have observed a first packet that is info-only.
// Info-only packets seem to be active and working.
infoOnlyPacketsActive.Set()
case pkt.ExpectInfo():
// Packet itself tells us that we should expect an info-only packet.
fallthrough
case infoOnlyPacketsActive.IsSet() && pkt.IsOutbound():
// Info-only packets are active and the packet is outbound.
// The probability is high that we will also get an info-only packet for this connection.
// TODO: Do not do this for forwarded packets in the future.
// DEBUG:
// log.Debugf("filter: waiting for info only packet in order to pull forward: %s", pkt)
select {
case infoPkt := <-pktQueue:
if infoPkt != nil {
// Debug: FIXME
// log.Debugf("filter: packet #%d [pulled forward] info=%v PID=%d packet: %s", pktSeq, pkt.InfoOnly(), pkt.Info().PID, pkt)
packetHandlerHandleConn(ctx, conn, pkt)
// DEBUG:
// log.Debugf("filter: packet #%d [pulled forward] info=%v PID=%d packet: %s", pktSeq, infoPkt.InfoOnly(), infoPkt.Info().PID, pkt)
packetHandlerHandleConn(ctx, conn, infoPkt)
pktSeq++
}
case <-time.After(5 * time.Millisecond):
case <-time.After(1 * time.Millisecond):
}
}
// Debug: FIXME
// DEBUG:
// switch {
// case pkt.Info().Inbound:
// log.Debugf("filter: packet #%d info=%v PID=%d packet: %s", pktSeq, pkt.InfoOnly(), pkt.Info().PID, pkt)
@@ -903,6 +934,20 @@ func packetHandlerHandleConn(ctx context.Context, conn *Connection, pkt packet.P
conn.Lock()
defer conn.Unlock()
// Check if we should use the default handler.
// The default handler is only for fully decided
// connections and just applying the verdict.
// There is no logging for these packets.
if conn.firewallHandler == nil {
// Run default handler.
defaultFirewallHandler(conn, pkt)
// Record metrics.
packetHandlingHistogram.UpdateDuration(pkt.Info().SeenAt)
return
}
// Create tracing context.
// Add context tracer and set context on packet.
traceCtx, tracer := log.AddTracer(ctx)
@@ -912,12 +957,8 @@ func packetHandlerHandleConn(ctx context.Context, conn *Connection, pkt packet.P
}
pkt.SetCtx(traceCtx)
// Handle packet with appropriate handler.
if conn.firewallHandler != nil {
conn.firewallHandler(conn, pkt)
} else {
defaultFirewallHandler(conn, pkt)
}
// Handle packet with set handler.
conn.firewallHandler(conn, pkt)
// Record metrics.
packetHandlingHistogram.UpdateDuration(pkt.Info().SeenAt)