go-common/app/infra/databus/tcp/pubsub_test.go
2019-04-22 02:59:20 +00:00

71 lines
1.6 KiB
Go

package tcp
import (
"fmt"
"io/ioutil"
"net"
"testing"
"time"
"go-common/app/infra/databus/conf"
. "github.com/smartystreets/goconvey/convey"
)
var (
pubCfg = &conf.Kafka{
Cluster: "test_topic",
Brokers: []string{"172.22.33.174:9092", "172.22.33.183:9092", "172.22.33.185:9092"},
}
)
func TestDatabus(t *testing.T) {
Convey("Test publish:", t, func() {
l, _ := net.Listen("tcp", ":8888")
go func() {
for {
conn, err := l.Accept()
if err != nil {
continue
}
b, err := ioutil.ReadAll(conn)
if err == nil {
fmt.Printf("test conn: %s", b)
}
conn.Close()
}
}()
conn, err := net.Dial("tcp", ":8888")
So(err, ShouldBeNil)
p, err := NewPub(newConn(conn, time.Second, time.Second), "pub", "", _testTopic, pubCfg)
So(err, ShouldBeNil)
key := []byte("key")
header := []byte("header")
msg := []byte("message")
err = p.publish(key, header, msg)
So(err, ShouldBeNil)
time.Sleep(time.Second)
Convey("test sub", func() {
conn, _ := net.Dial("tcp", ":8888")
s, err := NewSub(newConn(conn, time.Second, time.Second), "sub", "", _testTopic, pubCfg, 1)
So(err, ShouldBeNil)
t.Logf("subscriptions: %v", s.consumer.Subscriptions())
for {
select {
case msg := <-s.consumer.Messages():
s.consumer.CommitOffsets()
t.Logf("sub message: %s timestamp: %d", msg.Value, msg.Timestamp.Unix())
return
case err := <-s.consumer.Errors():
t.Errorf("error: %v", err)
So(err, ShouldBeNil)
case n := <-s.consumer.Notifications():
t.Logf("notify: %v", n)
err := p.publish(key, header, msg)
So(err, ShouldBeNil)
}
}
})
})
}