Skip to content

Arrow.Stream not working with non-seekable I/O e.g. FIFOs and sockets #580

@Yuan-Ru-Lin

Description

@Yuan-Ru-Lin

I found that Arrow.Stream does not work with non-seekable I/O, which should be supported for streaming. Here are some MWEs.

Named pipes

mkfifo /tmp/arrow_pipe

# Producer
julia -e '
using Arrow
open("/tmp/arrow_pipe", "w") do io
    Arrow.write(io, (i = collect(1:10),); file=false)  # streaming
end
' &

# Consumer
julia -e '
using Arrow
open("/tmp/arrow_pipe", "r") do io
    for batch in Arrow.Stream(io)
        println(batch)
    end
end
'
# Result: no output, no error

rm /tmp/arrow_pipe

Sockets

using Arrow, Sockets

server = listen(9999)
@async begin
    conn = accept(server)
    Arrow.write(conn, (i = collect(1:10),); file=false)
    close(conn)
end

sock = connect(9999)

# This block hangs indefinitely. Press Ctrl-C to proceed.
for batch in Arrow.Stream(sock)
    println(batch)
end

wait(t)
# ERROR: TaskFailedException
#     nested task error: MethodError: no method matching position(::TCPSocket)

Unix domain sockets

using Arrow, Sockets

server = listen("/tmp/arrow.sock")
@async begin
    conn = accept(server)
    Arrow.write(conn, (i = collect(1:10),); file=false)
    close(conn)
end

sock = connect("/tmp/arrow.sock")

# This block hangs indefinitely. Press Ctrl-C to proceed.
for batch in Arrow.Stream(sock)
    println(batch)
end

wait(t)
# ERROR: TaskFailedException
#     nested task error: MethodError: no method matching position(::Base.PipeEndpoint)

The above demo should be reproducible with Arrow v2.8.0.

Diagnosis

The first issue is caused tobytes(io::IOStream) unconditionally using Mmap.mmap(io). For named pipes, filesize(io) returns 0, so Mmap.mmap(io) silently returns an empty UInt8[].

The rest is caused by Base.write(io, msg, ...) calling position(io) to record block positions for the file format footer, even when writing streaming format (file=false).

I will submit a PR soon.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions