Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 42 additions & 11 deletions lib/sycamore/sycamore/connectors/file/file_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@

import boto3
import mimetypes
from typing import Any, Optional, Union, Tuple, Callable, TYPE_CHECKING
from typing import Any, Optional, Union, Tuple, Callable, TYPE_CHECKING, cast
import logging

from functools import partial
from pyarrow._fs import FileInfo
from pyarrow.fs import FileSystem, FileSelector
from sycamore.data import Document, mkdocid
from sycamore.plan_nodes import Scan
from sycamore.utils.time_trace import timetrace
from sycamore.materialize import RayPathParser

if TYPE_CHECKING:
from ray.data import Dataset
Expand Down Expand Up @@ -186,6 +188,7 @@ def __init__(
self._binary_format = binary_format
self._metadata_provider = metadata_provider
self._filter_paths_by_extension = filter_paths_by_extension
self._path_filter = None

@timetrace("readBinary")
def _to_document(self, dict: dict[str, Any]) -> dict[str, bytes]:
Expand All @@ -202,7 +205,10 @@ def _to_document(self, dict: dict[str, Any]) -> dict[str, bytes]:
document.properties["filetype"] = self._file_mime_type()
if self._metadata_provider:
document.properties.update(self._metadata_provider.get_metadata(dict["path"]))
if self._path_filter is not None:
from sycamore.materialize import docid_from_path

document = docid_from_path(document)
return {"doc": document.serialize()}

def _file_mime_type(self):
Expand All @@ -218,23 +224,44 @@ def execute(self, **kwargs) -> "Dataset":
file_extensions = [self.format()] if self._filter_paths_by_extension else None

from ray.data import read_binary_files

files = read_binary_files(
self._paths,
include_paths=True,
filesystem=self._filesystem,
override_num_blocks=self.override_num_blocks,
ray_remote_args=self.resource_args,
file_extensions=file_extensions,
)

from ray.data.datasource import PathPartitionFilter, PathPartitionParser

# TODO: Consider refactoring to use kwargs = self._get_read_args() for better extensibility
# when adding new read arguments in the future
partition_filter: Optional[Callable[[dict[str, str]], bool]] = None
if self._path_filter is not None:
partition_filter = PathPartitionFilter(
cast(PathPartitionParser, RayPathParser()), partial(self._path_filter, read=True)
)
shuffle = None if partition_filter is None else "files"

try:
files = read_binary_files(
self._paths,
include_paths=True,
filesystem=self._filesystem,
override_num_blocks=self.override_num_blocks,
ray_remote_args=self.resource_args,
file_extensions=file_extensions,
partition_filter=partition_filter,
shuffle=shuffle,
)
except ValueError as e:

from ray.data import from_items

if self._path_filter is not None and "No input files found to read." in str(e):
return from_items(items=[])
raise
return files.map(self._to_document, **self.resource_args)

def process_file(self, info) -> list[Document]:
if not info.is_file:
return []
if self._filter_paths_by_extension and not info.path.endswith(self.format()):
return []
if self._path_filter is not None and not self._path_filter(info.path, True):
return []

assert self._filesystem
with self._filesystem.open_input_file(info.path) as file:
Expand All @@ -251,6 +278,10 @@ def process_file(self, info) -> list[Document]:
document.properties["path"] = "s3://" + info.path
if self._metadata_provider:
document.properties.update(self._metadata_provider.get_metadata(info.path))
if self._path_filter is not None:
from sycamore.materialize import docid_from_path

document = docid_from_path(document)
return [document]

def format(self):
Expand Down
4 changes: 2 additions & 2 deletions lib/sycamore/sycamore/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,8 @@ def wrapper(*args, **kwargs):
"""
this let's you handle decorator usage like:
@context_params OR
@context_params() OR
@context_params("template") OR
@context_params() OR
@context_params("template") OR
@context_params("template1", "template2")
"""
if len(names) == 1 and callable(names[0]):
Expand Down
8 changes: 8 additions & 0 deletions lib/sycamore/sycamore/data/docid.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,11 @@ def nybbles_to_uuid(nybbles: list[int]) -> str:
rv += "-"
rv += alpha16[nybble]
return rv


def path_to_sha256_docid(path: str) -> str:
from hashlib import sha256

path_hash = sha256(path.encode("utf-8")).hexdigest()
doc_id = f"path-sha256-{path_hash}"
return doc_id
48 changes: 44 additions & 4 deletions lib/sycamore/sycamore/docset.py
Original file line number Diff line number Diff line change
Expand Up @@ -1575,7 +1575,10 @@ def materialize(

from sycamore.materialize import Materialize

return DocSet(self.context, Materialize(self.plan, self.context, path=path, source_mode=source_mode))
return DocSet(
self.context,
Materialize(self.plan, self.context, path=path, source_mode=source_mode),
)

def clear_materialize(self, path: Optional[Union[Path, str]] = None, *, clear_non_local=False) -> None:
"""
Expand All @@ -1595,10 +1598,47 @@ def clear_materialize(self, path: Optional[Union[Path, str]] = None, *, clear_no

def execute(self, **kwargs) -> None:
"""
Execute the pipeline, discard the results. Useful for side effects.
"""

Execute the pipeline and discard the results. This method is primarily used for pipelines that produce
side effects, such as materializing data to disk.

Reliability mode is automatically enabled when:
- The pipeline ends with a Materialize node and the start of the pipeline is a read node.
- A MaterializeReadReliability rule is present in the context's rewrite rules

# Standard execution
ctx = sycamore.init()
ds = ctx.read....
ds.execute() # Runs without reliability guarantees

# Reliable execution with materialize read

ctx = sycamore.init()
ctx.rewrite_rules.append(MaterializeReadReliability(max_batch=200, max_retries=20))
ds = ctx.read.materialize()\
... \
.materialize()
ds.execute() # Runs with batching, retries, and progress tracking


# Reliable execution with binary read

ctx = sycamore.init()
ctx.rewrite_rules.append(MaterializeReadReliability(max_batch=200, max_retries=20))
ds = ctx.read.binary()\
... \
.materialize()
ds.execute() # Runs with batching, retries, and progress tracking

For more details, see the MaterializeReadReliability class.

"""
from sycamore.executor import Execution
from sycamore.materialize import MaterializeReadReliability

for doc in Execution(self.context).execute_iter(self.plan, **kwargs):
if MaterializeReadReliability.maybe_execute_reliably(self):
pass

else:
for doc in Execution(self.context).execute_iter(self.plan, **kwargs):
pass
Loading