-
Notifications
You must be signed in to change notification settings - Fork 82
fix(arrow/array): Fix RecordFromJSON perf #449
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
zeroshade
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@caldempsey please give this a try and confirm that it solves your performance problem?
| arr, off, err := FromJSON(mem, st, r, opts...) | ||
| if err != nil { | ||
| return nil, off, err | ||
| var cfg fromJSONCfg |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we link to the issue here or/and summarize the performance issue so that it's clear why we're doing something rather unusual?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, basically we've inlined/specialized FromJSON here but with a RecordBuilder?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pretty much, the big difference between this and NewJSONReader is that this expects [ {record}, {record}, {record} ] where NewJSONReader expects NDJson
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll merge this and file an issue with goccy/go-json
|
@zeroshade Hey! I'll try it tomorrow morning (UK/GMT) :) |
|
Ran a test using data that's not quite production, but closer. The use case involves ingesting transcripts of audio files via Spark Connect. These transcripts can be quite large, up to 4MB each. We're streaming this data into a data lake using Spark Connect instead of traditional Spark jobs, which allows us to receive synchronous responses at the call site and support coordination patterns (GET/POST) using standard REST consumers (which is so much better for error handling). Hence the massive strings. Here are some benchmarks that compare your changes against Goccy's decode method. The reason I chose Goccy's decode method as this test to identify the root cause is if you follow the callsite trail, it forces a call to Goccy's string decoder: func (d *stringDecoder) DecodeStream(s *Stream, depth int64, p unsafe.Pointer) error {
bytes, err := d.decodeStreamByte(s)
if err != nil {
return err
}
if bytes == nil {
return nil
}
**(**string)(unsafe.Pointer(&p)) = *(*string)(unsafe.Pointer(&bytes))
s.reset()
return nil
}In the 4MB test with or without 'special' or more likely to be escaped unicode characters: We can observe the slowness if we let it finish in the tiny payload test with a whopping So I think we've latched onto the root cause and can show RecordFromJSON is now dealing gracefully. package main_test
import (
"bytes"
"encoding/json"
"fmt"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/memory"
gojson "github.com/goccy/go-json"
"github.com/google/uuid"
"strings"
"testing"
"time"
)
// FileLabel represents the structure from the Python benchmark
type FileLabel struct {
FilePath string `json:"file_path"`
SomeLabel string `json:"some_label"`
Confidence float64 `json:"confidence"`
ModelName string `json:"model_name"`
ProcessedAt string `json:"processed_at"`
BatchID string `json:"batch_id"`
Metadata string `json:"metadata"` // JSON string
}
// MetadataContent represents the metadata structure
type MetadataContent struct {
ProcessingTimeMs int `json:"processing_time_ms"`
Version string `json:"version"`
Padding string `json:"padding"`
RecordID int `json:"record_id"`
ExtraField1 string `json:"extra_field_1"`
ExtraField2 string `json:"extra_field_2"`
ExtraField3 string `json:"extra_field_3"`
}
// makePadding creates a padding string of the specified size
func makePadding(size int, useNonAZChars bool) string {
alphabet := "abcdefghijklmnopqrstuvwxyz"
if useNonAZChars {
alphabet = "世界你好"
}
repeats := size/len(alphabet) + 1
return strings.Repeat(alphabet, repeats)[:size]
}
// genPayload
func genPayload(numRecords int, recordSizeBytes int, useNonAZChars bool) []FileLabel {
batchID := uuid.New().String()
baseRecordSize := 200
metadataSize := recordSizeBytes - baseRecordSize
if metadataSize < 100 {
metadataSize = 100
}
padding := makePadding(metadataSize, useNonAZChars)
nowISO := time.Now().UTC().Format(time.RFC3339)
payload := make([]FileLabel, numRecords)
for i := 0; i < numRecords; i++ {
metadata := MetadataContent{
ProcessingTimeMs: 150 + (i % 100),
Version: "1.0",
Padding: padding,
RecordID: i,
ExtraField1: fmt.Sprintf("extra_value_%d", i),
ExtraField2: fmt.Sprintf("extra_value_%d", i*2),
ExtraField3: fmt.Sprintf("extra_value_%d", i*3),
}
metadataJSON, _ := json.Marshal(metadata)
payload[i] = FileLabel{
FilePath: fmt.Sprintf("s3://test-bucket/batch-%s/file-%d.jpg", batchID, i),
SomeLabel: fmt.Sprintf("label_%d", i%10),
Confidence: 0.85 + float64(i%15)/100.0,
ModelName: fmt.Sprintf("model_v%d", (i%5)+1),
ProcessedAt: nowISO,
BatchID: batchID,
Metadata: string(metadataJSON),
}
}
return payload
}
// payloadToNDJSON converts payload to newline-delimited JSON bytes
func payloadToNDJSON(payload []FileLabel) []byte {
var buf bytes.Buffer
encoder := json.NewEncoder(&buf)
for _, record := range payload {
encoder.Encode(record)
}
return buf.Bytes()
}
// payloadToJSONArray converts payload to JSON array format
func payloadToJSONArray(payload []FileLabel) []byte {
data, _ := json.Marshal(payload)
return data
}
// Define Arrow schema matching the FileLabel structure
func getFileLabelsSchema() *arrow.Schema {
return arrow.NewSchema([]arrow.Field{
{Name: "file_path", Type: arrow.BinaryTypes.String},
{Name: "some_label", Type: arrow.BinaryTypes.String},
{Name: "confidence", Type: arrow.PrimitiveTypes.Float64},
{Name: "model_name", Type: arrow.BinaryTypes.String},
{Name: "processed_at", Type: arrow.BinaryTypes.String},
{Name: "batch_id", Type: arrow.BinaryTypes.String},
{Name: "metadata", Type: arrow.BinaryTypes.String},
}, nil)
}
func benchmarkRecordFromJSON(data []byte, schema *arrow.Schema) (time.Duration, int64, error) {
pool := memory.NewGoAllocator()
start := time.Now()
record, _, err := array.RecordFromJSON(pool, schema, bytes.NewReader(data))
duration := time.Since(start)
if err != nil {
return duration, 0, err
}
numRows := record.NumRows()
record.Release()
return duration, numRows, nil
}
func benchmarkGoccyJSONDecoder(jsonData []byte) (time.Duration, int, error) {
start := time.Now()
var records []FileLabel
decoder := gojson.NewDecoder(bytes.NewReader(jsonData))
if err := decoder.Decode(&records); err != nil {
return 0, 0, err
}
return time.Since(start), len(records), nil
}
func benchmarkGoccyUnmarshal(jsonData []byte) (time.Duration, int, error) {
start := time.Now()
var records []FileLabel
err := gojson.Unmarshal(jsonData, &records)
return time.Since(start), len(records), err
}
func benchmarkJSONReader(ndjsonData []byte, schema *arrow.Schema) (time.Duration, int64, error) {
pool := memory.NewGoAllocator()
start := time.Now()
rdr := array.NewJSONReader(bytes.NewReader(ndjsonData), schema,
array.WithAllocator(pool))
defer rdr.Release()
var totalRows int64
for rdr.Next() {
rec := rdr.Record()
totalRows += rec.NumRows()
}
if err := rdr.Err(); err != nil {
return time.Since(start), totalRows, err
}
duration := time.Since(start)
return duration, totalRows, nil
}
func benchmarkJSONReaderSingleChunk(ndjsonData []byte, schema *arrow.Schema) (time.Duration, int64, error) {
pool := memory.NewGoAllocator()
start := time.Now()
rdr := array.NewJSONReader(bytes.NewReader(ndjsonData), schema,
array.WithAllocator(pool),
array.WithChunk(-1))
defer rdr.Release()
if !rdr.Next() {
return time.Since(start), 0, fmt.Errorf("no record found")
}
rec := rdr.Record()
numRows := rec.NumRows()
duration := time.Since(start)
return duration, numRows, nil
}
// BenchmarkScenario represents a test scenario
type BenchmarkScenario struct {
Name string
NumRecords int
RecordSizeBytes int
UseNonAZChars bool
}
func TestFileLabelsPayloads(t *testing.T) {
scenarios := []BenchmarkScenario{
{
Name: "tiny payload",
NumRecords: 1000,
RecordSizeBytes: 10_240,
UseNonAZChars: false,
},
{
Name: "large_payload_50k_records",
NumRecords: 50_000,
RecordSizeBytes: 10_240,
UseNonAZChars: false,
},
{
Name: "4MB_large_payload_1000_records",
NumRecords: 1000,
RecordSizeBytes: 4000000,
UseNonAZChars: false,
},
{
Name: "large_payload_50k_records_non_az_chars",
NumRecords: 50_000,
RecordSizeBytes: 10_240,
UseNonAZChars: true,
},
{
Name: "4MB_large_payload_1000_records_non_az_chars",
NumRecords: 1000,
RecordSizeBytes: 4000000,
UseNonAZChars: true,
},
}
schema := getFileLabelsSchema()
fmt.Println("Spark Connect API Payload Benchmarks")
fmt.Println("==================================")
for _, scenario := range scenarios {
t.Run(scenario.Name, func(t *testing.T) {
// Generate payload
payload := genPayload(scenario.NumRecords, scenario.RecordSizeBytes, scenario.UseNonAZChars)
// Convert to both formats
jsonArrayData := payloadToJSONArray(payload)
ndjsonData := payloadToNDJSON(payload)
jsonArraySizeMB := float64(len(jsonArrayData)) / (1024 * 1024)
ndjsonSizeMB := float64(len(ndjsonData)) / (1024 * 1024)
fmt.Printf("\n%s:\n", scenario.Name)
fmt.Printf(" Records: %d\n", scenario.NumRecords)
fmt.Printf(" Target record size: %d bytes\n", scenario.RecordSizeBytes)
fmt.Printf(" JSON Array size: %.2f MB\n", jsonArraySizeMB)
fmt.Printf(" NDJSON size: %.2f MB\n", ndjsonSizeMB)
// Benchmark RecordFromJSON (expects JSON array)
duration1, rows1, err1 := benchmarkRecordFromJSON(jsonArrayData, schema)
if err1 != nil {
t.Errorf("RecordFromJSON failed: %v", err1)
} else {
throughput1 := jsonArraySizeMB / duration1.Seconds()
recordsPerSec1 := float64(rows1) / duration1.Seconds()
fmt.Printf(" RecordFromJSON: %12v (%.2f MB/s, %.0f rec/s)\n",
duration1, throughput1, recordsPerSec1)
}
// Benchmark JSONReader with NDJSON (default chunking)
duration2, rows2, err2 := benchmarkJSONReader(ndjsonData, schema)
if err2 != nil {
t.Errorf("JSONReader (chunked) failed: %v", err2)
} else {
throughput2 := ndjsonSizeMB / duration2.Seconds()
recordsPerSec2 := float64(rows2) / duration2.Seconds()
fmt.Printf(" JSONReader (chunked): %12v (%.2f MB/s, %.0f rec/s)\n",
duration2, throughput2, recordsPerSec2)
}
// Benchmark JSONReader with NDJSON (single chunk)
duration3, rows3, err3 := benchmarkJSONReaderSingleChunk(ndjsonData, schema)
if err3 != nil {
t.Errorf("JSONReader (single chunk) failed: %v", err3)
} else {
throughput3 := ndjsonSizeMB / duration3.Seconds()
recordsPerSec3 := float64(rows3) / duration3.Seconds()
fmt.Printf(" JSONReader (single chunk): %12v (%.2f MB/s, %.0f rec/s)\n",
duration3, throughput3, recordsPerSec3)
}
// Benchmark Goccy JSON deserialization
duration4, rows4, err4 := benchmarkGoccyUnmarshal(jsonArrayData)
if err4 != nil {
t.Errorf("GoccyJSONUnmarshal failed: %v", err1)
} else {
throughput4 := jsonArraySizeMB / duration4.Seconds()
recordsPerSec4 := float64(rows4) / duration4.Seconds()
fmt.Printf(" GoccyJSONUnmarshal: %12v (%.2f MB/s, %.0f rec/s)\n",
duration4, throughput4, recordsPerSec4)
}
// Benchmark Goccy Decoder (expects JSON array)
duration5, rows5, err5 := benchmarkGoccyJSONDecoder(jsonArrayData)
if err5 != nil {
t.Errorf("GoccyJSONDecoder failed: %v", err1)
} else {
throughput5 := jsonArraySizeMB / duration4.Seconds()
recordsPerSec5 := float64(rows5) / duration4.Seconds()
fmt.Printf(" GoccyJSONDecoder: %12v (%.2f MB/s, %.0f rec/s)\n",
duration5, throughput5, recordsPerSec5)
}
})
}
}So looks like the root cause was determined. I think the only way this could be improved would be to have some way of enabling concurrent record processing: there's not a lot of reason why we can't use multiple threads as long as we can stitch the response back together w/ the right indices. LGTM 🚀 |
|
Turns out it's a known issue goccy/go-json#549 😦 |
fixes #448
Rationale for this change
When dealing with unicode in json values,
RecordFromJSONseems to have a significant performance slow-down due to an odd interaction of decoders with goccy/go-json.NewJSONReaderdoesn't exhibit the issue because it essentially creates a NewDecoder for each line/record by decoding into a RecordBuilder directly.What changes are included in this PR?
Change
RecordFromJSONto work closer toNewJSONReaderin how it decodes directly into aRecordBuilderso that we side-step the performance problem for large amounts of JSON.Are these changes tested?
Yes, benchmarks are added to keep track of the performance of using
RecordFromJSONvsNewJSONReaderfor the same data.Are there any user-facing changes?
Only a performance improvement when JSON has large amounts of unicode data.