Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 65 additions & 11 deletions beater/api/firehose/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/elastic/apm-server/datastreams"
"github.com/elastic/apm-server/model"
"github.com/elastic/apm-server/publish"
"github.com/elastic/beats/v7/libbeat/common"
)

const dataset = "apm.firehose"
Expand All @@ -41,12 +42,24 @@ type record struct {
Data string `json:"data"`
}

type firehoseLog struct {
type firehose struct {
RequestID string `json:"requestId"`
Timestamp int64 `json:"timestamp"`
Records []record `json:"records"`
}

type cloudwatchMetric struct {
AccountID string `json:"account_id"`
Dimensions map[string]string `json:"dimensions"`
MetricName string `json:"metric_name"`
MetricStreamName string `json:"metric_stream_name"`
Namespace string `json:"namespace"`
Region string `json:"region"`
Timestamp int64 `json:"timestamp"`
Unit string `json:"unit"`
Value map[string]float64 `json:"value"`
}

type result struct {
ErrorMessage string `json:"errorMessage"`
RequestID string `json:"requestId"`
Expand Down Expand Up @@ -100,15 +113,15 @@ func Handler(processor model.BatchProcessor, authenticator Authenticator) reques
}
}

var firehose firehoseLog
var firehose firehose
err = json.NewDecoder(c.Request.Body).Decode(&firehose)
if err != nil {
return nil, err
}

// convert firehose log to events
baseEvent := requestMetadata(c)
batch, err := processFirehoseLog(firehose, baseEvent)
batch, err := processFirehose(firehose, baseEvent)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -158,7 +171,7 @@ func (e requestError) Error() string {
return e.err.Error()
}

func processFirehoseLog(firehose firehoseLog, baseEvent model.APMEvent) (model.Batch, error) {
func processFirehose(firehose firehose, baseEvent model.APMEvent) (model.Batch, error) {
var batch model.Batch
for _, record := range firehose.Records {
event := baseEvent
Expand All @@ -172,15 +185,60 @@ func processFirehoseLog(firehose firehoseLog, baseEvent model.APMEvent) (model.B
if line == "" {
break
}
event.Timestamp = time.Unix(firehose.Timestamp/1000, 0)
event.Processor = model.LogProcessor
event.Message = line

var cwMetric cloudwatchMetric
err = json.Unmarshal([]byte(line), &cwMetric)
if err == nil && cwMetric.MetricStreamName != "" {
event = processMetrics(baseEvent, cwMetric)
} else {
event = processLogs(baseEvent, firehose, line)
}
batch = append(batch, event)
}
}
return batch, nil
}

func processMetrics(event model.APMEvent, cwMetric cloudwatchMetric) model.APMEvent {
event.Processor = model.MetricsetProcessor
event.DataStream.Type = datastreams.MetricsType
event.Timestamp = time.Unix(cwMetric.Timestamp/1000, 0)
event.Cloud.AccountID = cwMetric.AccountID
event.Cloud.Region = cwMetric.Region
event.Cloud.ServiceName = cwMetric.Namespace

namespace := strings.ToLower(cwMetric.Namespace)
namespace = strings.ReplaceAll(namespace, "/", ".")
event.DataStream.Dataset = dataset + "-" + namespace

labels := common.MapStr{}
for k, v := range cwMetric.Dimensions {
labels[k] = v
}
event.Labels = labels

var metricset model.Metricset
metricset.Name = namespace

samples := map[string]model.MetricsetSample{}
for k, v := range cwMetric.Value {
// TODO: handle units for CloudWatch metrics
samples[cwMetric.MetricName+"."+k] = model.MetricsetSample{Value: v}
}
metricset.Samples = samples
event.Metricset = &metricset
return event
}

func processLogs(event model.APMEvent, firehose firehose, logLine string) model.APMEvent {
event.DataStream.Dataset = dataset
event.Processor = model.LogProcessor
event.DataStream.Type = datastreams.LogsType
event.Timestamp = time.Unix(firehose.Timestamp/1000, 0)
event.Message = logLine
return event
}

func requestMetadata(c *request.Context) model.APMEvent {
arnString := c.Request.Header.Get("X-Amz-Firehose-Source-Arn")
arnParsed := parseARN(arnString)
Expand All @@ -196,10 +254,6 @@ func requestMetadata(c *request.Context) model.APMEvent {
serviceOrigin.ID = arnString
serviceOrigin.Name = arnParsed.Resource
event.Service.Origin = serviceOrigin

// Set data stream type and dataset fields for Firehose
event.DataStream.Type = datastreams.LogsType
event.DataStream.Dataset = dataset
return event
}

Expand Down
38 changes: 38 additions & 0 deletions beater/api/firehose/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,3 +234,41 @@ type authorizerFunc func(ctx context.Context, action auth.Action, resource auth.
func (f authorizerFunc) Authorize(ctx context.Context, action auth.Action, resource auth.Resource) error {
return f(ctx, action, resource)
}

func TestProcessFirehoseMetric(t *testing.T) {
var batches []model.Batch
tc := testcaseFirehoseHandler{
path: "cloudwatch_metric.json",
code: http.StatusOK,
id: request.IDResponseValidAccepted,
firehoseAccessKey: "U25jcABcd0JzTjQzUjNDemdGTHk6Ri0xMTNCdVVRdXFSR0lGYzF0Wk5Vdw==",
batchProcessor: model.ProcessBatchFunc(func(ctx context.Context, batch *model.Batch) error {
batches = append(batches, *batch)
return nil
}),
}

tc.setup(t)
h := Handler(tc.batchProcessor, tc.authenticator)
h(tc.c)

require.Len(t, batches, 1)
require.Len(t, batches[0], 1)
event := batches[0][0]

assert.Equal(t, expectedRegion, event.Cloud.Origin.Region)
assert.Equal(t, expectedAccountID, event.Cloud.Origin.AccountID)
assert.Equal(t, testARN, event.Service.Origin.ID)
assert.Equal(t, expectedResource, event.Service.Origin.Name)

assert.Equal(t, "metrics", event.DataStream.Type)
assert.Equal(t, "aws.ec2", event.Metricset.Name)
assert.Equal(t, "apm.firehose-aws.ec2", event.DataStream.Dataset)
assert.Equal(t, 1, len(event.Labels))

assert.Equal(t, 4, len(event.Metricset.Samples))
assert.Equal(t, 2.0, event.Metricset.Samples["CPUUtilization.count"].Value)
assert.Equal(t, 3.0, event.Metricset.Samples["CPUUtilization.sum"].Value)
assert.Equal(t, 2.0, event.Metricset.Samples["CPUUtilization.max"].Value)
assert.Equal(t, 1.0, event.Metricset.Samples["CPUUtilization.min"].Value)
}
1 change: 1 addition & 0 deletions changelogs/head.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ https://github.com/elastic/apm-server/compare/7.15\...master[View commits]
- Add an experimental endpoint for AWS Kinesis Data Firehose {pull}6299[6299]
- `output.elasticsearch.experimental` can be used to enable a new, experimental Elasticsearch output using the go-elasticsearch client {pull}5970[5970]
- Transaction metrics now also group by `service.node.name`, `cloud.provider`, `cloud.region`, `cloud.availability_zone` {pull}6323[6323]
- Add support for CloudWatch metric streams through firehose endpoint {pull}6380[6380]

[float]
==== Deprecated
Expand Down
8 changes: 8 additions & 0 deletions testdata/firehose/cloudwatch_metric.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{ "requestId": "bb389cba-95be-469f-8e50-95ecc1afefcd",
"timestamp": 1634755956128,
"records":[
{
"data": "eyJtZXRyaWNfc3RyZWFtX25hbWUiOiJjbG91ZHdhdGNoLW1ldHJpYy1zdHJlYW0tdXMtZWFzdC0xIiwiYWNjb3VudF9pZCI6IjQyODE1MjUwMjQ2NyIsInJlZ2lvbiI6InVzLWVhc3QtMSIsIm5hbWVzcGFjZSI6IkFXUy9FQzIiLCJtZXRyaWNfbmFtZSI6IkNQVVV0aWxpemF0aW9uIiwiZGltZW5zaW9ucyI6eyJJbnN0YW5jZUlEIjoiaS10ZXN0LTEyMzQifSwidGltZXN0YW1wIjoxNjM0NzU1OTU2MTI4LCJ2YWx1ZSI6eyJtaW4iOjEuMCwgInN1bSI6IDMuMCwgIm1heCI6IDIuMCwgImNvdW50IjogMn0sInVuaXQiOiJQZXJjZW50In0="
}
]
}