2019-04-22 02:59:20 +00:00

362 lines
9.0 KiB
Go

// Copyright 2012-present Oliver Eilhard. All rights reserved.
// Use of this source code is governed by a MIT-license.
// See http://olivere.mit-license.org/license.txt for details.
package main
import (
"context"
"encoding/json"
"errors"
"flag"
"fmt"
"log"
"math/rand"
"os"
"runtime"
"strings"
"sync/atomic"
"time"
elastic "gopkg.in/olivere/elastic.v5"
)
type Tweet struct {
User string `json:"user"`
Message string `json:"message"`
Retweets int `json:"retweets"`
Image string `json:"image,omitempty"`
Created time.Time `json:"created,omitempty"`
Tags []string `json:"tags,omitempty"`
Location string `json:"location,omitempty"`
Suggest *elastic.SuggestField `json:"suggest_field,omitempty"`
}
var (
nodes = flag.String("nodes", "", "comma-separated list of ES URLs (e.g. 'http://192.168.2.10:9200,http://192.168.2.11:9200')")
n = flag.Int("n", 5, "number of goroutines that run searches")
index = flag.String("index", "twitter", "name of ES index to use")
errorlogfile = flag.String("errorlog", "", "error log file")
infologfile = flag.String("infolog", "", "info log file")
tracelogfile = flag.String("tracelog", "", "trace log file")
retries = flag.Int("retries", 0, "number of retries")
sniff = flag.Bool("sniff", elastic.DefaultSnifferEnabled, "enable or disable sniffer")
sniffer = flag.Duration("sniffer", elastic.DefaultSnifferInterval, "sniffer interval")
healthcheck = flag.Bool("healthcheck", elastic.DefaultHealthcheckEnabled, "enable or disable healthchecks")
healthchecker = flag.Duration("healthchecker", elastic.DefaultHealthcheckInterval, "healthcheck interval")
)
func main() {
flag.Parse()
runtime.GOMAXPROCS(runtime.NumCPU())
if *nodes == "" {
log.Fatal("no nodes specified")
}
urls := strings.SplitN(*nodes, ",", -1)
testcase, err := NewTestCase(*index, urls)
if err != nil {
log.Fatal(err)
}
testcase.SetErrorLogFile(*errorlogfile)
testcase.SetInfoLogFile(*infologfile)
testcase.SetTraceLogFile(*tracelogfile)
testcase.SetMaxRetries(*retries)
testcase.SetHealthcheck(*healthcheck)
testcase.SetHealthcheckInterval(*healthchecker)
testcase.SetSniff(*sniff)
testcase.SetSnifferInterval(*sniffer)
if err := testcase.Run(*n); err != nil {
log.Fatal(err)
}
select {}
}
type RunInfo struct {
Success bool
}
type TestCase struct {
nodes []string
client *elastic.Client
runs int64
failures int64
runCh chan RunInfo
index string
errorlogfile string
infologfile string
tracelogfile string
maxRetries int
healthcheck bool
healthcheckInterval time.Duration
sniff bool
snifferInterval time.Duration
}
func NewTestCase(index string, nodes []string) (*TestCase, error) {
if index == "" {
return nil, errors.New("no index name specified")
}
return &TestCase{
index: index,
nodes: nodes,
runCh: make(chan RunInfo),
}, nil
}
func (t *TestCase) SetIndex(name string) {
t.index = name
}
func (t *TestCase) SetErrorLogFile(name string) {
t.errorlogfile = name
}
func (t *TestCase) SetInfoLogFile(name string) {
t.infologfile = name
}
func (t *TestCase) SetTraceLogFile(name string) {
t.tracelogfile = name
}
func (t *TestCase) SetMaxRetries(n int) {
t.maxRetries = n
}
func (t *TestCase) SetSniff(enabled bool) {
t.sniff = enabled
}
func (t *TestCase) SetSnifferInterval(d time.Duration) {
t.snifferInterval = d
}
func (t *TestCase) SetHealthcheck(enabled bool) {
t.healthcheck = enabled
}
func (t *TestCase) SetHealthcheckInterval(d time.Duration) {
t.healthcheckInterval = d
}
func (t *TestCase) Run(n int) error {
if err := t.setup(); err != nil {
return err
}
for i := 1; i < n; i++ {
go t.search()
}
go t.monitor()
return nil
}
func (t *TestCase) monitor() {
print := func() {
fmt.Printf("\033[32m%5d\033[0m; \033[31m%5d\033[0m: %s%s\r", t.runs, t.failures, t.client.String(), " ")
}
for {
select {
case run := <-t.runCh:
atomic.AddInt64(&t.runs, 1)
if !run.Success {
atomic.AddInt64(&t.failures, 1)
fmt.Println()
}
print()
case <-time.After(5 * time.Second):
// Print stats after some inactivity
print()
break
}
}
}
func (t *TestCase) setup() error {
var errorlogger *log.Logger
if t.errorlogfile != "" {
f, err := os.OpenFile(t.errorlogfile, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0664)
if err != nil {
return err
}
errorlogger = log.New(f, "", log.Ltime|log.Lmicroseconds|log.Lshortfile)
}
var infologger *log.Logger
if t.infologfile != "" {
f, err := os.OpenFile(t.infologfile, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0664)
if err != nil {
return err
}
infologger = log.New(f, "", log.LstdFlags)
}
// Trace request and response details like this
var tracelogger *log.Logger
if t.tracelogfile != "" {
f, err := os.OpenFile(t.tracelogfile, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0664)
if err != nil {
return err
}
tracelogger = log.New(f, "", log.LstdFlags)
}
client, err := elastic.NewClient(
elastic.SetURL(t.nodes...),
elastic.SetErrorLog(errorlogger),
elastic.SetInfoLog(infologger),
elastic.SetTraceLog(tracelogger),
elastic.SetMaxRetries(t.maxRetries),
elastic.SetSniff(t.sniff),
elastic.SetSnifferInterval(t.snifferInterval),
elastic.SetHealthcheck(t.healthcheck),
elastic.SetHealthcheckInterval(t.healthcheckInterval))
if err != nil {
// Handle error
return err
}
t.client = client
ctx := context.Background()
// Use the IndexExists service to check if a specified index exists.
exists, err := t.client.IndexExists(t.index).Do(ctx)
if err != nil {
return err
}
if exists {
deleteIndex, err := t.client.DeleteIndex(t.index).Do(ctx)
if err != nil {
return err
}
if !deleteIndex.Acknowledged {
return errors.New("delete index not acknowledged")
}
}
// Create a new index.
createIndex, err := t.client.CreateIndex(t.index).Do(ctx)
if err != nil {
return err
}
if !createIndex.Acknowledged {
return errors.New("create index not acknowledged")
}
// Index a tweet (using JSON serialization)
tweet1 := Tweet{User: "olivere", Message: "Take Five", Retweets: 0}
_, err = t.client.Index().
Index(t.index).
Type("tweet").
Id("1").
BodyJson(tweet1).
Do(ctx)
if err != nil {
return err
}
// Index a second tweet (by string)
tweet2 := `{"user" : "olivere", "message" : "It's a Raggy Waltz"}`
_, err = t.client.Index().
Index(t.index).
Type("tweet").
Id("2").
BodyString(tweet2).
Do(ctx)
if err != nil {
return err
}
// Flush to make sure the documents got written.
_, err = t.client.Flush().Index(t.index).Do(ctx)
if err != nil {
return err
}
return nil
}
func (t *TestCase) search() {
ctx := context.Background()
// Loop forever to check for connection issues
for {
// Get tweet with specified ID
get1, err := t.client.Get().
Index(t.index).
Type("tweet").
Id("1").
Do(ctx)
if err != nil {
//failf("Get failed: %v", err)
t.runCh <- RunInfo{Success: false}
continue
}
if !get1.Found {
//log.Printf("Document %s not found\n", "1")
//fmt.Printf("Got document %s in version %d from index %s, type %s\n", get1.Id, get1.Version, get1.Index, get1.Type)
t.runCh <- RunInfo{Success: false}
continue
}
// Search with a term query
searchResult, err := t.client.Search().
Index(t.index). // search in index t.index
Query(elastic.NewTermQuery("user", "olivere")). // specify the query
Sort("user", true). // sort by "user" field, ascending
From(0).Size(10). // take documents 0-9
Pretty(true). // pretty print request and response JSON
Do(ctx) // execute
if err != nil {
//failf("Search failed: %v\n", err)
t.runCh <- RunInfo{Success: false}
continue
}
// searchResult is of type SearchResult and returns hits, suggestions,
// and all kinds of other information from Elasticsearch.
//fmt.Printf("Query took %d milliseconds\n", searchResult.TookInMillis)
// Number of hits
if searchResult.Hits.TotalHits > 0 {
//fmt.Printf("Found a total of %d tweets\n", searchResult.Hits.TotalHits)
// Iterate through results
for _, hit := range searchResult.Hits.Hits {
// hit.Index contains the name of the index
// Deserialize hit.Source into a Tweet (could also be just a map[string]interface{}).
var tweet Tweet
err := json.Unmarshal(*hit.Source, &tweet)
if err != nil {
// Deserialization failed
//failf("Deserialize failed: %v\n", err)
t.runCh <- RunInfo{Success: false}
continue
}
// Work with tweet
//fmt.Printf("Tweet by %s: %s\n", t.User, t.Message)
}
} else {
// No hits
//fmt.Print("Found no tweets\n")
}
t.runCh <- RunInfo{Success: true}
// Sleep some time
time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)
}
}