mihomo/tunnel/tunnel.go

741 lines
18 KiB
Go
Raw Normal View History

2018-06-10 22:50:03 +08:00
package tunnel
import (
"context"
"errors"
2019-02-02 20:47:38 +08:00
"fmt"
chore: deliver UDP packets from same connection in receiving order (#1540) All UDP packets are queued into a single channel, and multiple workers are launched to poll the channel in current design. This introduces a problem where UDP packets from a single connection are delivered to different workers, thus forwarded in a random order if workers are on different CPU cores. Though UDP peers normally have their own logic to handle out-of-order packets, this behavior will inevitably cause significant variance in delay and harm connection quality. Furthermore, this out-of-order behavior is noticeable even if the underlying transport could provide guaranteed orderly delivery - this is unacceptable. This commit takes the idea of RSS in terms of NICs: it creates a distinct queue for each worker, hashes incoming packets, and distribute the packet to a worker by hash result. The tuple (SrcIP, SrcPort, DstIP, DstPort, Proto) is used for hashing (Proto is always UDP so it's dropped from final implementation), thus packets from the same connection can be sent to the same worker, keeping the receiving order. Different connections can be hashed to different workers to maintain performance. Performance for single UDP connection is not affected, as there is already a lock in natTable that prevents multiple packets being processed in different workers, limiting single connection forwarding performance to 1 worker. The only performance penalty is the hashing code, which should be neglectable given the footprint of en/decryption work. Co-authored-by: Hamster Tian <haotia@gmail.com>
2024-09-25 21:28:30 +08:00
"hash/maphash"
2018-12-05 21:13:29 +08:00
"net"
2022-04-20 01:52:51 +08:00
"net/netip"
2022-03-13 01:21:23 +08:00
"path/filepath"
"runtime"
"strings"
2018-06-10 22:50:03 +08:00
"sync"
2018-06-16 21:34:13 +08:00
"time"
2018-06-10 22:50:03 +08:00
2023-11-03 21:01:45 +08:00
N "github.com/metacubex/mihomo/common/net"
"github.com/metacubex/mihomo/common/utils"
"github.com/metacubex/mihomo/component/loopback"
2023-11-03 21:01:45 +08:00
"github.com/metacubex/mihomo/component/nat"
P "github.com/metacubex/mihomo/component/process"
"github.com/metacubex/mihomo/component/resolver"
2024-01-30 19:41:34 +08:00
"github.com/metacubex/mihomo/component/slowdown"
2023-11-03 21:01:45 +08:00
"github.com/metacubex/mihomo/component/sniffer"
C "github.com/metacubex/mihomo/constant"
2023-11-17 19:39:57 +08:00
"github.com/metacubex/mihomo/constant/features"
2023-11-03 21:01:45 +08:00
"github.com/metacubex/mihomo/constant/provider"
icontext "github.com/metacubex/mihomo/context"
"github.com/metacubex/mihomo/log"
"github.com/metacubex/mihomo/tunnel/statistic"
2018-06-10 22:50:03 +08:00
)
var (
status = newAtomicStatus(Suspend)
tcpQueue = make(chan C.ConnContext, 200)
chore: deliver UDP packets from same connection in receiving order (#1540) All UDP packets are queued into a single channel, and multiple workers are launched to poll the channel in current design. This introduces a problem where UDP packets from a single connection are delivered to different workers, thus forwarded in a random order if workers are on different CPU cores. Though UDP peers normally have their own logic to handle out-of-order packets, this behavior will inevitably cause significant variance in delay and harm connection quality. Furthermore, this out-of-order behavior is noticeable even if the underlying transport could provide guaranteed orderly delivery - this is unacceptable. This commit takes the idea of RSS in terms of NICs: it creates a distinct queue for each worker, hashes incoming packets, and distribute the packet to a worker by hash result. The tuple (SrcIP, SrcPort, DstIP, DstPort, Proto) is used for hashing (Proto is always UDP so it's dropped from final implementation), thus packets from the same connection can be sent to the same worker, keeping the receiving order. Different connections can be hashed to different workers to maintain performance. Performance for single UDP connection is not affected, as there is already a lock in natTable that prevents multiple packets being processed in different workers, limiting single connection forwarding performance to 1 worker. The only performance penalty is the hashing code, which should be neglectable given the footprint of en/decryption work. Co-authored-by: Hamster Tian <haotia@gmail.com>
2024-09-25 21:28:30 +08:00
udpQueues []chan C.PacketAdapter
udpHashSeed = maphash.MakeSeed()
natTable = nat.New()
rules []C.Rule
listeners = make(map[string]C.InboundListener)
subRules map[string][]C.Rule
proxies = make(map[string]C.Proxy)
providers map[string]provider.ProxyProvider
ruleProviders map[string]provider.RuleProvider
configMux sync.RWMutex
2022-03-20 02:39:48 +08:00
// Outbound Rule
mode = Rule
// default timeout for UDP session
udpTimeout = 60 * time.Second
findProcessMode P.FindProcessMode
fakeIPRange netip.Prefix
snifferDispatcher *sniffer.Dispatcher
sniffingEnable = false
ruleUpdateCallback = utils.NewCallback[provider.RuleProvider]()
)
type tunnel struct{}
var Tunnel = tunnel{}
var _ C.Tunnel = Tunnel
var _ provider.Tunnel = Tunnel
2023-10-11 10:55:12 +08:00
func (t tunnel) HandleTCPConn(conn net.Conn, metadata *C.Metadata) {
connCtx := icontext.NewConnContext(conn, metadata)
handleTCPConn(connCtx)
}
2023-10-11 10:55:12 +08:00
func (t tunnel) HandleUDPPacket(packet C.UDPPacket, metadata *C.Metadata) {
packetAdapter := C.NewPacketAdapter(packet, metadata)
chore: deliver UDP packets from same connection in receiving order (#1540) All UDP packets are queued into a single channel, and multiple workers are launched to poll the channel in current design. This introduces a problem where UDP packets from a single connection are delivered to different workers, thus forwarded in a random order if workers are on different CPU cores. Though UDP peers normally have their own logic to handle out-of-order packets, this behavior will inevitably cause significant variance in delay and harm connection quality. Furthermore, this out-of-order behavior is noticeable even if the underlying transport could provide guaranteed orderly delivery - this is unacceptable. This commit takes the idea of RSS in terms of NICs: it creates a distinct queue for each worker, hashes incoming packets, and distribute the packet to a worker by hash result. The tuple (SrcIP, SrcPort, DstIP, DstPort, Proto) is used for hashing (Proto is always UDP so it's dropped from final implementation), thus packets from the same connection can be sent to the same worker, keeping the receiving order. Different connections can be hashed to different workers to maintain performance. Performance for single UDP connection is not affected, as there is already a lock in natTable that prevents multiple packets being processed in different workers, limiting single connection forwarding performance to 1 worker. The only performance penalty is the hashing code, which should be neglectable given the footprint of en/decryption work. Co-authored-by: Hamster Tian <haotia@gmail.com>
2024-09-25 21:28:30 +08:00
var h maphash.Hash
h.SetSeed(udpHashSeed)
h.WriteString(metadata.SourceAddress())
h.WriteString(metadata.RemoteAddress())
queueNo := uint(h.Sum64()) % uint(len(udpQueues))
select {
chore: deliver UDP packets from same connection in receiving order (#1540) All UDP packets are queued into a single channel, and multiple workers are launched to poll the channel in current design. This introduces a problem where UDP packets from a single connection are delivered to different workers, thus forwarded in a random order if workers are on different CPU cores. Though UDP peers normally have their own logic to handle out-of-order packets, this behavior will inevitably cause significant variance in delay and harm connection quality. Furthermore, this out-of-order behavior is noticeable even if the underlying transport could provide guaranteed orderly delivery - this is unacceptable. This commit takes the idea of RSS in terms of NICs: it creates a distinct queue for each worker, hashes incoming packets, and distribute the packet to a worker by hash result. The tuple (SrcIP, SrcPort, DstIP, DstPort, Proto) is used for hashing (Proto is always UDP so it's dropped from final implementation), thus packets from the same connection can be sent to the same worker, keeping the receiving order. Different connections can be hashed to different workers to maintain performance. Performance for single UDP connection is not affected, as there is already a lock in natTable that prevents multiple packets being processed in different workers, limiting single connection forwarding performance to 1 worker. The only performance penalty is the hashing code, which should be neglectable given the footprint of en/decryption work. Co-authored-by: Hamster Tian <haotia@gmail.com>
2024-09-25 21:28:30 +08:00
case udpQueues[queueNo] <- packetAdapter:
default:
}
}
func (t tunnel) NatTable() C.NatTable {
return natTable
}
func (t tunnel) Providers() map[string]provider.ProxyProvider {
return providers
}
func (t tunnel) RuleProviders() map[string]provider.RuleProvider {
return ruleProviders
}
func (t tunnel) RuleUpdateCallback() *utils.Callback[provider.RuleProvider] {
return ruleUpdateCallback
}
func OnSuspend() {
2023-03-15 00:10:54 +08:00
status.Store(Suspend)
}
func OnInnerLoading() {
2023-03-15 00:10:54 +08:00
status.Store(Inner)
}
func OnRunning() {
2023-03-15 00:10:54 +08:00
status.Store(Running)
}
func Status() TunnelStatus {
return status.Load()
}
func SetFakeIPRange(p netip.Prefix) {
fakeIPRange = p
}
func FakeIPRange() netip.Prefix {
return fakeIPRange
}
func SetSniffing(b bool) {
if snifferDispatcher.Enable() {
configMux.Lock()
sniffingEnable = b
configMux.Unlock()
}
}
func IsSniffing() bool {
return sniffingEnable
}
func init() {
go process()
2018-06-10 22:50:03 +08:00
}
2021-06-13 17:23:10 +08:00
// TCPIn return fan-in queue
// Deprecated: using Tunnel instead
2021-06-13 17:23:10 +08:00
func TCPIn() chan<- C.ConnContext {
return tcpQueue
}
2021-06-13 17:23:10 +08:00
// UDPIn return fan-in udp queue
// Deprecated: using Tunnel instead
func UDPIn() chan<- C.PacketAdapter {
chore: deliver UDP packets from same connection in receiving order (#1540) All UDP packets are queued into a single channel, and multiple workers are launched to poll the channel in current design. This introduces a problem where UDP packets from a single connection are delivered to different workers, thus forwarded in a random order if workers are on different CPU cores. Though UDP peers normally have their own logic to handle out-of-order packets, this behavior will inevitably cause significant variance in delay and harm connection quality. Furthermore, this out-of-order behavior is noticeable even if the underlying transport could provide guaranteed orderly delivery - this is unacceptable. This commit takes the idea of RSS in terms of NICs: it creates a distinct queue for each worker, hashes incoming packets, and distribute the packet to a worker by hash result. The tuple (SrcIP, SrcPort, DstIP, DstPort, Proto) is used for hashing (Proto is always UDP so it's dropped from final implementation), thus packets from the same connection can be sent to the same worker, keeping the receiving order. Different connections can be hashed to different workers to maintain performance. Performance for single UDP connection is not affected, as there is already a lock in natTable that prevents multiple packets being processed in different workers, limiting single connection forwarding performance to 1 worker. The only performance penalty is the hashing code, which should be neglectable given the footprint of en/decryption work. Co-authored-by: Hamster Tian <haotia@gmail.com>
2024-09-25 21:28:30 +08:00
// compatibility: first queue is always available for external callers
return udpQueues[0]
2018-06-10 22:50:03 +08:00
}
2023-02-18 13:16:07 +08:00
// NatTable return nat table
func NatTable() C.NatTable {
return natTable
}
2018-11-21 13:47:46 +08:00
// Rules return all rules
func Rules() []C.Rule {
return rules
2018-06-18 11:31:49 +08:00
}
2022-12-04 21:53:13 +08:00
func Listeners() map[string]C.InboundListener {
2022-12-04 15:15:23 +08:00
return listeners
}
2018-11-21 13:47:46 +08:00
// UpdateRules handle update rules
2022-12-04 13:37:14 +08:00
func UpdateRules(newRules []C.Rule, newSubRule map[string][]C.Rule, rp map[string]provider.RuleProvider) {
configMux.Lock()
rules = newRules
2021-12-02 22:56:17 +08:00
ruleProviders = rp
2022-12-04 13:37:14 +08:00
subRules = newSubRule
configMux.Unlock()
2018-11-21 13:47:46 +08:00
}
// Proxies return all proxies
func Proxies() map[string]C.Proxy {
return proxies
2018-11-21 13:47:46 +08:00
}
func ProxiesWithProviders() map[string]C.Proxy {
allProxies := make(map[string]C.Proxy)
for name, proxy := range proxies {
allProxies[name] = proxy
}
for _, p := range providers {
for _, proxy := range p.Proxies() {
name := proxy.Name()
allProxies[name] = proxy
}
}
return allProxies
}
2019-12-08 12:17:24 +08:00
// Providers return all compatible providers
func Providers() map[string]provider.ProxyProvider {
return providers
2019-12-08 12:17:24 +08:00
}
2021-12-02 22:56:17 +08:00
// RuleProviders return all loaded rule providers
func RuleProviders() map[string]provider.RuleProvider {
2021-12-02 22:56:17 +08:00
return ruleProviders
}
2018-11-21 13:47:46 +08:00
// UpdateProxies handle update proxies
func UpdateProxies(newProxies map[string]C.Proxy, newProviders map[string]provider.ProxyProvider) {
configMux.Lock()
proxies = newProxies
providers = newProviders
configMux.Unlock()
}
2022-12-04 21:53:13 +08:00
func UpdateListeners(newListeners map[string]C.InboundListener) {
2022-12-04 15:15:23 +08:00
configMux.Lock()
defer configMux.Unlock()
listeners = newListeners
2022-12-04 15:15:23 +08:00
}
func UpdateSniffer(dispatcher *sniffer.Dispatcher) {
configMux.Lock()
snifferDispatcher = dispatcher
2022-11-03 00:31:31 +08:00
sniffingEnable = dispatcher.Enable()
configMux.Unlock()
}
2018-11-21 13:47:46 +08:00
// Mode return current mode
func Mode() TunnelMode {
return mode
2018-11-21 13:47:46 +08:00
}
// SetMode change the mode of tunnel
func SetMode(m TunnelMode) {
mode = m
}
func FindProcessMode() P.FindProcessMode {
return findProcessMode
}
// SetFindProcessMode replace SetAlwaysFindProcess
// always find process info if legacyAlways = true or mode.Always() = true, may be increase many memory
func SetFindProcessMode(mode P.FindProcessMode) {
findProcessMode = mode
}
func isHandle(t C.Type) bool {
2023-03-15 00:10:54 +08:00
status := status.Load()
return status == Running || (status == Inner && t == C.INNER)
}
// processUDP starts a loop to handle udp packet
chore: deliver UDP packets from same connection in receiving order (#1540) All UDP packets are queued into a single channel, and multiple workers are launched to poll the channel in current design. This introduces a problem where UDP packets from a single connection are delivered to different workers, thus forwarded in a random order if workers are on different CPU cores. Though UDP peers normally have their own logic to handle out-of-order packets, this behavior will inevitably cause significant variance in delay and harm connection quality. Furthermore, this out-of-order behavior is noticeable even if the underlying transport could provide guaranteed orderly delivery - this is unacceptable. This commit takes the idea of RSS in terms of NICs: it creates a distinct queue for each worker, hashes incoming packets, and distribute the packet to a worker by hash result. The tuple (SrcIP, SrcPort, DstIP, DstPort, Proto) is used for hashing (Proto is always UDP so it's dropped from final implementation), thus packets from the same connection can be sent to the same worker, keeping the receiving order. Different connections can be hashed to different workers to maintain performance. Performance for single UDP connection is not affected, as there is already a lock in natTable that prevents multiple packets being processed in different workers, limiting single connection forwarding performance to 1 worker. The only performance penalty is the hashing code, which should be neglectable given the footprint of en/decryption work. Co-authored-by: Hamster Tian <haotia@gmail.com>
2024-09-25 21:28:30 +08:00
func processUDP(queueNo int) {
queue := udpQueues[queueNo]
2020-10-20 17:44:39 +08:00
for conn := range queue {
handleUDPConn(conn)
}
}
func process() {
numUDPWorkers := 4
2021-10-27 21:27:19 +08:00
if num := runtime.GOMAXPROCS(0); num > numUDPWorkers {
numUDPWorkers = num
}
chore: deliver UDP packets from same connection in receiving order (#1540) All UDP packets are queued into a single channel, and multiple workers are launched to poll the channel in current design. This introduces a problem where UDP packets from a single connection are delivered to different workers, thus forwarded in a random order if workers are on different CPU cores. Though UDP peers normally have their own logic to handle out-of-order packets, this behavior will inevitably cause significant variance in delay and harm connection quality. Furthermore, this out-of-order behavior is noticeable even if the underlying transport could provide guaranteed orderly delivery - this is unacceptable. This commit takes the idea of RSS in terms of NICs: it creates a distinct queue for each worker, hashes incoming packets, and distribute the packet to a worker by hash result. The tuple (SrcIP, SrcPort, DstIP, DstPort, Proto) is used for hashing (Proto is always UDP so it's dropped from final implementation), thus packets from the same connection can be sent to the same worker, keeping the receiving order. Different connections can be hashed to different workers to maintain performance. Performance for single UDP connection is not affected, as there is already a lock in natTable that prevents multiple packets being processed in different workers, limiting single connection forwarding performance to 1 worker. The only performance penalty is the hashing code, which should be neglectable given the footprint of en/decryption work. Co-authored-by: Hamster Tian <haotia@gmail.com>
2024-09-25 21:28:30 +08:00
udpQueues = make([]chan C.PacketAdapter, numUDPWorkers)
for i := 0; i < numUDPWorkers; i++ {
chore: deliver UDP packets from same connection in receiving order (#1540) All UDP packets are queued into a single channel, and multiple workers are launched to poll the channel in current design. This introduces a problem where UDP packets from a single connection are delivered to different workers, thus forwarded in a random order if workers are on different CPU cores. Though UDP peers normally have their own logic to handle out-of-order packets, this behavior will inevitably cause significant variance in delay and harm connection quality. Furthermore, this out-of-order behavior is noticeable even if the underlying transport could provide guaranteed orderly delivery - this is unacceptable. This commit takes the idea of RSS in terms of NICs: it creates a distinct queue for each worker, hashes incoming packets, and distribute the packet to a worker by hash result. The tuple (SrcIP, SrcPort, DstIP, DstPort, Proto) is used for hashing (Proto is always UDP so it's dropped from final implementation), thus packets from the same connection can be sent to the same worker, keeping the receiving order. Different connections can be hashed to different workers to maintain performance. Performance for single UDP connection is not affected, as there is already a lock in natTable that prevents multiple packets being processed in different workers, limiting single connection forwarding performance to 1 worker. The only performance penalty is the hashing code, which should be neglectable given the footprint of en/decryption work. Co-authored-by: Hamster Tian <haotia@gmail.com>
2024-09-25 21:28:30 +08:00
udpQueues[i] = make(chan C.PacketAdapter, 200)
go processUDP(i)
}
2019-10-11 20:11:18 +08:00
2020-10-20 17:44:39 +08:00
queue := tcpQueue
for conn := range queue {
go handleTCPConn(conn)
2018-06-10 22:50:03 +08:00
}
}
func needLookupIP(metadata *C.Metadata) bool {
2022-04-20 01:52:51 +08:00
return resolver.MappingEnabled() && metadata.Host == "" && metadata.DstIP.IsValid()
2019-02-11 15:44:42 +08:00
}
func preHandleMetadata(metadata *C.Metadata) error {
// handle IP string on host
2022-04-20 01:52:51 +08:00
if ip, err := netip.ParseAddr(metadata.Host); err == nil {
metadata.DstIP = ip
2021-03-10 12:11:45 +08:00
metadata.Host = ""
}
2019-05-03 00:05:14 +08:00
// preprocess enhanced-mode metadata
if needLookupIP(metadata) {
host, exist := resolver.FindHostByIP(metadata.DstIP)
2018-12-05 21:13:29 +08:00
if exist {
metadata.Host = host
2021-10-18 22:58:16 +08:00
metadata.DNSMode = C.DNSMapping
if resolver.FakeIPEnabled() {
2022-04-20 01:52:51 +08:00
metadata.DstIP = netip.Addr{}
metadata.DNSMode = C.DNSFakeIP
} else if node, ok := resolver.DefaultHosts.Search(host, false); ok {
2020-04-27 21:28:24 +08:00
// redir-host should lookup the hosts
metadata.DstIP, _ = node.RandIP()
} else if node != nil && node.IsDomain {
metadata.Host = node.Domain
2019-05-03 00:05:14 +08:00
}
2022-03-15 02:55:06 +08:00
} else if resolver.IsFakeIP(metadata.DstIP) {
return fmt.Errorf("fake DNS record %s missing", metadata.DstIP)
2018-12-05 21:13:29 +08:00
}
} else if node, ok := resolver.DefaultHosts.Search(metadata.Host, true); ok {
// try use domain mapping
metadata.Host = node.Domain
2018-12-05 21:13:29 +08:00
}
return nil
}
func resolveMetadata(metadata *C.Metadata) (proxy C.Proxy, rule C.Rule, err error) {
2022-11-18 22:57:33 +08:00
if metadata.SpecialProxy != "" {
var exist bool
proxy, exist = proxies[metadata.SpecialProxy]
if !exist {
err = fmt.Errorf("proxy %s not found", metadata.SpecialProxy)
}
2022-11-22 19:16:08 +08:00
return
2022-11-18 22:57:33 +08:00
}
switch mode {
2018-11-21 13:47:46 +08:00
case Direct:
proxy = proxies["DIRECT"]
2018-11-21 13:47:46 +08:00
case Global:
proxy = proxies["GLOBAL"]
// Rule
default:
proxy, rule, err = match(metadata)
}
return
2019-10-11 20:11:18 +08:00
}
2019-02-02 20:47:38 +08:00
func handleUDPConn(packet C.PacketAdapter) {
if !isHandle(packet.Metadata().Type) {
packet.Drop()
return
}
metadata := packet.Metadata()
2019-10-11 20:11:18 +08:00
if !metadata.Valid() {
packet.Drop()
2019-10-11 20:11:18 +08:00
log.Warnln("[Metadata] not valid: %#v", metadata)
return
}
2020-10-28 21:26:50 +08:00
// make a fAddr if request ip is fakeip
var fAddr netip.Addr
if resolver.IsExistFakeIP(metadata.DstIP) {
fAddr = metadata.DstIP
2020-03-10 20:36:24 +08:00
}
if err := preHandleMetadata(metadata); err != nil {
packet.Drop()
log.Debugln("[Metadata PreHandle] error: %s", err)
return
2020-01-31 19:26:33 +08:00
}
if sniffingEnable && snifferDispatcher.Enable() {
snifferDispatcher.UDPSniff(packet)
2023-10-19 18:30:20 +08:00
}
2023-12-24 22:22:18 +08:00
// local resolve UDP dns
if !metadata.Resolved() {
ip, err := resolver.ResolveIP(context.Background(), metadata.Host)
if err != nil {
return
}
metadata.DstIP = ip
}
2020-01-31 14:43:54 +08:00
key := packet.LocalAddr().String()
2020-10-28 21:26:50 +08:00
handle := func() bool {
pc, proxy := natTable.Get(key)
2020-10-28 21:26:50 +08:00
if pc != nil {
if proxy != nil {
proxy.UpdateWriteBack(packet)
}
2022-04-11 06:28:42 +08:00
_ = handleUDPToRemote(packet, pc, metadata)
2020-10-28 21:26:50 +08:00
return true
}
return false
}
2023-12-24 22:22:18 +08:00
if handle() {
packet.Drop()
return
}
2023-12-24 22:22:18 +08:00
cond, loaded := natTable.GetOrCreateLock(key)
2023-12-18 23:44:19 +08:00
2023-12-24 22:22:18 +08:00
go func() {
defer packet.Drop()
2023-12-18 23:44:19 +08:00
if loaded {
cond.L.Lock()
cond.Wait()
handle()
cond.L.Unlock()
return
}
defer func() {
natTable.DeleteLock(key)
cond.Broadcast()
}()
proxy, rule, err := resolveMetadata(metadata)
2023-12-24 22:22:18 +08:00
if err != nil {
log.Warnln("[UDP] Parse metadata failed: %s", err.Error())
return
}
ctx, cancel := context.WithTimeout(context.Background(), C.DefaultUDPTimeout)
defer cancel()
2022-11-24 12:32:35 +08:00
rawPc, err := retry(ctx, func(ctx context.Context) (C.PacketConn, error) {
return proxy.ListenPacketContext(ctx, metadata.Pure())
}, func(err error) {
logMetadataErr(metadata, rule, proxy, err)
2022-11-24 12:32:35 +08:00
})
if err != nil {
2020-10-28 21:26:50 +08:00
return
}
logMetadata(metadata, rule, rawPc)
pc := statistic.NewUDPTracker(rawPc, statistic.DefaultManager, metadata, rule, 0, 0, true)
2020-10-28 21:26:50 +08:00
if rawPc.Chains().Last() == "REJECT-DROP" {
pc.Close()
return
}
2020-10-28 21:26:50 +08:00
oAddrPort := metadata.AddrPort()
writeBackProxy := nat.NewWriteBackProxy(packet)
natTable.Set(key, pc, writeBackProxy)
go handleUDPToLocal(writeBackProxy, pc, key, oAddrPort, fAddr)
2020-10-28 21:26:50 +08:00
handle()
2019-10-11 20:11:18 +08:00
}()
}
func handleTCPConn(connCtx C.ConnContext) {
if !isHandle(connCtx.Metadata().Type) {
_ = connCtx.Conn().Close()
return
}
2022-04-11 06:28:42 +08:00
defer func(conn net.Conn) {
_ = conn.Close()
}(connCtx.Conn())
2019-10-11 20:11:18 +08:00
metadata := connCtx.Metadata()
2019-10-11 20:11:18 +08:00
if !metadata.Valid() {
log.Warnln("[Metadata] not valid: %#v", metadata)
return
2019-04-23 23:29:36 +08:00
}
preHandleFailed := false
if err := preHandleMetadata(metadata); err != nil {
log.Debugln("[Metadata PreHandle] error: %s", err)
preHandleFailed = true
}
conn := connCtx.Conn()
conn.ResetPeeked() // reset before sniffer
if sniffingEnable && snifferDispatcher.Enable() {
// Try to sniff a domain when `preHandleMetadata` failed, this is usually
// caused by a "Fake DNS record missing" error when enhanced-mode is fake-ip.
if snifferDispatcher.TCPSniff(conn, metadata) {
// we now have a domain name
preHandleFailed = false
}
}
// If both trials have failed, we can do nothing but give up
if preHandleFailed {
log.Debugln("[Metadata PreHandle] failed to sniff a domain for connection %s --> %s, give up",
metadata.SourceDetail(), metadata.RemoteAddress())
return
}
peekMutex := sync.Mutex{}
if !conn.Peeked() {
peekMutex.Lock()
go func() {
defer peekMutex.Unlock()
_ = conn.SetReadDeadline(time.Now().Add(200 * time.Millisecond))
_, _ = conn.Peek(1)
_ = conn.SetReadDeadline(time.Time{})
}()
}
proxy, rule, err := resolveMetadata(metadata)
2019-10-11 20:11:18 +08:00
if err != nil {
log.Warnln("[Metadata] parse failed: %s", err.Error())
2019-10-11 20:11:18 +08:00
return
}
dialMetadata := metadata
if len(metadata.Host) > 0 {
if node, ok := resolver.DefaultHosts.Search(metadata.Host, false); ok {
if dstIp, _ := node.RandIP(); !FakeIPRange().Contains(dstIp) {
dialMetadata.DstIP = dstIp
dialMetadata.DNSMode = C.DNSHosts
dialMetadata = dialMetadata.Pure()
}
}
}
var peekBytes []byte
2023-02-25 19:41:01 +08:00
var peekLen int
2022-04-12 20:20:04 +08:00
ctx, cancel := context.WithTimeout(context.Background(), C.DefaultTCPTimeout)
defer cancel()
2023-02-27 09:46:16 +08:00
remoteConn, err := retry(ctx, func(ctx context.Context) (remoteConn C.Conn, err error) {
remoteConn, err = proxy.DialContext(ctx, dialMetadata)
if err != nil {
2023-02-27 09:46:16 +08:00
return
2023-02-26 11:11:54 +08:00
}
2023-02-27 09:46:16 +08:00
if N.NeedHandshake(remoteConn) {
2023-02-27 09:46:16 +08:00
defer func() {
for _, chain := range remoteConn.Chains() {
if chain == "REJECT" {
err = nil
return
}
}
if err != nil {
remoteConn = nil
}
}()
peekMutex.Lock()
defer peekMutex.Unlock()
peekBytes, _ = conn.Peek(conn.Buffered())
_, err = remoteConn.Write(peekBytes)
if err != nil {
2023-02-27 09:46:16 +08:00
return
}
if peekLen = len(peekBytes); peekLen > 0 {
_, _ = conn.Discard(peekLen)
}
}
2023-02-27 09:46:16 +08:00
return
2022-11-24 12:32:35 +08:00
}, func(err error) {
logMetadataErr(metadata, rule, proxy, err)
2022-11-24 12:32:35 +08:00
})
if err != nil {
2018-06-10 22:50:03 +08:00
return
}
logMetadata(metadata, rule, remoteConn)
remoteConn = statistic.NewTCPTracker(remoteConn, statistic.DefaultManager, metadata, rule, 0, int64(peekLen), true)
2022-04-11 06:28:42 +08:00
defer func(remoteConn C.Conn) {
_ = remoteConn.Close()
}(remoteConn)
2018-06-10 22:50:03 +08:00
_ = conn.SetReadDeadline(time.Now()) // stop unfinished peek
peekMutex.Lock()
defer peekMutex.Unlock()
_ = conn.SetReadDeadline(time.Time{}) // reset
handleSocket(conn, remoteConn)
}
func logMetadataErr(metadata *C.Metadata, rule C.Rule, proxy C.ProxyAdapter, err error) {
if rule == nil {
log.Warnln("[%s] dial %s %s --> %s error: %s", strings.ToUpper(metadata.NetWork.String()), proxy.Name(), metadata.SourceDetail(), metadata.RemoteAddress(), err.Error())
} else {
log.Warnln("[%s] dial %s (match %s/%s) %s --> %s error: %s", strings.ToUpper(metadata.NetWork.String()), proxy.Name(), rule.RuleType().String(), rule.Payload(), metadata.SourceDetail(), metadata.RemoteAddress(), err.Error())
}
}
func logMetadata(metadata *C.Metadata, rule C.Rule, remoteConn C.Connection) {
switch {
2022-11-18 22:57:33 +08:00
case metadata.SpecialProxy != "":
log.Infoln("[%s] %s --> %s using %s", strings.ToUpper(metadata.NetWork.String()), metadata.SourceDetail(), metadata.RemoteAddress(), metadata.SpecialProxy)
2020-01-31 14:58:54 +08:00
case rule != nil:
if rule.Payload() != "" {
log.Infoln("[%s] %s --> %s match %s using %s", strings.ToUpper(metadata.NetWork.String()), metadata.SourceDetail(), metadata.RemoteAddress(), fmt.Sprintf("%s(%s)", rule.RuleType().String(), rule.Payload()), remoteConn.Chains().String())
} else {
log.Infoln("[%s] %s --> %s match %s using %s", strings.ToUpper(metadata.NetWork.String()), metadata.SourceDetail(), metadata.RemoteAddress(), rule.RuleType().String(), remoteConn.Chains().String())
}
case mode == Global:
log.Infoln("[%s] %s --> %s using GLOBAL", strings.ToUpper(metadata.NetWork.String()), metadata.SourceDetail(), metadata.RemoteAddress())
case mode == Direct:
log.Infoln("[%s] %s --> %s using DIRECT", strings.ToUpper(metadata.NetWork.String()), metadata.SourceDetail(), metadata.RemoteAddress())
2020-01-31 14:58:54 +08:00
default:
log.Infoln("[%s] %s --> %s doesn't match any rule using %s", strings.ToUpper(metadata.NetWork.String()), metadata.SourceDetail(), metadata.RemoteAddress(), remoteConn.Chains().Last())
}
2018-06-10 22:50:03 +08:00
}
func shouldResolveIP(rule C.Rule, metadata *C.Metadata) bool {
2022-04-20 01:52:51 +08:00
return rule.ShouldResolveIP() && metadata.Host != "" && !metadata.DstIP.IsValid()
2019-02-02 20:47:38 +08:00
}
func match(metadata *C.Metadata) (C.Proxy, C.Rule, error) {
configMux.RLock()
defer configMux.RUnlock()
2022-08-12 03:04:58 +08:00
var (
resolved bool
attemptProcessLookup = metadata.Type != C.INNER
2022-08-12 03:04:58 +08:00
)
2019-09-11 17:00:55 +08:00
if node, ok := resolver.DefaultHosts.Search(metadata.Host, false); ok {
metadata.DstIP, _ = node.RandIP()
2019-09-11 17:00:55 +08:00
resolved = true
}
2022-12-04 13:37:14 +08:00
for _, rule := range getRules(metadata) {
if !resolved && shouldResolveIP(rule, metadata) {
func() {
ctx, cancel := context.WithTimeout(context.Background(), resolver.DefaultDNSTimeout)
defer cancel()
ip, err := resolver.ResolveIP(ctx, metadata.Host)
if err != nil {
log.Debugln("[DNS] resolve %s error: %s", metadata.Host, err.Error())
} else {
log.Debugln("[DNS] %s --> %s", metadata.Host, ip.String())
metadata.DstIP = ip
}
resolved = true
}()
2019-02-02 20:47:38 +08:00
}
if attemptProcessLookup && !findProcessMode.Off() && (findProcessMode.Always() || rule.ShouldFindProcess()) {
attemptProcessLookup = false
2023-11-17 23:12:10 +08:00
if !features.CMFA {
2023-11-17 19:39:57 +08:00
// normal check for process
uid, path, err := P.FindProcessName(metadata.NetWork.String(), metadata.SrcIP, int(metadata.SrcPort))
if err != nil {
log.Debugln("[Process] find process %s error: %v", metadata.String(), err)
} else {
metadata.Process = filepath.Base(path)
metadata.ProcessPath = path
metadata.Uid = uid
2024-07-24 13:59:10 +08:00
if pkg, err := P.FindPackageName(metadata); err == nil { // for android (not CMFA) package names
metadata.Process = pkg
}
2023-11-17 19:39:57 +08:00
}
2022-06-14 23:14:43 +08:00
} else {
2023-11-17 19:39:57 +08:00
// check package names
pkg, err := P.FindPackageName(metadata)
if err != nil {
log.Debugln("[Process] find process %s error: %v", metadata.String(), err)
} else {
metadata.Process = pkg
}
}
}
if matched, ada := rule.Match(metadata); matched {
adapter, ok := proxies[ada]
2019-04-23 23:29:36 +08:00
if !ok {
continue
2018-06-10 22:50:03 +08:00
}
2019-04-23 23:29:36 +08:00
2022-10-30 23:08:18 +08:00
// parse multi-layer nesting
passed := false
for adapter := adapter; adapter != nil; adapter = adapter.Unwrap(metadata, false) {
if adapter.Type() == C.Pass {
passed = true
break
}
}
if passed {
log.Debugln("%s match Pass rule", adapter.Name())
continue
}
2019-04-23 23:29:36 +08:00
if metadata.NetWork == C.UDP && !adapter.SupportUDP() {
2021-04-05 13:31:10 +08:00
log.Debugln("%s UDP is not supported", adapter.Name())
2019-04-23 23:29:36 +08:00
continue
}
2021-11-17 16:03:47 +08:00
return adapter, rule, nil
2018-06-10 22:50:03 +08:00
}
}
return proxies["DIRECT"], nil, nil
2021-11-17 16:03:47 +08:00
}
2022-11-24 12:32:35 +08:00
2022-12-04 13:37:14 +08:00
func getRules(metadata *C.Metadata) []C.Rule {
2022-12-04 22:08:20 +08:00
if sr, ok := subRules[metadata.SpecialRules]; ok {
2022-12-05 19:48:54 +08:00
log.Debugln("[Rule] use %s rules", metadata.SpecialRules)
2022-12-04 13:37:14 +08:00
return sr
} else {
2022-12-05 19:48:54 +08:00
log.Debugln("[Rule] use default rules")
2022-12-04 13:37:14 +08:00
return rules
}
}
func shouldStopRetry(err error) bool {
if errors.Is(err, resolver.ErrIPNotFound) {
return true
}
if errors.Is(err, resolver.ErrIPVersion) {
return true
}
if errors.Is(err, resolver.ErrIPv6Disabled) {
return true
}
if errors.Is(err, loopback.ErrReject) {
return true
}
return false
}
2022-11-24 12:32:35 +08:00
func retry[T any](ctx context.Context, ft func(context.Context) (T, error), fe func(err error)) (t T, err error) {
2024-01-30 19:41:34 +08:00
s := slowdown.New()
2022-11-24 12:32:35 +08:00
for i := 0; i < 10; i++ {
t, err = ft(ctx)
if err != nil {
if fe != nil {
fe(err)
}
if shouldStopRetry(err) {
return
}
2024-01-30 19:41:34 +08:00
if s.Wait(ctx) == nil {
2022-11-24 12:32:35 +08:00
continue
2024-01-30 19:41:34 +08:00
} else {
2022-11-24 12:32:35 +08:00
return
}
} else {
break
}
}
return
}