Skip to content

Expose manual stop method in KafkaReceiver#380

Open
cjlee38 wants to merge 4 commits intoreactor:mainfrom
cjlee38:explicit-stop
Open

Expose manual stop method in KafkaReceiver#380
cjlee38 wants to merge 4 commits intoreactor:mainfrom
cjlee38:explicit-stop

Conversation

@cjlee38
Copy link
Copy Markdown

@cjlee38 cjlee38 commented Jan 17, 2024

KafkaReceiver currently is not possible to close safely. An example is below:

val disposable = kafkaReceiver.receive()
    .flatMapSequential { record -> process(record).then(record) }
    .delayUntil { 
        it.receiverOffset().acknowledge() 
        it.receiverOffset().commit()
    }
    .subscribe()

dispose() method would be a way, but it might lead to unexpected result because of cancellation leading processed records not to be committed. I think This PR is discussed at #247, and also helps to address #378 indirectly.(People can customize KafkaReceiver's lifecycle after stopping followed by disposable.isDisposed()

I've considered several situations related to thread-safety or something, but might miss I didn't expect. Any further suggestions would be appreciated.

@cjlee38
Copy link
Copy Markdown
Author

cjlee38 commented Jun 5, 2024

Hello,

It has been quite some time since I submitted this PR, and I haven't received any response yet. I was wondering if support for reactor-kafka has ended, or if there might be another reason for the delay? I don't mean to rush, but I'm leaving this message because it has been a considerable amount of time without any reply.

Thank you.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant