mihomo/adapter/outboundgroup/loadbalance.go

273 lines
6.3 KiB
Go
Raw Normal View History

2019-12-08 12:17:24 +08:00
package outboundgroup
2019-02-15 14:25:20 +08:00
import (
2019-07-02 19:18:03 +08:00
"context"
2019-02-15 14:25:20 +08:00
"encoding/json"
"errors"
"fmt"
2019-02-15 14:25:20 +08:00
"net"
2023-03-01 14:04:42 +08:00
"sync"
"time"
2019-02-15 14:25:20 +08:00
2023-11-03 21:01:45 +08:00
"github.com/metacubex/mihomo/adapter/outbound"
"github.com/metacubex/mihomo/common/callback"
2023-12-02 17:07:36 +08:00
"github.com/metacubex/mihomo/common/lru"
2023-11-03 21:01:45 +08:00
N "github.com/metacubex/mihomo/common/net"
"github.com/metacubex/mihomo/common/utils"
"github.com/metacubex/mihomo/component/dialer"
C "github.com/metacubex/mihomo/constant"
"github.com/metacubex/mihomo/constant/provider"
2019-02-15 14:25:20 +08:00
"golang.org/x/net/publicsuffix"
)
2023-03-01 14:04:42 +08:00
type strategyFn = func(proxies []C.Proxy, metadata *C.Metadata, touch bool) C.Proxy
2019-02-15 14:25:20 +08:00
type LoadBalance struct {
*GroupBase
disableUDP bool
strategyFn strategyFn
testUrl string
expectedStatus string
}
var errStrategy = errors.New("unsupported strategy")
2022-03-16 12:10:13 +08:00
func parseStrategy(config map[string]any) string {
if strategy, ok := config["strategy"].(string); ok {
return strategy
}
return "consistent-hashing"
2019-02-15 14:25:20 +08:00
}
func getKey(metadata *C.Metadata) string {
2022-05-04 19:52:48 +08:00
if metadata == nil {
return ""
}
2019-02-15 14:25:20 +08:00
if metadata.Host != "" {
// ip host
if ip := net.ParseIP(metadata.Host); ip != nil {
return metadata.Host
}
if etld, err := publicsuffix.EffectiveTLDPlusOne(metadata.Host); err == nil {
return etld
}
}
2022-04-20 01:52:51 +08:00
if !metadata.DstIP.IsValid() {
2019-02-15 14:25:20 +08:00
return ""
}
2019-05-09 21:00:29 +08:00
return metadata.DstIP.String()
2019-02-15 14:25:20 +08:00
}
2022-05-17 13:28:54 +08:00
func getKeyWithSrcAndDst(metadata *C.Metadata) string {
dst := getKey(metadata)
src := ""
if metadata != nil {
src = metadata.SrcIP.String()
}
return fmt.Sprintf("%s%s", src, dst)
}
2019-02-15 14:25:20 +08:00
func jumpHash(key uint64, buckets int32) int32 {
var b, j int64
for j < int64(buckets) {
b = j
key = key*2862933555777941757 + 1
j = int64(float64(b+1) * (float64(int64(1)<<31) / float64((key>>33)+1)))
}
return int32(b)
}
2021-04-29 11:23:14 +08:00
// DialContext implements C.ProxyAdapter
func (lb *LoadBalance) DialContext(ctx context.Context, metadata *C.Metadata, opts ...dialer.Option) (c C.Conn, err error) {
2022-10-30 23:08:18 +08:00
proxy := lb.Unwrap(metadata, true)
c, err = proxy.DialContext(ctx, metadata, lb.Base.DialOptions(opts...)...)
if err == nil {
c.AppendToChains(lb)
} else {
lb.onDialFailed(proxy.Type(), err)
}
if N.NeedHandshake(c) {
c = callback.NewFirstWriteCallBackConn(c, func(err error) {
if err == nil {
lb.onDialSuccess()
} else {
lb.onDialFailed(proxy.Type(), err)
}
})
}
return
2019-02-15 14:25:20 +08:00
}
// ListenPacketContext implements C.ProxyAdapter
func (lb *LoadBalance) ListenPacketContext(ctx context.Context, metadata *C.Metadata, opts ...dialer.Option) (pc C.PacketConn, err error) {
defer func() {
if err == nil {
pc.AppendToChains(lb)
}
}()
2022-10-30 23:08:18 +08:00
proxy := lb.Unwrap(metadata, true)
return proxy.ListenPacketContext(ctx, metadata, lb.Base.DialOptions(opts...)...)
}
2021-04-29 11:23:14 +08:00
// SupportUDP implements C.ProxyAdapter
func (lb *LoadBalance) SupportUDP() bool {
return !lb.disableUDP
}
// IsL3Protocol implements C.ProxyAdapter
func (lb *LoadBalance) IsL3Protocol(metadata *C.Metadata) bool {
return lb.Unwrap(metadata, false).IsL3Protocol(metadata)
}
func strategyRoundRobin(url string) strategyFn {
idx := 0
2023-03-01 14:04:42 +08:00
idxMutex := sync.Mutex{}
return func(proxies []C.Proxy, metadata *C.Metadata, touch bool) C.Proxy {
idxMutex.Lock()
defer idxMutex.Unlock()
i := 0
length := len(proxies)
2023-03-01 14:04:42 +08:00
if touch {
defer func() {
idx = (idx + i) % length
2023-03-01 14:04:42 +08:00
}()
}
for ; i < length; i++ {
id := (idx + i) % length
2023-03-01 14:04:42 +08:00
proxy := proxies[id]
if proxy.AliveForTestUrl(url) {
i++
return proxy
}
2019-04-23 23:29:36 +08:00
}
return proxies[0]
2019-04-23 23:29:36 +08:00
}
}
2019-04-23 23:29:36 +08:00
func strategyConsistentHashing(url string) strategyFn {
maxRetry := 5
2023-03-01 14:04:42 +08:00
return func(proxies []C.Proxy, metadata *C.Metadata, touch bool) C.Proxy {
2023-06-10 17:35:19 +08:00
key := utils.MapHash(getKey(metadata))
buckets := int32(len(proxies))
for i := 0; i < maxRetry; i, key = i+1, key+1 {
idx := jumpHash(key, buckets)
proxy := proxies[idx]
if proxy.AliveForTestUrl(url) {
return proxy
}
}
// when availability is poor, traverse the entire list to get the available nodes
for _, proxy := range proxies {
if proxy.AliveForTestUrl(url) {
return proxy
}
}
return proxies[0]
}
}
func strategyStickySessions(url string) strategyFn {
2022-05-17 13:28:54 +08:00
ttl := time.Minute * 10
maxRetry := 5
2023-12-02 17:07:36 +08:00
lruCache := lru.New[uint64, int](
lru.WithAge[uint64, int](int64(ttl.Seconds())),
lru.WithSize[uint64, int](1000))
2023-03-01 14:04:42 +08:00
return func(proxies []C.Proxy, metadata *C.Metadata, touch bool) C.Proxy {
2023-06-10 17:35:19 +08:00
key := utils.MapHash(getKeyWithSrcAndDst(metadata))
length := len(proxies)
2022-05-18 22:29:27 +08:00
idx, has := lruCache.Get(key)
if !has {
idx = int(jumpHash(key+uint64(time.Now().UnixNano()), int32(length)))
}
nowIdx := idx
for i := 1; i < maxRetry; i++ {
2022-05-17 13:28:54 +08:00
proxy := proxies[nowIdx]
if proxy.AliveForTestUrl(url) {
2022-05-17 13:28:54 +08:00
if nowIdx != idx {
2022-05-18 22:29:27 +08:00
lruCache.Delete(key)
lruCache.Set(key, nowIdx)
2022-05-17 13:28:54 +08:00
}
return proxy
} else {
nowIdx = int(jumpHash(key+uint64(time.Now().UnixNano()), int32(length)))
}
}
2022-05-16 17:46:28 +08:00
lruCache.Delete(key)
lruCache.Set(key, 0)
2022-05-17 13:28:54 +08:00
return proxies[0]
}
}
2021-04-29 11:23:14 +08:00
// Unwrap implements C.ProxyAdapter
2022-10-30 23:08:18 +08:00
func (lb *LoadBalance) Unwrap(metadata *C.Metadata, touch bool) C.Proxy {
proxies := lb.GetProxies(touch)
2023-03-09 11:09:36 +08:00
return lb.strategyFn(proxies, metadata, touch)
2019-04-23 23:29:36 +08:00
}
2021-04-29 11:23:14 +08:00
// MarshalJSON implements C.ProxyAdapter
2019-02-15 14:25:20 +08:00
func (lb *LoadBalance) MarshalJSON() ([]byte, error) {
var all []string
for _, proxy := range lb.GetProxies(false) {
2019-02-15 14:25:20 +08:00
all = append(all, proxy.Name())
}
2022-03-16 12:10:13 +08:00
return json.Marshal(map[string]any{
"type": lb.Type().String(),
"all": all,
"testUrl": lb.testUrl,
"expectedStatus": lb.expectedStatus,
2019-02-15 14:25:20 +08:00
})
}
func NewLoadBalance(option *GroupCommonOption, providers []provider.ProxyProvider, strategy string) (lb *LoadBalance, err error) {
var strategyFn strategyFn
switch strategy {
case "consistent-hashing":
strategyFn = strategyConsistentHashing(option.URL)
case "round-robin":
strategyFn = strategyRoundRobin(option.URL)
case "sticky-sessions":
strategyFn = strategyStickySessions(option.URL)
default:
return nil, fmt.Errorf("%w: %s", errStrategy, strategy)
}
return &LoadBalance{
GroupBase: NewGroupBase(GroupBaseOption{
outbound.BaseOption{
Name: option.Name,
Type: C.LoadBalance,
Interface: option.Interface,
RoutingMark: option.RoutingMark,
},
option.Filter,
option.ExcludeFilter,
option.ExcludeType,
providers,
}),
strategyFn: strategyFn,
disableUDP: option.DisableUDP,
testUrl: option.URL,
expectedStatus: option.ExpectedStatus,
}, nil
2019-02-15 14:25:20 +08:00
}