Skip to content
This repository was archived by the owner on Nov 20, 2020. It is now read-only.

Commit 65ebbee

Browse files
committed
fix #160
1 parent e001da3 commit 65ebbee

File tree

4 files changed

+30
-11
lines changed

4 files changed

+30
-11
lines changed

RELEASE_NOTES.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
### 0.1.4 - 03.08.2017
2+
3+
* Fixed `FetchResponse` `MessageTooBigException` when a message set has been compressed. (#160)
4+
15
### 0.1.4-beta - 25.07.2017
26

37
* Fixed v0.10.1 protocol bug for `Offset` API.

src/kafunk/Compression.fs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ module internal Stream =
3131
use compStream = makeStream inputStream
3232
compStream.CopyTo(outputStream)
3333
let buf = Binary.Segment(outputStream.GetBuffer(), 0, int outputStream.Length)
34-
MessageSet.Read (messageVer, 0, 0s, buf.Count, BinaryZipper(buf))
34+
MessageSet.Read (messageVer, 0, 0s, buf.Count, true, BinaryZipper(buf))
3535

3636
[<Compile(Module)>]
3737
module GZip =
@@ -151,7 +151,7 @@ module Snappy =
151151

152152
let decompress (messageVer:ApiVersion) (m:Message) =
153153
let buf = CompressedMessage.decompress m.value
154-
MessageSet.Read (messageVer, 0, 0s, buf.Count, BinaryZipper(buf))
154+
MessageSet.Read (messageVer, 0, 0s, buf.Count, true, BinaryZipper(buf))
155155

156156
let compress (messageVer:int16) (compression:byte) (ms:MessageSet) =
157157
match compression with

src/kafunk/Protocol.fs

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -434,7 +434,9 @@ module Protocol =
434434
buf.WriteInt32 x.messageSize
435435
Message.Write (messageVer, x.message, buf)
436436

437-
static member internal Read (messageVer:ApiVersion, partition:Partition, ec:ErrorCode, messageSetSize:int, buf:BinaryZipper) =
437+
// NB: skipTooLarge=true is for scenarios where decompression is involved and a message set is being decoded from an individual message
438+
// which was itself too small.
439+
static member internal Read (messageVer:ApiVersion, partition:Partition, ec:ErrorCode, messageSetSize:int, skipTooLarge:bool, buf:BinaryZipper) =
438440
let mutable consumed = 0
439441
let arr = ResizeArray<_>()
440442
while consumed < messageSetSize && buf.Buffer.Count > 0 do
@@ -445,7 +447,20 @@ module Protocol =
445447
let (messageSize:MessageSize) = buf.ReadInt32 ()
446448
let messageSetRemainder = messageSetRemainder - 12 // (Offset + MessageSize)
447449
if messageSize > messageSetSize then
448-
raise (MessageTooBigException(sprintf "partition=%i offset=%i message_set_size=%i message_size=%i" partition offset messageSetSize messageSize))
450+
let errMsg = sprintf "partition=%i offset=%i message_set_size=%i message_size=%i consumed_bytes=%i consumed_count=%i"
451+
partition offset messageSetSize messageSize consumed arr.Count
452+
if not skipTooLarge then
453+
raise (MessageTooBigException(errMsg))
454+
else
455+
// let payload = Binary.toString buf.Buffer
456+
// printfn "|WARN|MessageTooBig|%s" errMsg
457+
// printfn "|WARN|MessageTooBig|payload=%s" payload
458+
// try
459+
// let message = Message.Read (messageVer,buf)
460+
// printfn "|WARN|MessageTooBig|payload=%s" (Binary.toString message.value)
461+
// with ex ->
462+
// printfn "ERROR DECODING MESSAGE|%O" ex
463+
()
449464
try
450465
if messageSetRemainder >= messageSize && buf.Buffer.Count >= messageSize then
451466
let message = Message.Read (messageVer,buf)
@@ -740,7 +755,7 @@ module Protocol =
740755
let errorCode = buf.ReadInt16 ()
741756
let hwo = buf.ReadInt64 ()
742757
let mss = buf.ReadInt32 ()
743-
let ms = MessageSet.Read (MessageVersions.fetchResMessage ver,partition,errorCode,mss,buf)
758+
let ms = MessageSet.Read (MessageVersions.fetchResMessage ver,partition,errorCode,mss,false,buf)
744759
ps.[j] <- partition, errorCode, hwo, mss, ms
745760
topics.[i] <- (t,ps)
746761
let res = FetchResponse(throttleTime, topics)

tests/kafunk.Tests/Consumer.fsx

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ open FSharp.Control
55
open Kafunk
66
open System
77

8-
Log.MinLevel <- LogLevel.Trace
8+
//Log.MinLevel <- LogLevel.Trace
99
let Log = Log.create __SOURCE_FILE__
1010

1111
let argiDefault i def = fsi.CommandLineArgs |> Seq.tryItem i |> Option.getOr def
@@ -20,18 +20,18 @@ let go = async {
2020
let connConfig =
2121
let chanConfig =
2222
ChanConfig.create (
23-
requestTimeout = TimeSpan.FromSeconds 30.0,
24-
receiveBufferSize = 8192 * 20,
25-
sendBufferSize = 8192 * 10,
23+
requestTimeout = TimeSpan.FromSeconds 60.0,
24+
receiveBufferSize = 8192 * 50,
25+
sendBufferSize = 8192 * 50,
2626
connectRetryPolicy = ChanConfig.DefaultConnectRetryPolicy,
2727
requestRetryPolicy = ChanConfig.DefaultRequestRetryPolicy)
2828
KafkaConfig.create (
2929
[KafkaUri.parse host],
3030
//[KafkaUri.parse "localhost:9092" ; KafkaUri.parse "localhost:9093" ; KafkaUri.parse "localhost:9094"],
3131
tcpConfig = chanConfig,
3232
requestRetryPolicy = KafkaConfig.DefaultRequestRetryPolicy,
33-
version = Versions.V_0_10_1,
34-
autoApiVersions = true)
33+
version = Versions.V_0_9_0,
34+
autoApiVersions = false)
3535
Kafka.connAsync connConfig
3636
let consumerConfig =
3737
ConsumerConfig.create (

0 commit comments

Comments
 (0)