Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
9d24f31
Make error code bounds checking look nicer
MarkLindblad Jan 24, 2025
d6f2ce4
Rename `_set_stream` to `_should_stream`
MarkLindblad Jan 24, 2025
82ff70d
Add comment in docstring about how cancellation can fail if the speci…
MarkLindblad Jan 24, 2025
57b2f1f
Make `partition_file_async_list` example output more concise
MarkLindblad Jan 24, 2025
5b7080a
Refactor shared header code into function, add `User-Agent`
MarkLindblad Jan 27, 2025
f14be07
Remove `test_partition_file_async`
MarkLindblad Jan 27, 2025
2cb94f8
Remove example from `partition_file_async_list` docstring
MarkLindblad Jan 27, 2025
6968d85
Keep all async examples in aryn-sdk README.md, point there from docst…
MarkLindblad Jan 27, 2025
56afd3a
Consolidate url rewriting into one function
MarkLindblad Jan 27, 2025
796c442
Add comment making behavior more obvious
MarkLindblad Jan 27, 2025
5c9e7a9
Remove dependence on `importlib`
MarkLindblad Jan 28, 2025
ec8f348
Make `_convert_sync_to_async_url` use more readable, fix linting
MarkLindblad Jan 28, 2025
ba11fb9
Simplify UX of `partition_file_async_list`
MarkLindblad Jan 28, 2025
80fca40
Filter out non-DocParse jobs from `partition_file_async_list`
MarkLindblad Jan 28, 2025
4a1d50c
Make `test_multiple_partition_file_async` more robust
MarkLindblad Jan 28, 2025
a49f7e9
Make `_convert_sync_to_async_url`'s `truncate` a keyword argument
MarkLindblad Jan 29, 2025
0f1cdb0
Improve performance of `partition_file_async_list`
MarkLindblad Jan 29, 2025
a7b7c44
Fix return description in docstring for `partition_file_async_result`
MarkLindblad Jan 29, 2025
a69af61
Improve return type in docstring of `partition_file_async_result`
MarkLindblad Jan 29, 2025
c069558
Remove repetitive sentence in docstring for `partition_file_async_res…
MarkLindblad Jan 29, 2025
7fba9f0
Fix `aryn-sdk` notebook example `ArynPartitionerPython.ipynb`
MarkLindblad Jan 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions lib/aryn-sdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -170,3 +170,10 @@ from aryn_sdk.partition import partition_file_async_submit, partition_file_async

partition_file_async_cancel(job_id)
```

#### List pending jobs

```
from aryn_sdk.partition import partition_file_async_list
partition_file_async_list()
```
174 changes: 56 additions & 118 deletions lib/aryn-sdk/aryn_sdk/partition/partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
_logger.setLevel(logging.INFO)
_logger.addHandler(logging.StreamHandler(sys.stderr))

g_version = "0.1.11"


class PartitionError(Exception):
def __init__(self, message: str, status_code: int) -> None:
Expand Down Expand Up @@ -206,12 +208,10 @@ def _partition_file_inner(
_logger.debug(f"{options_str}")

files: Mapping = {"options": options_str.encode("utf-8"), "pdf": file}
headers = {"Authorization": "Bearer {}".format(aryn_config.api_key())}
if webhook_url:
headers["X-Aryn-Webhook"] = webhook_url
resp = requests.post(docparse_url, files=files, headers=headers, stream=_set_stream(), verify=ssl_verify)
headers = _generate_headers(aryn_config.api_key(), webhook_url)
resp = requests.post(docparse_url, files=files, headers=headers, stream=_should_stream(), verify=ssl_verify)

if resp.status_code < 200 or resp.status_code >= 300:
if resp.status_code < 200 or resp.status_code > 299:
raise requests.exceptions.HTTPError(
f"Error: status_code: {resp.status_code}, reason: {resp.text}", response=resp
)
Expand Down Expand Up @@ -270,7 +270,14 @@ def _process_config(aryn_api_key: Optional[str] = None, aryn_config: Optional[Ar
return aryn_config


def _set_stream() -> bool:
def _generate_headers(aryn_api_key: str, webhook_url: Optional[str] = None) -> dict[str, str]:
headers = {"Authorization": f"Bearer {aryn_api_key}", "User-Agent": f"aryn-sdk/{g_version}"}
if webhook_url:
headers["X-Aryn-Webhook"] = webhook_url
return headers


def _should_stream() -> bool:
# Workaround for vcr. See https://github.com/aryn-ai/sycamore/issues/958
stream = True
if "vcr" in sys.modules:
Expand Down Expand Up @@ -354,6 +361,7 @@ def partition_file_async_submit(
Set the `docparse_url` argument to the url of the synchronous endpoint, and this function will automatically
change it to the async endpoint as long as `async_submit_url` is not set.

For examples of usage see README.md

Args:
Includes All Arguments `partition_file` accepts plus those below:
Expand All @@ -365,64 +373,17 @@ def partition_file_async_submit(
Returns:
A dictionary containing the key "job_id" the value of which can be used with the `partition_file_async_result`
function to get the results and check the status of the async job.

Single Job Example:
.. code-block:: python

import time
from aryn_sdk.partition import partition_file_async_submit, partition_file_async_result

with open("my-favorite-pdf.pdf", "rb") as f:
response = partition_file_async_submit(
f,
use_ocr=True,
extract_table_structure=True,
)

job_id = response["job_id"]

# Poll for the results
while True:
result = partition_file_async_result(job_id)
if result["status"] != "pending":
break
time.sleep(5)

Multi-Job Example:
.. code-block:: python

import logging
import time
from aryn_sdk.partition import partition_file_async_submit, partition_file_async_result

files = [open("file1.pdf", "rb"), open("file2.docx", "rb")]
job_ids = [None] * len(files)
for i, f in enumerate(files):
try:
job_ids[i] = partition_file_async_submit(f)["job_id"]
except Exception as e:
logging.warning(f"Failed to submit {f}: {e}")

results = [None] * len(files)
for i, job_id in enumerate(job_ids):
while True:
result = partition_file_async_result(job_id)
if result["status"] != "pending":
break
time.sleep(5)
results[i] = result

"""

if async_submit_url:
docparse_url = async_submit_url
elif not aps_url and not docparse_url:
docparse_url = _convert_sync_to_async_submit_url(ARYN_DOCPARSE_URL)
docparse_url = _convert_sync_to_async_url(ARYN_DOCPARSE_URL, "/submit", truncate=False)
else:
if aps_url:
aps_url = _convert_sync_to_async_submit_url(aps_url)
aps_url = _convert_sync_to_async_url(aps_url, "/submit", truncate=False)
if docparse_url:
docparse_url = _convert_sync_to_async_submit_url(docparse_url)
docparse_url = _convert_sync_to_async_url(docparse_url, "/submit", truncate=False)

return _partition_file_inner(
file=file,
Expand All @@ -445,12 +406,17 @@ def partition_file_async_submit(
)


def _convert_sync_to_async_submit_url(url: str) -> str:
def _convert_sync_to_async_url(url: str, prefix: str, *, truncate: bool) -> str:
parsed_url = urlparse(url)
assert parsed_url.path.startswith("/v1/")
if parsed_url.path.startswith("/v1/async/submit"):
return url
return urlunparse((*parsed_url[:2], f"/v1/async/submit{parsed_url.path[3:]}", *parsed_url[3:]))
ary = list(parsed_url)
if truncate:
ary[2] = f"/v1/async{prefix}" # path
else:
ary[2] = f"/v1/async{prefix}{parsed_url.path[3:]}" # path
return urlunparse(ary)


def partition_file_async_result(
Expand All @@ -464,26 +430,23 @@ def partition_file_async_result(
"""
Get the results of an asynchronous partitioning job by job_id. Meant to be used with `partition_file_async_submit`.

Returns:
A dict containing "status", "status_code", and also "result" which is "status" is "done". "status" can be
"done", "pending", "error", or "no_such_job".
For examples of usage see README.md

Unlike `partition_file`, this function does not raise an Exception if the partitioning failed. Note the
value corresponding to the "result" key of the returned dict contains what would have been the return value of
`partition_file` had the partitioning been done synchronously.
Returns:
A dict containing "status" and "status_code". When "status" is "done", the returned dict also contains "result"
which contains what would have been returned had `partition_file` been called directly. "status" can be "done",
"pending", "error", or "no_such_job".

Example:
See the examples in the docstring for `partition_file_async_submit` for a full example of how to use this
function.
Unlike `partition_file`, this function does not raise an Exception if the partitioning failed.
"""
if not async_result_url:
async_result_url = _convert_sync_to_async_url(ARYN_DOCPARSE_URL)
async_result_url = _convert_sync_to_async_url(ARYN_DOCPARSE_URL, "/result", truncate=True)

aryn_config = _process_config(aryn_api_key, aryn_config)

specific_job_url = f"{async_result_url.rstrip('/')}/{job_id}"
http_header = {"Authorization": f"Bearer {aryn_config.api_key()}"}
response = requests.get(specific_job_url, headers=http_header, stream=_set_stream(), verify=ssl_verify)
headers = _generate_headers(aryn_config.api_key())
response = requests.get(specific_job_url, headers=headers, stream=_should_stream(), verify=ssl_verify)

if response.status_code == 200:
return {"status": "done", "status_code": response.status_code, "result": response.json()}
Expand All @@ -495,14 +458,6 @@ def partition_file_async_result(
return {"status": "error", "status_code": response.status_code}


def _convert_sync_to_async_url(url: str, prefix: str = "/result") -> str:
parsed_url = urlparse(url)
assert parsed_url.path.startswith("/v1/")
if parsed_url.path.startswith(f"/v1/async{prefix}"):
return url
return urlunparse((*parsed_url[:2], f"/v1/async{prefix}", *parsed_url[3:]))


def partition_file_async_cancel(
job_id: str,
*,
Expand All @@ -517,27 +472,19 @@ def partition_file_async_cancel(
Returns:
A bool indicating whether the job was successfully cancelled by this request.

Example:
.. code-block:: python

from aryn_sdk.partition import partition_file_async_submit, partition_file_async_cancel
job_id = partition_file_async_submit(
"path/to/file.pdf",
use_ocr=True,
extract_table_structure=True,
extract_images=True,
)["job_id"]
A job can only be successfully cancelled once. A return value of false can mean the job was already cancelled,
the job is already done, or there was no job with the given job_id.

partition_file_async_cancel(job_id)
For an example of usage see README.md
"""
if not async_cancel_url:
async_cancel_url = _convert_sync_to_async_url(ARYN_DOCPARSE_URL, "/cancel")
async_cancel_url = _convert_sync_to_async_url(ARYN_DOCPARSE_URL, "/cancel", truncate=True)

aryn_config = _process_config(aryn_api_key, aryn_config)

specific_job_url = f"{async_cancel_url.rstrip('/')}/{job_id}"
http_header = {"Authorization": f"Bearer {aryn_config.api_key()}"}
response = requests.post(specific_job_url, headers=http_header, stream=_set_stream(), verify=ssl_verify)
headers = _generate_headers(aryn_config.api_key())
response = requests.post(specific_job_url, headers=headers, stream=_should_stream(), verify=ssl_verify)
if response.status_code == 200:
return True
elif response.status_code == 404:
Expand All @@ -554,44 +501,35 @@ def partition_file_async_list(
async_list_url: Optional[str] = None,
) -> dict[str, Any]:
"""
List pending async jobs.
List pending async jobs. For an example of usage see README.md
Comment thread
MarkLindblad marked this conversation as resolved.

Returns:
A dict containing "jobs" which is a dict containing jobs with their job_id as their key and their value is a
dict containing the keys "path" and "state".
A dict like the one below which maps job_ids to a dict containing details of the respective job.

{
"jobs": {
"aryn:j-sc0v0lglkauo774pioflp4l": {
"path": "/v1/document/partition",
"state": "run"
},
"aryn:j-0eorfmvhaf9skaxm0sagrrl": {
"path": "/v1/document/partition",
"state": "run"
},
"aryn:j-b9xp7ny0eejvqvbazjhg8rn": {
"path": "/v1/document/partition",
"state": "run"
}
"aryn:j-sc0v0lglkauo774pioflp4l": {
"state": "run"
},
"aryn:j-b9xp7ny0eejvqvbazjhg8rn": {
"state": "run"
}
}

Example:
.. code-block:: python

from aryn_sdk.partition import partition_file_async_list
partition_file_async_list()
"""
if not async_list_url:
async_list_url = _convert_sync_to_async_url(ARYN_DOCPARSE_URL, "/list")
async_list_url = _convert_sync_to_async_url(ARYN_DOCPARSE_URL, "/list", truncate=True)

aryn_config = _process_config(aryn_api_key, aryn_config)

http_header = {"Authorization": f"Bearer {aryn_config.api_key()}"}
response = requests.get(async_list_url, headers=http_header, stream=_set_stream(), verify=ssl_verify)
headers = _generate_headers(aryn_config.api_key())
response = requests.get(async_list_url, headers=headers, stream=_should_stream(), verify=ssl_verify)

return response.json()
all_jobs = response.json()["jobs"]
result = {}
for job_id in all_jobs.keys():
if all_jobs[job_id]["path"] == "/v1/document/partition":
del all_jobs[job_id]["path"]
result[job_id] = all_jobs[job_id]
return result


# Heavily adapted from lib/sycamore/data/table.py::Table.to_csv()
Expand Down
23 changes: 3 additions & 20 deletions lib/aryn-sdk/aryn_sdk/test/test_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,24 +238,6 @@ def check_nonstandard_url(
call_partition_file(nonstandard_async_url_example)


def test_partition_file_async():
with open(RESOURCE_DIR / "pdfs" / "3m_table.pdf", "rb") as f:
job_id = partition_file_async_submit(f)["job_id"]

start = time.time()
while True:
actual_result = partition_file_async_result(job_id)
if actual_result["status"] != "pending" or time.time() - start >= ASYNC_TIMEOUT:
break
time.sleep(1)
assert actual_result["status"] == "done"

with open(RESOURCE_DIR / "json" / "3m_output.json", "rb") as f:
expected_result = json.load(f)

assert expected_result["elements"] == actual_result["result"]["elements"]


def test_partition_file_async_with_unsupported_file_format():
with open(RESOURCE_DIR / "image" / "unsupported-format-test-document-image.heic", "rb") as f:
job_id = partition_file_async_submit(f)["job_id"]
Expand All @@ -278,7 +260,6 @@ def test_multiple_partition_file_async():

before = partition_file_async_list()
logging.info(f"List before:\n{json.dumps(before, indent=4)}")
assert len(before["jobs"]) == 0

for i in range(num_jobs):
logging.info(f"Submitting job {i + 1}/{num_jobs}")
Expand All @@ -288,7 +269,9 @@ def test_multiple_partition_file_async():

after = partition_file_async_list()
logging.info(f"List after:\n{json.dumps(after, indent=4)}")
assert len(after["jobs"]) == num_jobs
assert len(after) >= num_jobs
for job_id in job_ids:
assert job_id in after

for i, job_id in enumerate(job_ids):
logging.info(f"Polling job ({job_id}) {i + 1}/{num_jobs}")
Expand Down
2 changes: 1 addition & 1 deletion notebooks/ArynPartitionerPython.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@
"source": [
"def partition_filepath(filelocation, api_key=None, **options):\n",
" with open(filelocation, \"rb\") as f:\n",
" return partition_file(f, api_key, **options)"
" return partition_file(f, aryn_api_key=api_key, **options)"
]
},
{
Expand Down