Skip to content

lance-format/lance-flink

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

6 Commits
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

Flink Logo Β Β Β Β  Lance Logo

Flink Connector Lance

Apache Flink Connector for Lance Vector Database

License Build Status Flink Version Lance Version Java Version

Features β€’ Quick Start β€’ Documentation β€’ Contributing


πŸ“– Overview

flink-connector-lance is a high-performance Apache Flink connector for Lance, a modern columnar data format optimized for machine learning workloads and vector search. This connector enables seamless integration between Flink's powerful stream/batch processing capabilities and Lance's efficient vector storage.

✨ Features

Feature Description
πŸ”„ Source & Sink Full read/write support for Lance datasets
πŸ“Š Table API & SQL Native Flink SQL DDL/DML support
πŸ” Vector Search KNN search with L2, Cosine, Dot metrics
πŸ“‡ Index Building IVF_PQ, IVF_HNSW, IVF_FLAT indexes
βœ… Exactly-Once Checkpoint-based exactly-once semantics
🎯 Predicate Pushdown Filter pushdown for optimized reads
πŸ“ Catalog Support Lance Catalog for metadata management

πŸš€ Quick Start

Prerequisites

  • Java 8 or higher
  • Apache Flink 1.16.x
  • Maven 3.6+

Maven Dependency

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-lance</artifactId>
    <version>1.0.0-SNAPSHOT</version>
</dependency>

Flink SQL Example

-- Create a Lance table
CREATE TABLE vectors (
    id BIGINT,
    content STRING,
    embedding ARRAY<FLOAT>
) WITH (
    'connector' = 'lance',
    'path' = '/data/vectors',
    'write.batch-size' = '1024'
);

-- Insert data
INSERT INTO vectors VALUES 
    (1, 'Hello World', ARRAY[0.1, 0.2, 0.3, 0.4]);

-- Query data
SELECT * FROM vectors WHERE id > 0;

DataStream API Example

// Read from Lance
LanceSource source = LanceSource.builder()
    .path("/data/vectors")
    .batchSize(1024)
    .columns(Arrays.asList("id", "embedding"))
    .rowType(rowType)
    .build();

DataStream<RowData> stream = env.addSource(source);

// Write to Lance
LanceSink sink = LanceSink.builder()
    .path("/data/output")
    .batchSize(1024)
    .writeMode(WriteMode.APPEND)
    .rowType(rowType)
    .build();

stream.addSink(sink);

πŸ“š Documentation

Table API / SQL

Create Catalog

CREATE CATALOG lance_catalog WITH (
    'type' = 'lance',
    'warehouse' = '/path/to/warehouse',
    'default-database' = 'default'
);

USE CATALOG lance_catalog;

Table Options

Option Required Default Description
connector βœ… - Must be lance
path βœ… - Lance dataset path
read.batch-size ❌ 1024 Read batch size
write.batch-size ❌ 1024 Write batch size
write.mode ❌ append append or overwrite
write.max-rows-per-file ❌ 1000000 Max rows per file

Vector Index Building

LanceIndexBuilder builder = LanceIndexBuilder.builder()
    .datasetPath("/data/vectors")
    .columnName("embedding")
    .indexType(IndexType.IVF_PQ)
    .metricType(MetricType.L2)
    .numPartitions(256)
    .numSubVectors(16)
    .build();

builder.buildIndex();

Supported Index Types

Index Type Use Case Parameters
IVF_PQ Large-scale, memory-efficient num_partitions, num_sub_vectors, num_bits
IVF_HNSW High recall, fast search num_partitions, m, ef_construction
IVF_FLAT Small datasets, exact search num_partitions

πŸ“Š Index Selection Guide

Scenario Recommended Index Reason
< 100K vectors IVF_FLAT High accuracy, acceptable performance
100K - 10M vectors IVF_PQ Good balance of accuracy and memory
> 10M vectors IVF_PQ (tuned) Optimize num_partitions and num_sub_vectors
High recall required IVF_HNSW Best accuracy, higher memory usage
Memory constrained IVF_PQ Most memory efficient
Real-time search IVF_HNSW Fastest query latency

βš™οΈ Index Configuration Options

Option Default Description
index.type IVF_PQ Index type: IVF_PQ, IVF_HNSW, IVF_FLAT
index.column - Vector column name for indexing
index.num-partitions 256 Number of IVF partitions
index.num-sub-vectors 16 PQ sub-vectors (IVF_PQ only)
index.num-bits 8 Bits per sub-vector (IVF_PQ only)
index.m 16 HNSW max connections (IVF_HNSW only)
index.ef-construction 100 HNSW build quality (IVF_HNSW only)

πŸ”§ Index Building SQL Example

-- Create table with IVF_PQ index
CREATE TABLE doc_embeddings (
    doc_id BIGINT,
    title STRING,
    embedding ARRAY<FLOAT>
) WITH (
    'connector' = 'lance',
    'path' = '/data/embeddings',
    'index.type' = 'IVF_PQ',
    'index.column' = 'embedding',
    'index.num-partitions' = '256',
    'index.num-sub-vectors' = '16',
    'vector.metric' = 'COSINE'
);

-- Create table with IVF_HNSW index (high accuracy)
CREATE TABLE high_accuracy_vectors (
    id BIGINT,
    vector ARRAY<FLOAT>
) WITH (
    'connector' = 'lance',
    'path' = '/data/ha_vectors',
    'index.type' = 'IVF_HNSW',
    'index.column' = 'vector',
    'index.num-partitions' = '128',
    'index.m' = '32',
    'index.ef-construction' = '200',
    'vector.metric' = 'L2'
);

Vector Search

LanceVectorSearch search = LanceVectorSearch.builder()
    .datasetPath("/data/vectors")
    .columnName("embedding")
    .metricType(MetricType.COSINE)
    .nprobes(20)
    .build();

search.open();
List<SearchResult> results = search.search(queryVector, 10);
search.close();

Distance Metrics

Metric Description Range
L2 Euclidean distance [0, ∞)
Cosine Cosine similarity [-1, 1]
Dot Inner product (-∞, ∞)

Type Mapping

Lance/Arrow Type Flink Type
Int8/16/32/64 TINYINT/SMALLINT/INT/BIGINT
Float32/64 FLOAT/DOUBLE
String STRING
Boolean BOOLEAN
Binary BYTES
Date32 DATE
Timestamp TIMESTAMP
FixedSizeList<Float> ARRAY<FLOAT>

πŸ—οΈ Project Structure

flink-connector-lance/
β”œβ”€β”€ src/main/java/org/apache/flink/connector/lance/
β”‚   β”œβ”€β”€ LanceSource.java           # Source implementation
β”‚   β”œβ”€β”€ LanceSink.java             # Sink with checkpointing
β”‚   β”œβ”€β”€ LanceInputFormat.java      # Batch input format
β”‚   β”œβ”€β”€ LanceIndexBuilder.java     # Vector index builder
β”‚   β”œβ”€β”€ LanceVectorSearch.java     # KNN search
β”‚   β”œβ”€β”€ config/
β”‚   β”‚   └── LanceOptions.java      # Configuration options
β”‚   β”œβ”€β”€ converter/
β”‚   β”‚   β”œβ”€β”€ LanceTypeConverter.java
β”‚   β”‚   └── RowDataConverter.java
β”‚   └── table/
β”‚       β”œβ”€β”€ LanceDynamicTableFactory.java
β”‚       β”œβ”€β”€ LanceDynamicTableSource.java
β”‚       β”œβ”€β”€ LanceDynamicTableSink.java
β”‚       β”œβ”€β”€ LanceCatalog.java
β”‚       └── LanceVectorSearchFunction.java
└── src/test/java/                 # Unit & integration tests

πŸ”§ Build

# Build without tests
mvn clean package -DskipTests

# Build with tests
mvn clean package

# Run tests only
mvn test

🀝 Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

  1. Fork the repository
  2. Create your feature branch (git checkout -b feature/AmazingFeature)
  3. Commit your changes (git commit -m 'Add some AmazingFeature')
  4. Push to the branch (git push origin feature/AmazingFeature)
  5. Open a Pull Request

πŸ“‹ Roadmap

  • Streaming CDC support
  • Delta Lake interoperability
  • Distributed index building
  • GPU acceleration support
  • Python Table API support

πŸ“„ License

This project is licensed under the Apache License 2.0 - see the LICENSE file for details.

πŸ™ Acknowledgments


Made with ❀️ by the Flink Connector Lance Contributors

About

Apache Flink Connector for Lance Vector Database

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages