Skip to content

poll(0) blocks indefinitely when log.queue=true and debug is enabled — goto retry in rd_kafka_q_pop_serve() skips timeout check #5325

@nadavgolden

Description

@nadavgolden

consumer_poll(0) (and rd_kafka_poll(0)) can block indefinitely when log.queue=true and debug logging is enabled. The root cause is in rd_kafka_q_pop_serve(): after handling an internal operation (e.g., RD_KAFKA_OP_LOG), the code jumps to goto retry without checking the timeout. If background threads enqueue internal ops faster than the application thread can process them, the queue is never empty, and the timeout path (cnd_timedwait_abs) is never reached.

We observed consumer_poll(0) blocking for over 2 hours in production. A minimal reproduction script confirms the bug — poll(0) blocks for up to 62 seconds with a single broker, and applying a one-line fix eliminates the starvation entirely.

Mechanism

In rd_kafka_q_pop_serve() the timeout is only evaluated when the queue is empty:

while (1) {
retry:
    while ((rko = TAILQ_FIRST(&rkq->rkq_q)) &&
           !(rko = rd_kafka_op_filter(rkq, rko, version)))
        ;

    if (rko) {
        rd_kafka_q_deq0(rkq, rko);
        mtx_unlock(&rkq->rkq_lock);

        res = rd_kafka_op_handle(rkq->rkq_rk, rkq, rko,
                                 cb_type, opaque, callback);

        if (res == RD_KAFKA_OP_RES_HANDLED ||
            res == RD_KAFKA_OP_RES_KEEP) {
            mtx_lock(&rkq->rkq_lock);
            goto retry;   // <-- loops back without checking timeout
        }
        // only messages (RES_PASS) break out of the loop
    }

    // Only reached when queue is EMPTY:
    if (unlikely(rd_kafka_q_check_yield(rkq)))
        return NULL;

    if (cnd_timedwait_abs(..., &timeout_tspec) != thrd_success)
        return NULL;  // timeout — poll(0) would exit here
}

For poll(0), timeout_tspec guarantees an immediate ETIMEDOUT from cnd_timedwait_absbut only if the queue is empty. If internal ops keep the queue non-empty, the timeout is never checked.

Triggering conditions

All three must be true simultaneously:

  1. debug is enabled (especially debug=all) — broker threads generate high-volume log output
  2. log.queue=true — routes log messages as RD_KAFKA_OP_LOG onto the polled queue
  3. poll(0) or short timeout — trivially reproducible with timeout=0, but any timeout can starve if ops arrive faster than they're drained

Note on confluent-kafka-python: log.queue=true is set automatically when a logger is passed to Consumer() or Producer() (confluent_kafka.c:2498-2502). Any Python application using logger=... + debug is susceptible.

Reproduction

A self-contained reproduction is provided (see below). It runs three variants back-to-back on a single-broker setup with 20 partitions:

Test poll(0) latency Expected
control (no debug, no logger) 0.0 ms 0 ms
debug=all + logger 1.3 ms 0 ms
debug=all + logger + GIL contention (4 threads) 1,840–62,518 ms 0 ms

The third variant simulates a real multi-threaded Python application where GIL contention slows down the log_cb callback. With only 1 broker, poll(0) blocks for up to 62.5 seconds. In our production environment (4 brokers), it blocked for over 2 hours.

Reproduction script

Requires Docker and confluent-kafka:

docker-compose up -d   # single KRaft broker on localhost:9092
pip install confluent-kafka
python repro.py 2>/dev/null
docker-compose.yml
services:
  kafka:
    image: confluentinc/cp-kafka:7.6.0
    ports:
      - "9092:9092"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk
repro.py
"""
Repro: poll(0) blocks when debug=all + log.queue=true (via logger).

rd_kafka_q_pop_serve() handles internal ops (RD_KAFKA_OP_LOG) via `goto retry`
without checking the timeout. Under GIL contention, callbacks are slow enough
that new log ops arrive faster than they're drained — poll(0) never returns.

    docker-compose up -d && pip install confluent-kafka && python repro.py
"""

import logging
import sys
import threading
import time
from confluent_kafka import Consumer, Producer
from confluent_kafka.admin import AdminClient, NewTopic

BROKER = "localhost:9092"
TOPIC = "repro-poll0-starvation"
PARTITIONS = 20

logging.basicConfig(level=logging.DEBUG, format="%(message)s", stream=sys.stderr)
log = logging.getLogger("rdkafka")


def burn_gil(stop):
    while not stop.is_set():
        sum(range(500))


def main():
    AdminClient({"bootstrap.servers": BROKER}).create_topics(
        [NewTopic(TOPIC, num_partitions=PARTITIONS, replication_factor=1)]
    )
    p = Producer({"bootstrap.servers": BROKER})
    for i in range(500):
        p.produce(TOPIC, value=b"x", partition=i % PARTITIONS)
    p.flush(10)
    print("produced 500 msgs")

    for label, conf_extra, kwargs, n_threads in [
        ("control",           {},                                  {},               0),
        ("debug+logger",      {"debug": "all", "log_level": 7},   {"logger": log},  0),
        ("debug+logger+GIL",  {"debug": "all", "log_level": 7},   {"logger": log},  4),
    ]:
        c = Consumer({
            "bootstrap.servers": BROKER,
            "group.id": f"repro-{label}-{int(time.time())}",
            "auto.offset.reset": "earliest",
            "fetch.wait.max.ms": 100,
            **conf_extra,
        }, **kwargs)
        c.subscribe([TOPIC])

        n = 0
        t0 = time.monotonic()
        while True:
            msg = c.poll(1.0)
            if msg is None:
                if time.monotonic() - t0 > 10:
                    break
                continue
            if not msg.error():
                n += 1
                t0 = time.monotonic()
        print(f"\n[{label}] consumed {n}, draining done")

        stop = threading.Event()
        for _ in range(n_threads):
            threading.Thread(target=burn_gil, args=(stop,), daemon=True).start()

        for rnd in range(5):
            time.sleep(5)
            t0 = time.monotonic()
            c.poll(0)
            ms = (time.monotonic() - t0) * 1000
            print(f"  round {rnd+1}: poll(0) = {ms:.1f} ms")

        stop.set()
        c.close()


if __name__ == "__main__":
    main()

Output (unpatched v2.13.0)

produced 500 msgs

[control] consumed 50, draining done
  round 1: poll(0) = 0.0 ms
  round 2: poll(0) = 0.0 ms
  round 3: poll(0) = 0.0 ms
  round 4: poll(0) = 0.0 ms
  round 5: poll(0) = 0.0 ms

[debug+logger] consumed 50, draining done
  round 1: poll(0) = 1.3 ms
  round 2: poll(0) = 1.3 ms
  round 3: poll(0) = 1.3 ms
  round 4: poll(0) = 1.3 ms
  round 5: poll(0) = 1.3 ms

[debug+logger+GIL] consumed 50, draining done
  round 1: poll(0) = 62517.8 ms    ← 62 seconds for a "non-blocking" call
  round 2: poll(0) = 5276.8 ms
  round 3: poll(0) = 125.1 ms
  round 4: poll(0) = 1766.0 ms
  round 5: poll(0) = 1282.3 ms

How it works

The script lets log ops accumulate in the internal queue for 5 seconds (by not calling poll), then calls poll(0) and measures how long it takes. The burn_gil threads create GIL contention, simulating a real multi-threaded Python application where log_cb execution is slowed by GIL acquisition. This causes new ops to be enqueued faster than they're drained, and the goto retry loop never reaches the timeout check.

Proposed fix

Check the timeout after handling each internal operation, before jumping to retry:

if (res == RD_KAFKA_OP_RES_HANDLED ||
    res == RD_KAFKA_OP_RES_KEEP) {
    if (rd_timeout_expired(rd_timeout_remains(abs_timeout))) {
        return NULL;
    }
    mtx_lock(&rkq->rkq_lock);
    is_locked = rd_true;
    goto retry;
}

Fix verified

We rebuilt librdkafka v2.13.0 with this one-line change and re-ran the reproduction:

Test Before fix After fix
control 0.0 ms 0.0 ms
debug+logger 1.3 ms 0.1 ms
debug+logger+GIL 1,840–62,518 ms 19–69 ms
[control] consumed 50, draining done
  round 1: poll(0) = 0.0 ms
  round 2: poll(0) = 0.0 ms
  round 3: poll(0) = 0.0 ms
  round 4: poll(0) = 0.0 ms
  round 5: poll(0) = 0.0 ms

[debug+logger] consumed 50, draining done
  round 1: poll(0) = 0.1 ms
  round 2: poll(0) = 0.1 ms
  round 3: poll(0) = 0.1 ms
  round 4: poll(0) = 0.1 ms
  round 5: poll(0) = 0.1 ms

[debug+logger+GIL] consumed 50, draining done
  round 1: poll(0) = 68.8 ms
  round 2: poll(0) = 18.9 ms
  round 3: poll(0) = 48.2 ms
  round 4: poll(0) = 29.5 ms
  round 5: poll(0) = 55.4 ms

The worst case drops from 62,518 ms → 69 ms (906x improvement). The remaining ~50ms is the legitimate cost of processing accumulated log ops — but now the timeout is actually respected.

Production observation

In our production case (4 broker connections, debug=all, log.queue=true via Python logger):

  • Consumer consumed 16,566 / 17,686 events, then poll(0) blocked
  • 351,745 debug log lines generated during the stuck period (2h 7min)
  • Log rate: 39–241 lines/sec continuously from 4 broker threads
  • Broker FETCH CorrId went from ~8,800 to ~51,200 during the hang
  • Process had to be killed after 2+ hours

Checklist

  • librdkafka version: v2.13.0 (verified identical pattern in v2.4.0 and current master)
  • Apache Kafka version: 3.x (not version-specific)
  • librdkafka client configuration: debug=all, log.queue=true, log_level=7, log_cb set
  • Operating system: macOS (ARM64) and Linux — not OS-specific
  • Minimal reproduction provided with docker-compose + script
  • Fix verified: patched build eliminates starvation
  • Critical issue: causes indefinite blocking of the application thread

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions