Skip to content

Commit f9ead28

Browse files
committed
Add tests
1 parent e2484cb commit f9ead28

File tree

2 files changed

+240
-6
lines changed

2 files changed

+240
-6
lines changed

rust/blockstore/src/arrow/blockfile.rs

Lines changed: 126 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1466,6 +1466,131 @@ mod tests {
14661466
}
14671467
}
14681468

1469+
#[tokio::test]
1470+
async fn test_splitting_with_custom_blocksize() {
1471+
let tmp_dir = tempfile::tempdir().unwrap();
1472+
let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap()));
1473+
let block_cache = new_cache_for_test();
1474+
let sparse_index_cache = new_cache_for_test();
1475+
let blockfile_provider = ArrowBlockfileProvider::new(
1476+
storage,
1477+
TEST_MAX_BLOCK_SIZE_BYTES,
1478+
block_cache,
1479+
sparse_index_cache,
1480+
);
1481+
let prefix_path = String::from("");
1482+
let custom_block_size = 100 * 1024 * 1024; // 100 MiB
1483+
let writer = blockfile_provider
1484+
.write::<&str, Vec<u32>>(
1485+
BlockfileWriterOptions::new(prefix_path.clone())
1486+
.max_block_size_bytes(custom_block_size),
1487+
)
1488+
.await
1489+
.unwrap();
1490+
let id_1 = writer.id();
1491+
1492+
let n = 1200;
1493+
for i in 0..n {
1494+
let key = format!("{:04}", i);
1495+
let value = vec![i];
1496+
writer.set("key", key.as_str(), value).await.unwrap();
1497+
}
1498+
1499+
let flusher = writer.commit::<&str, Vec<u32>>().await.unwrap();
1500+
flusher.flush::<&str, Vec<u32>>().await.unwrap();
1501+
1502+
let read_options = BlockfileReaderOptions::new(id_1, prefix_path.clone());
1503+
let reader = blockfile_provider
1504+
.read::<&str, &[u32]>(read_options)
1505+
.await
1506+
.unwrap();
1507+
1508+
for i in 0..n {
1509+
let key = format!("{:04}", i);
1510+
let value = reader.get("key", &key).await.unwrap().unwrap();
1511+
assert_eq!(value, [i]);
1512+
}
1513+
1514+
// Sparse index should have 1 block
1515+
match &reader {
1516+
crate::BlockfileReader::ArrowBlockfileReader(reader) => {
1517+
assert_eq!(reader.root.sparse_index.len(), 1);
1518+
assert!(reader.root.sparse_index.is_valid());
1519+
}
1520+
_ => panic!("Unexpected reader type"),
1521+
}
1522+
1523+
// Add 5 new entries to the first block
1524+
let writer = blockfile_provider
1525+
.write::<&str, Vec<u32>>(BlockfileWriterOptions::new(prefix_path.clone()).fork(id_1))
1526+
.await
1527+
.unwrap();
1528+
let id_2 = writer.id();
1529+
for i in 0..5 {
1530+
let key = format!("{:05}", i);
1531+
let value = vec![i];
1532+
writer.set("key", key.as_str(), value).await.unwrap();
1533+
}
1534+
1535+
let flusher = writer.commit::<&str, Vec<u32>>().await.unwrap();
1536+
flusher.flush::<&str, Vec<u32>>().await.unwrap();
1537+
1538+
let read_options = BlockfileReaderOptions::new(id_2, prefix_path.clone());
1539+
let reader = blockfile_provider
1540+
.read::<&str, &[u32]>(read_options)
1541+
.await
1542+
.unwrap();
1543+
for i in 0..5 {
1544+
let key = format!("{:05}", i);
1545+
println!("Getting key: {}", key);
1546+
let value = reader.get("key", &key).await.unwrap().unwrap();
1547+
assert_eq!(value, [i]);
1548+
}
1549+
1550+
// Sparse index should still have 1 block
1551+
match &reader {
1552+
crate::BlockfileReader::ArrowBlockfileReader(reader) => {
1553+
assert_eq!(reader.root.sparse_index.len(), 1);
1554+
assert!(reader.root.sparse_index.is_valid());
1555+
}
1556+
_ => panic!("Unexpected reader type"),
1557+
}
1558+
1559+
// Add 1200 more entries, still 1 block
1560+
let writer = blockfile_provider
1561+
.write::<&str, Vec<u32>>(BlockfileWriterOptions::new(prefix_path.clone()).fork(id_2))
1562+
.await
1563+
.unwrap();
1564+
let id_3 = writer.id();
1565+
for i in n..n * 2 {
1566+
let key = format!("{:04}", i);
1567+
let value = vec![i];
1568+
writer.set("key", key.as_str(), value).await.unwrap();
1569+
}
1570+
let flusher = writer.commit::<&str, Vec<u32>>().await.unwrap();
1571+
flusher.flush::<&str, Vec<u32>>().await.unwrap();
1572+
1573+
let read_options = BlockfileReaderOptions::new(id_3, prefix_path);
1574+
let reader = blockfile_provider
1575+
.read::<&str, &[u32]>(read_options)
1576+
.await
1577+
.unwrap();
1578+
for i in n..n * 2 {
1579+
let key = format!("{:04}", i);
1580+
let value = reader.get("key", &key).await.unwrap().unwrap();
1581+
assert_eq!(value, [i]);
1582+
}
1583+
1584+
// Sparse index should have 1 block
1585+
match &reader {
1586+
crate::BlockfileReader::ArrowBlockfileReader(reader) => {
1587+
assert_eq!(reader.root.sparse_index.len(), 1);
1588+
assert!(reader.root.sparse_index.is_valid());
1589+
}
1590+
_ => panic!("Unexpected reader type"),
1591+
}
1592+
}
1593+
14691594
#[tokio::test]
14701595
async fn test_splitting_boundary() {
14711596
let tmp_dir = tempfile::tempdir().unwrap();
@@ -2058,7 +2183,7 @@ mod tests {
20582183
let block_cache = new_cache_for_test();
20592184
let root_cache = new_cache_for_test();
20602185
let root_manager = RootManager::new(storage.clone(), root_cache);
2061-
let block_manager = BlockManager::new(storage.clone(), 8 * 1024 * 1024, block_cache);
2186+
let block_manager = BlockManager::new(storage.clone(), 16384, block_cache);
20622187

20632188
// Manually create a v1 blockfile with no counts
20642189
let initial_block = block_manager.create::<&str, String, UnorderedBlockDelta>();

rust/blockstore/src/arrow/root.rs

Lines changed: 114 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use std::{
2121
use thiserror::Error;
2222
use uuid::Uuid;
2323

24-
pub(super) const CURRENT_VERSION: Version = Version::V1_1;
24+
pub(super) const CURRENT_VERSION: Version = Version::V1_2;
2525

2626
// ================
2727
// Version
@@ -558,8 +558,9 @@ mod test {
558558
let bytes = root_writer
559559
.to_bytes::<&str>()
560560
.expect("To be able to serialize");
561+
// Try a different max_block_size_bytes but it should read from the metadata.
561562
let root_reader =
562-
RootReader::from_bytes::<&str>(&bytes, prefix_path, bf_id, max_block_size_bytes)
563+
RootReader::from_bytes::<&str>(&bytes, prefix_path, bf_id, 4 * 1024 * 1024)
563564
.expect("To be able to deserialize");
564565

565566
// Check that the sparse index is the same
@@ -593,6 +594,112 @@ mod test {
593594

594595
assert_eq!(root_writer.version, root_reader.version);
595596
assert_eq!(root_writer.id, root_reader.id);
597+
assert_eq!(
598+
root_writer.max_block_size_bytes,
599+
root_reader.max_block_size_bytes
600+
);
601+
}
602+
603+
#[test]
604+
fn test_to_from_bytes_fork() {
605+
let block_ids = [
606+
Uuid::new_v4(),
607+
Uuid::new_v4(),
608+
Uuid::new_v4(),
609+
Uuid::new_v4(),
610+
];
611+
let sparse_index = SparseIndexWriter::new(block_ids[0]);
612+
613+
let bf_id = Uuid::new_v4();
614+
let prefix_path = "";
615+
let max_block_size_bytes = 8 * 1024 * 1024; // 8 MiB
616+
let root_writer = RootWriter::new(
617+
CURRENT_VERSION,
618+
bf_id,
619+
sparse_index,
620+
prefix_path.to_string(),
621+
max_block_size_bytes,
622+
);
623+
624+
root_writer
625+
.sparse_index
626+
.add_block(CompositeKey::new("prefix".to_string(), "a"), block_ids[1])
627+
.expect("No error");
628+
root_writer
629+
.sparse_index
630+
.add_block(CompositeKey::new("prefix".to_string(), "b"), block_ids[2])
631+
.expect("No error");
632+
root_writer
633+
.sparse_index
634+
.add_block(CompositeKey::new("prefix".to_string(), "c"), block_ids[3])
635+
.expect("No error");
636+
637+
root_writer
638+
.sparse_index
639+
.set_count(block_ids[0], 1)
640+
.expect("Set count should succeed");
641+
root_writer
642+
.sparse_index
643+
.set_count(block_ids[1], 2)
644+
.expect("Set count should succeed");
645+
root_writer
646+
.sparse_index
647+
.set_count(block_ids[2], 3)
648+
.expect("Set count should succeed");
649+
root_writer
650+
.sparse_index
651+
.set_count(block_ids[3], 4)
652+
.expect("Set count should succeed");
653+
654+
let bytes = root_writer
655+
.to_bytes::<&str>()
656+
.expect("To be able to serialize");
657+
// Try a different max_block_size_bytes but it should read from the metadata.
658+
let root_reader =
659+
RootReader::from_bytes::<&str>(&bytes, prefix_path, bf_id, 4 * 1024 * 1024)
660+
.expect("To be able to deserialize");
661+
let fork_root_writer = root_reader.fork(Uuid::new_v4());
662+
663+
// Check that the sparse index is the same
664+
assert_eq!(
665+
root_writer.sparse_index.len(),
666+
fork_root_writer.sparse_index.len()
667+
);
668+
669+
// Check that the block mapping is the same
670+
for (key, value) in root_writer.sparse_index.data.lock().forward.iter() {
671+
assert_eq!(
672+
fork_root_writer
673+
.sparse_index
674+
.data
675+
.lock()
676+
.forward
677+
.get(key)
678+
.unwrap(),
679+
value
680+
);
681+
}
682+
683+
// Check that counts are the same
684+
let writer_data = &root_writer.sparse_index.data.lock();
685+
for (key, _) in writer_data.forward.iter() {
686+
assert_eq!(
687+
fork_root_writer
688+
.sparse_index
689+
.data
690+
.lock()
691+
.counts
692+
.get(key)
693+
.unwrap(),
694+
writer_data.counts.get(key).unwrap()
695+
);
696+
}
697+
698+
assert_eq!(root_writer.version, fork_root_writer.version);
699+
assert_eq!(
700+
root_writer.max_block_size_bytes,
701+
fork_root_writer.max_block_size_bytes
702+
);
596703
}
597704

598705
#[test]
@@ -614,13 +721,13 @@ mod test {
614721
let prefix_path = "";
615722

616723
let bf_id = Uuid::new_v4();
617-
let max_block_size_bytes = 8 * 1024 * 1024; // 8 MiB
724+
let writer_max_block_size_bytes = 8 * 1024 * 1024; // 8 MiB
618725
let root_writer = RootWriter::new(
619726
Version::V1,
620727
bf_id,
621728
sparse_index,
622729
prefix_path.to_string(),
623-
max_block_size_bytes,
730+
writer_max_block_size_bytes,
624731
);
625732
root_writer
626733
.sparse_index
@@ -641,8 +748,9 @@ mod test {
641748
.to_bytes::<&str>()
642749
.expect("To be able to serialize");
643750

751+
let reader_block_size_bytes = 4 * 1024 * 1024; // 4 MiB
644752
let root_reader =
645-
RootReader::from_bytes::<&str>(&bytes, prefix_path, bf_id, max_block_size_bytes)
753+
RootReader::from_bytes::<&str>(&bytes, prefix_path, bf_id, reader_block_size_bytes)
646754
.expect("To be able to deserialize");
647755

648756
// Check the version is still v1
@@ -661,5 +769,6 @@ mod test {
661769
0
662770
);
663771
}
772+
assert_eq!(root_reader.max_block_size_bytes, reader_block_size_bytes);
664773
}
665774
}

0 commit comments

Comments
 (0)