ELK: Connecting to ElasticSearch with a Go client

ElasticSearch very often serves as a repository for monitoring, logging, and business data.  As such, integrations with external system are a requirement.

The Go programming language with its convenient deployment binary and rich set of packages can easily serve as a bridge between these systems and the ElasticSearch server.

We will use the olivere/elastic package for this purpose, it is well maintained and has support for both ElasticSearch 5.x and 2.x depending on your import statement.  In this article, we will be hitting an ElasticSearch 2.x backend.

If you need to install Go on Ubuntu, first read my article here.

Description

What the example code is going to do is connect to an ElasticSearch 2.x instance, check the backend to ensure it is 2.x, insert a row into an index named “myindex” with type “mytype”, and then finally do a search to verify this row was created.

The data inserted will be only two columns: @timestamp and message.  We will populate the timestamp with the current time, and the message will look have the format “message inserted at YYYY-MM-DD HH:MM:SSS Z”.

The first column name is prefixed with an ‘@’ sign only because this is common in logstash processing, but it has no special significance.

By default, automatic index creation is enabled, so there should not be any issue with us creating an index on-the-fly.

Run Example

Let’s start by simply running the code against an ElasticSearch 2.x backend, and then we’ll get down into code explanations further down.  Here are instructions for building and running on Linux, feel free to translate accordingly:

$ mkdir -p $GOPATH/src/elastictest
$ cd $GOPATH/src/elastictest
$ wget https://raw.githubusercontent.com/fabianlee/blogcode/master/golang/elastictest/elastictest.go
$ go get
$ go build
$ ./elastictest --server=http://127.0.0.1:9200

This will produce console output that looks like:

2017/05/21 02:01:48 client.running? true

2017/05/21 02:01:48 -------ElasticSearch version--------
2017/05/21 02:01:48 olivere/elastic API version: 3.0.68
2017/05/21 02:01:48 ElasticSearch server version: 2.3.3

2017/05/21 02:01:48 -------ElasticSearch insert--------
2017/05/21 02:01:48 Successfully inserted row of data into myindex/mydata: &{Index:myindex Type:mytype Id:AVwovOs7C1P69a9AO6Mv Version:1 Created:true}

2017/05/21 02:01:48 -------ElasticSearch search--------
Rows found:
time: 2017-05-21 02:01:48.727262099 +0000 UTC message: message inserted at 2017-05-21 02:01:48.72726217 +0000 UTC
2017/05/21 02:01:48 done with search

As described above, a connection is made, the version is checked, and then a row of data is inserted and then searched for and retrieved.  Below is a code breakdown

Connecting

The first step is to connect to ElasticSearch.  We do so with the server URL passed as a command line argument and then check if the node is running before moving forward.

  // configure connection to ES
 client, err := elastic.NewClient(elastic.SetURL(*server))
 if err != nil {
   panic(err)
 }
 log.Printf("client.running? %v",client.IsRunning())

Checking Version

Then, we check the ElasticSearch backend version to make sure our API is compatible:

version,verr := client.ElasticsearchVersion(*server)

Inserting Data

Inserting data means first constructing the struct of ‘MyType’, which has two fields: ‘Time’ and ‘Message’.  These struct field names must begin with uppercase, or else they will not be exported.

type MyType struct {
  Time    time.Time `json:"@timestamp"`
  Message string    `json:"message"`
}

Then we tell the ‘client.Index()’ to use this ‘row’ struct for inserting a row in the index=myindex of type=mytype.

row := MyType{
  Time: time.Now(),
  Message: fmt.Sprintf("message inserted at %s",time.Now()),
}
ires,ierr := client.Index().Index("myindex").Type("mytype").BodyJson(row).Refresh(true).Do()

Querying Data

Finally, we do a search which consists of first constructing a Term query that will look for the word “inserted” in the message field.

Then we execute the search against “myindex” index, sorting the results by @timestamp.

Finally, we iterate over the rows of results, and print the timestamp and message of reach row found.

    termQuery := elastic.NewTermQuery("message", "inserted")
    res, err := client.Search("myindex").
        Index("myindex").
        Query(termQuery).
        Sort("@timestamp", true).
        Do()
    if err != nil {
        return
    }
    fmt.Println("Rows found:")
    var l MyType
    for _, item := range res.Each(reflect.TypeOf(l)) {
        l := item.(MyType)
        fmt.Printf("time: %s message: %s\n", l.Time, l.Message)
    }

 

To download the full code for elastictest.go, grab it from github.

 

REFERENCES

https://github.com/olivere/elastic

https://groups.google.com/forum/#!topic/Golang-nuts/lXMVT9JHIQo

http://goinbigdata.com/working-with-elasticsearch-in-go/

http://stackoverflow.com/questions/30931092/how-can-i-disable-the-automatic-index-creation-in-elasticsearch

https://www.elastic.co/guide/en/elasticsearch/guide/current/create-doc.html

http://programming.freeideas.cz/2017/02/16/golang-feed-data-into-elasticsearch/