373 lines
8.0 KiB
Go
Raw Permalink Normal View History

2019-04-22 02:59:20 +00:00
package paladin
import (
"context"
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"os"
"path"
"strconv"
"sync"
"time"
"go-common/library/conf/env"
"go-common/library/ecode"
"go-common/library/log"
xip "go-common/library/net/ip"
"go-common/library/net/netutil"
"github.com/pkg/errors"
)
const (
_apiGet = "http://%s/config/v2/get?%s"
_apiCheck = "http://%s/config/v2/check?%s"
_maxLoadRetries = 3
)
var (
_ Client = &sven{}
svenHost string
svenVersion string
svenPath string
svenToken string
svenAppoint string
svenTreeid string
_debug bool
)
func init() {
flag.StringVar(&svenHost, "conf_host", os.Getenv("CONF_HOST"), `config api host.`)
flag.StringVar(&svenVersion, "conf_version", os.Getenv("CONF_VERSION"), `app version.`)
flag.StringVar(&svenPath, "conf_path", os.Getenv("CONF_PATH"), `config file path.`)
flag.StringVar(&svenToken, "conf_token", os.Getenv("CONF_TOKEN"), `config token.`)
flag.StringVar(&svenAppoint, "conf_appoint", os.Getenv("CONF_APPOINT"), `config appoint.`)
flag.StringVar(&svenTreeid, "tree_id", os.Getenv("TREE_ID"), `tree id.`)
if env.DeployEnv == env.DeployEnvDev {
_debug = true
}
}
type watcher struct {
keys []string
ch chan Event
}
func newWatcher(keys []string) *watcher {
return &watcher{keys: keys, ch: make(chan Event, 5)}
}
func (w *watcher) HasKey(key string) bool {
if len(w.keys) == 0 {
return true
}
for _, k := range w.keys {
if k == key {
return true
}
}
return false
}
func (w *watcher) Handle(event Event) {
select {
case w.ch <- event:
default:
log.Error("paladin: discard event:%+v", event)
}
}
func (w *watcher) Chan() <-chan Event {
return w.ch
}
func (w *watcher) Close() {
close(w.ch)
}
// sven is sven config client.
type sven struct {
values *Map
wmu sync.RWMutex
watchers map[*watcher]struct{}
httpCli *http.Client
backoff *netutil.BackoffConfig
}
// NewSven new a config client.
func NewSven() (Client, error) {
s := &sven{
values: new(Map),
watchers: make(map[*watcher]struct{}),
httpCli: &http.Client{Timeout: 60 * time.Second},
backoff: &netutil.BackoffConfig{
MaxDelay: 5 * time.Second,
BaseDelay: 1.0 * time.Second,
Factor: 1.6,
Jitter: 0.2,
},
}
if err := s.checkEnv(); err != nil {
return nil, err
}
ver, err := s.load()
if err != nil {
return nil, err
}
go s.watchproc(ver)
return s, nil
}
func (s *sven) checkEnv() error {
if svenHost == "" || svenVersion == "" || svenPath == "" || svenToken == "" || svenTreeid == "" {
return fmt.Errorf("config env invalid. conf_host(%s) conf_version(%s) conf_path(%s) conf_token(%s) conf_appoint(%s) tree_id(%s)", svenHost, svenVersion, svenPath, svenToken, svenAppoint, svenTreeid)
}
return nil
}
// Get return value by key.
func (s *sven) Get(key string) *Value {
return s.values.Get(key)
}
// GetAll return value map.
func (s *sven) GetAll() *Map {
return s.values
}
// WatchEvent watch with the specified keys.
func (s *sven) WatchEvent(ctx context.Context, keys ...string) <-chan Event {
w := newWatcher(keys)
s.wmu.Lock()
s.watchers[w] = struct{}{}
s.wmu.Unlock()
return w.Chan()
}
// Close close watcher.
func (s *sven) Close() (err error) {
s.wmu.RLock()
for w := range s.watchers {
w.Close()
}
s.wmu.RUnlock()
return
}
func (s *sven) fireEvent(event Event) {
s.wmu.RLock()
for w := range s.watchers {
if w.HasKey(event.Key) {
w.Handle(event)
}
}
s.wmu.RUnlock()
}
func (s *sven) load() (ver int64, err error) {
var (
v *version
cs []*content
)
if v, err = s.check(-1); err != nil {
log.Error("paladin: s.check(-1) error(%v)", err)
return
}
for i := 0; i < _maxLoadRetries; i++ {
if cs, err = s.config(v); err == nil {
all := make(map[string]*Value, len(cs))
for _, v := range cs {
all[v.Name] = &Value{val: v.Config, raw: v.Config}
}
s.values.Store(all)
return v.Version, nil
}
log.Error("paladin: s.config(%v) error(%v)", ver, err)
time.Sleep(s.backoff.Backoff(i))
}
return 0, err
}
func (s *sven) watchproc(ver int64) {
var retry int
for {
v, err := s.check(ver)
if err != nil {
if ecode.NotModified.Equal(err) {
time.Sleep(time.Second)
continue
}
log.Error("paladin: s.check(%d) error(%v)", ver, err)
retry++
time.Sleep(s.backoff.Backoff(retry))
continue
}
cs, err := s.config(v)
if err != nil {
log.Error("paladin: s.config(%v) error(%v)", ver, err)
retry++
time.Sleep(s.backoff.Backoff(retry))
continue
}
all := s.values.Load()
news := make(map[string]*Value, len(cs))
for _, v := range cs {
if _, ok := all[v.Name]; !ok {
go s.fireEvent(Event{Event: EventAdd, Key: v.Name, Value: v.Config})
} else if v.Config != "" {
go s.fireEvent(Event{Event: EventUpdate, Key: v.Name, Value: v.Config})
} else {
go s.fireEvent(Event{Event: EventRemove, Key: v.Name, Value: v.Config})
}
news[v.Name] = &Value{val: v.Config, raw: v.Config}
}
for k, v := range all {
if _, ok := news[k]; !ok {
news[k] = v
}
}
s.values.Store(news)
ver = v.Version
retry = 0
}
}
type version struct {
Version int64 `json:"version"`
Diffs []int64 `json:"diffs"`
}
type config struct {
Version int64 `json:"version"`
Content string `json:"content"`
Md5 string `json:"md5"`
}
type content struct {
Cid int64 `json:"cid"`
Name string `json:"name"`
Config string `json:"config"`
}
func (s *sven) check(ver int64) (v *version, err error) {
params := newParams()
params.Set("version", strconv.FormatInt(ver, 10))
params.Set("appoint", svenAppoint)
var res struct {
Code int `json:"code"`
Data *version `json:"data"`
}
uri := fmt.Sprintf(_apiCheck, svenHost, params.Encode())
if _debug {
fmt.Printf("paladin: check(%d) uri(%s)\n", ver, uri)
}
req, err := http.NewRequest("GET", uri, nil)
if err != nil {
return
}
resp, err := s.httpCli.Do(req)
if err != nil {
return
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
err = errors.Errorf("paladin: httpCli.GET(%s) error(%d)", params.Encode(), resp.StatusCode)
return
}
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
return
}
if err = json.Unmarshal(b, &res); err != nil {
return
}
if ec := ecode.Int(res.Code); !ec.Equal(ecode.OK) {
err = ec
return
}
if res.Data == nil {
err = errors.Errorf("paladin: http version is nil. params(%s)", params.Encode())
return
}
v = res.Data
return
}
func (s *sven) config(ver *version) (cts []*content, err error) {
ids, _ := json.Marshal(ver.Diffs)
params := newParams()
params.Set("version", strconv.FormatInt(ver.Version, 10))
params.Set("ids", string(ids))
var res struct {
Code int `json:"code"`
Data *config `json:"data"`
}
uri := fmt.Sprintf(_apiGet, svenHost, params.Encode())
if _debug {
fmt.Printf("paladin: config(%+v) uri(%s)\n", ver, uri)
}
req, err := http.NewRequest("GET", uri, nil)
if err != nil {
return
}
resp, err := s.httpCli.Do(req)
if err != nil {
return
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
err = errors.Errorf("paladin: httpCli.GET(%s) error(%d)", params.Encode(), resp.StatusCode)
return
}
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
return
}
if err = json.Unmarshal(b, &res); err != nil {
return
}
if !ecode.Int(res.Code).Equal(ecode.OK) || res.Data == nil {
err = errors.Errorf("paladin: http config is nil. params(%s) ecode(%d)", params.Encode(), res.Code)
return
}
if err = json.Unmarshal([]byte(res.Data.Content), &cts); err != nil {
return
}
for _, c := range cts {
if err = ioutil.WriteFile(path.Join(svenPath, c.Name), []byte(c.Config), 0644); err != nil {
return
}
}
return
}
func newParams() url.Values {
params := url.Values{}
params.Set("service", serviceName())
params.Set("build", svenVersion)
params.Set("token", svenToken)
params.Set("hostname", env.Hostname)
params.Set("ip", ipAddr())
return params
}
func ipAddr() string {
if env.IP != "" {
return env.IP
}
return xip.InternalIP()
}
func serviceName() string {
return fmt.Sprintf("%s_%s_%s", svenTreeid, env.DeployEnv, env.Zone)
}