|
1 | 1 | import json |
2 | 2 | from abc import ABC, abstractmethod |
| 3 | +from io import BytesIO |
| 4 | + |
3 | 5 | import boto3 |
4 | 6 | import mimetypes |
5 | 7 | from typing import Any, Optional, Union, Tuple, Callable, TYPE_CHECKING |
@@ -288,6 +290,46 @@ def execute(self, **kwargs) -> "Dataset": |
288 | 290 | doc_extractor = self._doc_extractor if self._doc_extractor else self._to_document |
289 | 291 | return json_dataset.flat_map(doc_extractor, **self.resource_args) |
290 | 292 |
|
| 293 | + def local_source(self, **kwargs) -> list[Document]: |
| 294 | + if isinstance(self._paths, str): |
| 295 | + paths = [self._paths] |
| 296 | + else: |
| 297 | + paths = self._paths |
| 298 | + |
| 299 | + documents = [] |
| 300 | + |
| 301 | + def process_file(info): |
| 302 | + if not info.is_file: |
| 303 | + return |
| 304 | + |
| 305 | + with self._filesystem.open_input_file(info.path) as file: |
| 306 | + import pyarrow.json as pyjson |
| 307 | + import pyarrow |
| 308 | + |
| 309 | + buffer: pyarrow.lib.Buffer = file.read_buffer() |
| 310 | + table = pyjson.read_json(BytesIO(buffer)) |
| 311 | + rows = table.to_pylist() |
| 312 | + for row in rows: |
| 313 | + row["path"] = path |
| 314 | + doc_extractor = self._doc_extractor if self._doc_extractor else self._to_document |
| 315 | + docs = [doc_extractor(row)[0] for row in rows] |
| 316 | + documents.extend(docs) |
| 317 | + |
| 318 | + for orig_path in paths: |
| 319 | + from sycamore.utils.pyarrow import cross_check_infer_fs |
| 320 | + |
| 321 | + (filesystem, path) = cross_check_infer_fs(self._filesystem, orig_path) |
| 322 | + if self._filesystem is None: |
| 323 | + self._filesystem = filesystem |
| 324 | + |
| 325 | + path_info = filesystem.get_file_info(path) |
| 326 | + if path_info.is_file: |
| 327 | + process_file(path_info) |
| 328 | + else: |
| 329 | + for info in filesystem.get_file_info(FileSelector(path, recursive=True)): |
| 330 | + process_file(info) |
| 331 | + return documents |
| 332 | + |
291 | 333 | def format(self): |
292 | 334 | return "json" |
293 | 335 |
|
@@ -321,5 +363,42 @@ def execute(self, **kwargs) -> "Dataset": |
321 | 363 | ) |
322 | 364 | return ds.flat_map(self.json_as_document, **self.resource_args) |
323 | 365 |
|
| 366 | + def local_source(self, **kwargs) -> list[Document]: |
| 367 | + if isinstance(self._paths, str): |
| 368 | + paths = [self._paths] |
| 369 | + else: |
| 370 | + paths = self._paths |
| 371 | + |
| 372 | + documents = [] |
| 373 | + |
| 374 | + def process_file(info): |
| 375 | + if not info.is_file: |
| 376 | + return |
| 377 | + |
| 378 | + with self._filesystem.open_input_file(info.path) as file: |
| 379 | + import pyarrow.json as pyjson |
| 380 | + import pyarrow |
| 381 | + |
| 382 | + buffer: pyarrow.lib.Buffer = file.read_buffer() |
| 383 | + table = pyjson.read_json(BytesIO(buffer)) |
| 384 | + rows = table.to_pylist() |
| 385 | + docs = [Document(row) for row in rows] |
| 386 | + documents.extend(docs) |
| 387 | + |
| 388 | + for orig_path in paths: |
| 389 | + from sycamore.utils.pyarrow import cross_check_infer_fs |
| 390 | + |
| 391 | + (filesystem, path) = cross_check_infer_fs(self._filesystem, orig_path) |
| 392 | + if self._filesystem is None: |
| 393 | + self._filesystem = filesystem |
| 394 | + |
| 395 | + path_info = filesystem.get_file_info(path) |
| 396 | + if path_info.is_file: |
| 397 | + process_file(path_info) |
| 398 | + else: |
| 399 | + for info in filesystem.get_file_info(FileSelector(path, recursive=True)): |
| 400 | + process_file(info) |
| 401 | + return documents |
| 402 | + |
324 | 403 | def format(self): |
325 | 404 | return "jsonl" |
0 commit comments