Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ CodecLz4 = "5ba52731-8f18-5e0d-9241-30f10d1ec561"
CodecZstd = "6b39b394-51ab-5f42-8807-6242bab2b4c2"
DataAPI = "9a962f9c-6df0-11e9-0e5d-c546b8b5ee8a"
Dates = "ade2ca70-3891-5945-98fb-dc099432e06a"
LoggingExtras = "e6f89c97-d47a-5376-807f-9c37f3926c36"
Mmap = "a63ad114-7e13-5084-954f-fe012c677804"
PooledArrays = "2dfb63ee-cc39-5dd5-95bd-886bf059d720"
SentinelArrays = "91c51154-3ec4-41a3-a24f-3f23e20d615c"
Expand All @@ -41,6 +42,7 @@ BitIntegers = "0.2"
CodecLz4 = "0.4"
CodecZstd = "0.7"
DataAPI = "1"
LoggingExtras = "0.4"
FilePathsBase = "0.9"
PooledArrays = "0.5, 1.0"
SentinelArrays = "1"
Expand Down
26 changes: 1 addition & 25 deletions src/Arrow.jl
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ module Arrow

using Base.Iterators
using Mmap
using LoggingExtras
import Dates
using DataAPI, Tables, SentinelArrays, PooledArrays, CodecLz4, CodecZstd, TimeZones, BitIntegers, WorkerUtilities

Expand All @@ -51,31 +52,6 @@ export ArrowTypes
using Base: @propagate_inbounds
import Base: ==

const DEBUG_LEVEL = Ref(0)

function setdebug!(level::Int)
DEBUG_LEVEL[] = level
return
end

function withdebug(f, level)
lvl = DEBUG_LEVEL[]
try
setdebug!(level)
f()
finally
setdebug!(lvl)
end
end

macro debug(level, msg)
esc(quote
if DEBUG_LEVEL[] >= $level
println(string("DEBUG: ", $(QuoteNode(__source__.file)), ":", $(QuoteNode(__source__.line)), " ", $msg))
end
end)
end

const FILE_FORMAT_MAGIC_BYTES = b"ARROW1"
const CONTINUATION_INDICATOR_BYTES = 0xffffffff

Expand Down
2 changes: 1 addition & 1 deletion src/append.jl
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ function append(io::IO, source, arrow_schema, compress, largelists, denseunions,
@error "error writing arrow data on partition = $(errorref[][3])" exception=(errorref[][1], errorref[][2])
error("fatal error writing arrow data")
end
@debug 1 "processing table partition i = $i"
@debugv 1 "processing table partition i = $i"
tbl_cols = Tables.columns(tbl)
tbl_schema = Tables.schema(tbl_cols)

Expand Down
12 changes: 6 additions & 6 deletions src/arraytypes/arraytypes.jl
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ getmetadata(x::ArrowVector) = x.metadata
Base.deleteat!(x::T, inds) where {T <: ArrowVector} = throw(ArgumentError("`$T` does not support `deleteat!`; arrow data is by nature immutable"))

function toarrowvector(x, i=1, de=Dict{Int64, Any}(), ded=DictEncoding[], meta=getmetadata(x); compression::Union{Nothing, Vector{LZ4FrameCompressor}, LZ4FrameCompressor, Vector{ZstdCompressor}, ZstdCompressor}=nothing, kw...)
@debug 2 "converting top-level column to arrow format: col = $(typeof(x)), compression = $compression, kw = $(kw.data)"
@debug 3 x
@debugv 2 "converting top-level column to arrow format: col = $(typeof(x)), compression = $compression, kw = $(kw.data)"
@debugv 3 x
A = arrowvector(x, i, 0, 0, de, ded, meta; compression=compression, kw...)
if compression isa LZ4FrameCompressor
A = compress(Meta.CompressionTypes.LZ4_FRAME, compression, A)
Expand All @@ -44,8 +44,8 @@ function toarrowvector(x, i=1, de=Dict{Int64, Any}(), ded=DictEncoding[], meta=g
elseif compression isa Vector{ZstdCompressor}
A = compress(Meta.CompressionTypes.ZSTD, compression[Threads.threadid()], A)
end
@debug 2 "converted top-level column to arrow format: $(typeof(A))"
@debug 3 A
@debugv 2 "converted top-level column to arrow format: $(typeof(A))"
@debugv 3 A
return A
end

Expand Down Expand Up @@ -104,7 +104,7 @@ compress(Z::Meta.CompressionType, comp, v::NullVector) =

function makenodesbuffers!(col::NullVector, fieldnodes, fieldbuffers, bufferoffset, alignment)
push!(fieldnodes, FieldNode(length(col), length(col)))
@debug 1 "made field node: nodeidx = $(length(fieldnodes)), col = $(typeof(col)), len = $(fieldnodes[end].length), nc = $(fieldnodes[end].null_count)"
@debugv 1 "made field node: nodeidx = $(length(fieldnodes)), col = $(typeof(col)), len = $(fieldnodes[end].length), nc = $(fieldnodes[end].null_count)"
return bufferoffset
end

Expand Down Expand Up @@ -187,7 +187,7 @@ end

function writebitmap(io, col::ArrowVector, alignment)
v = col.validity
@debug 1 "writing validity bitmap: nc = $(v.nc), n = $(cld(v.ℓ, 8))"
@debugv 1 "writing validity bitmap: nc = $(v.nc), n = $(cld(v.ℓ, 8))"
v.nc == 0 && return 0
n = Base.write(io, view(v.bytes, v.pos:(v.pos + cld(v.ℓ, 8) - 1)))
return n + writezeros(io, paddinglength(n, alignment))
Expand Down
10 changes: 5 additions & 5 deletions src/arraytypes/bool.jl
Original file line number Diff line number Diff line change
Expand Up @@ -89,22 +89,22 @@ function makenodesbuffers!(col::BoolVector, fieldnodes, fieldbuffers, bufferoffs
len = length(col)
nc = nullcount(col)
push!(fieldnodes, FieldNode(len, nc))
@debug 1 "made field node: nodeidx = $(length(fieldnodes)), col = $(typeof(col)), len = $(fieldnodes[end].length), nc = $(fieldnodes[end].null_count)"
@debugv 1 "made field node: nodeidx = $(length(fieldnodes)), col = $(typeof(col)), len = $(fieldnodes[end].length), nc = $(fieldnodes[end].null_count)"
# validity bitmap
blen = nc == 0 ? 0 : bitpackedbytes(len, alignment)
push!(fieldbuffers, Buffer(bufferoffset, blen))
@debug 1 "made field buffer: bufferidx = $(length(fieldbuffers)), offset = $(fieldbuffers[end].offset), len = $(fieldbuffers[end].length), padded = $(padding(fieldbuffers[end].length, alignment))"
@debugv 1 "made field buffer: bufferidx = $(length(fieldbuffers)), offset = $(fieldbuffers[end].offset), len = $(fieldbuffers[end].length), padded = $(padding(fieldbuffers[end].length, alignment))"
# adjust buffer offset, make primitive array buffer
bufferoffset += blen
blen = bitpackedbytes(len, alignment)
push!(fieldbuffers, Buffer(bufferoffset, blen))
@debug 1 "made field buffer: bufferidx = $(length(fieldbuffers)), offset = $(fieldbuffers[end].offset), len = $(fieldbuffers[end].length), padded = $(padding(fieldbuffers[end].length, alignment))"
@debugv 1 "made field buffer: bufferidx = $(length(fieldbuffers)), offset = $(fieldbuffers[end].offset), len = $(fieldbuffers[end].length), padded = $(padding(fieldbuffers[end].length, alignment))"
return bufferoffset + blen
end

function writebuffer(io, col::BoolVector, alignment)
@debug 1 "writebuffer: col = $(typeof(col))"
@debug 2 col
@debugv 1 "writebuffer: col = $(typeof(col))"
@debugv 2 col
writebitmap(io, col, alignment)
n = Base.write(io, view(col.arrow, col.pos:(col.pos + cld(col.ℓ, 8) - 1)))
return n + writezeros(io, paddinglength(n, alignment))
Expand Down
12 changes: 6 additions & 6 deletions src/arraytypes/compressed.jl
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ compress(Z::Meta.CompressionType, comp, v::ValidityBitmap) =

function makenodesbuffers!(col::Compressed, fieldnodes, fieldbuffers, bufferoffset, alignment)
push!(fieldnodes, FieldNode(col.len, col.nullcount))
@debug 1 "made field node: nodeidx = $(length(fieldnodes)), col = $(typeof(col)), len = $(fieldnodes[end].length), nc = $(fieldnodes[end].null_count)"
@debugv 1 "made field node: nodeidx = $(length(fieldnodes)), col = $(typeof(col)), len = $(fieldnodes[end].length), nc = $(fieldnodes[end].null_count)"
for buffer in col.buffers
blen = length(buffer.data) == 0 ? 0 : 8 + length(buffer.data)
push!(fieldbuffers, Buffer(bufferoffset, blen))
@debug 1 "made field buffer: bufferidx = $(length(fieldbuffers)), offset = $(fieldbuffers[end].offset), len = $(fieldbuffers[end].length), padded = $(padding(fieldbuffers[end].length, alignment))"
@debugv 1 "made field buffer: bufferidx = $(length(fieldbuffers)), offset = $(fieldbuffers[end].offset), len = $(fieldbuffers[end].length), padded = $(padding(fieldbuffers[end].length, alignment))"
bufferoffset += padding(blen, alignment)
end
for child in col.children
Expand All @@ -69,16 +69,16 @@ end
function writearray(io, b::CompressedBuffer)
if length(b.data) > 0
n = Base.write(io, b.uncompressedlength)
@debug 1 "writing compressed buffer: uncompressedlength = $(b.uncompressedlength), n = $(length(b.data))"
@debug 2 b.data
@debugv 1 "writing compressed buffer: uncompressedlength = $(b.uncompressedlength), n = $(length(b.data))"
@debugv 2 b.data
return n + Base.write(io, b.data)
end
return 0
end

function writebuffer(io, col::Compressed, alignment)
@debug 1 "writebuffer: col = $(typeof(col))"
@debug 2 col
@debugv 1 "writebuffer: col = $(typeof(col))"
@debugv 2 col
for buffer in col.buffers
n = writearray(io, buffer)
writezeros(io, paddinglength(n, alignment))
Expand Down
12 changes: 6 additions & 6 deletions src/arraytypes/dictencoding.jl
Original file line number Diff line number Diff line change
Expand Up @@ -289,16 +289,16 @@ function makenodesbuffers!(col::DictEncoded{T, S}, fieldnodes, fieldbuffers, buf
len = length(col)
nc = nullcount(col)
push!(fieldnodes, FieldNode(len, nc))
@debug 1 "made field node: nodeidx = $(length(fieldnodes)), col = $(typeof(col)), len = $(fieldnodes[end].length), nc = $(fieldnodes[end].null_count)"
@debugv 1 "made field node: nodeidx = $(length(fieldnodes)), col = $(typeof(col)), len = $(fieldnodes[end].length), nc = $(fieldnodes[end].null_count)"
# validity bitmap
blen = nc == 0 ? 0 : bitpackedbytes(len, alignment)
push!(fieldbuffers, Buffer(bufferoffset, blen))
@debug 1 "made field buffer: bufferidx = $(length(fieldbuffers)), offset = $(fieldbuffers[end].offset), len = $(fieldbuffers[end].length), padded = $(padding(fieldbuffers[end].length, alignment))"
@debugv 1 "made field buffer: bufferidx = $(length(fieldbuffers)), offset = $(fieldbuffers[end].offset), len = $(fieldbuffers[end].length), padded = $(padding(fieldbuffers[end].length, alignment))"
bufferoffset += blen
# indices
blen = sizeof(S) * len
push!(fieldbuffers, Buffer(bufferoffset, blen))
@debug 1 "made field buffer: bufferidx = $(length(fieldbuffers)), offset = $(fieldbuffers[end].offset), len = $(fieldbuffers[end].length), padded = $(padding(fieldbuffers[end].length, alignment))"
@debugv 1 "made field buffer: bufferidx = $(length(fieldbuffers)), offset = $(fieldbuffers[end].offset), len = $(fieldbuffers[end].length), padded = $(padding(fieldbuffers[end].length, alignment))"
bufferoffset += padding(blen, alignment)
return bufferoffset
end
Expand All @@ -308,12 +308,12 @@ DataAPI.refarray(x::DictEncoded{T, S}) where {T, S} = x.indices .+ one(S)
DataAPI.refpool(x::DictEncoded) = copy(x.encoding.data)

function writebuffer(io, col::DictEncoded, alignment)
@debug 1 "writebuffer: col = $(typeof(col))"
@debug 2 col
@debugv 1 "writebuffer: col = $(typeof(col))"
@debugv 2 col
writebitmap(io, col, alignment)
# write indices
n = writearray(io, col.indices)
@debug 1 "writing array: col = $(typeof(col.indices)), n = $n, padded = $(padding(n, alignment))"
@debugv 1 "writing array: col = $(typeof(col.indices)), n = $n, padded = $(padding(n, alignment))"
writezeros(io, paddinglength(n, alignment))
return
end
12 changes: 6 additions & 6 deletions src/arraytypes/fixedsizelist.jl
Original file line number Diff line number Diff line change
Expand Up @@ -139,16 +139,16 @@ function makenodesbuffers!(col::FixedSizeList{T, A}, fieldnodes, fieldbuffers, b
len = length(col)
nc = nullcount(col)
push!(fieldnodes, FieldNode(len, nc))
@debug 1 "made field node: nodeidx = $(length(fieldnodes)), col = $(typeof(col)), len = $(fieldnodes[end].length), nc = $(fieldnodes[end].null_count)"
@debugv 1 "made field node: nodeidx = $(length(fieldnodes)), col = $(typeof(col)), len = $(fieldnodes[end].length), nc = $(fieldnodes[end].null_count)"
# validity bitmap
blen = nc == 0 ? 0 : bitpackedbytes(len, alignment)
push!(fieldbuffers, Buffer(bufferoffset, blen))
@debug 1 "made field buffer: bufferidx = $(length(fieldbuffers)), offset = $(fieldbuffers[end].offset), len = $(fieldbuffers[end].length), padded = $(padding(fieldbuffers[end].length, alignment))"
@debugv 1 "made field buffer: bufferidx = $(length(fieldbuffers)), offset = $(fieldbuffers[end].offset), len = $(fieldbuffers[end].length), padded = $(padding(fieldbuffers[end].length, alignment))"
bufferoffset += blen
if eltype(A) === UInt8
blen = ArrowTypes.getsize(ArrowTypes.ArrowKind(Base.nonmissingtype(T))) * len
push!(fieldbuffers, Buffer(bufferoffset, blen))
@debug 1 "made field buffer: bufferidx = $(length(fieldbuffers)), offset = $(fieldbuffers[end].offset), len = $(fieldbuffers[end].length), padded = $(padding(fieldbuffers[end].length, alignment))"
@debugv 1 "made field buffer: bufferidx = $(length(fieldbuffers)), offset = $(fieldbuffers[end].offset), len = $(fieldbuffers[end].length), padded = $(padding(fieldbuffers[end].length, alignment))"
bufferoffset += padding(blen, alignment)
else
bufferoffset = makenodesbuffers!(col.data, fieldnodes, fieldbuffers, bufferoffset, alignment)
Expand All @@ -157,13 +157,13 @@ function makenodesbuffers!(col::FixedSizeList{T, A}, fieldnodes, fieldbuffers, b
end

function writebuffer(io, col::FixedSizeList{T, A}, alignment) where {T, A}
@debug 1 "writebuffer: col = $(typeof(col))"
@debug 2 col
@debugv 1 "writebuffer: col = $(typeof(col))"
@debugv 2 col
writebitmap(io, col, alignment)
# write values array
if eltype(A) === UInt8
n = writearray(io, UInt8, col.data)
@debug 1 "writing array: col = $(typeof(col.data)), n = $n, padded = $(padding(n, alignment))"
@debugv 1 "writing array: col = $(typeof(col.data)), n = $n, padded = $(padding(n, alignment))"
writezeros(io, paddinglength(n, alignment))
else
writebuffer(io, col.data, alignment)
Expand Down
16 changes: 8 additions & 8 deletions src/arraytypes/map.jl
Original file line number Diff line number Diff line change
Expand Up @@ -79,21 +79,21 @@ function makenodesbuffers!(col::Union{Map{T, O, A}, List{T, O, A}}, fieldnodes,
len = length(col)
nc = nullcount(col)
push!(fieldnodes, FieldNode(len, nc))
@debug 1 "made field node: nodeidx = $(length(fieldnodes)), col = $(typeof(col)), len = $(fieldnodes[end].length), nc = $(fieldnodes[end].null_count)"
@debugv 1 "made field node: nodeidx = $(length(fieldnodes)), col = $(typeof(col)), len = $(fieldnodes[end].length), nc = $(fieldnodes[end].null_count)"
# validity bitmap
blen = nc == 0 ? 0 : bitpackedbytes(len, alignment)
push!(fieldbuffers, Buffer(bufferoffset, blen))
@debug 1 "made field buffer: bufferidx = $(length(fieldbuffers)), offset = $(fieldbuffers[end].offset), len = $(fieldbuffers[end].length), padded = $(padding(fieldbuffers[end].length, alignment))"
@debugv 1 "made field buffer: bufferidx = $(length(fieldbuffers)), offset = $(fieldbuffers[end].offset), len = $(fieldbuffers[end].length), padded = $(padding(fieldbuffers[end].length, alignment))"
# adjust buffer offset, make array buffer
bufferoffset += blen
blen = sizeof(O) * (len + 1)
push!(fieldbuffers, Buffer(bufferoffset, blen))
@debug 1 "made field buffer: bufferidx = $(length(fieldbuffers)), offset = $(fieldbuffers[end].offset), len = $(fieldbuffers[end].length), padded = $(padding(fieldbuffers[end].length, alignment))"
@debugv 1 "made field buffer: bufferidx = $(length(fieldbuffers)), offset = $(fieldbuffers[end].offset), len = $(fieldbuffers[end].length), padded = $(padding(fieldbuffers[end].length, alignment))"
bufferoffset += padding(blen, alignment)
if eltype(A) == UInt8
blen = length(col.data)
push!(fieldbuffers, Buffer(bufferoffset, blen))
@debug 1 "made field buffer: bufferidx = $(length(fieldbuffers)), offset = $(fieldbuffers[end].offset), len = $(fieldbuffers[end].length), padded = $(padding(fieldbuffers[end].length, alignment))"
@debugv 1 "made field buffer: bufferidx = $(length(fieldbuffers)), offset = $(fieldbuffers[end].offset), len = $(fieldbuffers[end].length), padded = $(padding(fieldbuffers[end].length, alignment))"
bufferoffset += padding(blen, alignment)
else
bufferoffset = makenodesbuffers!(col.data, fieldnodes, fieldbuffers, bufferoffset, alignment)
Expand All @@ -102,17 +102,17 @@ function makenodesbuffers!(col::Union{Map{T, O, A}, List{T, O, A}}, fieldnodes,
end

function writebuffer(io, col::Union{Map{T, O, A}, List{T, O, A}}, alignment) where {T, O, A}
@debug 1 "writebuffer: col = $(typeof(col))"
@debug 2 col
@debugv 1 "writebuffer: col = $(typeof(col))"
@debugv 2 col
writebitmap(io, col, alignment)
# write offsets
n = writearray(io, O, col.offsets.offsets)
@debug 1 "writing array: col = $(typeof(col.offsets.offsets)), n = $n, padded = $(padding(n, alignment))"
@debugv 1 "writing array: col = $(typeof(col.offsets.offsets)), n = $n, padded = $(padding(n, alignment))"
writezeros(io, paddinglength(n, alignment))
# write values array
if eltype(A) == UInt8
n = writearray(io, UInt8, col.data)
@debug 1 "writing array: col = $(typeof(col.data)), n = $n, padded = $(padding(n, alignment))"
@debugv 1 "writing array: col = $(typeof(col.data)), n = $n, padded = $(padding(n, alignment))"
writezeros(io, paddinglength(n, alignment))
else
writebuffer(io, col.data, alignment)
Expand Down
12 changes: 6 additions & 6 deletions src/arraytypes/primitive.jl
Original file line number Diff line number Diff line change
Expand Up @@ -82,25 +82,25 @@ function makenodesbuffers!(col::Primitive{T}, fieldnodes, fieldbuffers, bufferof
len = length(col)
nc = nullcount(col)
push!(fieldnodes, FieldNode(len, nc))
@debug 1 "made field node: nodeidx = $(length(fieldnodes)), col = $(typeof(col)), len = $(fieldnodes[end].length), nc = $(fieldnodes[end].null_count)"
@debugv 1 "made field node: nodeidx = $(length(fieldnodes)), col = $(typeof(col)), len = $(fieldnodes[end].length), nc = $(fieldnodes[end].null_count)"
# validity bitmap
blen = nc == 0 ? 0 : bitpackedbytes(len, alignment)
push!(fieldbuffers, Buffer(bufferoffset, blen))
@debug 1 "made field buffer: bufferidx = $(length(fieldbuffers)), offset = $(fieldbuffers[end].offset), len = $(fieldbuffers[end].length), padded = $(padding(fieldbuffers[end].length, alignment))"
@debugv 1 "made field buffer: bufferidx = $(length(fieldbuffers)), offset = $(fieldbuffers[end].offset), len = $(fieldbuffers[end].length), padded = $(padding(fieldbuffers[end].length, alignment))"
# adjust buffer offset, make primitive array buffer
bufferoffset += blen
blen = len * sizeof(Base.nonmissingtype(T))
push!(fieldbuffers, Buffer(bufferoffset, blen))
@debug 1 "made field buffer: bufferidx = $(length(fieldbuffers)), offset = $(fieldbuffers[end].offset), len = $(fieldbuffers[end].length), padded = $(padding(fieldbuffers[end].length, alignment))"
@debugv 1 "made field buffer: bufferidx = $(length(fieldbuffers)), offset = $(fieldbuffers[end].offset), len = $(fieldbuffers[end].length), padded = $(padding(fieldbuffers[end].length, alignment))"
return bufferoffset + padding(blen, alignment)
end

function writebuffer(io, col::Primitive{T}, alignment) where {T}
@debug 1 "writebuffer: col = $(typeof(col))"
@debug 2 col
@debugv 1 "writebuffer: col = $(typeof(col))"
@debugv 2 col
writebitmap(io, col, alignment)
n = writearray(io, Base.nonmissingtype(T), col.data)
@debug 1 "writing array: col = $(typeof(col.data)), n = $n, padded = $(padding(n, alignment))"
@debugv 1 "writing array: col = $(typeof(col.data)), n = $n, padded = $(padding(n, alignment))"
writezeros(io, paddinglength(n, alignment))
return
end
Loading