Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion libbeat/monitoring/report/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ func (c *publishClient) Publish(batch publisher.Batch) error {

// Currently one request per event is sent. Reason is that each event can contain different
// interval params and X-Pack requires to send the interval param.
_, err = c.es.BulkWith("_xpack", "monitoring", params, nil, bulk[:])
_, err = c.es.SendMonitoringBulk(params, bulk[:])

if err != nil {
failed = append(failed, event)
reason = err
Expand Down
67 changes: 67 additions & 0 deletions libbeat/outputs/elasticsearch/bulkapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"io/ioutil"
"net/http"
"strings"

"github.com/elastic/beats/libbeat/common"
)

// MetaBuilder creates meta data for bulk requests
Expand Down Expand Up @@ -77,6 +79,41 @@ func (conn *Connection) BulkWith(
return readQueryResult(result.raw)
}

// SendMonitoringBulk creates a HTTP request to the X-Pack Monitoring API containing a bunch of
// operations and sends them to Elasticsearch. The request is retransmitted up to max_retries
// before returning an error.
func (conn *Connection) SendMonitoringBulk(
params map[string]string,
body []interface{},
) (*QueryResult, error) {
if len(body) == 0 {
return nil, nil
}

enc := conn.encoder
enc.Reset()
if err := bulkEncode(enc, nil, body); err != nil {
return nil, err
}

if !conn.version.IsValid() {
if err := conn.Connect(); err != nil {
return nil, err
}
}

requ, err := newMonitoringBulkRequest(conn.version, conn.URL, params, enc)
if err != nil {
return nil, err
}

_, result, err := conn.sendBulkRequest(requ)
if err != nil {
return nil, err
}
return readQueryResult(result.raw)
}

func newBulkRequest(
urlStr string,
index, docType string,
Expand All @@ -88,6 +125,36 @@ func newBulkRequest(
return nil, err
}

return newBulkRequestWithPath(urlStr, path, params, body)
}

func newMonitoringBulkRequest(
esVersion common.Version,
urlStr string,
params map[string]string,
body bodyEncoder,
) (*bulkRequest, error) {
var path string
var err error
if esVersion.Major < 7 {
path, err = makePath("_xpack", "monitoring", "_bulk")
} else {
path, err = makePath("_monitoring", "bulk", "")
}

if err != nil {
return nil, err
}

return newBulkRequestWithPath(urlStr, path, params, body)
}

func newBulkRequestWithPath(
urlStr string,
path string,
params map[string]string,
body bodyEncoder,
) (*bulkRequest, error) {
url := addToURL(urlStr, path, "", params)

var reader io.Reader
Expand Down