Skip to content
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#2132](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2132))
- `opentelemetry-resource-detector-azure` Changed timeout to 4 seconds due to [timeout bug](https://github.com/open-telemetry/opentelemetry-python/issues/3644)
([#2136](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2136))
- Fix elastic-search instrumentation sanitization to support bulk queries
([#1990](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1990))

## Version 1.22.0/0.43b0 (2023-12-14)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,19 @@ def _unflatten_dict(d):


def sanitize_body(body) -> str:
if isinstance(body, bytes):
body = body.decode("utf8")

if isinstance(body, str):
body_lines = body.strip().split("\n")
if len(body_lines) > 1:
return sanitize_body(body_lines)

body = json.loads(body)

if isinstance(body, list):
return str([sanitize_body(elem) for elem in body])

flatten_body = _flatten_dict(body)

for key in flatten_body:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -486,3 +486,51 @@ def test_body_sanitization(self, _):
sanitize_body(json.dumps(sanitization_queries.interval_query)),
str(sanitization_queries.interval_query_sanitized),
)
self.assertEqual(
sanitize_body(
[
json.dumps(sanitization_queries.filter_query).encode("utf-8"),
json.dumps(sanitization_queries.match_query).encode("utf-8"),
json.dumps(sanitization_queries.interval_query).encode("utf-8"),
]
),
str(
[
str(sanitization_queries.filter_query_sanitized),
str(sanitization_queries.match_query_sanitized),
str(sanitization_queries.interval_query_sanitized),
]
),
)

def test_bulk_search(self, request_mock):
request_mock.return_value = (2, {}, json.dumps({"items": []}))

data = [
{
"_index": "words",
"word": "foo",
},
{
"_index": "words",
"word": "bar",
},
]
client = Elasticsearch()
elasticsearch.helpers.bulk(client, data)

spans = self.get_finished_spans()
span = spans[0]
self.assertEqual(1, len(spans))
self.assertEqual(span.name, "Elasticsearch/_bulk")
self.assertIsNotNone(span.end_time)
expected_bulk_attributes = {
SpanAttributes.DB_SYSTEM: "elasticsearch",
"elasticsearch.url": "/_bulk",
"elasticsearch.method": "POST",
SpanAttributes.DB_STATEMENT: "[\"{'index': {'_index': 'words'}}\", \"{'word': 'foo'}\", \"{'index': {'_index': 'words'}}\", \"{'word': 'bar'}\"]",
}
self.assertEqual(
span.attributes,
expected_bulk_attributes,
)