mihomo/tunnel/tunnel.go

639 lines
15 KiB
Go
Raw Normal View History

2018-06-10 22:50:03 +08:00
package tunnel
import (
"context"
2019-02-02 20:47:38 +08:00
"fmt"
2018-12-05 21:13:29 +08:00
"net"
2022-04-20 01:52:51 +08:00
"net/netip"
2022-03-13 01:21:23 +08:00
"path/filepath"
"runtime"
"strconv"
2018-06-10 22:50:03 +08:00
"sync"
2018-06-16 21:34:13 +08:00
"time"
2018-06-10 22:50:03 +08:00
2022-11-24 12:32:35 +08:00
"github.com/jpillora/backoff"
N "github.com/Dreamacro/clash/common/net"
2019-10-11 20:11:18 +08:00
"github.com/Dreamacro/clash/component/nat"
2022-11-12 21:42:45 +08:00
P "github.com/Dreamacro/clash/component/process"
"github.com/Dreamacro/clash/component/resolver"
"github.com/Dreamacro/clash/component/sniffer"
2018-06-10 22:50:03 +08:00
C "github.com/Dreamacro/clash/constant"
"github.com/Dreamacro/clash/constant/provider"
icontext "github.com/Dreamacro/clash/context"
2018-11-21 13:47:46 +08:00
"github.com/Dreamacro/clash/log"
"github.com/Dreamacro/clash/tunnel/statistic"
2018-06-10 22:50:03 +08:00
)
var (
2023-03-15 00:10:54 +08:00
status = newAtomicStatus(Suspend)
tcpQueue = make(chan C.ConnContext, 200)
udpQueue = make(chan C.PacketAdapter, 200)
natTable = nat.New()
rules []C.Rule
2022-12-04 21:53:13 +08:00
listeners = make(map[string]C.InboundListener)
2022-12-04 13:37:14 +08:00
subRules map[string][]C.Rule
proxies = make(map[string]C.Proxy)
providers map[string]provider.ProxyProvider
ruleProviders map[string]provider.RuleProvider
2022-11-03 00:31:31 +08:00
sniffingEnable = false
configMux sync.RWMutex
2022-03-20 02:39:48 +08:00
// Outbound Rule
mode = Rule
// default timeout for UDP session
udpTimeout = 60 * time.Second
findProcessMode P.FindProcessMode
fakeIPRange netip.Prefix
)
func OnSuspend() {
2023-03-15 00:10:54 +08:00
status.Store(Suspend)
2023-03-14 22:57:43 +08:00
for _, c := range statistic.DefaultManager.Snapshot().Connections {
_ = c.Close()
}
}
func OnInnerLoading() {
2023-03-15 00:10:54 +08:00
status.Store(Inner)
}
func OnRunning() {
2023-03-15 00:10:54 +08:00
status.Store(Running)
}
func Status() TunnelStatus {
return status.Load()
}
func SetFakeIPRange(p netip.Prefix) {
fakeIPRange = p
}
func FakeIPRange() netip.Prefix {
return fakeIPRange
}
func SetSniffing(b bool) {
if sniffer.Dispatcher.Enable() {
configMux.Lock()
sniffingEnable = b
configMux.Unlock()
}
}
func IsSniffing() bool {
return sniffingEnable
}
func init() {
go process()
2018-06-10 22:50:03 +08:00
}
2021-06-13 17:23:10 +08:00
// TCPIn return fan-in queue
func TCPIn() chan<- C.ConnContext {
return tcpQueue
}
2021-06-13 17:23:10 +08:00
// UDPIn return fan-in udp queue
func UDPIn() chan<- C.PacketAdapter {
2021-06-13 17:23:10 +08:00
return udpQueue
2018-06-10 22:50:03 +08:00
}
2023-02-18 13:16:07 +08:00
// NatTable return nat table
func NatTable() C.NatTable {
return natTable
}
2018-11-21 13:47:46 +08:00
// Rules return all rules
func Rules() []C.Rule {
return rules
2018-06-18 11:31:49 +08:00
}
2022-12-04 21:53:13 +08:00
func Listeners() map[string]C.InboundListener {
2022-12-04 15:15:23 +08:00
return listeners
}
2018-11-21 13:47:46 +08:00
// UpdateRules handle update rules
2022-12-04 13:37:14 +08:00
func UpdateRules(newRules []C.Rule, newSubRule map[string][]C.Rule, rp map[string]provider.RuleProvider) {
configMux.Lock()
rules = newRules
2021-12-02 22:56:17 +08:00
ruleProviders = rp
2022-12-04 13:37:14 +08:00
subRules = newSubRule
configMux.Unlock()
2018-11-21 13:47:46 +08:00
}
// Proxies return all proxies
func Proxies() map[string]C.Proxy {
return proxies
2018-11-21 13:47:46 +08:00
}
2019-12-08 12:17:24 +08:00
// Providers return all compatible providers
func Providers() map[string]provider.ProxyProvider {
return providers
2019-12-08 12:17:24 +08:00
}
2021-12-02 22:56:17 +08:00
// RuleProviders return all loaded rule providers
func RuleProviders() map[string]provider.RuleProvider {
2021-12-02 22:56:17 +08:00
return ruleProviders
}
2018-11-21 13:47:46 +08:00
// UpdateProxies handle update proxies
func UpdateProxies(newProxies map[string]C.Proxy, newProviders map[string]provider.ProxyProvider) {
configMux.Lock()
proxies = newProxies
providers = newProviders
configMux.Unlock()
}
2022-12-04 21:53:13 +08:00
func UpdateListeners(newListeners map[string]C.InboundListener) {
2022-12-04 15:15:23 +08:00
configMux.Lock()
defer configMux.Unlock()
listeners = newListeners
2022-12-04 15:15:23 +08:00
}
func UpdateSniffer(dispatcher *sniffer.SnifferDispatcher) {
configMux.Lock()
sniffer.Dispatcher = dispatcher
2022-11-03 00:31:31 +08:00
sniffingEnable = dispatcher.Enable()
configMux.Unlock()
}
2018-11-21 13:47:46 +08:00
// Mode return current mode
func Mode() TunnelMode {
return mode
2018-11-21 13:47:46 +08:00
}
// SetMode change the mode of tunnel
func SetMode(m TunnelMode) {
mode = m
}
// SetFindProcessMode replace SetAlwaysFindProcess
// always find process info if legacyAlways = true or mode.Always() = true, may be increase many memory
func SetFindProcessMode(mode P.FindProcessMode) {
findProcessMode = mode
}
func isHandle(t C.Type) bool {
2023-03-15 00:10:54 +08:00
status := status.Load()
return status == Running || (status == Inner && t == C.INNER)
}
// processUDP starts a loop to handle udp packet
func processUDP() {
2020-10-20 17:44:39 +08:00
queue := udpQueue
for conn := range queue {
if !isHandle(conn.Metadata().Type) {
conn.Drop()
continue
}
handleUDPConn(conn)
}
}
func process() {
numUDPWorkers := 4
2021-10-27 21:27:19 +08:00
if num := runtime.GOMAXPROCS(0); num > numUDPWorkers {
numUDPWorkers = num
}
for i := 0; i < numUDPWorkers; i++ {
go processUDP()
}
2019-10-11 20:11:18 +08:00
2020-10-20 17:44:39 +08:00
queue := tcpQueue
for conn := range queue {
if !isHandle(conn.Metadata().Type) {
_ = conn.Conn().Close()
continue
}
go handleTCPConn(conn)
2018-06-10 22:50:03 +08:00
}
}
func needLookupIP(metadata *C.Metadata) bool {
2022-04-20 01:52:51 +08:00
return resolver.MappingEnabled() && metadata.Host == "" && metadata.DstIP.IsValid()
2019-02-11 15:44:42 +08:00
}
func preHandleMetadata(metadata *C.Metadata) error {
// handle IP string on host
2022-04-20 01:52:51 +08:00
if ip, err := netip.ParseAddr(metadata.Host); err == nil {
metadata.DstIP = ip
2021-03-10 12:11:45 +08:00
metadata.Host = ""
}
2019-05-03 00:05:14 +08:00
// preprocess enhanced-mode metadata
if needLookupIP(metadata) {
host, exist := resolver.FindHostByIP(metadata.DstIP)
2018-12-05 21:13:29 +08:00
if exist {
metadata.Host = host
2021-10-18 22:58:16 +08:00
metadata.DNSMode = C.DNSMapping
if resolver.FakeIPEnabled() {
2022-04-20 01:52:51 +08:00
metadata.DstIP = netip.Addr{}
metadata.DNSMode = C.DNSFakeIP
} else if node, ok := resolver.DefaultHosts.Search(host, false); ok {
2020-04-27 21:28:24 +08:00
// redir-host should lookup the hosts
metadata.DstIP, _ = node.RandIP()
} else if node != nil && node.IsDomain {
metadata.Host = node.Domain
2019-05-03 00:05:14 +08:00
}
2022-03-15 02:55:06 +08:00
} else if resolver.IsFakeIP(metadata.DstIP) {
return fmt.Errorf("fake DNS record %s missing", metadata.DstIP)
2018-12-05 21:13:29 +08:00
}
} else if node, ok := resolver.DefaultHosts.Search(metadata.Host, true); ok {
// try use domain mapping
metadata.Host = node.Domain
2018-12-05 21:13:29 +08:00
}
return nil
}
func resolveMetadata(ctx C.PlainContext, metadata *C.Metadata) (proxy C.Proxy, rule C.Rule, err error) {
2022-11-18 22:57:33 +08:00
if metadata.SpecialProxy != "" {
var exist bool
proxy, exist = proxies[metadata.SpecialProxy]
if !exist {
err = fmt.Errorf("proxy %s not found", metadata.SpecialProxy)
}
2022-11-22 19:16:08 +08:00
return
2022-11-18 22:57:33 +08:00
}
switch mode {
2018-11-21 13:47:46 +08:00
case Direct:
proxy = proxies["DIRECT"]
2018-11-21 13:47:46 +08:00
case Global:
proxy = proxies["GLOBAL"]
// Rule
default:
proxy, rule, err = match(metadata)
}
return
2019-10-11 20:11:18 +08:00
}
2019-02-02 20:47:38 +08:00
func handleUDPConn(packet C.PacketAdapter) {
metadata := packet.Metadata()
2019-10-11 20:11:18 +08:00
if !metadata.Valid() {
log.Warnln("[Metadata] not valid: %#v", metadata)
return
}
2020-10-28 21:26:50 +08:00
// make a fAddr if request ip is fakeip
var fAddr netip.Addr
if resolver.IsExistFakeIP(metadata.DstIP) {
fAddr = metadata.DstIP
2020-03-10 20:36:24 +08:00
}
if err := preHandleMetadata(metadata); err != nil {
log.Debugln("[Metadata PreHandle] error: %s", err)
return
2020-01-31 19:26:33 +08:00
}
// local resolve UDP dns
if !metadata.Resolved() {
ip, err := resolver.ResolveIP(context.Background(), metadata.Host)
if err != nil {
return
}
metadata.DstIP = ip
}
2020-01-31 14:43:54 +08:00
key := packet.LocalAddr().String()
2020-10-28 21:26:50 +08:00
handle := func() bool {
pc := natTable.Get(key)
if pc != nil {
2022-04-11 06:28:42 +08:00
_ = handleUDPToRemote(packet, pc, metadata)
2020-10-28 21:26:50 +08:00
return true
}
return false
}
if handle() {
2019-10-11 20:11:18 +08:00
return
}
lockKey := key + "-lock"
2020-10-28 21:26:50 +08:00
cond, loaded := natTable.GetOrCreateLock(lockKey)
2019-10-11 20:11:18 +08:00
go func() {
2020-10-28 21:26:50 +08:00
if loaded {
cond.L.Lock()
cond.Wait()
handle()
cond.L.Unlock()
return
}
2019-10-11 20:11:18 +08:00
2020-10-28 21:26:50 +08:00
defer func() {
natTable.Delete(lockKey)
2020-10-28 21:26:50 +08:00
cond.Broadcast()
}()
pCtx := icontext.NewPacketConnContext(metadata)
proxy, rule, err := resolveMetadata(pCtx, metadata)
2020-10-28 21:26:50 +08:00
if err != nil {
log.Warnln("[UDP] Parse metadata failed: %s", err.Error())
return
2019-04-24 10:29:29 +08:00
}
ctx, cancel := context.WithTimeout(context.Background(), C.DefaultUDPTimeout)
defer cancel()
2022-11-24 12:32:35 +08:00
rawPc, err := retry(ctx, func(ctx context.Context) (C.PacketConn, error) {
return proxy.ListenPacketContext(ctx, metadata.Pure())
}, func(err error) {
if rule == nil {
log.Warnln(
"[UDP] dial %s %s --> %s error: %s",
proxy.Name(),
2023-01-25 13:00:18 +08:00
metadata.SourceDetail(),
metadata.RemoteAddress(),
err.Error(),
)
} else {
2023-01-25 13:00:18 +08:00
log.Warnln("[UDP] dial %s (match %s/%s) %s --> %s error: %s", proxy.Name(), rule.RuleType().String(), rule.Payload(), metadata.SourceDetail(), metadata.RemoteAddress(), err.Error())
}
2022-11-24 12:32:35 +08:00
})
if err != nil {
2020-10-28 21:26:50 +08:00
return
}
pCtx.InjectPacketConn(rawPc)
2023-02-25 19:41:01 +08:00
pc := statistic.NewUDPTracker(rawPc, statistic.DefaultManager, metadata, rule, 0, 0)
2020-10-28 21:26:50 +08:00
switch true {
2022-11-18 22:57:33 +08:00
case metadata.SpecialProxy != "":
2023-01-25 13:00:18 +08:00
log.Infoln("[UDP] %s --> %s using %s", metadata.SourceDetail(), metadata.RemoteAddress(), metadata.SpecialProxy)
2020-10-28 21:26:50 +08:00
case rule != nil:
if rule.Payload() != "" {
log.Infoln("[UDP] %s --> %s match %s using %s", metadata.SourceDetail(), metadata.RemoteAddress(), fmt.Sprintf("%s(%s)", rule.RuleType().String(), rule.Payload()), rawPc.Chains().String())
} else {
log.Infoln("[UDP] %s --> %s match %s using %s", metadata.SourceDetail(), metadata.RemoteAddress(), rule.Payload(), rawPc.Chains().String())
}
2020-10-28 21:26:50 +08:00
case mode == Global:
log.Infoln("[UDP] %s --> %s using GLOBAL", metadata.SourceDetail(), metadata.RemoteAddress())
2020-10-28 21:26:50 +08:00
case mode == Direct:
log.Infoln("[UDP] %s --> %s using DIRECT", metadata.SourceDetail(), metadata.RemoteAddress())
2020-10-28 21:26:50 +08:00
default:
log.Infoln("[UDP] %s --> %s doesn't match any rule using DIRECT", metadata.SourceDetail(), metadata.RemoteAddress())
}
2020-10-28 21:26:50 +08:00
oAddr := metadata.DstIP
natTable.Set(key, pc)
go handleUDPToLocal(packet, pc, key, oAddr, fAddr)
2020-10-28 21:26:50 +08:00
handle()
2019-10-11 20:11:18 +08:00
}()
}
func handleTCPConn(connCtx C.ConnContext) {
2022-04-11 06:28:42 +08:00
defer func(conn net.Conn) {
_ = conn.Close()
}(connCtx.Conn())
2019-10-11 20:11:18 +08:00
metadata := connCtx.Metadata()
2019-10-11 20:11:18 +08:00
if !metadata.Valid() {
log.Warnln("[Metadata] not valid: %#v", metadata)
return
2019-04-23 23:29:36 +08:00
}
if err := preHandleMetadata(metadata); err != nil {
log.Debugln("[Metadata PreHandle] error: %s", err)
return
}
conn := connCtx.Conn()
conn.ResetPeeked() // reset before sniffer
if sniffer.Dispatcher.Enable() && sniffingEnable {
sniffer.Dispatcher.TCPSniff(conn, metadata)
}
peekMutex := sync.Mutex{}
if !conn.Peeked() {
peekMutex.Lock()
go func() {
defer peekMutex.Unlock()
_ = conn.SetReadDeadline(time.Now().Add(200 * time.Millisecond))
_, _ = conn.Peek(1)
_ = conn.SetReadDeadline(time.Time{})
}()
}
proxy, rule, err := resolveMetadata(connCtx, metadata)
2019-10-11 20:11:18 +08:00
if err != nil {
log.Warnln("[Metadata] parse failed: %s", err.Error())
2019-10-11 20:11:18 +08:00
return
}
dialMetadata := metadata
if len(metadata.Host) > 0 {
if node, ok := resolver.DefaultHosts.Search(metadata.Host, false); ok {
if dstIp, _ := node.RandIP(); !FakeIPRange().Contains(dstIp) {
dialMetadata.DstIP = dstIp
dialMetadata.DNSMode = C.DNSHosts
dialMetadata = dialMetadata.Pure()
}
}
}
var peekBytes []byte
2023-02-25 19:41:01 +08:00
var peekLen int
2022-04-12 20:20:04 +08:00
ctx, cancel := context.WithTimeout(context.Background(), C.DefaultTCPTimeout)
defer cancel()
2023-02-27 09:46:16 +08:00
remoteConn, err := retry(ctx, func(ctx context.Context) (remoteConn C.Conn, err error) {
remoteConn, err = proxy.DialContext(ctx, dialMetadata)
if err != nil {
2023-02-27 09:46:16 +08:00
return
2023-02-26 11:11:54 +08:00
}
2023-02-27 09:46:16 +08:00
if N.NeedHandshake(remoteConn) {
2023-02-27 09:46:16 +08:00
defer func() {
for _, chain := range remoteConn.Chains() {
if chain == "REJECT" {
err = nil
return
}
}
if err != nil {
remoteConn = nil
}
}()
peekMutex.Lock()
defer peekMutex.Unlock()
peekBytes, _ = conn.Peek(conn.Buffered())
_, err = remoteConn.Write(peekBytes)
if err != nil {
2023-02-27 09:46:16 +08:00
return
}
if peekLen = len(peekBytes); peekLen > 0 {
_, _ = conn.Discard(peekLen)
}
}
2023-02-27 09:46:16 +08:00
return
2022-11-24 12:32:35 +08:00
}, func(err error) {
if rule == nil {
log.Warnln(
"[TCP] dial %s %s --> %s error: %s",
proxy.Name(),
2023-01-25 13:00:18 +08:00
metadata.SourceDetail(),
metadata.RemoteAddress(),
err.Error(),
)
} else {
2023-01-25 13:00:18 +08:00
log.Warnln("[TCP] dial %s (match %s/%s) %s --> %s error: %s", proxy.Name(), rule.RuleType().String(), rule.Payload(), metadata.SourceDetail(), metadata.RemoteAddress(), err.Error())
}
2022-11-24 12:32:35 +08:00
})
if err != nil {
2018-06-10 22:50:03 +08:00
return
}
2023-02-25 19:41:01 +08:00
remoteConn = statistic.NewTCPTracker(remoteConn, statistic.DefaultManager, metadata, rule, 0, int64(peekLen))
2022-04-11 06:28:42 +08:00
defer func(remoteConn C.Conn) {
_ = remoteConn.Close()
}(remoteConn)
2018-06-10 22:50:03 +08:00
2020-01-31 14:58:54 +08:00
switch true {
2022-11-18 22:57:33 +08:00
case metadata.SpecialProxy != "":
2023-01-25 13:00:18 +08:00
log.Infoln("[TCP] %s --> %s using %s", metadata.SourceDetail(), metadata.RemoteAddress(), metadata.SpecialProxy)
2020-01-31 14:58:54 +08:00
case rule != nil:
if rule.Payload() != "" {
log.Infoln("[TCP] %s --> %s match %s using %s", metadata.SourceDetail(), metadata.RemoteAddress(), fmt.Sprintf("%s(%s)", rule.RuleType().String(), rule.Payload()), remoteConn.Chains().String())
} else {
log.Infoln("[TCP] %s --> %s match %s using %s", metadata.SourceDetail(), metadata.RemoteAddress(), rule.RuleType().String(), remoteConn.Chains().String())
}
case mode == Global:
log.Infoln("[TCP] %s --> %s using GLOBAL", metadata.SourceDetail(), metadata.RemoteAddress())
case mode == Direct:
log.Infoln("[TCP] %s --> %s using DIRECT", metadata.SourceDetail(), metadata.RemoteAddress())
2020-01-31 14:58:54 +08:00
default:
log.Infoln(
"[TCP] %s --> %s doesn't match any rule using DIRECT",
2023-01-25 13:00:18 +08:00
metadata.SourceDetail(),
metadata.RemoteAddress(),
)
}
_ = conn.SetReadDeadline(time.Now()) // stop unfinished peek
peekMutex.Lock()
defer peekMutex.Unlock()
_ = conn.SetReadDeadline(time.Time{}) // reset
handleSocket(connCtx, remoteConn)
2018-06-10 22:50:03 +08:00
}
func shouldResolveIP(rule C.Rule, metadata *C.Metadata) bool {
2022-04-20 01:52:51 +08:00
return rule.ShouldResolveIP() && metadata.Host != "" && !metadata.DstIP.IsValid()
2019-02-02 20:47:38 +08:00
}
func match(metadata *C.Metadata) (C.Proxy, C.Rule, error) {
configMux.RLock()
defer configMux.RUnlock()
2022-08-12 03:04:58 +08:00
var (
resolved bool
processFound bool
)
2019-09-11 17:00:55 +08:00
if node, ok := resolver.DefaultHosts.Search(metadata.Host, false); ok {
metadata.DstIP, _ = node.RandIP()
2019-09-11 17:00:55 +08:00
resolved = true
}
2022-12-04 13:37:14 +08:00
for _, rule := range getRules(metadata) {
if !resolved && shouldResolveIP(rule, metadata) {
func() {
ctx, cancel := context.WithTimeout(context.Background(), resolver.DefaultDNSTimeout)
defer cancel()
ip, err := resolver.ResolveIP(ctx, metadata.Host)
if err != nil {
log.Debugln("[DNS] resolve %s error: %s", metadata.Host, err.Error())
} else {
log.Debugln("[DNS] %s --> %s", metadata.Host, ip.String())
metadata.DstIP = ip
}
resolved = true
}()
2019-02-02 20:47:38 +08:00
}
if !findProcessMode.Off() && !processFound && (findProcessMode.Always() || rule.ShouldFindProcess()) {
srcPort, err := strconv.ParseUint(metadata.SrcPort, 10, 16)
2022-06-14 23:14:43 +08:00
uid, path, err := P.FindProcessName(metadata.NetWork.String(), metadata.SrcIP, int(srcPort))
if err != nil {
log.Debugln("[Process] find process %s: %v", metadata.String(), err)
} else {
2022-07-13 22:27:49 +08:00
metadata.Process = filepath.Base(path)
metadata.ProcessPath = path
metadata.Uid = uid
2022-08-12 03:04:58 +08:00
processFound = true
}
}
if matched, ada := rule.Match(metadata); matched {
adapter, ok := proxies[ada]
2019-04-23 23:29:36 +08:00
if !ok {
continue
2018-06-10 22:50:03 +08:00
}
2019-04-23 23:29:36 +08:00
2022-10-30 23:08:18 +08:00
// parse multi-layer nesting
passed := false
for adapter := adapter; adapter != nil; adapter = adapter.Unwrap(metadata, false) {
if adapter.Type() == C.Pass {
passed = true
break
}
}
if passed {
log.Debugln("%s match Pass rule", adapter.Name())
continue
}
2019-04-23 23:29:36 +08:00
if metadata.NetWork == C.UDP && !adapter.SupportUDP() {
2021-04-05 13:31:10 +08:00
log.Debugln("%s UDP is not supported", adapter.Name())
2019-04-23 23:29:36 +08:00
continue
}
2021-11-17 16:03:47 +08:00
return adapter, rule, nil
2018-06-10 22:50:03 +08:00
}
}
return proxies["DIRECT"], nil, nil
2021-11-17 16:03:47 +08:00
}
2022-11-24 12:32:35 +08:00
2022-12-04 13:37:14 +08:00
func getRules(metadata *C.Metadata) []C.Rule {
2022-12-04 22:08:20 +08:00
if sr, ok := subRules[metadata.SpecialRules]; ok {
2022-12-05 19:48:54 +08:00
log.Debugln("[Rule] use %s rules", metadata.SpecialRules)
2022-12-04 13:37:14 +08:00
return sr
} else {
2022-12-05 19:48:54 +08:00
log.Debugln("[Rule] use default rules")
2022-12-04 13:37:14 +08:00
return rules
}
}
2022-11-24 12:32:35 +08:00
func retry[T any](ctx context.Context, ft func(context.Context) (T, error), fe func(err error)) (t T, err error) {
b := &backoff.Backoff{
Min: 10 * time.Millisecond,
Max: 1 * time.Second,
Factor: 2,
Jitter: true,
}
for i := 0; i < 10; i++ {
t, err = ft(ctx)
if err != nil {
if fe != nil {
fe(err)
}
select {
case <-time.After(b.Duration()):
continue
case <-ctx.Done():
return
}
} else {
break
}
}
return
}