-
Notifications
You must be signed in to change notification settings - Fork 726
Description
Version & Environment
Redpanda version:
rpk version: v25.3.4
Git ref: 4127e17813e52c16c853309ddb6bac05f69884f6
Build date: 2025 Dec 26 04 43 37 Fri
OS/Arch: linux/amd64
Client: rust-rdkafka 0.37.0
What went wrong?
When a topic is configured with message.timestamp.type=LogAppendTime, the LogAppendTimeMs field in the Kafka ProduceResponse always returns -1 instead of the broker's append timestamp. This causes Kafka clients (e.g., librdkafka) to incorrectly interpret the timestamp type as CreateTime.
The stored message correctly has LogAppendTime, but the produce response is incorrect.
What should have happened instead?
According to the Kafka protocol specification (https://github.com/redpanda-data/redpanda/blob/dev/src/v/kafka/protocol/schemata/produce_response.json):
"The timestamp returned by broker after appending the messages. If CreateTime is used for the topic, the timestamp will be -1. If LogAppendTime is used for the topic, the timestamp will be the broker local time when the messages are appended."
When LogAppendTime is configured:
- LogAppendTimeMs should contain the broker's append timestamp (a positive value)
- Kafka clients should report timestamp type as LogAppendTime
How to reproduce the issue?
- Create a topic with LogAppendTime configuration:
rpk topic create test-topic -c message.timestamp.type=LogAppendTime
- Produce a message using any Kafka client (e.g., librdkafka/rust-rdkafka)
- Inspect the delivery receipt - the timestamp type will be reported as CreateTime
- Consume the same message and the output will show Timestamp Type: 1 (LogAppendTime)
rpk topic consume test-topic --num 1 --format 'Timestamp Type: %a{timestamp-type}\n'
Additional information
Root cause:
The issue is here
redpanda/src/v/kafka/server/handlers/produce.cc
Lines 141 to 180 in 6b88a8f
| partition_produce_stages partition_append( | |
| model::partition_id id, | |
| partition_proxy partition, | |
| model::batch_identity bid, | |
| std::unique_ptr<model::record_batch> batch, | |
| int16_t acks, | |
| int32_t num_records, | |
| int64_t num_bytes, | |
| std::chrono::milliseconds timeout_ms) { | |
| auto stages = partition.replicate( | |
| bid, std::move(*batch), acks_to_replicate_options(acks, timeout_ms)); | |
| return partition_produce_stages{ | |
| .dispatched = std::move(stages.request_enqueued), | |
| .produced = stages.replicate_finished.then_wrapped( | |
| [partition = std::move(partition), | |
| id, | |
| num_records = num_records, | |
| num_bytes](ss::future<result<raft::replicate_result>> f) mutable { | |
| produce_response::partition p{.partition_index = id}; | |
| try { | |
| auto r = f.get(); | |
| if (r.has_value()) { | |
| // have to subtract num_of_records - 1 as base_offset | |
| // is inclusive | |
| p.base_offset = model::offset( | |
| r.value().last_offset - (num_records - 1)); | |
| p.error_code = error_code::none; | |
| partition.probe().add_records_produced(num_records); | |
| partition.probe().add_bytes_produced(num_bytes); | |
| partition.probe().add_batches_produced(1); | |
| } else { | |
| p.error_code = map_produce_error_code(r.error()); | |
| } | |
| } catch (...) { | |
| p.error_code = error_code::request_timed_out; | |
| } | |
| return p; | |
| }), | |
| }; | |
| } |
The log_append_time_ms field is never populated. Additionally,
raft::replicate_result (in src/v/raft/replicate.h) only contains last_offset and last_term - it doesn't return the append timestamp from the storage layer.
Suggested Fix:
- Extend
raft::replicate_resultto include the append timestamp when LogAppendTime is configured - In
partition_append(), setp.log_append_time_msbased on the topic's timestamp type configuration:- If CreateTime: leave as -1
- If LogAppendTime: set to the broker's append timestamp from the replicate result