Revamp connection handling flow to fix race condition and support info-only packets

This commit is contained in:
Daniel
2023-06-21 15:31:45 +02:00
parent 83b084959e
commit 8a09ba6045
22 changed files with 527 additions and 349 deletions

View File

@@ -13,6 +13,12 @@ type infoPacket struct {
pmpacket.Base
}
// InfoOnly returns whether the packet is informational only and does not
// represent an actual packet.
func (pkt *infoPacket) InfoOnly() bool {
return true
}
// LoadPacketData does nothing on Linux, as data is always fully parsed.
func (pkt *infoPacket) LoadPacketData() error {
return fmt.Errorf("can't load data in info only packet")

View File

@@ -5,11 +5,13 @@ import (
"encoding/binary"
"errors"
"net"
"time"
"unsafe"
"github.com/cilium/ebpf/link"
"github.com/cilium/ebpf/ringbuf"
"github.com/cilium/ebpf/rlimit"
"github.com/safing/portbase/log"
"github.com/safing/portmaster/network/packet"
)
@@ -17,6 +19,7 @@ import (
//go:generate go run github.com/cilium/ebpf/cmd/bpf2go -cc clang -cflags "-O2 -g -Wall -Werror" -type Event bpf program/monitor.c
var stopper chan struct{}
// StartEBPFWorker starts the ebpf worker.
func StartEBPFWorker(ch chan packet.Packet) {
stopper = make(chan struct{})
go func() {
@@ -32,7 +35,7 @@ func StartEBPFWorker(ch chan packet.Packet) {
log.Errorf("ebpf: failed to load ebpf object: %s", err)
return
}
defer objs.Close()
defer objs.Close() //nolint:errcheck
// Create a link to the tcp_connect program.
linkTCPConnect, err := link.AttachTracing(link.TracingOptions{
@@ -42,7 +45,7 @@ func StartEBPFWorker(ch chan packet.Packet) {
log.Errorf("ebpf: failed to attach to tcp_v4_connect: %s ", err)
return
}
defer linkTCPConnect.Close()
defer linkTCPConnect.Close() //nolint:errcheck
// Create a link to the udp_v4_connect program.
linkUDPV4, err := link.AttachTracing(link.TracingOptions{
@@ -52,7 +55,7 @@ func StartEBPFWorker(ch chan packet.Packet) {
log.Errorf("ebpf: failed to attach to udp_v4_connect: %s ", err)
return
}
defer linkUDPV4.Close()
defer linkUDPV4.Close() //nolint:errcheck
// Create a link to the udp_v6_connect program.
linkUDPV6, err := link.AttachTracing(link.TracingOptions{
@@ -62,14 +65,14 @@ func StartEBPFWorker(ch chan packet.Packet) {
log.Errorf("ebpf: failed to attach to udp_v6_connect: %s ", err)
return
}
defer linkUDPV6.Close()
defer linkUDPV6.Close() //nolint:errcheck
rd, err := ringbuf.NewReader(objs.bpfMaps.Events)
if err != nil {
log.Errorf("ebpf: failed to open ring buffer: %s", err)
return
}
defer rd.Close()
defer rd.Close() //nolint:errcheck
go func() {
<-stopper
@@ -107,7 +110,8 @@ func StartEBPFWorker(ch chan packet.Packet) {
DstPort: event.Dport,
Src: arrayToIP(event.Saddr, packet.IPVersion(event.IpVersion)),
Dst: arrayToIP(event.Daddr, packet.IPVersion(event.IpVersion)),
PID: event.Pid,
PID: int(event.Pid),
SeenAt: time.Now(),
}
if isEventValid(event) {
log.Debugf("ebpf: PID: %d conn: %s:%d -> %s:%d %s %s", info.PID, info.LocalIP(), info.LocalPort(), info.RemoteIP(), info.RemotePort(), info.Version.String(), info.Protocol.String())
@@ -123,6 +127,7 @@ func StartEBPFWorker(ch chan packet.Packet) {
}()
}
// StopEBPFWorker stops the ebpf worker.
func StopEBPFWorker() {
close(stopper)
}
@@ -148,11 +153,12 @@ func isEventValid(event bpfEvent) bool {
return true
}
// arrayToIP converts IP number array to net.IP
// arrayToIP converts IP number array to net.IP.
func arrayToIP(ipNum [4]uint32, ipVersion packet.IPVersion) net.IP {
if ipVersion == packet.IPv4 {
// FIXME: maybe convertIPv4 from windowskext package
return unsafe.Slice((*byte)(unsafe.Pointer(&ipNum)), 4)
} else {
return unsafe.Slice((*byte)(unsafe.Pointer(&ipNum)), 16)
}
// FIXME: maybe use convertIPv6 from windowskext package
return unsafe.Slice((*byte)(unsafe.Pointer(&ipNum)), 16)
}

View File

@@ -16,6 +16,7 @@ import (
"github.com/safing/portbase/log"
pmpacket "github.com/safing/portmaster/network/packet"
"github.com/safing/portmaster/process"
)
// Queue wraps a nfqueue.
@@ -175,10 +176,11 @@ func (q *Queue) packetHandler(ctx context.Context) func(nfqueue.Attribute) int {
pkt := &packet{
pktID: *attrs.PacketID,
queue: q,
received: time.Now(),
verdictSet: make(chan struct{}),
verdictPending: abool.New(),
}
pkt.Info().PID = process.UndefinedProcessID
pkt.Info().SeenAt = time.Now()
if attrs.Payload == nil {
// There is not payload.
@@ -194,11 +196,11 @@ func (q *Queue) packetHandler(ctx context.Context) func(nfqueue.Attribute) int {
select {
case q.packets <- pkt:
log.Tracef("nfqueue: queued packet %s (%s -> %s) after %s", pkt.ID(), pkt.Info().Src, pkt.Info().Dst, time.Since(pkt.received))
log.Tracef("nfqueue: queued packet %s (%s -> %s) after %s", pkt.ID(), pkt.Info().Src, pkt.Info().Dst, time.Since(pkt.Info().SeenAt))
case <-ctx.Done():
return 0
case <-time.After(time.Second):
log.Warningf("nfqueue: failed to queue packet (%s since it was handed over by the kernel)", time.Since(pkt.received))
log.Warningf("nfqueue: failed to queue packet (%s since it was handed over by the kernel)", time.Since(pkt.Info().SeenAt))
}
go func() {
@@ -206,7 +208,7 @@ func (q *Queue) packetHandler(ctx context.Context) func(nfqueue.Attribute) int {
case <-pkt.verdictSet:
case <-time.After(20 * time.Second):
log.Warningf("nfqueue: no verdict set for packet %s (%s -> %s) after %s, dropping", pkt.ID(), pkt.Info().Src, pkt.Info().Dst, time.Since(pkt.received))
log.Warningf("nfqueue: no verdict set for packet %s (%s -> %s) after %s, dropping", pkt.ID(), pkt.Info().Src, pkt.Info().Dst, time.Since(pkt.Info().SeenAt))
if err := pkt.Drop(); err != nil {
log.Warningf("nfqueue: failed to apply default-drop to unveridcted packet %s (%s -> %s)", pkt.ID(), pkt.Info().Src, pkt.Info().Dst)
}

View File

@@ -55,7 +55,6 @@ func markToString(mark int) string {
type packet struct {
pmpacket.Base
pktID uint32
received time.Time
queue *Queue
verdictSet chan struct{}
verdictPending *abool.AtomicBool
@@ -118,7 +117,7 @@ func (pkt *packet) setMark(mark int) error {
}
break
}
log.Tracer(pkt.Ctx()).Tracef("nfqueue: marking packet %s (%s -> %s) on queue %d with %s after %s", pkt.ID(), pkt.Info().Src, pkt.Info().Dst, pkt.queue.id, markToString(mark), time.Since(pkt.received))
log.Tracer(pkt.Ctx()).Tracef("nfqueue: marking packet %s (%s -> %s) on queue %d with %s after %s", pkt.ID(), pkt.Info().Src, pkt.Info().Dst, pkt.queue.id, markToString(mark), time.Since(pkt.Info().SeenAt))
return nil
}

View File

@@ -8,8 +8,11 @@ import (
"errors"
"fmt"
"net"
"time"
"unsafe"
"github.com/safing/portmaster/process"
"github.com/tevino/abool"
"github.com/safing/portbase/log"
@@ -103,21 +106,28 @@ func Handler(packets chan packet.Packet) {
verdictRequest: packetInfo,
verdictSet: abool.NewBool(false),
}
info := new.Info()
info.Inbound = packetInfo.direction > 0
info.InTunnel = false
info.Protocol = packet.IPProtocol(packetInfo.protocol)
info.PID = packetInfo.pid
info.PID = int(packetInfo.pid)
info.SeenAt = time.Now()
// IP version
// Check PID
if info.PID == 0 {
// Windows does not have zero PIDs.
// Set to UndefinedProcessID.
info.PID = process.UndefinedProcessID
}
// Set IP version
if packetInfo.ipV6 == 1 {
info.Version = packet.IPv6
} else {
info.Version = packet.IPv4
}
// IPs
// Set IPs
if info.Version == packet.IPv4 {
// IPv4
if info.Inbound {
@@ -142,7 +152,7 @@ func Handler(packets chan packet.Packet) {
}
}
// Ports
// Set Ports
if info.Inbound {
// Inbound
info.SrcPort = packetInfo.remotePort

View File

@@ -1,3 +1,4 @@
//go:build windows
// +build windows
package windowskext
@@ -23,6 +24,12 @@ type Packet struct {
lock sync.Mutex
}
// InfoOnly returns whether the packet is informational only and does not
// represent an actual packet.
func (pkt *Packet) InfoOnly() bool {
return pkt.verdictRequest.flags&VerdictRequestFlagSocketAuth > 0
}
// FastTrackedByIntegration returns whether the packet has been fast-track
// accepted by the OS integration.
func (pkt *Packet) FastTrackedByIntegration() bool {