[Data] Add Kafka as datasource#58592
Conversation
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
python/ray/data/read_api.py
Outdated
| start_offset: Optional[Union[int, str]] = "earliest", | ||
| end_offset: Optional[Union[int, str]] = "latest", |
There was a problem hiding this comment.
Are earlier and latest the only valid string values? Will we need to add more values later?
There was a problem hiding this comment.
Changed from str into Literal["earliest"] & Literal["latest"]
python/ray/data/read_api.py
Outdated
| trigger: Literal["once"] = "once", | ||
| start_offset: Optional[Union[int, str]] = "earliest", | ||
| end_offset: Optional[Union[int, str]] = "latest", | ||
| authentication: Optional[Dict[str, Any]] = None, |
There was a problem hiding this comment.
What happens if this is None? If it's implicitly required, I think it'd be clearer if this was a required argument.
There was a problem hiding this comment.
Also, in the interest of being explicit over implicit, I think we should use a typed dataclass rather than a dict (what are the valid fields and values? You'd only know by looking at the docstring)
There was a problem hiding this comment.
Made authentication a dataclass
What happens if this is None? If it's implicitly required, I think it'd be clearer if this was a required argument.
This is not required.
python/ray/data/read_api.py
Outdated
| By default, the number of output blocks is dynamically decided based on | ||
| input data size and available resources. You shouldn't manually set this | ||
| value in most cases. | ||
| timeout_ms: Timeout in milliseconds to poll to until reaching end_offset (default 10000ms/10s). |
There was a problem hiding this comment.
What happens if we don't expose this?
There was a problem hiding this comment.
I think user should set their timeout?
|
|
||
| # Create metadata for this task | ||
| metadata = BlockMetadata( | ||
| num_rows=max_records_per_task, |
There was a problem hiding this comment.
What happens if we don't set this? This could be inaccurate, right?
There was a problem hiding this comment.
I removed max_records_per_task since we current run a task per partition.
Also set the num_rows to None.
| in the remote task without requiring 'self' to be serialized. | ||
| """ | ||
|
|
||
| def kafka_read_fn() -> Iterable[Block]: |
There was a problem hiding this comment.
The implementation of this function is pretty long. Wonder if there's a way we can make it shorter (and easier to read) by abstracting some of the logic as functions.
If you feel like it'd hurt readability though, okay to keep as-is.
| --hash=sha256:31f23644fe2602f88ff55e1f5c79ba497e01224ee7737937930c448e4d0e24dc \ | ||
| --hash=sha256:a8a6399716257f45be6a007360200409fce5cda2661e3dec71d23dc15f6189ab | ||
| # via uvicorn | ||
| python-dotenv==1.2.1 \ |
There was a problem hiding this comment.
For my own understanding, what's the deal with all the lock files? Is this what REEf recommended?
There was a problem hiding this comment.
I asked REEf team how do I resolve deps error and they want me to run this command
bazel run //ci/raydepsets:raydepsets -- build --all-configs
Pasting the message
I added some test deps and ran bash ci/ci.sh compile_pip_dependencies to update requirement_compiled.txt
And still getting this error, do you have any idea how we should resolve this?
https://buildkite.com/ray-project/premerge/builds/53872/steps/canvas?sid=019a84cf-060a-43b7-b6ef-39fe56484670
There was a problem hiding this comment.
Is this what REEf recommended?
yes.
you can split the python-docenv version upgrade PR into a separate one.
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
There was a problem hiding this comment.
Code Review
This pull request introduces a new Kafka datasource for Ray Data, allowing bounded reads from Kafka topics. The implementation is well-structured, using kafka-python and providing a clear API via ray.data.read_kafka. The addition of KafkaAuthConfig makes authentication flexible. The integration tests are comprehensive, covering various scenarios including different offset configurations, multiple partitions/topics, and timeout behavior, which is excellent. I've found a few areas for improvement, mainly around documentation clarity and a couple of small code refinements. Overall, this is a great contribution.
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Signed-off-by: You-Cheng Lin <106612301+owenowenisme@users.noreply.github.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Signed-off-by: You-Cheng Lin <106612301+owenowenisme@users.noreply.github.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Signed-off-by: You-Cheng Lin <106612301+owenowenisme@users.noreply.github.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
python/ray/data/read_api.py
Outdated
| trigger: Literal["once"] = "once", | ||
| start_offset: Union[int, Literal["earliest"]] = "earliest", | ||
| end_offset: Union[int, Literal["latest"]] = "latest", | ||
| kafka_auth_config: KafkaAuthConfig = KafkaAuthConfig(), |
There was a problem hiding this comment.
Bug: Mutable Defaults Break Configuration Isolation
Mutable default argument KafkaAuthConfig() creates a shared instance across all calls to read_kafka. Since KafkaAuthConfig is a non-frozen dataclass, any mutation to this shared instance would affect all subsequent function calls that don't provide kafka_auth_config. The default should be None instead, with instantiation inside the function if needed.
aslonnie
left a comment
There was a problem hiding this comment.
fwiw, @elliot-barn is giving a talk about depset next tue.
| --hash=sha256:31f23644fe2602f88ff55e1f5c79ba497e01224ee7737937930c448e4d0e24dc \ | ||
| --hash=sha256:a8a6399716257f45be6a007360200409fce5cda2661e3dec71d23dc15f6189ab | ||
| # via uvicorn | ||
| python-dotenv==1.2.1 \ |
There was a problem hiding this comment.
Is this what REEf recommended?
yes.
you can split the python-docenv version upgrade PR into a separate one.
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
| --hash=sha256:31f23644fe2602f88ff55e1f5c79ba497e01224ee7737937930c448e4d0e24dc \ | ||
| --hash=sha256:a8a6399716257f45be6a007360200409fce5cda2661e3dec71d23dc15f6189ab | ||
| # via uvicorn | ||
| python-dotenv==1.2.1 \ |
There was a problem hiding this comment.
@elliot-barn could you change codeowner so that these lock files are not owned by ray-ci ? we do not need to review every single dependency change for release tests.
aslonnie
left a comment
There was a problem hiding this comment.
approval for release test lock file changes.
python/ray/data/read_api.py
Outdated
| By default, the number of output blocks is dynamically decided based on | ||
| input data size and available resources. You shouldn't manually set this | ||
| value in most cases. | ||
| timeout_ms: Timeout in milliseconds to poll to until reaching end_offset (default 10000ms). |
There was a problem hiding this comment.
Can you elaborate on this? For example, is this the timeout per task, or for the read as a whole? What happens when the timeout is reached? Does the job fail?
| if elapsed_time >= timeout_seconds: | ||
| break |
There was a problem hiding this comment.
If a read task times out and performs a partial read, I don't think the user would know that from the logs.
Maybe let's add a logging.warning or logging.info so the users aren't surprised (and it's easier to debug)?
There was a problem hiding this comment.
Added logging.warning
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
## Description Added Kafka as data source, we can now use `read_kafka` for bounded data source ## Related issues Closes ray-project#58653 ## Additional information --------- Signed-off-by: You-Cheng Lin <mses010108@gmail.com> Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com> Signed-off-by: You-Cheng Lin <106612301+owenowenisme@users.noreply.github.com> Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Signed-off-by: Aydin Abiar <aydin@anyscale.com>
## Description Added Kafka as data source, we can now use `read_kafka` for bounded data source ## Related issues Closes ray-project#58653 ## Additional information --------- Signed-off-by: You-Cheng Lin <mses010108@gmail.com> Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com> Signed-off-by: You-Cheng Lin <106612301+owenowenisme@users.noreply.github.com> Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Signed-off-by: Pavitra Bhalla <pavitra@rayai.com>
## Description Added Kafka as data source, we can now use `read_kafka` for bounded data source ## Related issues Closes ray-project#58653 ## Additional information --------- Signed-off-by: You-Cheng Lin <mses010108@gmail.com> Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com> Signed-off-by: You-Cheng Lin <106612301+owenowenisme@users.noreply.github.com> Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
## Description Added Kafka as data source, we can now use `read_kafka` for bounded data source ## Related issues Closes ray-project#58653 ## Additional information --------- Signed-off-by: You-Cheng Lin <mses010108@gmail.com> Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com> Signed-off-by: You-Cheng Lin <106612301+owenowenisme@users.noreply.github.com> Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Signed-off-by: YK <1811651+ykdojo@users.noreply.github.com>
## Description Added Kafka as data source, we can now use `read_kafka` for bounded data source ## Related issues Closes ray-project#58653 ## Additional information --------- Signed-off-by: You-Cheng Lin <mses010108@gmail.com> Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com> Signed-off-by: You-Cheng Lin <106612301+owenowenisme@users.noreply.github.com> Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
## Description Added Kafka as data source, we can now use `read_kafka` for bounded data source ## Related issues Closes ray-project#58653 ## Additional information --------- Signed-off-by: You-Cheng Lin <mses010108@gmail.com> Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com> Signed-off-by: You-Cheng Lin <106612301+owenowenisme@users.noreply.github.com> Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Signed-off-by: peterxcli <peterxcli@gmail.com>
Description
Added Kafka as data source, we can now use
read_kafkafor bounded data sourceRelated issues
Closes #58653
Additional information