diff --git a/beater/api/firehose/handler.go b/beater/api/firehose/handler.go index b93adc6443b..a2f5e818918 100644 --- a/beater/api/firehose/handler.go +++ b/beater/api/firehose/handler.go @@ -33,6 +33,7 @@ import ( "github.com/elastic/apm-server/datastreams" "github.com/elastic/apm-server/model" "github.com/elastic/apm-server/publish" + "github.com/elastic/beats/v7/libbeat/common" ) const dataset = "apm.firehose" @@ -41,12 +42,24 @@ type record struct { Data string `json:"data"` } -type firehoseLog struct { +type firehose struct { RequestID string `json:"requestId"` Timestamp int64 `json:"timestamp"` Records []record `json:"records"` } +type cloudwatchMetric struct { + AccountID string `json:"account_id"` + Dimensions map[string]string `json:"dimensions"` + MetricName string `json:"metric_name"` + MetricStreamName string `json:"metric_stream_name"` + Namespace string `json:"namespace"` + Region string `json:"region"` + Timestamp int64 `json:"timestamp"` + Unit string `json:"unit"` + Value map[string]float64 `json:"value"` +} + type result struct { ErrorMessage string `json:"errorMessage"` RequestID string `json:"requestId"` @@ -100,7 +113,7 @@ func Handler(processor model.BatchProcessor, authenticator Authenticator) reques } } - var firehose firehoseLog + var firehose firehose err = json.NewDecoder(c.Request.Body).Decode(&firehose) if err != nil { return nil, err @@ -108,7 +121,7 @@ func Handler(processor model.BatchProcessor, authenticator Authenticator) reques // convert firehose log to events baseEvent := requestMetadata(c) - batch, err := processFirehoseLog(firehose, baseEvent) + batch, err := processFirehose(firehose, baseEvent) if err != nil { return nil, err } @@ -158,7 +171,7 @@ func (e requestError) Error() string { return e.err.Error() } -func processFirehoseLog(firehose firehoseLog, baseEvent model.APMEvent) (model.Batch, error) { +func processFirehose(firehose firehose, baseEvent model.APMEvent) (model.Batch, error) { var batch model.Batch for _, record := range firehose.Records { event := baseEvent @@ -172,15 +185,60 @@ func processFirehoseLog(firehose firehoseLog, baseEvent model.APMEvent) (model.B if line == "" { break } - event.Timestamp = time.Unix(firehose.Timestamp/1000, 0) - event.Processor = model.LogProcessor - event.Message = line + + var cwMetric cloudwatchMetric + err = json.Unmarshal([]byte(line), &cwMetric) + if err == nil && cwMetric.MetricStreamName != "" { + event = processMetrics(baseEvent, cwMetric) + } else { + event = processLogs(baseEvent, firehose, line) + } batch = append(batch, event) } } return batch, nil } +func processMetrics(event model.APMEvent, cwMetric cloudwatchMetric) model.APMEvent { + event.Processor = model.MetricsetProcessor + event.DataStream.Type = datastreams.MetricsType + event.Timestamp = time.Unix(cwMetric.Timestamp/1000, 0) + event.Cloud.AccountID = cwMetric.AccountID + event.Cloud.Region = cwMetric.Region + event.Cloud.ServiceName = cwMetric.Namespace + + namespace := strings.ToLower(cwMetric.Namespace) + namespace = strings.ReplaceAll(namespace, "/", ".") + event.DataStream.Dataset = dataset + "-" + namespace + + labels := common.MapStr{} + for k, v := range cwMetric.Dimensions { + labels[k] = v + } + event.Labels = labels + + var metricset model.Metricset + metricset.Name = namespace + + samples := map[string]model.MetricsetSample{} + for k, v := range cwMetric.Value { + // TODO: handle units for CloudWatch metrics + samples[cwMetric.MetricName+"."+k] = model.MetricsetSample{Value: v} + } + metricset.Samples = samples + event.Metricset = &metricset + return event +} + +func processLogs(event model.APMEvent, firehose firehose, logLine string) model.APMEvent { + event.DataStream.Dataset = dataset + event.Processor = model.LogProcessor + event.DataStream.Type = datastreams.LogsType + event.Timestamp = time.Unix(firehose.Timestamp/1000, 0) + event.Message = logLine + return event +} + func requestMetadata(c *request.Context) model.APMEvent { arnString := c.Request.Header.Get("X-Amz-Firehose-Source-Arn") arnParsed := parseARN(arnString) @@ -196,10 +254,6 @@ func requestMetadata(c *request.Context) model.APMEvent { serviceOrigin.ID = arnString serviceOrigin.Name = arnParsed.Resource event.Service.Origin = serviceOrigin - - // Set data stream type and dataset fields for Firehose - event.DataStream.Type = datastreams.LogsType - event.DataStream.Dataset = dataset return event } diff --git a/beater/api/firehose/handler_test.go b/beater/api/firehose/handler_test.go index d99d901315a..16ef10c6df8 100644 --- a/beater/api/firehose/handler_test.go +++ b/beater/api/firehose/handler_test.go @@ -234,3 +234,41 @@ type authorizerFunc func(ctx context.Context, action auth.Action, resource auth. func (f authorizerFunc) Authorize(ctx context.Context, action auth.Action, resource auth.Resource) error { return f(ctx, action, resource) } + +func TestProcessFirehoseMetric(t *testing.T) { + var batches []model.Batch + tc := testcaseFirehoseHandler{ + path: "cloudwatch_metric.json", + code: http.StatusOK, + id: request.IDResponseValidAccepted, + firehoseAccessKey: "U25jcABcd0JzTjQzUjNDemdGTHk6Ri0xMTNCdVVRdXFSR0lGYzF0Wk5Vdw==", + batchProcessor: model.ProcessBatchFunc(func(ctx context.Context, batch *model.Batch) error { + batches = append(batches, *batch) + return nil + }), + } + + tc.setup(t) + h := Handler(tc.batchProcessor, tc.authenticator) + h(tc.c) + + require.Len(t, batches, 1) + require.Len(t, batches[0], 1) + event := batches[0][0] + + assert.Equal(t, expectedRegion, event.Cloud.Origin.Region) + assert.Equal(t, expectedAccountID, event.Cloud.Origin.AccountID) + assert.Equal(t, testARN, event.Service.Origin.ID) + assert.Equal(t, expectedResource, event.Service.Origin.Name) + + assert.Equal(t, "metrics", event.DataStream.Type) + assert.Equal(t, "aws.ec2", event.Metricset.Name) + assert.Equal(t, "apm.firehose-aws.ec2", event.DataStream.Dataset) + assert.Equal(t, 1, len(event.Labels)) + + assert.Equal(t, 4, len(event.Metricset.Samples)) + assert.Equal(t, 2.0, event.Metricset.Samples["CPUUtilization.count"].Value) + assert.Equal(t, 3.0, event.Metricset.Samples["CPUUtilization.sum"].Value) + assert.Equal(t, 2.0, event.Metricset.Samples["CPUUtilization.max"].Value) + assert.Equal(t, 1.0, event.Metricset.Samples["CPUUtilization.min"].Value) +} diff --git a/changelogs/head.asciidoc b/changelogs/head.asciidoc index 53c6157498e..61565fab387 100644 --- a/changelogs/head.asciidoc +++ b/changelogs/head.asciidoc @@ -46,6 +46,7 @@ https://github.com/elastic/apm-server/compare/7.15\...master[View commits] - Add an experimental endpoint for AWS Kinesis Data Firehose {pull}6299[6299] - `output.elasticsearch.experimental` can be used to enable a new, experimental Elasticsearch output using the go-elasticsearch client {pull}5970[5970] - Transaction metrics now also group by `service.node.name`, `cloud.provider`, `cloud.region`, `cloud.availability_zone` {pull}6323[6323] +- Add support for CloudWatch metric streams through firehose endpoint {pull}6380[6380] [float] ==== Deprecated diff --git a/testdata/firehose/cloudwatch_metric.json b/testdata/firehose/cloudwatch_metric.json new file mode 100644 index 00000000000..5ceffd9bb75 --- /dev/null +++ b/testdata/firehose/cloudwatch_metric.json @@ -0,0 +1,8 @@ +{ "requestId": "bb389cba-95be-469f-8e50-95ecc1afefcd", + "timestamp": 1634755956128, + "records":[ + { + "data": "eyJtZXRyaWNfc3RyZWFtX25hbWUiOiJjbG91ZHdhdGNoLW1ldHJpYy1zdHJlYW0tdXMtZWFzdC0xIiwiYWNjb3VudF9pZCI6IjQyODE1MjUwMjQ2NyIsInJlZ2lvbiI6InVzLWVhc3QtMSIsIm5hbWVzcGFjZSI6IkFXUy9FQzIiLCJtZXRyaWNfbmFtZSI6IkNQVVV0aWxpemF0aW9uIiwiZGltZW5zaW9ucyI6eyJJbnN0YW5jZUlEIjoiaS10ZXN0LTEyMzQifSwidGltZXN0YW1wIjoxNjM0NzU1OTU2MTI4LCJ2YWx1ZSI6eyJtaW4iOjEuMCwgInN1bSI6IDMuMCwgIm1heCI6IDIuMCwgImNvdW50IjogMn0sInVuaXQiOiJQZXJjZW50In0=" + } + ] +}