Apache Flink Connector for Lance Vector Database
Features β’ Quick Start β’ Documentation β’ Contributing
flink-connector-lance is a high-performance Apache Flink connector for Lance, a modern columnar data format optimized for machine learning workloads and vector search. This connector enables seamless integration between Flink's powerful stream/batch processing capabilities and Lance's efficient vector storage.
| Feature | Description |
|---|---|
| π Source & Sink | Full read/write support for Lance datasets |
| π Table API & SQL | Native Flink SQL DDL/DML support |
| π Vector Search | KNN search with L2, Cosine, Dot metrics |
| π Index Building | IVF_PQ, IVF_HNSW, IVF_FLAT indexes |
| β Exactly-Once | Checkpoint-based exactly-once semantics |
| π― Predicate Pushdown | Filter pushdown for optimized reads |
| π Catalog Support | Lance Catalog for metadata management |
- Java 8 or higher
- Apache Flink 1.16.x
- Maven 3.6+
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-lance</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>-- Create a Lance table
CREATE TABLE vectors (
id BIGINT,
content STRING,
embedding ARRAY<FLOAT>
) WITH (
'connector' = 'lance',
'path' = '/data/vectors',
'write.batch-size' = '1024'
);
-- Insert data
INSERT INTO vectors VALUES
(1, 'Hello World', ARRAY[0.1, 0.2, 0.3, 0.4]);
-- Query data
SELECT * FROM vectors WHERE id > 0;// Read from Lance
LanceSource source = LanceSource.builder()
.path("/data/vectors")
.batchSize(1024)
.columns(Arrays.asList("id", "embedding"))
.rowType(rowType)
.build();
DataStream<RowData> stream = env.addSource(source);
// Write to Lance
LanceSink sink = LanceSink.builder()
.path("/data/output")
.batchSize(1024)
.writeMode(WriteMode.APPEND)
.rowType(rowType)
.build();
stream.addSink(sink);CREATE CATALOG lance_catalog WITH (
'type' = 'lance',
'warehouse' = '/path/to/warehouse',
'default-database' = 'default'
);
USE CATALOG lance_catalog;| Option | Required | Default | Description |
|---|---|---|---|
connector |
β | - | Must be lance |
path |
β | - | Lance dataset path |
read.batch-size |
β | 1024 | Read batch size |
write.batch-size |
β | 1024 | Write batch size |
write.mode |
β | append | append or overwrite |
write.max-rows-per-file |
β | 1000000 | Max rows per file |
LanceIndexBuilder builder = LanceIndexBuilder.builder()
.datasetPath("/data/vectors")
.columnName("embedding")
.indexType(IndexType.IVF_PQ)
.metricType(MetricType.L2)
.numPartitions(256)
.numSubVectors(16)
.build();
builder.buildIndex();| Index Type | Use Case | Parameters |
|---|---|---|
| IVF_PQ | Large-scale, memory-efficient | num_partitions, num_sub_vectors, num_bits |
| IVF_HNSW | High recall, fast search | num_partitions, m, ef_construction |
| IVF_FLAT | Small datasets, exact search | num_partitions |
| Scenario | Recommended Index | Reason |
|---|---|---|
| < 100K vectors | IVF_FLAT | High accuracy, acceptable performance |
| 100K - 10M vectors | IVF_PQ | Good balance of accuracy and memory |
| > 10M vectors | IVF_PQ (tuned) | Optimize num_partitions and num_sub_vectors |
| High recall required | IVF_HNSW | Best accuracy, higher memory usage |
| Memory constrained | IVF_PQ | Most memory efficient |
| Real-time search | IVF_HNSW | Fastest query latency |
| Option | Default | Description |
|---|---|---|
index.type |
IVF_PQ | Index type: IVF_PQ, IVF_HNSW, IVF_FLAT |
index.column |
- | Vector column name for indexing |
index.num-partitions |
256 | Number of IVF partitions |
index.num-sub-vectors |
16 | PQ sub-vectors (IVF_PQ only) |
index.num-bits |
8 | Bits per sub-vector (IVF_PQ only) |
index.m |
16 | HNSW max connections (IVF_HNSW only) |
index.ef-construction |
100 | HNSW build quality (IVF_HNSW only) |
-- Create table with IVF_PQ index
CREATE TABLE doc_embeddings (
doc_id BIGINT,
title STRING,
embedding ARRAY<FLOAT>
) WITH (
'connector' = 'lance',
'path' = '/data/embeddings',
'index.type' = 'IVF_PQ',
'index.column' = 'embedding',
'index.num-partitions' = '256',
'index.num-sub-vectors' = '16',
'vector.metric' = 'COSINE'
);
-- Create table with IVF_HNSW index (high accuracy)
CREATE TABLE high_accuracy_vectors (
id BIGINT,
vector ARRAY<FLOAT>
) WITH (
'connector' = 'lance',
'path' = '/data/ha_vectors',
'index.type' = 'IVF_HNSW',
'index.column' = 'vector',
'index.num-partitions' = '128',
'index.m' = '32',
'index.ef-construction' = '200',
'vector.metric' = 'L2'
);LanceVectorSearch search = LanceVectorSearch.builder()
.datasetPath("/data/vectors")
.columnName("embedding")
.metricType(MetricType.COSINE)
.nprobes(20)
.build();
search.open();
List<SearchResult> results = search.search(queryVector, 10);
search.close();| Metric | Description | Range |
|---|---|---|
| L2 | Euclidean distance | [0, β) |
| Cosine | Cosine similarity | [-1, 1] |
| Dot | Inner product | (-β, β) |
| Lance/Arrow Type | Flink Type |
|---|---|
| Int8/16/32/64 | TINYINT/SMALLINT/INT/BIGINT |
| Float32/64 | FLOAT/DOUBLE |
| String | STRING |
| Boolean | BOOLEAN |
| Binary | BYTES |
| Date32 | DATE |
| Timestamp | TIMESTAMP |
| FixedSizeList<Float> | ARRAY<FLOAT> |
flink-connector-lance/
βββ src/main/java/org/apache/flink/connector/lance/
β βββ LanceSource.java # Source implementation
β βββ LanceSink.java # Sink with checkpointing
β βββ LanceInputFormat.java # Batch input format
β βββ LanceIndexBuilder.java # Vector index builder
β βββ LanceVectorSearch.java # KNN search
β βββ config/
β β βββ LanceOptions.java # Configuration options
β βββ converter/
β β βββ LanceTypeConverter.java
β β βββ RowDataConverter.java
β βββ table/
β βββ LanceDynamicTableFactory.java
β βββ LanceDynamicTableSource.java
β βββ LanceDynamicTableSink.java
β βββ LanceCatalog.java
β βββ LanceVectorSearchFunction.java
βββ src/test/java/ # Unit & integration tests
# Build without tests
mvn clean package -DskipTests
# Build with tests
mvn clean package
# Run tests only
mvn testContributions are welcome! Please feel free to submit a Pull Request.
- Fork the repository
- Create your feature branch (
git checkout -b feature/AmazingFeature) - Commit your changes (
git commit -m 'Add some AmazingFeature') - Push to the branch (
git push origin feature/AmazingFeature) - Open a Pull Request
- Streaming CDC support
- Delta Lake interoperability
- Distributed index building
- GPU acceleration support
- Python Table API support
This project is licensed under the Apache License 2.0 - see the LICENSE file for details.
- Apache Flink - Stateful computations over data streams
- LanceDB - Modern columnar data format for ML
- Apache Arrow - Cross-language development platform
Made with β€οΈ by the Flink Connector Lance Contributors

