2019-04-22 02:59:20 +00:00

605 lines
17 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package tcp
import (
"bytes"
"encoding/json"
"fmt"
"io"
"strconv"
"time"
"unicode"
"go-common/app/infra/databus/conf"
"go-common/library/conf/env"
"go-common/library/log"
"go-common/library/queue/databus"
"github.com/Shopify/sarama"
cluster "github.com/bsm/sarama-cluster"
pb "github.com/gogo/protobuf/proto"
)
func stringify(b []byte) []byte {
return bytes.Map(
func(r rune) rune {
if unicode.IsSymbol(r) || unicode.IsControl(r) {
return rune('-')
}
return r
},
b,
)
}
type proto struct {
prefix byte
integer int
message string
}
// psCommon is pub sub common
type psCommon struct {
c *conn
err error
closed bool
// kafka
group string
topic string
cluster string
addr string
color []byte
}
func newPsCommon(c *conn, group, topic, color, cluster string) (ps *psCommon) {
ps = &psCommon{
c: c,
group: group,
topic: topic,
cluster: cluster,
color: []byte(color),
}
if c != nil {
ps.addr = c.conn.RemoteAddr().String()
}
return
}
func (ps *psCommon) write(protos ...proto) (err error) {
for _, p := range protos {
if err = ps.c.Write(p); err != nil {
return
}
}
err = ps.c.Flush()
return
}
func (ps *psCommon) batchWrite(protos []proto) (err error) {
if err = ps.c.Write(proto{prefix: _protoArray, integer: len(protos)}); err != nil {
return
}
for _, p := range protos {
if err = ps.c.Write(p); err != nil {
return
}
// FIXME(felix): 因为ops-log性能问题先屏蔽了
if env.DeployEnv != env.DeployEnvProd {
log.Info("batchWrite group(%s) topic(%s) cluster(%s) color(%s) addr(%s) consumer(%s) ok", ps.group, ps.topic, ps.cluster, ps.color, ps.addr, stringify([]byte(p.message)))
}
}
err = ps.c.Flush()
return
}
func (ps *psCommon) pong() (err error) {
if err = ps.write(proto{prefix: _protoStr, message: _pong}); err != nil {
return
}
log.Info("pong group(%s) topic(%s) cluster(%s) color(%s) addr(%s) ping success", ps.group, ps.topic, ps.cluster, ps.color, ps.addr)
return
}
func (ps *psCommon) Closed() bool {
return ps.closed
}
// Close 跟 redis 协议耦合的太紧,加个 sendRedisErr 开关
func (ps *psCommon) Close(sendRedisErr bool) {
if ps.closed {
return
}
if ps.err == nil {
ps.err = errConnClosedByServer // when closed by self, send close event to client.
}
// write error
if ps.err != errConnRead && ps.err != errConnClosedByClient && sendRedisErr {
ps.write(proto{prefix: _protoErr, message: ps.err.Error()})
}
if ps.c != nil {
ps.c.Close()
}
ps.closed = true
}
func (ps *psCommon) fatal(err error) {
if err == nil || ps.closed {
return
}
ps.err = err
ps.Close(true)
}
// Pub databus producer
type Pub struct {
*psCommon
// producer
producer sarama.SyncProducer
}
// NewPub new databus producer
// http 接口复用此方法c 传 nil
func NewPub(c *conn, group, topic, color string, pCfg *conf.Kafka) (p *Pub, err error) {
producer, err := newProducer(group, topic, pCfg)
if err != nil {
log.Error("group(%s) topic(%s) cluster(%s) NewPub producer error(%v)", group, topic, pCfg.Cluster, err)
return
}
p = &Pub{
psCommon: newPsCommon(c, group, topic, color, pCfg.Cluster),
producer: producer,
}
// http 协议的连接不作处理
if c != nil {
// set producer read connection timeout
p.c.readTimeout = _pubReadTimeout
}
log.Info("NewPub() success group(%s) topic(%s) color(%s) cluster(%s) addr(%s)", group, topic, color, pCfg.Cluster, p.addr)
return
}
// Serve databus producer goroutine
func (p *Pub) Serve() {
var (
err error
cmd string
args [][]byte
)
for {
if cmd, args, err = p.c.Read(); err != nil {
if err != io.EOF {
log.Error("group(%s) topic(%s) cluster(%s) color(%s) addr(%s) read error(%v)", p.group, p.topic, p.cluster, p.color, p.addr, err)
}
p.fatal(errConnRead)
return
}
if p.Closed() {
log.Warn("group(%s) topic(%s) cluster(%s) color(%s) addr(%s) p.Closed()", p.group, p.topic, p.cluster, p.color, p.addr)
return
}
select {
case <-quit:
p.fatal(errConnClosedByServer)
return
default:
}
switch cmd {
case _auth:
err = p.write(proto{prefix: _protoStr, message: _ok})
case _ping:
err = p.pong()
case _set:
if len(args) != 2 {
p.write(proto{prefix: _protoErr, message: errPubParams.Error()})
continue
}
err = p.publish(args[0], nil, args[1])
case _hset:
if len(args) != 3 {
p.write(proto{prefix: _protoErr, message: errPubParams.Error()})
continue
}
err = p.publish(args[0], args[1], args[2])
case _quit:
err = errConnClosedByClient
default:
err = errCmdNotSupport
}
if err != nil {
p.fatal(err)
log.Error("group(%s) topic(%s) cluster(%s) color(%s) addr(%s) serve error(%v)", p.group, p.topic, p.cluster, p.color, p.addr, p.err)
return
}
}
}
func (p *Pub) publish(key, header, value []byte) (err error) {
if _, _, err = p.Publish(key, header, value); err != nil {
return
}
return p.write(proto{prefix: _protoStr, message: _ok})
}
// Publish 发送消息 redis 和 http 协议共用
func (p *Pub) Publish(key, header, value []byte) (partition int32, offset int64, err error) {
var message = &sarama.ProducerMessage{
Topic: p.topic,
Key: sarama.ByteEncoder(key),
Value: sarama.ByteEncoder(value),
Headers: []sarama.RecordHeader{
{Key: _headerColor, Value: p.color},
{Key: _headerMetadata, Value: header},
},
}
now := time.Now()
// TODO(felix): support RecordHeader
if partition, offset, err = p.producer.SendMessage(message); err != nil {
log.Error("group(%s) topic(%s) cluster(%s) color(%s) addr(%s) publish(%v) error(%v)", p.group, p.topic, p.cluster, p.color, p.addr, message, err)
return
}
if svc != nil {
svc.TimeProm.Timing(p.group, int64(time.Since(now)/time.Millisecond))
svc.CountProm.Incr(_opProducerMsgSpeed, p.group, p.topic)
}
// FIXME(felix): 因为ops-log性能问题先屏蔽了
if env.DeployEnv != env.DeployEnvProd {
log.Info("publish group(%s) topic(%s) cluster(%s) color(%s) addr(%s) key(%s) header(%s) value(%s) ok", p.group, p.topic, p.cluster, p.color, p.addr, key, stringify(header), stringify(value))
}
return
}
// Sub databus consumer
type Sub struct {
*psCommon
// kafka consumer
consumer *cluster.Consumer
waitClosing bool
batch int
// ticker
ticker *time.Ticker
}
// NewSub new databus consumer
func NewSub(c *conn, group, topic, color string, sCfg *conf.Kafka, batch int64) (s *Sub, err error) {
select {
case <-consumerLimter:
default:
}
// NOTE color 用于染色消费消息过虑
if color != "" {
group = fmt.Sprintf("%s-%s", group, color)
}
if err = validate(group, topic, sCfg.Brokers); err != nil {
return
}
s = &Sub{
psCommon: newPsCommon(c, group, topic, color, sCfg.Cluster),
ticker: time.NewTicker(_batchInterval),
}
if batch == 0 {
s.batch = _batchNum
} else {
s.batch = int(batch)
}
// set consumer read connection timeout
s.c.readTimeout = _subReadTimeout
// cluster config
cfg := cluster.NewConfig()
cfg.Version = sarama.V1_0_0_0
cfg.ClientID = fmt.Sprintf("%s-%s", group, topic)
cfg.Net.KeepAlive = 30 * time.Second
// NOTE cluster auto commit offset interval
cfg.Consumer.Offsets.CommitInterval = time.Second * 1
// NOTE set fetch.wait.max.ms
cfg.Consumer.MaxWaitTime = time.Millisecond * 250
cfg.Consumer.MaxProcessingTime = 50 * time.Millisecond
// NOTE errors that occur during offset management,if enabled, c.Errors channel must be read
cfg.Consumer.Return.Errors = true
// NOTE notifications that occur during consumer, if enabled, c.Notifications channel must be read
cfg.Group.Return.Notifications = true
// The initial offset to use if no offset was previously committed.
// default: OffsetOldest
cfg.Consumer.Offsets.Initial = sarama.OffsetNewest
if s.consumer, err = cluster.NewConsumer(sCfg.Brokers, group, []string{topic}, cfg); err != nil {
log.Error("group(%s) topic(%s) cluster(%s) color(%s) addr(%s) cluster.NewConsumer() error(%v)", s.group, s.topic, s.cluster, s.color, s.addr, err)
} else {
log.Info("group(%s) topic(%s) cluster(%s) color(%s) addr(%s) cluster.NewConsumer() ok", s.group, s.topic, s.cluster, s.color, s.addr)
}
return
}
func validate(group, topic string, brokers []string) (err error) {
var (
cli *cluster.Client
c *cluster.Config
broker *sarama.Broker
gresp *sarama.DescribeGroupsResponse
memberAssignment *sarama.ConsumerGroupMemberAssignment
consumerNum int
partitions []int32
)
c = cluster.NewConfig()
c.Version = sarama.V0_10_0_1
if cli, err = cluster.NewClient(brokers, c); err != nil {
log.Error("group(%s) topic(%s) cluster.NewClient() error(%v)", group, topic, err)
err = errKafKaData
return
}
defer cli.Close()
if partitions, err = cli.Partitions(topic); err != nil {
log.Error("group(%s) topic(%s) cli.Partitions error(%v)", group, topic, err)
err = errKafKaData
return
}
if len(partitions) <= 0 {
err = errKafKaData
return
}
if err = cli.RefreshCoordinator(group); err != nil {
log.Error("group(%s) topic(%s) cli.RefreshCoordinator error(%v)", group, topic, err)
err = errKafKaData
return
}
if broker, err = cli.Coordinator(group); err != nil {
log.Error("group(%s) topic(%s) cli.Coordinator error(%v)", group, topic, err)
err = errKafKaData
return
}
defer broker.Close()
if gresp, err = broker.DescribeGroups(&sarama.DescribeGroupsRequest{
Groups: []string{group},
}); err != nil {
log.Error("group(%s) topic(%s) cli.DescribeGroups error(%v)", group, topic, err)
err = errKafKaData
return
}
if len(gresp.Groups) != 1 {
err = errKafKaData
return
}
for _, member := range gresp.Groups[0].Members {
if memberAssignment, err = member.GetMemberAssignment(); err != nil {
log.Error("group(%s) topic(%s) member.GetMemberAssignment error(%v)", group, topic, err)
err = errKafKaData
return
}
for mtopic := range memberAssignment.Topics {
if mtopic == topic {
consumerNum++
break
}
}
}
if consumerNum >= len(partitions) {
err = errUseLessConsumer
return
}
return nil
}
// Serve databus consumer goroutine
func (s *Sub) Serve() {
var (
err error
cmd string
args [][]byte
)
defer func() {
svc.CountProm.Decr(_opCurrentConsumer, s.group, s.topic)
}()
log.Info("group(%s) topic(%s) cluster(%s) color(%s) addr(%s) begin serve", s.group, s.topic, s.cluster, s.color, s.addr)
for {
if cmd, args, err = s.c.Read(); err != nil {
if err != io.EOF {
log.Error("group(%s) topic(%s) cluster(%s) color(%s) addr(%s) read error(%v)", s.group, s.topic, s.cluster, s.color, s.addr, err)
}
s.fatal(errConnRead)
return
}
if s.consumer == nil {
log.Error("group(%s) topic(%s) cluster(%s) color(%s) addr(%s) s.consumer is nil", s.group, s.topic, s.cluster, s.color, s.addr)
s.fatal(errConsumerClosed)
return
}
if s.Closed() {
log.Warn("group(%s) topic(%s) cluster(%s) color(%s) addr(%s) s.Closed()", s.group, s.topic, s.cluster, s.color, s.addr)
return
}
switch cmd {
case _auth:
err = s.write(proto{prefix: _protoStr, message: _ok})
case _ping:
err = s.pong()
case _mget:
var enc []byte
if len(args) > 0 {
enc = args[0]
}
err = s.message(enc)
case _set:
err = s.commit(args)
case _quit:
err = errConnClosedByClient
default:
err = errCmdNotSupport
}
if err != nil {
s.fatal(err)
log.Error("group(%s) topic(%s) cluster(%s) color(%s) addr(%s) serve error(%v)", s.group, s.topic, s.cluster, s.color, s.addr, err)
return
}
}
}
func (s *Sub) message(enc []byte) (err error) {
var (
msg *sarama.ConsumerMessage
notify *cluster.Notification
protos []proto
ok bool
bs []byte
last = time.Now()
ret = &databus.MessagePB{}
p = proto{prefix: _protoBulk}
)
for {
select {
case err = <-s.consumer.Errors():
log.Error("group(%s) topic(%s) cluster(%s) addr(%s) catch error(%v)", s.group, s.topic, s.cluster, s.addr, err)
return
case notify, ok = <-s.consumer.Notifications():
if !ok {
log.Info("notification notOk group(%s) topic(%s) cluster(%s) addr(%s) catch error(%v)", s.group, s.topic, s.cluster, s.addr, err)
err = errClosedNotifyChannel
return
}
switch notify.Type {
case cluster.UnknownNotification, cluster.RebalanceError:
log.Error("notification(%s) group(%s) topic(%s) cluster(%s) addr(%s) catch error(%v)", notify.Type, s.group, s.topic, s.cluster, s.addr, err)
err = errClosedNotifyChannel
return
case cluster.RebalanceStart:
log.Info("notification(%s) group(%s) topic(%s) cluster(%s) addr(%s) catch error(%v)", notify.Type, s.group, s.topic, s.cluster, s.addr, err)
continue
case cluster.RebalanceOK:
log.Info("notification(%s) group(%s) topic(%s) cluster(%s) addr(%s) catch error(%v)", notify.Type, s.group, s.topic, s.cluster, s.addr, err)
}
if len(notify.Current[s.topic]) == 0 {
log.Warn("notification(%s) no topic group(%s) topic(%s) cluster(%s) addr(%s) catch error(%v)", notify.Type, s.group, s.topic, s.cluster, s.addr, err)
err = errConsumerOver
return
}
case msg, ok = <-s.consumer.Messages():
if !ok {
log.Error("group(%s) topic(%s) cluster(%s) addr(%s) message channel closed", s.group, s.topic, s.cluster, s.addr)
err = errClosedMsgChannel
return
}
// reset timestamp
last = time.Now()
ret.Key = string(msg.Key)
ret.Value = msg.Value
ret.Topic = s.topic
ret.Partition = msg.Partition
ret.Offset = msg.Offset
ret.Timestamp = msg.Timestamp.Unix()
if len(msg.Headers) > 0 {
var notMatchColor bool
for _, h := range msg.Headers {
if bytes.Equal(h.Key, _headerColor) && !bytes.Equal(h.Value, s.color) {
// match color
notMatchColor = true
} else if bytes.Equal(h.Key, _headerMetadata) && h.Value != nil {
// parse metadata
dh := new(databus.Header)
if err = pb.Unmarshal(h.Value, dh); err != nil {
log.Error("pb.Unmarshal(%s) error(%v)", h.Value, err)
err = nil
} else {
ret.Metadata = dh.Metadata
}
}
}
if notMatchColor {
continue
}
}
if bytes.Equal(enc, _encodePB) {
// encode to pb bytes
if bs, err = pb.Marshal(ret); err != nil {
log.Error("proto.Marshal(%v) error(%v)", ret, err)
s.consumer.MarkPartitionOffset(s.topic, msg.Partition, msg.Offset, "")
return s.write(proto{prefix: _protoErr, message: errMsgFormat.Error()})
}
} else {
// encode to json bytes
if bs, err = json.Marshal(ret); err != nil {
log.Error("json.Marshal(%v) error(%v)", ret, err)
s.consumer.MarkPartitionOffset(s.topic, msg.Partition, msg.Offset, "")
return s.write(proto{prefix: _protoErr, message: errMsgFormat.Error()})
}
}
svc.StatProm.State(_opPartitionOffset, msg.Offset, s.group, s.topic, strconv.Itoa(int(msg.Partition)))
svc.CountProm.Incr(_opConsumerMsgSpeed, s.group, s.topic)
svc.StatProm.Incr(_opConsumerPartition, s.group, s.topic, strconv.Itoa(int(msg.Partition)))
p.message = string(bs)
protos = append(protos, p)
if len(protos) >= s.batch {
return s.batchWrite(protos)
}
case <-s.ticker.C:
if len(protos) != 0 {
return s.batchWrite(protos)
}
if time.Since(last) < _batchTimeout {
continue
}
if s.waitClosing {
log.Info("consumer group(%s) topic(%s) cluster(%s) addr(%s) wait closing then exit,maybe cluster changed", s.group, s.topic, s.cluster, s.addr)
err = errConsumerTimeout
return
}
return s.batchWrite(protos)
}
}
}
func (s *Sub) commit(args [][]byte) (err error) {
var (
partition, offset int64
)
if len(args) != 2 {
log.Error("group(%v) topic(%v) cluster(%s) addr(%s) commit offset error, args(%v) is illegal", s.group, s.topic, s.cluster, s.addr, args)
// write error
return s.write(proto{prefix: _protoErr, message: errCommitParams.Error()})
}
if partition, err = strconv.ParseInt(string(args[0]), 10, 32); err != nil {
return s.write(proto{prefix: _protoErr, message: errCommitParams.Error()})
}
if offset, err = strconv.ParseInt(string(args[1]), 10, 64); err != nil {
return s.write(proto{prefix: _protoErr, message: errCommitParams.Error()})
}
// mark partition offset
s.consumer.MarkPartitionOffset(s.topic, int32(partition), offset, "")
// FIXME(felix): 因为ops-log性能问题先屏蔽了
if env.DeployEnv != env.DeployEnvProd {
log.Info("commit group(%s) topic(%s) cluster(%s) color(%s) addr(%s) partition(%d) offset(%d) mark offset succeed", s.group, s.topic, s.cluster, s.color, s.addr, partition, offset)
}
return s.write(proto{prefix: _protoStr, message: _ok})
}
// Closed judge if consumer is closed
func (s *Sub) Closed() bool {
return s.psCommon != nil && s.psCommon.Closed()
}
// Close close consumer
func (s *Sub) Close() {
if !s.psCommon.Closed() {
s.psCommon.Close(true)
}
if s.consumer != nil {
s.consumer.Close()
s.consumer = nil
}
if s.ticker != nil {
s.ticker.Stop()
}
log.Info("group(%s) topic(%s) cluster(%s) color(%s) addr(%s) consumer exit", s.group, s.topic, s.cluster, s.color, s.addr)
}
// WaitClosing marks closing state and close when consumer stoped until 30s.
func (s *Sub) WaitClosing() {
s.waitClosing = true
}
func (s *Sub) fatal(err error) {
if err == nil || s.closed {
return
}
if s.psCommon != nil {
s.psCommon.fatal(err)
}
s.Close()
}