Skip to content
This repository was archived by the owner on Mar 3, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 29 additions & 0 deletions crates/sparrow-logical/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
[package]
name = "sparrow-logical"
version.workspace = true
authors.workspace = true
edition.workspace = true
license.workspace = true
publish = false
description = """
Logical representation of Kaskada queries.
"""

[dependencies]
arrow-schema.workspace = true
derive_more.workspace = true
error-stack.workspace = true
hashbrown.workspace = true
itertools.workspace = true
serde.workspace = true
sparrow-arrow = { path = "../sparrow-arrow" }
sparrow-types = { path = "../sparrow-types" }
static_init.workspace = true
serde_yaml.workspace = true
uuid.workspace = true

[dev-dependencies]
insta.workspace = true

[lib]
doctest = false
25 changes: 25 additions & 0 deletions crates/sparrow-logical/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use crate::{ExprRef, Grouping};
use arrow_schema::DataType;

use sparrow_types::DisplayFenlType;

#[derive(derive_more::Display, Debug)]
pub enum Error {
#[display(fmt = "internal error: {_0}")]
Internal(&'static str),
#[display(fmt = "invalid non-struct type: {}", "_0.display()")]
InvalidNonStructType(DataType),
#[display(fmt = "invalid non-string literal: {_0:?}")]
InvalidNonStringLiteral(ExprRef),
// TODO: Include nearest matches?
#[display(fmt = "invalid field name '{name}'")]
InvalidFieldName { name: String },
#[display(fmt = "invalid types")]
InvalidTypes,
#[display(fmt = "incompatible groupings {_0:?}")]
IncompatibleGroupings(Vec<Grouping>),
#[display(fmt = "invalid function: '{_0}'")]
InvalidFunction(String),
}

impl error_stack::Context for Error {}
223 changes: 223 additions & 0 deletions crates/sparrow-logical/src/expr.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
use crate::{Error, Grouping};
use arrow_schema::{DataType, TimeUnit};
use sparrow_types::Types;
use std::borrow::Cow;
use std::sync::Arc;
use uuid::Uuid;

/// Represents an operation applied to 0 or more arguments.
#[derive(Debug)]
pub struct Expr {
/// The instruction being applied by this expression.
pub name: Cow<'static, str>,
/// Zero or more literal-valued arguments.
pub literal_args: Vec<Literal>,
/// Arguments to the expression.
pub args: Vec<ExprRef>,
/// The type produced by the expression.
pub result_type: DataType,
/// The grouping associated with the expression.
pub grouping: Grouping,
}

#[derive(Debug)]
pub enum Literal {
Null,
Bool(bool),
String(String),
Int64(i64),
UInt64(u64),
Float64(f64),
Timedelta { seconds: i64, nanos: i64 },
Uuid(Uuid),
}

impl Expr {
pub fn try_new(
name: Cow<'static, str>,
args: Vec<ExprRef>,
) -> error_stack::Result<Self, Error> {
let Types {
arguments: arg_types,
result: result_type,
} = crate::typecheck::typecheck(name.as_ref(), &args)?;

// If any of the types are different, we'll need to create new arguments.
let args = args
.into_iter()
.zip(arg_types)
.map(|(arg, arg_type)| arg.cast(arg_type))
.collect::<Result<Vec<_>, _>>()?;

let grouping = Grouping::from_args(&args)?;

Ok(Self {
name,
literal_args: vec![],
args,
result_type,
grouping,
})
}

/// Create a new literal node referencing a UUID.
///
/// This can be used for sources, UDFs, etc.
///
/// Generally, the `name` should identify the kind of thing being referenced (source, UDF, etc.)
/// and the `uuid` should identify the specific thing being referenced.
pub fn new_uuid(
name: &'static str,
uuid: Uuid,
result_type: DataType,
grouping: Grouping,
) -> Self {
Self {
name: Cow::Borrowed(name),
literal_args: vec![Literal::Uuid(uuid)],
args: vec![],
result_type,
grouping,
}
}

pub fn new_literal(literal: Literal) -> Self {
let result_type = match literal {
Literal::Null => DataType::Null,
Literal::Bool(_) => DataType::Boolean,
Literal::String(_) => DataType::Utf8,
Literal::Int64(_) => DataType::Int64,
Literal::UInt64(_) => DataType::UInt64,
Literal::Float64(_) => DataType::Float64,
Literal::Timedelta { .. } => DataType::Duration(TimeUnit::Nanosecond),
Literal::Uuid(_) => DataType::FixedSizeBinary(BYTES_IN_UUID),
};
Self {
name: Cow::Borrowed("literal"),
literal_args: vec![literal],
args: vec![],
result_type,
grouping: Grouping::Literal,
}
}

/// Create a new cast expression to the given type.
pub fn cast(self: Arc<Self>, data_type: DataType) -> error_stack::Result<Arc<Self>, Error> {
if self.result_type == data_type {
Ok(self)
} else {
let grouping = self.grouping.clone();
Ok(Arc::new(Expr {
name: Cow::Borrowed("cast"),
literal_args: vec![],
args: vec![self],
result_type: data_type,
grouping,
}))
}
}

/// If this expression is a literal, return the corresponding scalar value.
pub fn literal_opt(&self) -> Option<&Literal> {
if self.name == "literal" {
debug_assert_eq!(self.literal_args.len(), 1);
Some(&self.literal_args[0])
} else {
None
}
}

/// If this expression is a literal string, return it.
///
/// This returns `None` if:
/// 1. This expression is not a literal.
/// 2. This expression is not a string literal.
pub fn literal_str_opt(&self) -> Option<&str> {
self.literal_opt().and_then(|scalar| match scalar {
Literal::String(str) => Some(str.as_str()),
_ => None,
})
}
}

const BYTES_IN_UUID: i32 = (std::mem::size_of::<uuid::Bytes>() / std::mem::size_of::<u8>()) as i32;

/// Reference counted expression.
pub type ExprRef = Arc<Expr>;

#[cfg(test)]
mod tests {
use arrow_schema::Field;

use super::*;

#[test]
fn test_literal() {
let literal = Expr::new_literal(Literal::String("hello".to_owned()));
insta::assert_debug_snapshot!(literal);
}

#[test]
fn test_fieldref() {
let uuid = Uuid::from_u64_pair(42, 84);
let source = Arc::new(Expr::new_uuid(
"source",
uuid,
DataType::Struct(
vec![
Field::new("a", DataType::Int64, true),
Field::new("b", DataType::Float64, false),
]
.into(),
),
Grouping::Literal,
));
let field = Expr::try_new(
"fieldref".into(),
vec![
source,
Arc::new(Expr::new_literal(Literal::String("a".to_owned()))),
],
)
.unwrap();
insta::assert_debug_snapshot!(field);
}

#[test]
fn test_function() {
let uuid = Uuid::from_u64_pair(42, 84);
let source = Arc::new(Expr::new_uuid(
"source",
uuid,
DataType::Struct(
vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Float64, false),
]
.into(),
),
Grouping::Literal,
));
let a_i32 = Arc::new(
Expr::try_new(
"fieldref".into(),
vec![
source,
Arc::new(Expr::new_literal(Literal::String("a".to_owned()))),
],
)
.unwrap(),
);

// i32 + f64 literal => f64
let a_i32_plus_1 = Expr::try_new(
"add".into(),
vec![
a_i32.clone(),
Arc::new(Expr::new_literal(Literal::Float64(1.0))),
],
)
.unwrap();
insta::assert_debug_snapshot!(a_i32_plus_1);
}
}
6 changes: 6 additions & 0 deletions crates/sparrow-logical/src/functions.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
- name: add
signature: "<T: number>(x: T, y: T) -> T"
- name: sub
signature: "<T: number>(x: T, y: T) -> T"
- name: eq
signature: "<T: any>(x: T, y: T) -> bool"
35 changes: 35 additions & 0 deletions crates/sparrow-logical/src/grouping.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use itertools::Itertools;

use crate::{Error, ExprRef};

/// A wrapper around a u32 identifying a distinct grouping.
#[derive(Debug, Eq, PartialEq, Clone, Copy, Hash, Ord, PartialOrd)]
#[repr(transparent)]
pub struct GroupId(u32);

/// The grouping associated with an expression.
#[derive(Debug, PartialEq, Eq, Hash, Clone)]
pub enum Grouping {
Literal,
Group(GroupId),
}

impl Grouping {
pub fn from_args(args: &[ExprRef]) -> error_stack::Result<Grouping, Error> {
let groupings = args
.iter()
.map(|arg| &arg.grouping)
.unique()
.filter(|g| **g == Grouping::Literal)
.cloned();

match groupings.at_most_one() {
Ok(None) => Ok(Grouping::Literal),
Ok(Some(grouping)) => Ok(grouping),
Err(groupings) => {
let groupings: Vec<_> = groupings.collect();
error_stack::bail!(Error::IncompatibleGroupings(groupings))
}
}
}
}
19 changes: 19 additions & 0 deletions crates/sparrow-logical/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#![warn(
rust_2018_idioms,
nonstandard_style,
future_incompatible,
clippy::mod_module_files,
clippy::print_stdout,
clippy::print_stderr,
clippy::undocumented_unsafe_blocks
)]

//! Logical execution plans for Kaskada queries.
mod error;
mod expr;
mod grouping;
mod typecheck;

pub use error::*;
pub use expr::*;
pub use grouping::*;
Loading