Skip to content

Provide option for KafkaReceiver's graceful shutdown #378

@cjlee38

Description

@cjlee38

Motivation

Over the past few days, I've been looking for a way to shutdown gracefully KafkaReceiver, but couldn't find any proper way to handle this. I read related issues (#247, #51, #196) or SO questions but they don't work as I expected (This might be caused by my bad understanding of reactor or kafka, so please excuse me)

Example code snippets :

val disposable = kafkaReceiver.receive()
    .flatMapSequential { record -> process(record).thenReturn(record) }
    .concatMap { record -> 
        record.receiverOffset().acknowledge()
        record.receiverOffset().commit() 
    }
    .subscribe()

This is typical case.

  1. receive record
  2. process record
  3. ack and commit (can be omitted when using auto-commit)

Desired solution

So, I think this is very common case : when I re-start my application(which is based on spring framework), consumers stop fetching records, and ongoing(I mean, already fetched records) flux keeps processing and also commits, and then complete the flux.

However, just disposing the disposable would not work as expected, because there is possibility that processed record not be committed.

Considered alternatives

There is no concrete idea to implement this, but things to consider are next.

  1. The Scheduler interface of reactor provides disposeGracefully method.
    image
    These methods (1 2) can be replaced with this (or selected by option)
  2. add sink.emitComplete() in ConsumerEventLoop#stop
  3. It looks like ConsumerEventLoop keeps polling from broker without hesitation and emit records into sink. If it's right, when producing numerous records in an instant would cause some problems. For example, let's say 10,000 records are produced, and consumer fetched them all within a few seconds. Besides OOM issue, flux needs to wait until all records are drained for desired graceful shutdown. I think emitting records should have some delays.

Additional context

In case of my ignorance, please let me know. Any other opinions would be appreciated. Thanks

Metadata

Metadata

Assignees

No one assigned

    Labels

    ❓need-triageThis issue needs triage, hasn't been looked at by a team member yet

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions