I found that Arrow.Stream does not work with non-seekable I/O, which should be supported for streaming. Here are some MWEs.
Named pipes
mkfifo /tmp/arrow_pipe
# Producer
julia -e '
using Arrow
open("/tmp/arrow_pipe", "w") do io
Arrow.write(io, (i = collect(1:10),); file=false) # streaming
end
' &
# Consumer
julia -e '
using Arrow
open("/tmp/arrow_pipe", "r") do io
for batch in Arrow.Stream(io)
println(batch)
end
end
'
# Result: no output, no error
rm /tmp/arrow_pipe
Sockets
using Arrow, Sockets
server = listen(9999)
@async begin
conn = accept(server)
Arrow.write(conn, (i = collect(1:10),); file=false)
close(conn)
end
sock = connect(9999)
# This block hangs indefinitely. Press Ctrl-C to proceed.
for batch in Arrow.Stream(sock)
println(batch)
end
wait(t)
# ERROR: TaskFailedException
# nested task error: MethodError: no method matching position(::TCPSocket)
Unix domain sockets
using Arrow, Sockets
server = listen("/tmp/arrow.sock")
@async begin
conn = accept(server)
Arrow.write(conn, (i = collect(1:10),); file=false)
close(conn)
end
sock = connect("/tmp/arrow.sock")
# This block hangs indefinitely. Press Ctrl-C to proceed.
for batch in Arrow.Stream(sock)
println(batch)
end
wait(t)
# ERROR: TaskFailedException
# nested task error: MethodError: no method matching position(::Base.PipeEndpoint)
The above demo should be reproducible with Arrow v2.8.0.
Diagnosis
The first issue is caused tobytes(io::IOStream) unconditionally using Mmap.mmap(io). For named pipes, filesize(io) returns 0, so Mmap.mmap(io) silently returns an empty UInt8[].
The rest is caused by Base.write(io, msg, ...) calling position(io) to record block positions for the file format footer, even when writing streaming format (file=false).
I will submit a PR soon.
I found that
Arrow.Streamdoes not work with non-seekable I/O, which should be supported for streaming. Here are some MWEs.Named pipes
Sockets
Unix domain sockets
The above demo should be reproducible with Arrow v2.8.0.
Diagnosis
The first issue is caused
tobytes(io::IOStream)unconditionally usingMmap.mmap(io). For named pipes,filesize(io)returns0, soMmap.mmap(io)silently returns an emptyUInt8[].The rest is caused by
Base.write(io, msg, ...)callingposition(io)to record block positions for the file format footer, even when writing streaming format (file=false).I will submit a PR soon.