Finally switch to nfqexp as the default nfqueue backend

This commit is contained in:
Patrick Pacher
2020-09-30 08:41:15 +02:00
parent 278846a5b9
commit bdcfc8c566
10 changed files with 12 additions and 519 deletions

View File

@@ -0,0 +1,156 @@
// +build linux
// Package nfq contains a nfqueue library experiment.
package nfq
import (
"context"
"sync/atomic"
"time"
"github.com/safing/portbase/log"
pmpacket "github.com/safing/portmaster/network/packet"
"github.com/tevino/abool"
"golang.org/x/sys/unix"
"github.com/florianl/go-nfqueue"
)
// Queue wraps a nfqueue
type Queue struct {
id uint16
nf *nfqueue.Nfqueue
packets chan pmpacket.Packet
cancelSocketCallback context.CancelFunc
pendingVerdicts uint64
verdictCompleted chan struct{}
}
// New opens a new nfQueue.
func New(qid uint16, v6 bool) (*Queue, error) { //nolint:gocognit
afFamily := unix.AF_INET
if v6 {
afFamily = unix.AF_INET6
}
cfg := &nfqueue.Config{
NfQueue: qid,
MaxPacketLen: 1600, // mtu is normally around 1500, make sure to capture it.
MaxQueueLen: 0xffff,
AfFamily: uint8(afFamily),
Copymode: nfqueue.NfQnlCopyPacket,
ReadTimeout: 5 * time.Millisecond,
WriteTimeout: 100 * time.Millisecond,
}
nf, err := nfqueue.Open(cfg)
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(context.Background())
q := &Queue{
id: qid,
nf: nf,
packets: make(chan pmpacket.Packet, 1000),
cancelSocketCallback: cancel,
verdictCompleted: make(chan struct{}, 1),
}
fn := func(attrs nfqueue.Attribute) int {
if attrs.PacketID == nil {
// we need a packet id to set a verdict,
// if we don't get an ID there's hardly anything
// we can do.
return 0
}
pkt := &packet{
pktID: *attrs.PacketID,
queue: q,
received: time.Now(),
verdictSet: make(chan struct{}),
verdictPending: abool.New(),
}
if attrs.Payload != nil {
pkt.Payload = *attrs.Payload
}
if err := pmpacket.Parse(pkt.Payload, pkt.Info()); err != nil {
log.Warningf("nfqexp: failed to parse payload: %s", err)
_ = pkt.Drop()
return 0
}
select {
case q.packets <- pkt:
log.Tracef("nfqexp: queued packet %s (%s -> %s) after %s", pkt.ID(), pkt.Info().Src, pkt.Info().Dst, time.Since(pkt.received))
case <-ctx.Done():
return 0
case <-time.After(time.Second):
log.Warningf("nfqexp: failed to queue packet (%s since it was handed over by the kernel)", time.Since(pkt.received))
}
go func() {
select {
case <-pkt.verdictSet:
case <-time.After(20 * time.Second):
log.Warningf("nfqexp: no verdict set for packet %s (%s -> %s) after %s, dropping", pkt.ID(), pkt.Info().Src, pkt.Info().Dst, time.Since(pkt.received))
if err := pkt.Drop(); err != nil {
log.Warningf("nfqexp: failed to apply default-drop to unveridcted packet %s (%s -> %s)", pkt.ID(), pkt.Info().Src, pkt.Info().Dst)
}
}
}()
return 0 // continue calling this fn
}
errorFunc := func(e error) int {
// embedded interface is required to work-around some
// dep-vendoring weirdness
if opError, ok := e.(interface {
Timeout() bool
Temporary() bool
}); ok {
if opError.Timeout() || opError.Temporary() {
c := atomic.LoadUint64(&q.pendingVerdicts)
if c > 0 {
log.Tracef("nfqexp: waiting for %d pending verdicts", c)
for atomic.LoadUint64(&q.pendingVerdicts) > 0 { // must NOT use c here
<-q.verdictCompleted
}
}
return 0
}
}
log.Errorf("nfqexp: encountered error while receiving packets: %s\n", e.Error())
return 1
}
if err := q.nf.RegisterWithErrorFunc(ctx, fn, errorFunc); err != nil {
defer q.nf.Close()
return nil, err
}
return q, nil
}
// Destroy destroys the queue. Any error encountered is logged.
func (q *Queue) Destroy() {
q.cancelSocketCallback()
if err := q.nf.Close(); err != nil {
log.Errorf("nfqexp: failed to close queue %d: %s", q.id, err)
}
}
// PacketChannel returns the packet channel.
func (q *Queue) PacketChannel() <-chan pmpacket.Packet {
return q.packets
}

View File

@@ -0,0 +1,150 @@
// +build linux
package nfq
import (
"errors"
"fmt"
"sync/atomic"
"time"
"github.com/tevino/abool"
"github.com/florianl/go-nfqueue"
"github.com/safing/portbase/log"
pmpacket "github.com/safing/portmaster/network/packet"
)
// Firewalling marks used by the Portmaster.
// See TODO on packet.mark() on their relevance
// and a possibility to remove most IPtables rules.
const (
MarkAccept = 1700
MarkBlock = 1701
MarkDrop = 1702
MarkAcceptAlways = 1710
MarkBlockAlways = 1711
MarkDropAlways = 1712
MarkRerouteNS = 1799
MarkRerouteSPN = 1717
)
func markToString(mark int) string {
switch mark {
case MarkAccept:
return "Accept"
case MarkBlock:
return "Block"
case MarkDrop:
return "Drop"
case MarkAcceptAlways:
return "AcceptAlways"
case MarkBlockAlways:
return "BlockAlways"
case MarkDropAlways:
return "DropAlways"
case MarkRerouteNS:
return "RerouteNS"
case MarkRerouteSPN:
return "RerouteSPN"
}
return "unknown"
}
// packet implements the packet.Packet interface.
type packet struct {
pmpacket.Base
pktID uint32
received time.Time
queue *Queue
verdictSet chan struct{}
verdictPending *abool.AtomicBool
}
func (pkt *packet) ID() string {
return fmt.Sprintf("pkt:%d qid:%d", pkt.pktID, pkt.queue.id)
}
// TODO(ppacher): revisit the following behavior:
// The legacy implementation of nfqueue (and the interception) module
// always accept a packet but may mark it so that a subsequent rule in
// the C17 chain drops, rejects or modifies it.
//
// For drop/return we could use the actual nfQueue verdicts Drop and Stop.
// Re-routing to local NS or SPN can be done by modifying the packet here
// and using SetVerdictModPacket and reject can be implemented using a simple
// raw-socket.
//
func (pkt *packet) mark(mark int) (err error) {
if pkt.verdictPending.SetToIf(false, true) {
defer close(pkt.verdictSet)
return pkt.setMark(mark)
}
return errors.New("verdict set")
}
func (pkt *packet) setMark(mark int) error {
atomic.AddUint64(&pkt.queue.pendingVerdicts, 1)
defer func() {
atomic.AddUint64(&pkt.queue.pendingVerdicts, ^uint64(0))
select {
case pkt.queue.verdictCompleted <- struct{}{}:
default:
}
}()
for {
if err := pkt.queue.nf.SetVerdictWithMark(pkt.pktID, nfqueue.NfAccept, mark); err != nil {
// embedded interface is required to work-around some
// dep-vendoring weirdness
if opErr, ok := err.(interface {
Timeout() bool
Temporary() bool
}); ok {
if opErr.Timeout() || opErr.Temporary() {
continue
}
}
log.Errorf("nfqexp: failed to set verdict %s for %s (%s -> %s): %s", markToString(mark), pkt.ID(), pkt.Info().Src, pkt.Info().Dst, err)
return err
}
break
}
log.Tracef("nfqexp: marking packet %s (%s -> %s) on queue %d with %s after %s", pkt.ID(), pkt.Info().Src, pkt.Info().Dst, pkt.queue.id, markToString(mark), time.Since(pkt.received))
return nil
}
func (pkt *packet) Accept() error {
return pkt.mark(MarkAccept)
}
func (pkt *packet) Block() error {
return pkt.mark(MarkBlock)
}
func (pkt *packet) Drop() error {
return pkt.mark(MarkDrop)
}
func (pkt *packet) PermanentAccept() error {
return pkt.mark(MarkAcceptAlways)
}
func (pkt *packet) PermanentBlock() error {
return pkt.mark(MarkBlockAlways)
}
func (pkt *packet) PermanentDrop() error {
return pkt.mark(MarkDropAlways)
}
func (pkt *packet) RerouteToNameserver() error {
return pkt.mark(MarkRerouteNS)
}
func (pkt *packet) RerouteToTunnel() error {
return pkt.mark(MarkRerouteSPN)
}