From 1cad615b25e277b0153192f0b6c8788597c57b8c Mon Sep 17 00:00:00 2001 From: wwqgtxx Date: Sat, 2 Sep 2023 16:54:35 +0800 Subject: [PATCH] chore: using xsync.MapOf replace sync.Map --- adapter/adapter.go | 31 ++++++++++---------- component/auth/auth.go | 10 +++---- component/nat/table.go | 58 ++++++++++++++++++++++++------------- constant/adapters.go | 14 +++++---- go.mod | 1 + go.sum | 2 ++ listener/tproxy/packet.go | 11 ++++--- transport/tuic/v4/client.go | 13 +++++---- transport/tuic/v4/server.go | 10 +++---- transport/tuic/v5/client.go | 11 +++---- transport/tuic/v5/server.go | 10 +++---- tunnel/connection.go | 9 ++---- tunnel/statistic/manager.go | 11 +++---- tunnel/tunnel.go | 5 ++-- 14 files changed, 110 insertions(+), 86 deletions(-) diff --git a/adapter/adapter.go b/adapter/adapter.go index c7351061..44f09ba1 100644 --- a/adapter/adapter.go +++ b/adapter/adapter.go @@ -10,7 +10,6 @@ import ( "net/netip" "net/url" "strconv" - "sync" "time" "github.com/Dreamacro/clash/common/atomic" @@ -19,6 +18,8 @@ import ( "github.com/Dreamacro/clash/component/dialer" C "github.com/Dreamacro/clash/constant" "github.com/Dreamacro/clash/log" + + "github.com/puzpuzpuz/xsync/v2" ) var UnifiedDelay = atomic.NewBool(false) @@ -37,7 +38,7 @@ type Proxy struct { history *queue.Queue[C.DelayHistory] alive *atomic.Bool url string - extra sync.Map + extra *xsync.MapOf[string, *extraProxyState] } // Alive implements C.Proxy @@ -48,7 +49,7 @@ func (p *Proxy) Alive() bool { // AliveForTestUrl implements C.Proxy func (p *Proxy) AliveForTestUrl(url string) bool { if state, ok := p.extra.Load(url); ok { - return state.(*extraProxyState).alive.Load() + return state.alive.Load() } return p.alive.Load() @@ -96,7 +97,7 @@ func (p *Proxy) DelayHistoryForTestUrl(url string) []C.DelayHistory { var queueM []C.DelayHistory if state, ok := p.extra.Load(url); ok { - queueM = state.(*extraProxyState).history.Copy() + queueM = state.history.Copy() } if queueM == nil { @@ -113,10 +114,10 @@ func (p *Proxy) DelayHistoryForTestUrl(url string) []C.DelayHistory { func (p *Proxy) ExtraDelayHistory() map[string][]C.DelayHistory { extraHistory := map[string][]C.DelayHistory{} - p.extra.Range(func(k, v interface{}) bool { + p.extra.Range(func(k string, v *extraProxyState) bool { - testUrl := k.(string) - state := v.(*extraProxyState) + testUrl := k + state := v histories := []C.DelayHistory{} queueM := state.history.Copy() @@ -155,8 +156,8 @@ func (p *Proxy) LastDelayForTestUrl(url string) (delay uint16) { history := p.history.Last() if state, ok := p.extra.Load(url); ok { - alive = state.(*extraProxyState).alive.Load() - history = state.(*extraProxyState).history.Last() + alive = state.alive.Load() + history = state.history.Last() } if !alive { @@ -226,10 +227,10 @@ func (p *Proxy) URLTest(ctx context.Context, url string, expectedStatus utils.In p.extra.Store(url, state) } - state.(*extraProxyState).alive.Store(alive) - state.(*extraProxyState).history.Put(record) - if state.(*extraProxyState).history.Len() > defaultHistoriesNum { - state.(*extraProxyState).history.Pop() + state.alive.Store(alive) + state.history.Put(record) + if state.history.Len() > defaultHistoriesNum { + state.history.Pop() } default: log.Debugln("health check result will be discarded, url: %s alive: %t, delay: %d", url, alive, t) @@ -311,7 +312,7 @@ func NewProxy(adapter C.ProxyAdapter) *Proxy { history: queue.New[C.DelayHistory](defaultHistoriesNum), alive: atomic.NewBool(true), url: "", - extra: sync.Map{}} + extra: xsync.NewMapOf[*extraProxyState]()} } func urlToMetadata(rawURL string) (addr C.Metadata, err error) { @@ -355,7 +356,7 @@ func (p *Proxy) determineFinalStoreType(store C.DelayHistoryStoreType, url strin } length := 0 - p.extra.Range(func(_, _ interface{}) bool { + p.extra.Range(func(_ string, _ *extraProxyState) bool { length++ return length < 2*C.DefaultMaxHealthCheckUrlNum }) diff --git a/component/auth/auth.go b/component/auth/auth.go index 9d30b927..9b351606 100644 --- a/component/auth/auth.go +++ b/component/auth/auth.go @@ -1,7 +1,7 @@ package auth import ( - "sync" + "github.com/puzpuzpuz/xsync/v2" ) type Authenticator interface { @@ -15,7 +15,7 @@ type AuthUser struct { } type inMemoryAuthenticator struct { - storage *sync.Map + storage *xsync.MapOf[string, string] usernames []string } @@ -31,13 +31,13 @@ func NewAuthenticator(users []AuthUser) Authenticator { return nil } - au := &inMemoryAuthenticator{storage: &sync.Map{}} + au := &inMemoryAuthenticator{storage: xsync.NewMapOf[string]()} for _, user := range users { au.storage.Store(user.User, user.Pass) } usernames := make([]string, 0, len(users)) - au.storage.Range(func(key, value any) bool { - usernames = append(usernames, key.(string)) + au.storage.Range(func(key string, value string) bool { + usernames = append(usernames, key) return true }) au.usernames = usernames diff --git a/component/nat/table.go b/component/nat/table.go index adc6eace..df258dc2 100644 --- a/component/nat/table.go +++ b/component/nat/table.go @@ -5,23 +5,28 @@ import ( "sync" C "github.com/Dreamacro/clash/constant" + + "github.com/puzpuzpuz/xsync/v2" ) type Table struct { - mapping sync.Map + mapping *xsync.MapOf[string, *Entry] + lockMap *xsync.MapOf[string, *sync.Cond] } type Entry struct { PacketConn C.PacketConn WriteBackProxy C.WriteBackProxy - LocalUDPConnMap sync.Map + LocalUDPConnMap *xsync.MapOf[string, *net.UDPConn] + LocalLockMap *xsync.MapOf[string, *sync.Cond] } func (t *Table) Set(key string, e C.PacketConn, w C.WriteBackProxy) { t.mapping.Store(key, &Entry{ PacketConn: e, WriteBackProxy: w, - LocalUDPConnMap: sync.Map{}, + LocalUDPConnMap: xsync.NewMapOf[*net.UDPConn](), + LocalLockMap: xsync.NewMapOf[*sync.Cond](), }) } @@ -34,15 +39,19 @@ func (t *Table) Get(key string) (C.PacketConn, C.WriteBackProxy) { } func (t *Table) GetOrCreateLock(key string) (*sync.Cond, bool) { - item, loaded := t.mapping.LoadOrStore(key, sync.NewCond(&sync.Mutex{})) - return item.(*sync.Cond), loaded + item, loaded := t.lockMap.LoadOrCompute(key, makeLock) + return item, loaded } func (t *Table) Delete(key string) { t.mapping.Delete(key) } -func (t *Table) GetLocalConn(lAddr, rAddr string) *net.UDPConn { +func (t *Table) DeleteLock(lockKey string) { + t.lockMap.Delete(lockKey) +} + +func (t *Table) GetForLocalConn(lAddr, rAddr string) *net.UDPConn { entry, exist := t.getEntry(lAddr) if !exist { return nil @@ -51,10 +60,10 @@ func (t *Table) GetLocalConn(lAddr, rAddr string) *net.UDPConn { if !exist { return nil } - return item.(*net.UDPConn) + return item } -func (t *Table) AddLocalConn(lAddr, rAddr string, conn *net.UDPConn) bool { +func (t *Table) AddForLocalConn(lAddr, rAddr string, conn *net.UDPConn) bool { entry, exist := t.getEntry(lAddr) if !exist { return false @@ -63,7 +72,7 @@ func (t *Table) AddLocalConn(lAddr, rAddr string, conn *net.UDPConn) bool { return true } -func (t *Table) RangeLocalConn(lAddr string, f func(key, value any) bool) { +func (t *Table) RangeForLocalConn(lAddr string, f func(key string, value *net.UDPConn) bool) { entry, exist := t.getEntry(lAddr) if !exist { return @@ -76,11 +85,11 @@ func (t *Table) GetOrCreateLockForLocalConn(lAddr, key string) (*sync.Cond, bool if !loaded { return nil, false } - item, loaded := entry.LocalUDPConnMap.LoadOrStore(key, sync.NewCond(&sync.Mutex{})) - return item.(*sync.Cond), loaded + item, loaded := entry.LocalLockMap.LoadOrCompute(key, makeLock) + return item, loaded } -func (t *Table) DeleteLocalConnMap(lAddr, key string) { +func (t *Table) DeleteForLocalConn(lAddr, key string) { entry, loaded := t.getEntry(lAddr) if !loaded { return @@ -88,17 +97,26 @@ func (t *Table) DeleteLocalConnMap(lAddr, key string) { entry.LocalUDPConnMap.Delete(key) } -func (t *Table) getEntry(key string) (*Entry, bool) { - item, ok := t.mapping.Load(key) - // This should not happen usually since this function called after PacketConn created - if !ok { - return nil, false +func (t *Table) DeleteLockForLocalConn(lAddr, key string) { + entry, loaded := t.getEntry(lAddr) + if !loaded { + return } - entry, ok := item.(*Entry) - return entry, ok + entry.LocalLockMap.Delete(key) +} + +func (t *Table) getEntry(key string) (*Entry, bool) { + return t.mapping.Load(key) +} + +func makeLock() *sync.Cond { + return sync.NewCond(&sync.Mutex{}) } // New return *Cache func New() *Table { - return &Table{} + return &Table{ + mapping: xsync.NewMapOf[*Entry](), + lockMap: xsync.NewMapOf[*sync.Cond](), + } } diff --git a/constant/adapters.go b/constant/adapters.go index a3796ef7..5639dd47 100644 --- a/constant/adapters.go +++ b/constant/adapters.go @@ -267,13 +267,17 @@ type NatTable interface { Delete(key string) - GetLocalConn(lAddr, rAddr string) *net.UDPConn + DeleteLock(key string) - AddLocalConn(lAddr, rAddr string, conn *net.UDPConn) bool + GetForLocalConn(lAddr, rAddr string) *net.UDPConn - RangeLocalConn(lAddr string, f func(key, value any) bool) + AddForLocalConn(lAddr, rAddr string, conn *net.UDPConn) bool - GetOrCreateLockForLocalConn(lAddr, key string) (*sync.Cond, bool) + RangeForLocalConn(lAddr string, f func(key string, value *net.UDPConn) bool) - DeleteLocalConnMap(lAddr, key string) + GetOrCreateLockForLocalConn(lAddr string, key string) (*sync.Cond, bool) + + DeleteForLocalConn(lAddr, key string) + + DeleteLockForLocalConn(lAddr, key string) } diff --git a/go.mod b/go.mod index 00fd802b..a57d9a2b 100644 --- a/go.mod +++ b/go.mod @@ -30,6 +30,7 @@ require ( github.com/mroth/weightedrand/v2 v2.1.0 github.com/openacid/low v0.1.21 github.com/oschwald/maxminddb-golang v1.12.0 + github.com/puzpuzpuz/xsync/v2 v2.5.0 github.com/sagernet/netlink v0.0.0-20220905062125-8043b4a9aa97 github.com/sagernet/sing v0.2.10-0.20230807080248-4db0062caa0a github.com/sagernet/sing-mux v0.1.3-0.20230811111955-dc1639b5204c diff --git a/go.sum b/go.sum index 84b13831..e718b9e4 100644 --- a/go.sum +++ b/go.sum @@ -134,6 +134,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c h1:ncq/mPwQF4JjgDlrVEn3C11VoGHZN7m8qihwgMEtzYw= github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE= github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g= +github.com/puzpuzpuz/xsync/v2 v2.5.0 h1:2k4qrO/orvmEXZ3hmtHqIy9XaQtPTwzMZk1+iErpE8c= +github.com/puzpuzpuz/xsync/v2 v2.5.0/go.mod h1:gD2H2krq/w52MfPLE+Uy64TzJDVY7lP2znR9qmR35kU= github.com/quic-go/qpack v0.4.0 h1:Cr9BXA1sQS2SmDUWjSofMPNKmvF6IiIfDRmgU0w1ZCo= github.com/quic-go/qpack v0.4.0/go.mod h1:UZVnYIfi5GRk+zI9UMaCPsmZ2xKJP7XBUvVyT1Knj9A= github.com/quic-go/qtls-go1-20 v0.3.2 h1:rRgN3WfnKbyik4dBV8A6girlJVxGand/d+jVKbQq5GI= diff --git a/listener/tproxy/packet.go b/listener/tproxy/packet.go index 2966fd2e..b73339a1 100644 --- a/listener/tproxy/packet.go +++ b/listener/tproxy/packet.go @@ -55,16 +55,15 @@ func (c *packet) InAddr() net.Addr { func createOrGetLocalConn(rAddr, lAddr net.Addr, in chan<- C.PacketAdapter, natTable C.NatTable) (*net.UDPConn, error) { remote := rAddr.String() local := lAddr.String() - localConn := natTable.GetLocalConn(local, remote) + localConn := natTable.GetForLocalConn(local, remote) // localConn not exist if localConn == nil { - lockKey := remote + "-lock" - cond, loaded := natTable.GetOrCreateLockForLocalConn(local, lockKey) + cond, loaded := natTable.GetOrCreateLockForLocalConn(local, remote) if loaded { cond.L.Lock() cond.Wait() // we should get localConn here - localConn = natTable.GetLocalConn(local, remote) + localConn = natTable.GetForLocalConn(local, remote) if localConn == nil { return nil, fmt.Errorf("localConn is nil, nat entry not exist") } @@ -74,7 +73,7 @@ func createOrGetLocalConn(rAddr, lAddr net.Addr, in chan<- C.PacketAdapter, natT return nil, fmt.Errorf("cond is nil, nat entry not exist") } defer func() { - natTable.DeleteLocalConnMap(local, lockKey) + natTable.DeleteLockForLocalConn(local, remote) cond.Broadcast() }() conn, err := listenLocalConn(rAddr, lAddr, in, natTable) @@ -82,7 +81,7 @@ func createOrGetLocalConn(rAddr, lAddr net.Addr, in chan<- C.PacketAdapter, natT log.Errorln("listenLocalConn failed with error: %s, packet loss (rAddr[%T]=%s lAddr[%T]=%s)", err.Error(), rAddr, remote, lAddr, local) return nil, err } - natTable.AddLocalConn(local, remote, conn) + natTable.AddForLocalConn(local, remote, conn) localConn = conn } } diff --git a/transport/tuic/v4/client.go b/transport/tuic/v4/client.go index fd3bf54a..ce33b72b 100644 --- a/transport/tuic/v4/client.go +++ b/transport/tuic/v4/client.go @@ -22,6 +22,7 @@ import ( "github.com/Dreamacro/clash/transport/tuic/common" "github.com/metacubex/quic-go" + "github.com/puzpuzpuz/xsync/v2" "github.com/zhangyunhao116/fastrand" ) @@ -49,7 +50,7 @@ type clientImpl struct { openStreams atomic.Int64 closed atomic.Bool - udpInputMap sync.Map + udpInputMap *xsync.MapOf[uint32, net.Conn] // only ready for PoolClient dialerRef C.Dialer @@ -263,11 +264,10 @@ func (t *clientImpl) forceClose(quicConn quic.Connection, err error) { if quicConn != nil { _ = quicConn.CloseWithError(ProtocolError, errStr) } - udpInputMap := &t.udpInputMap - udpInputMap.Range(func(key, value any) bool { - if conn, ok := value.(net.Conn); ok { - _ = conn.Close() - } + udpInputMap := t.udpInputMap + udpInputMap.Range(func(key uint32, value net.Conn) bool { + conn := value + _ = conn.Close() udpInputMap.Delete(key) return true }) @@ -469,6 +469,7 @@ func NewClient(clientOption *ClientOption, udp bool, dialerRef C.Dialer) *Client ClientOption: clientOption, udp: udp, dialerRef: dialerRef, + udpInputMap: xsync.NewIntegerMapOf[uint32, net.Conn](), } c := &Client{ci} runtime.SetFinalizer(c, closeClient) diff --git a/transport/tuic/v4/server.go b/transport/tuic/v4/server.go index b0012d96..56133fea 100644 --- a/transport/tuic/v4/server.go +++ b/transport/tuic/v4/server.go @@ -17,6 +17,7 @@ import ( "github.com/gofrs/uuid/v5" "github.com/metacubex/quic-go" + "github.com/puzpuzpuz/xsync/v2" ) type ServerOption struct { @@ -33,6 +34,7 @@ func NewServerHandler(option *ServerOption, quicConn quic.EarlyConnection, uuid quicConn: quicConn, uuid: uuid, authCh: make(chan struct{}), + udpInputMap: xsync.NewIntegerMapOf[uint32, *atomic.Bool](), } } @@ -45,7 +47,7 @@ type serverHandler struct { authOk atomic.Bool authOnce sync.Once - udpInputMap sync.Map + udpInputMap *xsync.MapOf[uint32, *atomic.Bool] } func (s *serverHandler) AuthOk() bool { @@ -78,8 +80,7 @@ func (s *serverHandler) parsePacket(packet *Packet, udpRelayMode common.UdpRelay assocId = packet.ASSOC_ID - v, _ := s.udpInputMap.LoadOrStore(assocId, &atomic.Bool{}) - writeClosed := v.(*atomic.Bool) + writeClosed, _ := s.udpInputMap.LoadOrCompute(assocId, func() *atomic.Bool { return &atomic.Bool{} }) if writeClosed.Load() { return nil } @@ -173,8 +174,7 @@ func (s *serverHandler) HandleUniStream(reader *bufio.Reader) (err error) { if err != nil { return } - if v, loaded := s.udpInputMap.LoadAndDelete(disassociate.ASSOC_ID); loaded { - writeClosed := v.(*atomic.Bool) + if writeClosed, loaded := s.udpInputMap.LoadAndDelete(disassociate.ASSOC_ID); loaded { writeClosed.Store(true) } case HeartbeatType: diff --git a/transport/tuic/v5/client.go b/transport/tuic/v5/client.go index 74dfd581..c4ac25d4 100644 --- a/transport/tuic/v5/client.go +++ b/transport/tuic/v5/client.go @@ -20,6 +20,7 @@ import ( "github.com/Dreamacro/clash/transport/tuic/common" "github.com/metacubex/quic-go" + "github.com/puzpuzpuz/xsync/v2" "github.com/zhangyunhao116/fastrand" ) @@ -46,7 +47,7 @@ type clientImpl struct { openStreams atomic.Int64 closed atomic.Bool - udpInputMap sync.Map + udpInputMap xsync.MapOf[uint16, net.Conn] // only ready for PoolClient dialerRef C.Dialer @@ -270,10 +271,9 @@ func (t *clientImpl) forceClose(quicConn quic.Connection, err error) { _ = quicConn.CloseWithError(ProtocolError, errStr) } udpInputMap := &t.udpInputMap - udpInputMap.Range(func(key, value any) bool { - if conn, ok := value.(net.Conn); ok { - _ = conn.Close() - } + udpInputMap.Range(func(key uint16, value net.Conn) bool { + conn := value + _ = conn.Close() udpInputMap.Delete(key) return true }) @@ -406,6 +406,7 @@ func NewClient(clientOption *ClientOption, udp bool, dialerRef C.Dialer) *Client ClientOption: clientOption, udp: udp, dialerRef: dialerRef, + udpInputMap: *xsync.NewIntegerMapOf[uint16, net.Conn](), } c := &Client{ci} runtime.SetFinalizer(c, closeClient) diff --git a/transport/tuic/v5/server.go b/transport/tuic/v5/server.go index 30259583..10003a9d 100644 --- a/transport/tuic/v5/server.go +++ b/transport/tuic/v5/server.go @@ -16,6 +16,7 @@ import ( "github.com/gofrs/uuid/v5" "github.com/metacubex/quic-go" + "github.com/puzpuzpuz/xsync/v2" ) type ServerOption struct { @@ -32,6 +33,7 @@ func NewServerHandler(option *ServerOption, quicConn quic.EarlyConnection, uuid quicConn: quicConn, uuid: uuid, authCh: make(chan struct{}), + udpInputMap: xsync.NewIntegerMapOf[uint16, *serverUDPInput](), } } @@ -45,7 +47,7 @@ type serverHandler struct { authUUID atomic.TypedValue[string] authOnce sync.Once - udpInputMap sync.Map + udpInputMap *xsync.MapOf[uint16, *serverUDPInput] } func (s *serverHandler) AuthOk() bool { @@ -94,8 +96,7 @@ func (s *serverHandler) parsePacket(packet *Packet, udpRelayMode common.UdpRelay assocId = packet.ASSOC_ID - v, _ := s.udpInputMap.LoadOrStore(assocId, &serverUDPInput{}) - input := v.(*serverUDPInput) + input, _ := s.udpInputMap.LoadOrCompute(assocId, func() *serverUDPInput { return &serverUDPInput{} }) if input.writeClosed.Load() { return nil } @@ -186,8 +187,7 @@ func (s *serverHandler) HandleUniStream(reader *bufio.Reader) (err error) { if err != nil { return } - if v, loaded := s.udpInputMap.LoadAndDelete(disassociate.ASSOC_ID); loaded { - input := v.(*serverUDPInput) + if input, loaded := s.udpInputMap.LoadAndDelete(disassociate.ASSOC_ID); loaded { input.writeClosed.Store(true) } } diff --git a/tunnel/connection.go b/tunnel/connection.go index 2e76b86b..9fc4f405 100644 --- a/tunnel/connection.go +++ b/tunnel/connection.go @@ -73,12 +73,9 @@ func handleUDPToLocal(writeBack C.WriteBack, pc N.EnhancePacketConn, key string, } func closeAllLocalCoon(lAddr string) { - natTable.RangeLocalConn(lAddr, func(key, value any) bool { - conn, ok := value.(*net.UDPConn) - if !ok || conn == nil { - log.Debugln("Value %#v unknown value when closing TProxy local conn...", conn) - return true - } + natTable.RangeForLocalConn(lAddr, func(key string, value *net.UDPConn) bool { + conn := value + conn.Close() log.Debugln("Closing TProxy local conn... lAddr=%s rAddr=%s", lAddr, key) return true diff --git a/tunnel/statistic/manager.go b/tunnel/statistic/manager.go index 2358d0bd..19ce58d9 100644 --- a/tunnel/statistic/manager.go +++ b/tunnel/statistic/manager.go @@ -2,11 +2,11 @@ package statistic import ( "os" - "sync" "time" "github.com/Dreamacro/clash/common/atomic" + "github.com/puzpuzpuz/xsync/v2" "github.com/shirou/gopsutil/v3/process" ) @@ -14,6 +14,7 @@ var DefaultManager *Manager func init() { DefaultManager = &Manager{ + connections: xsync.NewMapOf[Tracker](), uploadTemp: atomic.NewInt64(0), downloadTemp: atomic.NewInt64(0), uploadBlip: atomic.NewInt64(0), @@ -27,7 +28,7 @@ func init() { } type Manager struct { - connections sync.Map + connections *xsync.MapOf[string, Tracker] uploadTemp *atomic.Int64 downloadTemp *atomic.Int64 uploadBlip *atomic.Int64 @@ -48,14 +49,14 @@ func (m *Manager) Leave(c Tracker) { func (m *Manager) Get(id string) (c Tracker) { if value, ok := m.connections.Load(id); ok { - c = value.(Tracker) + c = value } return } func (m *Manager) Range(f func(c Tracker) bool) { - m.connections.Range(func(key, value any) bool { - return f(value.(Tracker)) + m.connections.Range(func(key string, value Tracker) bool { + return f(value) }) } diff --git a/tunnel/tunnel.go b/tunnel/tunnel.go index d4c15a87..b7557e10 100644 --- a/tunnel/tunnel.go +++ b/tunnel/tunnel.go @@ -318,8 +318,7 @@ func handleUDPConn(packet C.PacketAdapter) { return } - lockKey := key + "-lock" - cond, loaded := natTable.GetOrCreateLock(lockKey) + cond, loaded := natTable.GetOrCreateLock(key) go func() { defer packet.Drop() @@ -333,7 +332,7 @@ func handleUDPConn(packet C.PacketAdapter) { } defer func() { - natTable.Delete(lockKey) + natTable.DeleteLock(key) cond.Broadcast() }()