Skip to content

Conversation

@zeroshade
Copy link
Member

fixes #448

Rationale for this change

When dealing with unicode in json values, RecordFromJSON seems to have a significant performance slow-down due to an odd interaction of decoders with goccy/go-json. NewJSONReader doesn't exhibit the issue because it essentially creates a NewDecoder for each line/record by decoding into a RecordBuilder directly.

What changes are included in this PR?

Change RecordFromJSON to work closer to NewJSONReader in how it decodes directly into a RecordBuilder so that we side-step the performance problem for large amounts of JSON.

Are these changes tested?

Yes, benchmarks are added to keep track of the performance of using RecordFromJSON vs NewJSONReader for the same data.

Are there any user-facing changes?

Only a performance improvement when JSON has large amounts of unicode data.

@zeroshade zeroshade requested review from kou and lidavidm July 24, 2025 19:42
Copy link
Member Author

@zeroshade zeroshade left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@caldempsey please give this a try and confirm that it solves your performance problem?

arr, off, err := FromJSON(mem, st, r, opts...)
if err != nil {
return nil, off, err
var cfg fromJSONCfg
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we link to the issue here or/and summarize the performance issue so that it's clear why we're doing something rather unusual?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, basically we've inlined/specialized FromJSON here but with a RecordBuilder?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pretty much, the big difference between this and NewJSONReader is that this expects [ {record}, {record}, {record} ] where NewJSONReader expects NDJson

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll merge this and file an issue with goccy/go-json

@caldempsey
Copy link

@zeroshade Hey! I'll try it tomorrow morning (UK/GMT) :)

@caldempsey
Copy link

caldempsey commented Jul 25, 2025

Ran a test using data that's not quite production, but closer. The use case involves ingesting transcripts of audio files via Spark Connect. These transcripts can be quite large, up to 4MB each. We're streaming this data into a data lake using Spark Connect instead of traditional Spark jobs, which allows us to receive synchronous responses at the call site and support coordination patterns (GET/POST) using standard REST consumers (which is so much better for error handling). Hence the massive strings. Here are some benchmarks that compare your changes against Goccy's decode method.

The reason I chose Goccy's decode method as this test to identify the root cause is if you follow the callsite trail, it forces a call to Goccy's string decoder:

func (d *stringDecoder) DecodeStream(s *Stream, depth int64, p unsafe.Pointer) error {
	bytes, err := d.decodeStreamByte(s)
	if err != nil {
		return err
	}
	if bytes == nil {
		return nil
	}
	**(**string)(unsafe.Pointer(&p)) = *(*string)(unsafe.Pointer(&bytes))
	s.reset()
	return nil
}

In the 4MB test with or without 'special' or more likely to be escaped unicode characters:

4MB_large_payload_1000_records_non_az_chars:
  Records: 1000
  Target record size: 4000000 bytes
  JSON Array size: 3814.95 MB
  NDJSON size: 3814.95 MB
  RecordFromJSON:            20.802824917s (183.39 MB/s, 48 rec/s)
  JSONReader (chunked):      16.895239958s (225.80 MB/s, 59 rec/s)
  JSONReader (single chunk): 15.627604875s (244.12 MB/s, 64 rec/s)
  GoccyJSONDecoder:            8.260023459s (461.86 MB/s, 121 rec/s)
  // Goccy Decode DNF 

We can observe the slowness if we let it finish in the tiny payload test with a whopping 1.866923375s in Goccy's decoder:

tiny payload:
  Records: 1000
  Target record size: 10240 bytes
  JSON Array size: 10.01 MB
  NDJSON size: 10.01 MB
  RecordFromJSON:              40.55425ms (246.84 MB/s, 24658 rec/s)
  JSONReader (chunked):       37.439792ms (267.37 MB/s, 26710 rec/s)
  JSONReader (single chunk):  36.667208ms (273.00 MB/s, 27272 rec/s)
  GoccyJSONUnmarshal:             15.728292ms (636.45 MB/s, 63580 rec/s)
  GoccyJSONDecoder:            1.866923375s (636.45 MB/s, 63580 rec/s) // phwor 
--- PASS: TestFileLabelsPayloads/tiny_payload (2.05s)
--- PASS: TestFileLabelsPayloads (2.05s)
PASS

So I think we've latched onto the root cause and can show RecordFromJSON is now dealing gracefully.

package main_test

import (
	"bytes"
	"encoding/json"
	"fmt"
	"github.com/apache/arrow-go/v18/arrow"
	"github.com/apache/arrow-go/v18/arrow/array"
	"github.com/apache/arrow-go/v18/arrow/memory"
	gojson "github.com/goccy/go-json"
	"github.com/google/uuid"
	"strings"
	"testing"
	"time"
)

// FileLabel represents the structure from the Python benchmark
type FileLabel struct {
	FilePath    string  `json:"file_path"`
	SomeLabel   string  `json:"some_label"`
	Confidence  float64 `json:"confidence"`
	ModelName   string  `json:"model_name"`
	ProcessedAt string  `json:"processed_at"`
	BatchID     string  `json:"batch_id"`
	Metadata    string  `json:"metadata"` // JSON string
}

// MetadataContent represents the metadata structure
type MetadataContent struct {
	ProcessingTimeMs int    `json:"processing_time_ms"`
	Version          string `json:"version"`
	Padding          string `json:"padding"`
	RecordID         int    `json:"record_id"`
	ExtraField1      string `json:"extra_field_1"`
	ExtraField2      string `json:"extra_field_2"`
	ExtraField3      string `json:"extra_field_3"`
}

// makePadding creates a padding string of the specified size
func makePadding(size int, useNonAZChars bool) string {
	alphabet := "abcdefghijklmnopqrstuvwxyz"
	if useNonAZChars {
		alphabet = "世界你好"
	}
	repeats := size/len(alphabet) + 1
	return strings.Repeat(alphabet, repeats)[:size]
}

// genPayload
func genPayload(numRecords int, recordSizeBytes int, useNonAZChars bool) []FileLabel {
	batchID := uuid.New().String()
	baseRecordSize := 200
	metadataSize := recordSizeBytes - baseRecordSize
	if metadataSize < 100 {
		metadataSize = 100
	}
	padding := makePadding(metadataSize, useNonAZChars)
	nowISO := time.Now().UTC().Format(time.RFC3339)

	payload := make([]FileLabel, numRecords)
	for i := 0; i < numRecords; i++ {
		metadata := MetadataContent{
			ProcessingTimeMs: 150 + (i % 100),
			Version:          "1.0",
			Padding:          padding,
			RecordID:         i,
			ExtraField1:      fmt.Sprintf("extra_value_%d", i),
			ExtraField2:      fmt.Sprintf("extra_value_%d", i*2),
			ExtraField3:      fmt.Sprintf("extra_value_%d", i*3),
		}

		metadataJSON, _ := json.Marshal(metadata)

		payload[i] = FileLabel{
			FilePath:    fmt.Sprintf("s3://test-bucket/batch-%s/file-%d.jpg", batchID, i),
			SomeLabel:   fmt.Sprintf("label_%d", i%10),
			Confidence:  0.85 + float64(i%15)/100.0,
			ModelName:   fmt.Sprintf("model_v%d", (i%5)+1),
			ProcessedAt: nowISO,
			BatchID:     batchID,
			Metadata:    string(metadataJSON),
		}
	}
	return payload
}

// payloadToNDJSON converts payload to newline-delimited JSON bytes
func payloadToNDJSON(payload []FileLabel) []byte {
	var buf bytes.Buffer
	encoder := json.NewEncoder(&buf)
	for _, record := range payload {
		encoder.Encode(record)
	}
	return buf.Bytes()
}

// payloadToJSONArray converts payload to JSON array format
func payloadToJSONArray(payload []FileLabel) []byte {
	data, _ := json.Marshal(payload)
	return data
}

// Define Arrow schema matching the FileLabel structure
func getFileLabelsSchema() *arrow.Schema {
	return arrow.NewSchema([]arrow.Field{
		{Name: "file_path", Type: arrow.BinaryTypes.String},
		{Name: "some_label", Type: arrow.BinaryTypes.String},
		{Name: "confidence", Type: arrow.PrimitiveTypes.Float64},
		{Name: "model_name", Type: arrow.BinaryTypes.String},
		{Name: "processed_at", Type: arrow.BinaryTypes.String},
		{Name: "batch_id", Type: arrow.BinaryTypes.String},
		{Name: "metadata", Type: arrow.BinaryTypes.String},
	}, nil)
}

func benchmarkRecordFromJSON(data []byte, schema *arrow.Schema) (time.Duration, int64, error) {
	pool := memory.NewGoAllocator()

	start := time.Now()
	record, _, err := array.RecordFromJSON(pool, schema, bytes.NewReader(data))
	duration := time.Since(start)

	if err != nil {
		return duration, 0, err
	}

	numRows := record.NumRows()
	record.Release()

	return duration, numRows, nil
}

func benchmarkGoccyJSONDecoder(jsonData []byte) (time.Duration, int, error) {
	start := time.Now()

	var records []FileLabel

	decoder := gojson.NewDecoder(bytes.NewReader(jsonData))
	if err := decoder.Decode(&records); err != nil {
		return 0, 0, err
	}

	return time.Since(start), len(records), nil
}

func benchmarkGoccyUnmarshal(jsonData []byte) (time.Duration, int, error) {
	start := time.Now()

	var records []FileLabel

	err := gojson.Unmarshal(jsonData, &records)

	return time.Since(start), len(records), err
}

func benchmarkJSONReader(ndjsonData []byte, schema *arrow.Schema) (time.Duration, int64, error) {
	pool := memory.NewGoAllocator()

	start := time.Now()

	rdr := array.NewJSONReader(bytes.NewReader(ndjsonData), schema,
		array.WithAllocator(pool))
	defer rdr.Release()

	var totalRows int64
	for rdr.Next() {
		rec := rdr.Record()
		totalRows += rec.NumRows()
	}

	if err := rdr.Err(); err != nil {
		return time.Since(start), totalRows, err
	}

	duration := time.Since(start)
	return duration, totalRows, nil
}

func benchmarkJSONReaderSingleChunk(ndjsonData []byte, schema *arrow.Schema) (time.Duration, int64, error) {
	pool := memory.NewGoAllocator()

	start := time.Now()

	rdr := array.NewJSONReader(bytes.NewReader(ndjsonData), schema,
		array.WithAllocator(pool),
		array.WithChunk(-1))
	defer rdr.Release()

	if !rdr.Next() {
		return time.Since(start), 0, fmt.Errorf("no record found")
	}

	rec := rdr.Record()
	numRows := rec.NumRows()

	duration := time.Since(start)
	return duration, numRows, nil
}

// BenchmarkScenario represents a test scenario
type BenchmarkScenario struct {
	Name            string
	NumRecords      int
	RecordSizeBytes int
	UseNonAZChars   bool
}

func TestFileLabelsPayloads(t *testing.T) {
	scenarios := []BenchmarkScenario{
		{
			Name:            "tiny payload",
			NumRecords:      1000,
			RecordSizeBytes: 10_240,
			UseNonAZChars:   false,
		},
		{
			Name:            "large_payload_50k_records",
			NumRecords:      50_000,
			RecordSizeBytes: 10_240,
			UseNonAZChars:   false,
		},
		{
			Name:            "4MB_large_payload_1000_records",
			NumRecords:      1000,
			RecordSizeBytes: 4000000,
			UseNonAZChars:   false,
		},
		{
			Name:            "large_payload_50k_records_non_az_chars",
			NumRecords:      50_000,
			RecordSizeBytes: 10_240,
			UseNonAZChars:   true,
		},
		{
			Name:            "4MB_large_payload_1000_records_non_az_chars",
			NumRecords:      1000,
			RecordSizeBytes: 4000000,
			UseNonAZChars:   true,
		},
	}

	schema := getFileLabelsSchema()

	fmt.Println("Spark Connect API Payload Benchmarks")
	fmt.Println("==================================")

	for _, scenario := range scenarios {
		t.Run(scenario.Name, func(t *testing.T) {
			// Generate payload
			payload := genPayload(scenario.NumRecords, scenario.RecordSizeBytes, scenario.UseNonAZChars)

			// Convert to both formats
			jsonArrayData := payloadToJSONArray(payload)
			ndjsonData := payloadToNDJSON(payload)

			jsonArraySizeMB := float64(len(jsonArrayData)) / (1024 * 1024)
			ndjsonSizeMB := float64(len(ndjsonData)) / (1024 * 1024)

			fmt.Printf("\n%s:\n", scenario.Name)
			fmt.Printf("  Records: %d\n", scenario.NumRecords)
			fmt.Printf("  Target record size: %d bytes\n", scenario.RecordSizeBytes)
			fmt.Printf("  JSON Array size: %.2f MB\n", jsonArraySizeMB)
			fmt.Printf("  NDJSON size: %.2f MB\n", ndjsonSizeMB)

			// Benchmark RecordFromJSON (expects JSON array)
			duration1, rows1, err1 := benchmarkRecordFromJSON(jsonArrayData, schema)
			if err1 != nil {
				t.Errorf("RecordFromJSON failed: %v", err1)
			} else {
				throughput1 := jsonArraySizeMB / duration1.Seconds()
				recordsPerSec1 := float64(rows1) / duration1.Seconds()
				fmt.Printf("  RecordFromJSON:            %12v (%.2f MB/s, %.0f rec/s)\n",
					duration1, throughput1, recordsPerSec1)
			}
			// Benchmark JSONReader with NDJSON (default chunking)
			duration2, rows2, err2 := benchmarkJSONReader(ndjsonData, schema)
			if err2 != nil {
				t.Errorf("JSONReader (chunked) failed: %v", err2)
			} else {
				throughput2 := ndjsonSizeMB / duration2.Seconds()
				recordsPerSec2 := float64(rows2) / duration2.Seconds()
				fmt.Printf("  JSONReader (chunked):      %12v (%.2f MB/s, %.0f rec/s)\n",
					duration2, throughput2, recordsPerSec2)
			}

			// Benchmark JSONReader with NDJSON (single chunk)
			duration3, rows3, err3 := benchmarkJSONReaderSingleChunk(ndjsonData, schema)
			if err3 != nil {
				t.Errorf("JSONReader (single chunk) failed: %v", err3)
			} else {
				throughput3 := ndjsonSizeMB / duration3.Seconds()
				recordsPerSec3 := float64(rows3) / duration3.Seconds()
				fmt.Printf("  JSONReader (single chunk): %12v (%.2f MB/s, %.0f rec/s)\n",
					duration3, throughput3, recordsPerSec3)
			}
			// Benchmark Goccy JSON deserialization
			duration4, rows4, err4 := benchmarkGoccyUnmarshal(jsonArrayData)
			if err4 != nil {
				t.Errorf("GoccyJSONUnmarshal failed: %v", err1)
			} else {
				throughput4 := jsonArraySizeMB / duration4.Seconds()
				recordsPerSec4 := float64(rows4) / duration4.Seconds()
				fmt.Printf("  GoccyJSONUnmarshal:            %12v (%.2f MB/s, %.0f rec/s)\n",
					duration4, throughput4, recordsPerSec4)
			}
			// Benchmark Goccy Decoder (expects JSON array)
			duration5, rows5, err5 := benchmarkGoccyJSONDecoder(jsonArrayData)
			if err5 != nil {
				t.Errorf("GoccyJSONDecoder failed: %v", err1)
			} else {
				throughput5 := jsonArraySizeMB / duration4.Seconds()
				recordsPerSec5 := float64(rows5) / duration4.Seconds()
				fmt.Printf("  GoccyJSONDecoder:            %12v (%.2f MB/s, %.0f rec/s)\n",
					duration5, throughput5, recordsPerSec5)
			}
		})
	}
}

So looks like the root cause was determined. I think the only way this could be improved would be to have some way of enabling concurrent record processing: there's not a lot of reason why we can't use multiple threads as long as we can stitch the response back together w/ the right indices.

LGTM 🚀

@zeroshade zeroshade merged commit a7d23a7 into apache:main Jul 25, 2025
20 checks passed
@zeroshade zeroshade deleted the record-from-json-perf branch July 25, 2025 15:20
@zeroshade
Copy link
Member Author

Turns out it's a known issue goccy/go-json#549 😦

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5000x performance degradation in RecordFromJSON vs NewJSONReader (yes really)

3 participants