-
Notifications
You must be signed in to change notification settings - Fork 3.2k
Description
consumer_poll(0) (and rd_kafka_poll(0)) can block indefinitely when log.queue=true and debug logging is enabled. The root cause is in rd_kafka_q_pop_serve(): after handling an internal operation (e.g., RD_KAFKA_OP_LOG), the code jumps to goto retry without checking the timeout. If background threads enqueue internal ops faster than the application thread can process them, the queue is never empty, and the timeout path (cnd_timedwait_abs) is never reached.
We observed consumer_poll(0) blocking for over 2 hours in production. A minimal reproduction script confirms the bug — poll(0) blocks for up to 62 seconds with a single broker, and applying a one-line fix eliminates the starvation entirely.
Mechanism
In rd_kafka_q_pop_serve() the timeout is only evaluated when the queue is empty:
while (1) {
retry:
while ((rko = TAILQ_FIRST(&rkq->rkq_q)) &&
!(rko = rd_kafka_op_filter(rkq, rko, version)))
;
if (rko) {
rd_kafka_q_deq0(rkq, rko);
mtx_unlock(&rkq->rkq_lock);
res = rd_kafka_op_handle(rkq->rkq_rk, rkq, rko,
cb_type, opaque, callback);
if (res == RD_KAFKA_OP_RES_HANDLED ||
res == RD_KAFKA_OP_RES_KEEP) {
mtx_lock(&rkq->rkq_lock);
goto retry; // <-- loops back without checking timeout
}
// only messages (RES_PASS) break out of the loop
}
// Only reached when queue is EMPTY:
if (unlikely(rd_kafka_q_check_yield(rkq)))
return NULL;
if (cnd_timedwait_abs(..., &timeout_tspec) != thrd_success)
return NULL; // timeout — poll(0) would exit here
}For poll(0), timeout_tspec guarantees an immediate ETIMEDOUT from cnd_timedwait_abs — but only if the queue is empty. If internal ops keep the queue non-empty, the timeout is never checked.
Triggering conditions
All three must be true simultaneously:
debugis enabled (especiallydebug=all) — broker threads generate high-volume log outputlog.queue=true— routes log messages asRD_KAFKA_OP_LOGonto the polled queuepoll(0)or short timeout — trivially reproducible with timeout=0, but any timeout can starve if ops arrive faster than they're drained
Note on confluent-kafka-python: log.queue=true is set automatically when a logger is passed to Consumer() or Producer() (confluent_kafka.c:2498-2502). Any Python application using logger=... + debug is susceptible.
Reproduction
A self-contained reproduction is provided (see below). It runs three variants back-to-back on a single-broker setup with 20 partitions:
| Test | poll(0) latency |
Expected |
|---|---|---|
| control (no debug, no logger) | 0.0 ms | 0 ms |
debug=all + logger |
1.3 ms | 0 ms |
debug=all + logger + GIL contention (4 threads) |
1,840–62,518 ms | 0 ms |
The third variant simulates a real multi-threaded Python application where GIL contention slows down the log_cb callback. With only 1 broker, poll(0) blocks for up to 62.5 seconds. In our production environment (4 brokers), it blocked for over 2 hours.
Reproduction script
Requires Docker and confluent-kafka:
docker-compose up -d # single KRaft broker on localhost:9092
pip install confluent-kafka
python repro.py 2>/dev/nulldocker-compose.yml
services:
kafka:
image: confluentinc/cp-kafka:7.6.0
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qkrepro.py
"""
Repro: poll(0) blocks when debug=all + log.queue=true (via logger).
rd_kafka_q_pop_serve() handles internal ops (RD_KAFKA_OP_LOG) via `goto retry`
without checking the timeout. Under GIL contention, callbacks are slow enough
that new log ops arrive faster than they're drained — poll(0) never returns.
docker-compose up -d && pip install confluent-kafka && python repro.py
"""
import logging
import sys
import threading
import time
from confluent_kafka import Consumer, Producer
from confluent_kafka.admin import AdminClient, NewTopic
BROKER = "localhost:9092"
TOPIC = "repro-poll0-starvation"
PARTITIONS = 20
logging.basicConfig(level=logging.DEBUG, format="%(message)s", stream=sys.stderr)
log = logging.getLogger("rdkafka")
def burn_gil(stop):
while not stop.is_set():
sum(range(500))
def main():
AdminClient({"bootstrap.servers": BROKER}).create_topics(
[NewTopic(TOPIC, num_partitions=PARTITIONS, replication_factor=1)]
)
p = Producer({"bootstrap.servers": BROKER})
for i in range(500):
p.produce(TOPIC, value=b"x", partition=i % PARTITIONS)
p.flush(10)
print("produced 500 msgs")
for label, conf_extra, kwargs, n_threads in [
("control", {}, {}, 0),
("debug+logger", {"debug": "all", "log_level": 7}, {"logger": log}, 0),
("debug+logger+GIL", {"debug": "all", "log_level": 7}, {"logger": log}, 4),
]:
c = Consumer({
"bootstrap.servers": BROKER,
"group.id": f"repro-{label}-{int(time.time())}",
"auto.offset.reset": "earliest",
"fetch.wait.max.ms": 100,
**conf_extra,
}, **kwargs)
c.subscribe([TOPIC])
n = 0
t0 = time.monotonic()
while True:
msg = c.poll(1.0)
if msg is None:
if time.monotonic() - t0 > 10:
break
continue
if not msg.error():
n += 1
t0 = time.monotonic()
print(f"\n[{label}] consumed {n}, draining done")
stop = threading.Event()
for _ in range(n_threads):
threading.Thread(target=burn_gil, args=(stop,), daemon=True).start()
for rnd in range(5):
time.sleep(5)
t0 = time.monotonic()
c.poll(0)
ms = (time.monotonic() - t0) * 1000
print(f" round {rnd+1}: poll(0) = {ms:.1f} ms")
stop.set()
c.close()
if __name__ == "__main__":
main()Output (unpatched v2.13.0)
produced 500 msgs
[control] consumed 50, draining done
round 1: poll(0) = 0.0 ms
round 2: poll(0) = 0.0 ms
round 3: poll(0) = 0.0 ms
round 4: poll(0) = 0.0 ms
round 5: poll(0) = 0.0 ms
[debug+logger] consumed 50, draining done
round 1: poll(0) = 1.3 ms
round 2: poll(0) = 1.3 ms
round 3: poll(0) = 1.3 ms
round 4: poll(0) = 1.3 ms
round 5: poll(0) = 1.3 ms
[debug+logger+GIL] consumed 50, draining done
round 1: poll(0) = 62517.8 ms ← 62 seconds for a "non-blocking" call
round 2: poll(0) = 5276.8 ms
round 3: poll(0) = 125.1 ms
round 4: poll(0) = 1766.0 ms
round 5: poll(0) = 1282.3 ms
How it works
The script lets log ops accumulate in the internal queue for 5 seconds (by not calling poll), then calls poll(0) and measures how long it takes. The burn_gil threads create GIL contention, simulating a real multi-threaded Python application where log_cb execution is slowed by GIL acquisition. This causes new ops to be enqueued faster than they're drained, and the goto retry loop never reaches the timeout check.
Proposed fix
Check the timeout after handling each internal operation, before jumping to retry:
if (res == RD_KAFKA_OP_RES_HANDLED ||
res == RD_KAFKA_OP_RES_KEEP) {
if (rd_timeout_expired(rd_timeout_remains(abs_timeout))) {
return NULL;
}
mtx_lock(&rkq->rkq_lock);
is_locked = rd_true;
goto retry;
}Fix verified
We rebuilt librdkafka v2.13.0 with this one-line change and re-ran the reproduction:
| Test | Before fix | After fix |
|---|---|---|
| control | 0.0 ms | 0.0 ms |
| debug+logger | 1.3 ms | 0.1 ms |
| debug+logger+GIL | 1,840–62,518 ms | 19–69 ms |
[control] consumed 50, draining done
round 1: poll(0) = 0.0 ms
round 2: poll(0) = 0.0 ms
round 3: poll(0) = 0.0 ms
round 4: poll(0) = 0.0 ms
round 5: poll(0) = 0.0 ms
[debug+logger] consumed 50, draining done
round 1: poll(0) = 0.1 ms
round 2: poll(0) = 0.1 ms
round 3: poll(0) = 0.1 ms
round 4: poll(0) = 0.1 ms
round 5: poll(0) = 0.1 ms
[debug+logger+GIL] consumed 50, draining done
round 1: poll(0) = 68.8 ms
round 2: poll(0) = 18.9 ms
round 3: poll(0) = 48.2 ms
round 4: poll(0) = 29.5 ms
round 5: poll(0) = 55.4 ms
The worst case drops from 62,518 ms → 69 ms (906x improvement). The remaining ~50ms is the legitimate cost of processing accumulated log ops — but now the timeout is actually respected.
Production observation
In our production case (4 broker connections, debug=all, log.queue=true via Python logger):
- Consumer consumed 16,566 / 17,686 events, then
poll(0)blocked - 351,745 debug log lines generated during the stuck period (2h 7min)
- Log rate: 39–241 lines/sec continuously from 4 broker threads
- Broker FETCH CorrId went from ~8,800 to ~51,200 during the hang
- Process had to be killed after 2+ hours
Checklist
- librdkafka version: v2.13.0 (verified identical pattern in v2.4.0 and current master)
- Apache Kafka version: 3.x (not version-specific)
- librdkafka client configuration:
debug=all,log.queue=true,log_level=7,log_cbset - Operating system: macOS (ARM64) and Linux — not OS-specific
- Minimal reproduction provided with docker-compose + script
- Fix verified: patched build eliminates starvation
- Critical issue: causes indefinite blocking of the application thread