Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions _data-prepper/pipelines/configuration/sinks/opensearch.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,58 @@ Option | Required | Type | Description
`document_root_key` | No | String | The key in the event that will be used as the root in the document. The default is the root of the event. If the key does not exist, then the entire event is written as the document. If `document_root_key` is of a basic value type, such as a string or integer, then the document will have a structure of `{"data": <value of the document_root_key>}`.
`serverless` | No | Boolean | **Deprecated in Data Prepper 2.7. Use this option with the `aws` configuration instead.** Determines whether the OpenSearch backend is Amazon OpenSearch Serverless. Set this value to `true` when the destination for the `opensearch` sink is an Amazon OpenSearch Serverless collection. Default is `false`.
`serverless_options` | No | Object | **Deprecated in Data Prepper 2.7. Use this option with the `aws` configuration instead.** The network configuration options available when the backend of the `opensearch` sink is set to Amazon OpenSearch Serverless. For more information, see [Serverless options](#serverless-options).
`query_lookup` | No | Object | Configuration for querying existing documents before indexing to prevent duplicates. For more information, see [Query lookup](#query-lookup).


## Query lookup

The `query_lookup` configuration enables deduplication by querying OpenSearch for existing documents before indexing new ones. This feature helps prevent duplicates in two scenarios:

1. **Conditional querying**: Query for documents based on a condition before indexing them.
2. **Error-based querying**: Query for documents when bulk operation errors occur that could result in partial success, such as socket timeouts or 500 internal server errors.

### Query lookup options

The `query_lookup` object supports the following options.

Option | Required | Type | Description
:--- | :--- | :--- | :---
`query_when` | No | String | A [Data Prepper expression]({{site.url}}{{site.baseurl}}/data-prepper/pipelines/expression-syntax/) that determines which documents qualify for querying before indexing. For example, `getMetadata("potential_duplicate") == true` will only query documents with that metadata field set to `true`.
`query_term` | Yes | String | The unique field of the document that will be used to query for existing documents in OpenSearch. This is typically an ID field.
`query_on_bulk_errors` | No | Boolean | When set to `true`, documents that encounter recoverable bulk operation errors (such as socket timeouts or 500 errors) will be queried rather than immediately retried. This helps prevent duplicates when the initial indexing request may have partially succeeded. Default is `false`.
`query_duration` | No | Duration | The amount of time to query for a given document before indexing it. Use ISO 8601 duration format, such as `PT5M` for 5 minutes. Default is `PT5M`.
`async_limit` | No | Integer | The maximum number of documents that can be queried concurrently before blocking the processor worker threads. Default is `5000`.

### Query lookup example

The following example configuration queries for documents with a `potential_duplicate` metadata field before indexing:

```yaml
pipeline:
...
sink:
opensearch:
hosts: ["https://localhost:9200"]
username: YOUR_USERNAME
password: YOUR_PASSWORD
index: my-index
query_lookup:
query_when: 'getMetadata("potential_duplicate") == true'
query_term: 'document_id'
query_on_bulk_errors: true
query_duration: PT5M
async_limit: 5000
```

In this configuration:
- Documents with the `potential_duplicate` metadata set to `true` are queried before indexing.
- The `document_id` field is used as the unique identifier for querying.
- If a bulk operation encounters errors like socket timeouts or 500 errors, the affected documents are queried rather than immediately retried.
- Documents are queried for up to 5 minutes before being indexed.
- Up to 5,000 documents can be queried concurrently.

If a document already exists in OpenSearch, it will be dropped and the event handle will be released, preventing duplicates.

## aws

Option | Required | Type | Description
Expand Down
Loading