592 lines
16 KiB
Go
592 lines
16 KiB
Go
// Copyright 2012-present Oliver Eilhard. All rights reserved.
|
|
// Use of this source code is governed by a MIT-license.
|
|
// See http://olivere.mit-license.org/license.txt for details.
|
|
|
|
package elastic_test
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log"
|
|
"os"
|
|
"reflect"
|
|
"time"
|
|
|
|
elastic "gopkg.in/olivere/elastic.v5"
|
|
)
|
|
|
|
type Tweet struct {
|
|
User string `json:"user"`
|
|
Message string `json:"message"`
|
|
Retweets int `json:"retweets"`
|
|
Image string `json:"image,omitempty"`
|
|
Created time.Time `json:"created,omitempty"`
|
|
Tags []string `json:"tags,omitempty"`
|
|
Location string `json:"location,omitempty"`
|
|
Suggest *elastic.SuggestField `json:"suggest_field,omitempty"`
|
|
}
|
|
|
|
func Example() {
|
|
errorlog := log.New(os.Stdout, "APP ", log.LstdFlags)
|
|
|
|
// Obtain a client. You can also provide your own HTTP client here.
|
|
client, err := elastic.NewClient(elastic.SetErrorLog(errorlog))
|
|
if err != nil {
|
|
// Handle error
|
|
panic(err)
|
|
}
|
|
|
|
// Trace request and response details like this
|
|
//client.SetTracer(log.New(os.Stdout, "", 0))
|
|
|
|
// Ping the Elasticsearch server to get e.g. the version number
|
|
info, code, err := client.Ping("http://127.0.0.1:9200").Do(context.Background())
|
|
if err != nil {
|
|
// Handle error
|
|
panic(err)
|
|
}
|
|
fmt.Printf("Elasticsearch returned with code %d and version %s\n", code, info.Version.Number)
|
|
|
|
// Getting the ES version number is quite common, so there's a shortcut
|
|
esversion, err := client.ElasticsearchVersion("http://127.0.0.1:9200")
|
|
if err != nil {
|
|
// Handle error
|
|
panic(err)
|
|
}
|
|
fmt.Printf("Elasticsearch version %s\n", esversion)
|
|
|
|
// Use the IndexExists service to check if a specified index exists.
|
|
exists, err := client.IndexExists("twitter").Do(context.Background())
|
|
if err != nil {
|
|
// Handle error
|
|
panic(err)
|
|
}
|
|
if !exists {
|
|
// Create a new index.
|
|
mapping := `
|
|
{
|
|
"settings":{
|
|
"number_of_shards":1,
|
|
"number_of_replicas":0
|
|
},
|
|
"mappings":{
|
|
"_default_": {
|
|
"_all": {
|
|
"enabled": true
|
|
}
|
|
},
|
|
"tweet":{
|
|
"properties":{
|
|
"user":{
|
|
"type":"keyword"
|
|
},
|
|
"message":{
|
|
"type":"text",
|
|
"store": true,
|
|
"fielddata": true
|
|
},
|
|
"retweets":{
|
|
"type":"long"
|
|
},
|
|
"tags":{
|
|
"type":"keyword"
|
|
},
|
|
"location":{
|
|
"type":"geo_point"
|
|
},
|
|
"suggest_field":{
|
|
"type":"completion"
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
`
|
|
createIndex, err := client.CreateIndex("twitter").Body(mapping).Do(context.Background())
|
|
if err != nil {
|
|
// Handle error
|
|
panic(err)
|
|
}
|
|
if !createIndex.Acknowledged {
|
|
// Not acknowledged
|
|
}
|
|
}
|
|
|
|
// Index a tweet (using JSON serialization)
|
|
tweet1 := Tweet{User: "olivere", Message: "Take Five", Retweets: 0}
|
|
put1, err := client.Index().
|
|
Index("twitter").
|
|
Type("tweet").
|
|
Id("1").
|
|
BodyJson(tweet1).
|
|
Do(context.Background())
|
|
if err != nil {
|
|
// Handle error
|
|
panic(err)
|
|
}
|
|
fmt.Printf("Indexed tweet %s to index %s, type %s\n", put1.Id, put1.Index, put1.Type)
|
|
|
|
// Index a second tweet (by string)
|
|
tweet2 := `{"user" : "olivere", "message" : "It's a Raggy Waltz"}`
|
|
put2, err := client.Index().
|
|
Index("twitter").
|
|
Type("tweet").
|
|
Id("2").
|
|
BodyString(tweet2).
|
|
Do(context.Background())
|
|
if err != nil {
|
|
// Handle error
|
|
panic(err)
|
|
}
|
|
fmt.Printf("Indexed tweet %s to index %s, type %s\n", put2.Id, put2.Index, put2.Type)
|
|
|
|
// Get tweet with specified ID
|
|
get1, err := client.Get().
|
|
Index("twitter").
|
|
Type("tweet").
|
|
Id("1").
|
|
Do(context.Background())
|
|
if err != nil {
|
|
// Handle error
|
|
panic(err)
|
|
}
|
|
if get1.Found {
|
|
fmt.Printf("Got document %s in version %d from index %s, type %s\n", get1.Id, get1.Version, get1.Index, get1.Type)
|
|
}
|
|
|
|
// Flush to make sure the documents got written.
|
|
_, err = client.Flush().Index("twitter").Do(context.Background())
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
// Search with a term query
|
|
termQuery := elastic.NewTermQuery("user", "olivere")
|
|
searchResult, err := client.Search().
|
|
Index("twitter"). // search in index "twitter"
|
|
Query(termQuery). // specify the query
|
|
Sort("user", true). // sort by "user" field, ascending
|
|
From(0).Size(10). // take documents 0-9
|
|
Pretty(true). // pretty print request and response JSON
|
|
Do(context.Background()) // execute
|
|
if err != nil {
|
|
// Handle error
|
|
panic(err)
|
|
}
|
|
|
|
// searchResult is of type SearchResult and returns hits, suggestions,
|
|
// and all kinds of other information from Elasticsearch.
|
|
fmt.Printf("Query took %d milliseconds\n", searchResult.TookInMillis)
|
|
|
|
// Each is a convenience function that iterates over hits in a search result.
|
|
// It makes sure you don't need to check for nil values in the response.
|
|
// However, it ignores errors in serialization. If you want full control
|
|
// over iterating the hits, see below.
|
|
var ttyp Tweet
|
|
for _, item := range searchResult.Each(reflect.TypeOf(ttyp)) {
|
|
t := item.(Tweet)
|
|
fmt.Printf("Tweet by %s: %s\n", t.User, t.Message)
|
|
}
|
|
// TotalHits is another convenience function that works even when something goes wrong.
|
|
fmt.Printf("Found a total of %d tweets\n", searchResult.TotalHits())
|
|
|
|
// Here's how you iterate through results with full control over each step.
|
|
if searchResult.Hits.TotalHits > 0 {
|
|
fmt.Printf("Found a total of %d tweets\n", searchResult.Hits.TotalHits)
|
|
|
|
// Iterate through results
|
|
for _, hit := range searchResult.Hits.Hits {
|
|
// hit.Index contains the name of the index
|
|
|
|
// Deserialize hit.Source into a Tweet (could also be just a map[string]interface{}).
|
|
var t Tweet
|
|
err := json.Unmarshal(*hit.Source, &t)
|
|
if err != nil {
|
|
// Deserialization failed
|
|
}
|
|
|
|
// Work with tweet
|
|
fmt.Printf("Tweet by %s: %s\n", t.User, t.Message)
|
|
}
|
|
} else {
|
|
// No hits
|
|
fmt.Print("Found no tweets\n")
|
|
}
|
|
|
|
// Update a tweet by the update API of Elasticsearch.
|
|
// We just increment the number of retweets.
|
|
script := elastic.NewScript("ctx._source.retweets += params.num").Param("num", 1)
|
|
update, err := client.Update().Index("twitter").Type("tweet").Id("1").
|
|
Script(script).
|
|
Upsert(map[string]interface{}{"retweets": 0}).
|
|
Do(context.Background())
|
|
if err != nil {
|
|
// Handle error
|
|
panic(err)
|
|
}
|
|
fmt.Printf("New version of tweet %q is now %d", update.Id, update.Version)
|
|
|
|
// ...
|
|
|
|
// Delete an index.
|
|
deleteIndex, err := client.DeleteIndex("twitter").Do(context.Background())
|
|
if err != nil {
|
|
// Handle error
|
|
panic(err)
|
|
}
|
|
if !deleteIndex.Acknowledged {
|
|
// Not acknowledged
|
|
}
|
|
}
|
|
|
|
func ExampleClient_NewClient_default() {
|
|
// Obtain a client to the Elasticsearch instance on http://127.0.0.1:9200.
|
|
client, err := elastic.NewClient()
|
|
if err != nil {
|
|
// Handle error
|
|
fmt.Printf("connection failed: %v\n", err)
|
|
} else {
|
|
fmt.Println("connected")
|
|
}
|
|
_ = client
|
|
// Output:
|
|
// connected
|
|
}
|
|
|
|
func ExampleClient_NewClient_cluster() {
|
|
// Obtain a client for an Elasticsearch cluster of two nodes,
|
|
// running on 10.0.1.1 and 10.0.1.2.
|
|
client, err := elastic.NewClient(elastic.SetURL("http://10.0.1.1:9200", "http://10.0.1.2:9200"))
|
|
if err != nil {
|
|
// Handle error
|
|
panic(err)
|
|
}
|
|
_ = client
|
|
}
|
|
|
|
func ExampleClient_NewClient_manyOptions() {
|
|
// Obtain a client for an Elasticsearch cluster of two nodes,
|
|
// running on 10.0.1.1 and 10.0.1.2. Do not run the sniffer.
|
|
// Set the healthcheck interval to 10s. When requests fail,
|
|
// retry 5 times. Print error messages to os.Stderr and informational
|
|
// messages to os.Stdout.
|
|
client, err := elastic.NewClient(
|
|
elastic.SetURL("http://10.0.1.1:9200", "http://10.0.1.2:9200"),
|
|
elastic.SetSniff(false),
|
|
elastic.SetHealthcheckInterval(10*time.Second),
|
|
elastic.SetMaxRetries(5),
|
|
elastic.SetErrorLog(log.New(os.Stderr, "ELASTIC ", log.LstdFlags)),
|
|
elastic.SetInfoLog(log.New(os.Stdout, "", log.LstdFlags)))
|
|
if err != nil {
|
|
// Handle error
|
|
panic(err)
|
|
}
|
|
_ = client
|
|
}
|
|
|
|
func ExampleIndexExistsService() {
|
|
// Get a client to the local Elasticsearch instance.
|
|
client, err := elastic.NewClient()
|
|
if err != nil {
|
|
// Handle error
|
|
panic(err)
|
|
}
|
|
// Use the IndexExists service to check if the index "twitter" exists.
|
|
exists, err := client.IndexExists("twitter").Do(context.Background())
|
|
if err != nil {
|
|
// Handle error
|
|
panic(err)
|
|
}
|
|
if exists {
|
|
// ...
|
|
}
|
|
}
|
|
|
|
func ExampleCreateIndexService() {
|
|
// Get a client to the local Elasticsearch instance.
|
|
client, err := elastic.NewClient()
|
|
if err != nil {
|
|
// Handle error
|
|
panic(err)
|
|
}
|
|
// Create a new index.
|
|
createIndex, err := client.CreateIndex("twitter").Do(context.Background())
|
|
if err != nil {
|
|
// Handle error
|
|
panic(err)
|
|
}
|
|
if !createIndex.Acknowledged {
|
|
// Not acknowledged
|
|
}
|
|
}
|
|
|
|
func ExampleDeleteIndexService() {
|
|
// Get a client to the local Elasticsearch instance.
|
|
client, err := elastic.NewClient()
|
|
if err != nil {
|
|
// Handle error
|
|
panic(err)
|
|
}
|
|
// Delete an index.
|
|
deleteIndex, err := client.DeleteIndex("twitter").Do(context.Background())
|
|
if err != nil {
|
|
// Handle error
|
|
panic(err)
|
|
}
|
|
if !deleteIndex.Acknowledged {
|
|
// Not acknowledged
|
|
}
|
|
}
|
|
|
|
func ExampleSearchService() {
|
|
// Get a client to the local Elasticsearch instance.
|
|
client, err := elastic.NewClient()
|
|
if err != nil {
|
|
// Handle error
|
|
panic(err)
|
|
}
|
|
|
|
// Search with a term query
|
|
termQuery := elastic.NewTermQuery("user", "olivere")
|
|
searchResult, err := client.Search().
|
|
Index("twitter"). // search in index "twitter"
|
|
Query(termQuery). // specify the query
|
|
Sort("user", true). // sort by "user" field, ascending
|
|
From(0).Size(10). // take documents 0-9
|
|
Pretty(true). // pretty print request and response JSON
|
|
Do(context.Background()) // execute
|
|
if err != nil {
|
|
// Handle error
|
|
panic(err)
|
|
}
|
|
|
|
// searchResult is of type SearchResult and returns hits, suggestions,
|
|
// and all kinds of other information from Elasticsearch.
|
|
fmt.Printf("Query took %d milliseconds\n", searchResult.TookInMillis)
|
|
|
|
// Number of hits
|
|
if searchResult.Hits.TotalHits > 0 {
|
|
fmt.Printf("Found a total of %d tweets\n", searchResult.Hits.TotalHits)
|
|
|
|
// Iterate through results
|
|
for _, hit := range searchResult.Hits.Hits {
|
|
// hit.Index contains the name of the index
|
|
|
|
// Deserialize hit.Source into a Tweet (could also be just a map[string]interface{}).
|
|
var t Tweet
|
|
err := json.Unmarshal(*hit.Source, &t)
|
|
if err != nil {
|
|
// Deserialization failed
|
|
}
|
|
|
|
// Work with tweet
|
|
fmt.Printf("Tweet by %s: %s\n", t.User, t.Message)
|
|
}
|
|
} else {
|
|
// No hits
|
|
fmt.Print("Found no tweets\n")
|
|
}
|
|
}
|
|
|
|
func ExampleAggregations() {
|
|
// Get a client to the local Elasticsearch instance.
|
|
client, err := elastic.NewClient()
|
|
if err != nil {
|
|
// Handle error
|
|
panic(err)
|
|
}
|
|
|
|
// Create an aggregation for users and a sub-aggregation for a date histogram of tweets (per year).
|
|
timeline := elastic.NewTermsAggregation().Field("user").Size(10).OrderByCountDesc()
|
|
histogram := elastic.NewDateHistogramAggregation().Field("created").Interval("year")
|
|
timeline = timeline.SubAggregation("history", histogram)
|
|
|
|
// Search with a term query
|
|
searchResult, err := client.Search().
|
|
Index("twitter"). // search in index "twitter"
|
|
Query(elastic.NewMatchAllQuery()). // return all results, but ...
|
|
SearchType("count"). // ... do not return hits, just the count
|
|
Aggregation("timeline", timeline). // add our aggregation to the query
|
|
Pretty(true). // pretty print request and response JSON
|
|
Do(context.Background()) // execute
|
|
if err != nil {
|
|
// Handle error
|
|
panic(err)
|
|
}
|
|
|
|
// Access "timeline" aggregate in search result.
|
|
agg, found := searchResult.Aggregations.Terms("timeline")
|
|
if !found {
|
|
log.Fatalf("we should have a terms aggregation called %q", "timeline")
|
|
}
|
|
for _, userBucket := range agg.Buckets {
|
|
// Every bucket should have the user field as key.
|
|
user := userBucket.Key
|
|
|
|
// The sub-aggregation history should have the number of tweets per year.
|
|
histogram, found := userBucket.DateHistogram("history")
|
|
if found {
|
|
for _, year := range histogram.Buckets {
|
|
var key string
|
|
if v := year.KeyAsString; v != nil {
|
|
key = *v
|
|
}
|
|
fmt.Printf("user %q has %d tweets in %q\n", user, year.DocCount, key)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func ExampleSearchResult() {
|
|
client, err := elastic.NewClient()
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
// Do a search
|
|
searchResult, err := client.Search().Index("twitter").Query(elastic.NewMatchAllQuery()).Do(context.Background())
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
// searchResult is of type SearchResult and returns hits, suggestions,
|
|
// and all kinds of other information from Elasticsearch.
|
|
fmt.Printf("Query took %d milliseconds\n", searchResult.TookInMillis)
|
|
|
|
// Each is a utility function that iterates over hits in a search result.
|
|
// It makes sure you don't need to check for nil values in the response.
|
|
// However, it ignores errors in serialization. If you want full control
|
|
// over iterating the hits, see below.
|
|
var ttyp Tweet
|
|
for _, item := range searchResult.Each(reflect.TypeOf(ttyp)) {
|
|
t := item.(Tweet)
|
|
fmt.Printf("Tweet by %s: %s\n", t.User, t.Message)
|
|
}
|
|
fmt.Printf("Found a total of %d tweets\n", searchResult.TotalHits())
|
|
|
|
// Here's how you iterate hits with full control.
|
|
if searchResult.Hits.TotalHits > 0 {
|
|
fmt.Printf("Found a total of %d tweets\n", searchResult.Hits.TotalHits)
|
|
|
|
// Iterate through results
|
|
for _, hit := range searchResult.Hits.Hits {
|
|
// hit.Index contains the name of the index
|
|
|
|
// Deserialize hit.Source into a Tweet (could also be just a map[string]interface{}).
|
|
var t Tweet
|
|
err := json.Unmarshal(*hit.Source, &t)
|
|
if err != nil {
|
|
// Deserialization failed
|
|
}
|
|
|
|
// Work with tweet
|
|
fmt.Printf("Tweet by %s: %s\n", t.User, t.Message)
|
|
}
|
|
} else {
|
|
// No hits
|
|
fmt.Print("Found no tweets\n")
|
|
}
|
|
}
|
|
|
|
func ExamplePutTemplateService() {
|
|
client, err := elastic.NewClient()
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
// Create search template
|
|
tmpl := `{"template":{"query":{"match":{"title":"{{query_string}}"}}}}`
|
|
|
|
// Create template
|
|
resp, err := client.PutTemplate().
|
|
Id("my-search-template"). // Name of the template
|
|
BodyString(tmpl). // Search template itself
|
|
Do(context.Background()) // Execute
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
if resp.Acknowledged {
|
|
fmt.Println("search template creation acknowledged")
|
|
}
|
|
}
|
|
|
|
func ExampleGetTemplateService() {
|
|
client, err := elastic.NewClient()
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
// Get template stored under "my-search-template"
|
|
resp, err := client.GetTemplate().Id("my-search-template").Do(context.Background())
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
fmt.Printf("search template is: %q\n", resp.Template)
|
|
}
|
|
|
|
func ExampleDeleteTemplateService() {
|
|
client, err := elastic.NewClient()
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
// Delete template
|
|
resp, err := client.DeleteTemplate().Id("my-search-template").Do(context.Background())
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
if resp != nil && resp.Acknowledged {
|
|
fmt.Println("template deleted")
|
|
}
|
|
}
|
|
|
|
func ExampleClusterHealthService() {
|
|
client, err := elastic.NewClient()
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
// Get cluster health
|
|
res, err := client.ClusterHealth().Index("twitter").Do(context.Background())
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
if res == nil {
|
|
panic(err)
|
|
}
|
|
fmt.Printf("Cluster status is %q\n", res.Status)
|
|
}
|
|
|
|
func ExampleClusterHealthService_WaitForGreen() {
|
|
client, err := elastic.NewClient()
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
// Wait for status green
|
|
res, err := client.ClusterHealth().WaitForStatus("green").Timeout("15s").Do(context.Background())
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
if res.TimedOut {
|
|
fmt.Printf("time out waiting for cluster status %q\n", "green")
|
|
} else {
|
|
fmt.Printf("cluster status is %q\n", res.Status)
|
|
}
|
|
}
|
|
|
|
func ExampleClusterStateService() {
|
|
client, err := elastic.NewClient()
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
// Get cluster state
|
|
res, err := client.ClusterState().Metric("version").Do(context.Background())
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
fmt.Printf("Cluster %q has version %d", res.ClusterName, res.Version)
|
|
}
|