This repository was archived by the owner on Mar 3, 2026. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 17
ref: Introduce logical plan creation #756
Merged
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,29 @@ | ||
| [package] | ||
| name = "sparrow-logical" | ||
| version.workspace = true | ||
| authors.workspace = true | ||
| edition.workspace = true | ||
| license.workspace = true | ||
| publish = false | ||
| description = """ | ||
| Logical representation of Kaskada queries. | ||
| """ | ||
|
|
||
| [dependencies] | ||
| arrow-schema.workspace = true | ||
| derive_more.workspace = true | ||
| error-stack.workspace = true | ||
| hashbrown.workspace = true | ||
| itertools.workspace = true | ||
| serde.workspace = true | ||
| sparrow-arrow = { path = "../sparrow-arrow" } | ||
| sparrow-types = { path = "../sparrow-types" } | ||
| static_init.workspace = true | ||
| serde_yaml.workspace = true | ||
| uuid.workspace = true | ||
|
|
||
| [dev-dependencies] | ||
| insta.workspace = true | ||
|
|
||
| [lib] | ||
| doctest = false |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,25 @@ | ||
| use crate::{ExprRef, Grouping}; | ||
| use arrow_schema::DataType; | ||
|
|
||
| use sparrow_types::DisplayFenlType; | ||
|
|
||
| #[derive(derive_more::Display, Debug)] | ||
| pub enum Error { | ||
| #[display(fmt = "internal error: {_0}")] | ||
| Internal(&'static str), | ||
| #[display(fmt = "invalid non-struct type: {}", "_0.display()")] | ||
| InvalidNonStructType(DataType), | ||
| #[display(fmt = "invalid non-string literal: {_0:?}")] | ||
| InvalidNonStringLiteral(ExprRef), | ||
| // TODO: Include nearest matches? | ||
| #[display(fmt = "invalid field name '{name}'")] | ||
| InvalidFieldName { name: String }, | ||
| #[display(fmt = "invalid types")] | ||
| InvalidTypes, | ||
| #[display(fmt = "incompatible groupings {_0:?}")] | ||
| IncompatibleGroupings(Vec<Grouping>), | ||
| #[display(fmt = "invalid function: '{_0}'")] | ||
| InvalidFunction(String), | ||
| } | ||
|
|
||
| impl error_stack::Context for Error {} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,223 @@ | ||
| use crate::{Error, Grouping}; | ||
| use arrow_schema::{DataType, TimeUnit}; | ||
| use sparrow_types::Types; | ||
| use std::borrow::Cow; | ||
| use std::sync::Arc; | ||
| use uuid::Uuid; | ||
|
|
||
| /// Represents an operation applied to 0 or more arguments. | ||
| #[derive(Debug)] | ||
| pub struct Expr { | ||
| /// The instruction being applied by this expression. | ||
| pub name: Cow<'static, str>, | ||
| /// Zero or more literal-valued arguments. | ||
| pub literal_args: Vec<Literal>, | ||
| /// Arguments to the expression. | ||
| pub args: Vec<ExprRef>, | ||
| /// The type produced by the expression. | ||
| pub result_type: DataType, | ||
| /// The grouping associated with the expression. | ||
| pub grouping: Grouping, | ||
| } | ||
|
|
||
| #[derive(Debug)] | ||
| pub enum Literal { | ||
| Null, | ||
| Bool(bool), | ||
| String(String), | ||
| Int64(i64), | ||
| UInt64(u64), | ||
| Float64(f64), | ||
| Timedelta { seconds: i64, nanos: i64 }, | ||
| Uuid(Uuid), | ||
| } | ||
|
|
||
| impl Expr { | ||
| pub fn try_new( | ||
| name: Cow<'static, str>, | ||
| args: Vec<ExprRef>, | ||
| ) -> error_stack::Result<Self, Error> { | ||
| let Types { | ||
| arguments: arg_types, | ||
| result: result_type, | ||
| } = crate::typecheck::typecheck(name.as_ref(), &args)?; | ||
|
|
||
| // If any of the types are different, we'll need to create new arguments. | ||
| let args = args | ||
| .into_iter() | ||
| .zip(arg_types) | ||
| .map(|(arg, arg_type)| arg.cast(arg_type)) | ||
| .collect::<Result<Vec<_>, _>>()?; | ||
|
|
||
| let grouping = Grouping::from_args(&args)?; | ||
|
|
||
| Ok(Self { | ||
| name, | ||
| literal_args: vec![], | ||
| args, | ||
| result_type, | ||
| grouping, | ||
| }) | ||
| } | ||
|
|
||
| /// Create a new literal node referencing a UUID. | ||
| /// | ||
| /// This can be used for sources, UDFs, etc. | ||
| /// | ||
| /// Generally, the `name` should identify the kind of thing being referenced (source, UDF, etc.) | ||
| /// and the `uuid` should identify the specific thing being referenced. | ||
| pub fn new_uuid( | ||
| name: &'static str, | ||
| uuid: Uuid, | ||
| result_type: DataType, | ||
| grouping: Grouping, | ||
| ) -> Self { | ||
| Self { | ||
| name: Cow::Borrowed(name), | ||
| literal_args: vec![Literal::Uuid(uuid)], | ||
| args: vec![], | ||
| result_type, | ||
| grouping, | ||
| } | ||
| } | ||
|
|
||
| pub fn new_literal(literal: Literal) -> Self { | ||
| let result_type = match literal { | ||
| Literal::Null => DataType::Null, | ||
| Literal::Bool(_) => DataType::Boolean, | ||
| Literal::String(_) => DataType::Utf8, | ||
| Literal::Int64(_) => DataType::Int64, | ||
| Literal::UInt64(_) => DataType::UInt64, | ||
| Literal::Float64(_) => DataType::Float64, | ||
| Literal::Timedelta { .. } => DataType::Duration(TimeUnit::Nanosecond), | ||
| Literal::Uuid(_) => DataType::FixedSizeBinary(BYTES_IN_UUID), | ||
| }; | ||
| Self { | ||
| name: Cow::Borrowed("literal"), | ||
| literal_args: vec![literal], | ||
| args: vec![], | ||
| result_type, | ||
| grouping: Grouping::Literal, | ||
| } | ||
| } | ||
|
|
||
| /// Create a new cast expression to the given type. | ||
| pub fn cast(self: Arc<Self>, data_type: DataType) -> error_stack::Result<Arc<Self>, Error> { | ||
| if self.result_type == data_type { | ||
| Ok(self) | ||
| } else { | ||
| let grouping = self.grouping.clone(); | ||
| Ok(Arc::new(Expr { | ||
| name: Cow::Borrowed("cast"), | ||
| literal_args: vec![], | ||
| args: vec![self], | ||
| result_type: data_type, | ||
| grouping, | ||
| })) | ||
| } | ||
| } | ||
|
|
||
| /// If this expression is a literal, return the corresponding scalar value. | ||
| pub fn literal_opt(&self) -> Option<&Literal> { | ||
| if self.name == "literal" { | ||
| debug_assert_eq!(self.literal_args.len(), 1); | ||
| Some(&self.literal_args[0]) | ||
| } else { | ||
| None | ||
| } | ||
| } | ||
|
|
||
| /// If this expression is a literal string, return it. | ||
| /// | ||
| /// This returns `None` if: | ||
| /// 1. This expression is not a literal. | ||
| /// 2. This expression is not a string literal. | ||
| pub fn literal_str_opt(&self) -> Option<&str> { | ||
| self.literal_opt().and_then(|scalar| match scalar { | ||
| Literal::String(str) => Some(str.as_str()), | ||
| _ => None, | ||
| }) | ||
| } | ||
| } | ||
|
|
||
| const BYTES_IN_UUID: i32 = (std::mem::size_of::<uuid::Bytes>() / std::mem::size_of::<u8>()) as i32; | ||
|
|
||
| /// Reference counted expression. | ||
| pub type ExprRef = Arc<Expr>; | ||
|
|
||
| #[cfg(test)] | ||
| mod tests { | ||
| use arrow_schema::Field; | ||
|
|
||
| use super::*; | ||
|
|
||
| #[test] | ||
| fn test_literal() { | ||
| let literal = Expr::new_literal(Literal::String("hello".to_owned())); | ||
| insta::assert_debug_snapshot!(literal); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_fieldref() { | ||
| let uuid = Uuid::from_u64_pair(42, 84); | ||
| let source = Arc::new(Expr::new_uuid( | ||
| "source", | ||
| uuid, | ||
| DataType::Struct( | ||
| vec![ | ||
| Field::new("a", DataType::Int64, true), | ||
| Field::new("b", DataType::Float64, false), | ||
| ] | ||
| .into(), | ||
| ), | ||
| Grouping::Literal, | ||
| )); | ||
| let field = Expr::try_new( | ||
| "fieldref".into(), | ||
| vec![ | ||
| source, | ||
| Arc::new(Expr::new_literal(Literal::String("a".to_owned()))), | ||
| ], | ||
| ) | ||
| .unwrap(); | ||
| insta::assert_debug_snapshot!(field); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_function() { | ||
| let uuid = Uuid::from_u64_pair(42, 84); | ||
| let source = Arc::new(Expr::new_uuid( | ||
| "source", | ||
| uuid, | ||
| DataType::Struct( | ||
| vec![ | ||
| Field::new("a", DataType::Int32, true), | ||
| Field::new("b", DataType::Float64, false), | ||
| ] | ||
| .into(), | ||
| ), | ||
| Grouping::Literal, | ||
| )); | ||
| let a_i32 = Arc::new( | ||
| Expr::try_new( | ||
| "fieldref".into(), | ||
| vec![ | ||
| source, | ||
| Arc::new(Expr::new_literal(Literal::String("a".to_owned()))), | ||
| ], | ||
| ) | ||
| .unwrap(), | ||
| ); | ||
|
|
||
| // i32 + f64 literal => f64 | ||
| let a_i32_plus_1 = Expr::try_new( | ||
| "add".into(), | ||
| vec![ | ||
| a_i32.clone(), | ||
| Arc::new(Expr::new_literal(Literal::Float64(1.0))), | ||
| ], | ||
| ) | ||
| .unwrap(); | ||
| insta::assert_debug_snapshot!(a_i32_plus_1); | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| - name: add | ||
| signature: "<T: number>(x: T, y: T) -> T" | ||
| - name: sub | ||
| signature: "<T: number>(x: T, y: T) -> T" | ||
| - name: eq | ||
| signature: "<T: any>(x: T, y: T) -> bool" | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,35 @@ | ||
| use itertools::Itertools; | ||
|
|
||
| use crate::{Error, ExprRef}; | ||
|
|
||
| /// A wrapper around a u32 identifying a distinct grouping. | ||
| #[derive(Debug, Eq, PartialEq, Clone, Copy, Hash, Ord, PartialOrd)] | ||
| #[repr(transparent)] | ||
| pub struct GroupId(u32); | ||
|
|
||
| /// The grouping associated with an expression. | ||
| #[derive(Debug, PartialEq, Eq, Hash, Clone)] | ||
| pub enum Grouping { | ||
| Literal, | ||
| Group(GroupId), | ||
| } | ||
|
|
||
| impl Grouping { | ||
| pub fn from_args(args: &[ExprRef]) -> error_stack::Result<Grouping, Error> { | ||
| let groupings = args | ||
| .iter() | ||
| .map(|arg| &arg.grouping) | ||
| .unique() | ||
| .filter(|g| **g == Grouping::Literal) | ||
| .cloned(); | ||
|
|
||
| match groupings.at_most_one() { | ||
| Ok(None) => Ok(Grouping::Literal), | ||
| Ok(Some(grouping)) => Ok(grouping), | ||
| Err(groupings) => { | ||
| let groupings: Vec<_> = groupings.collect(); | ||
| error_stack::bail!(Error::IncompatibleGroupings(groupings)) | ||
| } | ||
| } | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,19 @@ | ||
| #![warn( | ||
| rust_2018_idioms, | ||
| nonstandard_style, | ||
| future_incompatible, | ||
| clippy::mod_module_files, | ||
| clippy::print_stdout, | ||
| clippy::print_stderr, | ||
| clippy::undocumented_unsafe_blocks | ||
| )] | ||
|
|
||
| //! Logical execution plans for Kaskada queries. | ||
| mod error; | ||
| mod expr; | ||
| mod grouping; | ||
| mod typecheck; | ||
|
|
||
| pub use error::*; | ||
| pub use expr::*; | ||
| pub use grouping::*; |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.