Skip to content
This repository was archived by the owner on Mar 3, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions crates/sparrow-sources/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ sparrow-interfaces = { path = "../sparrow-interfaces" }
tracing.workspace = true

[dev-dependencies]
tokio.workspace = true
arrow-select.workspace = true
static_init.workspace = true

[lib]
bench = false
Expand Down
194 changes: 194 additions & 0 deletions crates/sparrow-sources/src/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ impl InMemoryBatches {
/// Add a batch, merging it into the in-memory version.
///
/// Publishes the new batch to the subscribers.
///
/// Note: This assumes the batch has been prepared, and will likely panic if not.
pub async fn add_batch(&self, batch: RecordBatch) -> error_stack::Result<(), SourceError> {
if batch.num_rows() == 0 {
return Ok(());
Expand Down Expand Up @@ -210,6 +212,7 @@ impl InMemoryBatches {
}
}
}
.boxed()
}

/// Retrieve the current in-memory batch.
Expand All @@ -222,3 +225,194 @@ impl InMemoryBatches {
}
}
}

#[cfg(test)]
mod tests {
use arrow_array::{
types::{ArrowPrimitiveType, TimestampNanosecondType},
ArrayRef, Int32Array, TimestampNanosecondArray, UInt64Array,
};
use arrow_schema::{Field, Schema};

use super::*;

#[static_init::dynamic]
static SCHEMA: Schema = {
Schema::new(vec![
Field::new("_time", TimestampNanosecondType::DATA_TYPE, false),
Field::new("_subsort", DataType::UInt64, false),
Field::new("_key_hash", DataType::UInt64, false),
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::UInt64, true),
])
};

fn batch1() -> RecordBatch {
let time: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![0, 1, 2]));
let subsort: ArrayRef = Arc::new(UInt64Array::from(vec![0, 1, 2]));
let key: ArrayRef = Arc::new(UInt64Array::from(vec![0, 0, 0]));
let a: ArrayRef = Arc::new(Int32Array::from(vec![0, 1, 2]));
let b: ArrayRef = Arc::new(UInt64Array::from(vec![None, Some(1), Some(8)]));
let schema = Arc::new(SCHEMA.clone());
RecordBatch::try_new(schema.clone(), vec![time, subsort, key, a, b]).unwrap()
}

fn batch2() -> RecordBatch {
let time: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![3, 4, 5]));
let subsort: ArrayRef = Arc::new(UInt64Array::from(vec![0, 1, 2]));
let key: ArrayRef = Arc::new(UInt64Array::from(vec![0, 0, 0]));
let a: ArrayRef = Arc::new(Int32Array::from(vec![3, 4, 5]));
let b: ArrayRef = Arc::new(UInt64Array::from(vec![Some(10), None, None]));
let schema = Arc::new(SCHEMA.clone());
RecordBatch::try_new(schema.clone(), vec![time, subsort, key, a, b]).unwrap()
}

#[tokio::test]
async fn test_subscribe_to_batches() {
let batch1 = batch1();
let batch2 = batch2();
let schema = Arc::new(SCHEMA.clone());

let in_mem = InMemoryBatches::new(true, schema);

let mut s1 = in_mem.subscribe();
let mut s2 = in_mem.subscribe();

// Add the first batch
in_mem.add_batch(batch1.clone()).await.unwrap();

let b1_s1 = s1.next().await.unwrap().unwrap();
assert_eq!(batch1, b1_s1);
let b1_s2 = s2.next().await.unwrap().unwrap();
assert_eq!(batch1, b1_s2);

// Add the second batch
in_mem.add_batch(batch2.clone()).await.unwrap();
let b2_s1 = s1.next().await.unwrap().unwrap();
assert_eq!(batch2, b2_s1);
let b2_s2 = s2.next().await.unwrap().unwrap();
assert_eq!(batch2, b2_s2);
}

#[tokio::test]
async fn test_subscribe_to_multiple_batches() {
// Sends multiple batches before reading
let batch1 = batch1();
let batch2 = batch2();
let schema = Arc::new(SCHEMA.clone());

let in_mem = InMemoryBatches::new(true, schema.clone());

let mut s1 = in_mem.subscribe();
let mut s2 = in_mem.subscribe();

// Add both batches before reading
in_mem.add_batch(batch1.clone()).await.unwrap();
in_mem.add_batch(batch2.clone()).await.unwrap();

let b1_s1 = s1.next().await.unwrap().unwrap();
assert_eq!(batch1, b1_s1);
let b1_s2 = s2.next().await.unwrap().unwrap();
assert_eq!(batch1, b1_s2);

let b2_s1 = s1.next().await.unwrap().unwrap();
assert_eq!(batch2, b2_s1);
let b2_s2 = s2.next().await.unwrap().unwrap();
assert_eq!(batch2, b2_s2);
}

#[tokio::test]
async fn test_late_subscription_receives_merged_batch() {
// Verify later subscription gets the full merged batch
let batch1 = batch1();
let batch2 = batch2();
let schema = Arc::new(SCHEMA.clone());

let in_mem = InMemoryBatches::new(true, schema.clone());

// Subscribe first stream
let mut s1 = in_mem.subscribe();

// Send both batches. In-memory should have merged them.
in_mem.add_batch(batch1.clone()).await.unwrap();
in_mem.add_batch(batch2.clone()).await.unwrap();

// Subscribe second stream
let mut s2 = in_mem.subscribe();

let b1_s1 = s1.next().await.unwrap().unwrap();
assert_eq!(batch1, b1_s1);

let b2_s1 = s1.next().await.unwrap().unwrap();
assert_eq!(batch2, b2_s1);

let merged_batch =
arrow_select::concat::concat_batches(&batch1.schema(), &[batch1, batch2]).unwrap();

let b1_s2 = s2.next().await.unwrap().unwrap();
assert_eq!(merged_batch, b1_s2);
}

#[tokio::test]
async fn test_reads_current_batch() {
let batch1 = batch1();
let schema = Arc::new(SCHEMA.clone());

let in_mem = InMemoryBatches::new(true, schema);
in_mem.add_batch(batch1.clone()).await.unwrap();

let received = in_mem.current().unwrap();
assert_eq!(batch1, received);

let batch2 = batch2();
in_mem.add_batch(batch2.clone()).await.unwrap();

let received = in_mem.current().unwrap();
let expected =
arrow_select::concat::concat_batches(&batch1.schema(), &[batch1, batch2]).unwrap();
assert_eq!(received, expected);
}

#[tokio::test]
async fn test_non_queryable_reads_empty_current_batch() {
let batch1 = batch1();
let schema = Arc::new(SCHEMA.clone());

let in_mem = InMemoryBatches::new(false, schema);
in_mem.add_batch(batch1.clone()).await.unwrap();

assert!(in_mem.current().is_none());
}

#[tokio::test]
async fn test_non_queryable_subscription() {
let batch1 = batch1();
let batch2 = batch2();
let schema = Arc::new(SCHEMA.clone());

let in_mem = InMemoryBatches::new(false, schema.clone());

// Subscribe first stream
let mut s1 = in_mem.subscribe();

// Send both batches
in_mem.add_batch(batch1.clone()).await.unwrap();

// Subscribe second stream
let mut s2 = in_mem.subscribe();

// Send the second batch
in_mem.add_batch(batch2.clone()).await.unwrap();

let b1_s1 = s1.next().await.unwrap().unwrap();
assert_eq!(batch1, b1_s1);

let b2_s1 = s1.next().await.unwrap().unwrap();
assert_eq!(batch2, b2_s1);

// Second subscription should only see second batch
let b1_s2 = s2.next().await.unwrap().unwrap();
println!("read 3");
assert_eq!(batch2, b1_s2);
}
}