Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 15 additions & 15 deletions lib/aryn-sdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ png_str = convert_image_element(image_elts[2], format="PNG", b64encode=True)

### Async Aryn DocParse

#### Single Job Example
#### Single Task Example
```python
import time
from aryn_sdk.partition import partition_file_async_submit, partition_file_async_result
Expand All @@ -111,67 +111,67 @@ with open("my-favorite-pdf.pdf", "rb") as f:
extract_table_structure=True,
)

job_id = response["job_id"]
task_id = response["task_id"]

# Poll for the results
while True:
result = partition_file_async_result(job_id)
result = partition_file_async_result(task_id)
if result["status"] != "pending":
break
time.sleep(5)
```

Optionally, you can also set a webhook for Aryn to call when your job is completed:
Optionally, you can also set a webhook for Aryn to call when your task is completed:

```python
partition_file_async_submit("path/to/my/file.docx", webhook_url="https://example.com/alert")
```

Aryn will POST a request containing a body like the below:
```json
{"done": [{"job_id": "aryn:j-47gpd3604e5tz79z1jro5fc"}]}
{"done": [{"task_id": "aryn:j-47gpd3604e5tz79z1jro5fc"}]}
```

#### Multi-Job Example
#### Multi-Task Example

```python
import logging
import time
from aryn_sdk.partition import partition_file_async_submit, partition_file_async_result

files = [open("file1.pdf", "rb"), open("file2.docx", "rb")]
job_ids = [None] * len(files)
task_ids = [None] * len(files)
for i, f in enumerate(files):
try:
job_ids[i] = partition_file_async_submit(f)["job_id"]
task_ids[i] = partition_file_async_submit(f)["task_id"]
except Exception as e:
logging.warning(f"Failed to submit {f}: {e}")

results = [None] * len(files)
for i, job_id in enumerate(job_ids):
for i, task_id in enumerate(task_ids):
while True:
result = partition_file_async_result(job_id)
result = partition_file_async_result(task_id)
if result["status"] != "pending":
break
time.sleep(5)
results[i] = result
```

#### Cancelling an async job
#### Cancelling an async task

```python
from aryn_sdk.partition import partition_file_async_submit, partition_file_async_cancel
job_id = partition_file_async_submit(
task_id = partition_file_async_submit(
"path/to/file.pdf",
use_ocr=True,
extract_table_structure=True,
extract_images=True,
)["job_id"]
)["task_id"]

partition_file_async_cancel(job_id)
partition_file_async_cancel(task_id)
```

#### List pending jobs
#### List pending tasks

```
from aryn_sdk.partition import partition_file_async_list
Expand Down
55 changes: 28 additions & 27 deletions lib/aryn-sdk/aryn_sdk/partition/partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,8 +355,8 @@ def partition_file_async_submit(
Submits a file to be partitioned asynchronously. Meant to be used in tandem with `partition_file_async_result`.

`partition_file_async_submit` takes the same arguments as `partition_file`, and in addition it accepts a str
`webhook_url` argument which is a URL Aryn will send a POST request to when the job stops and an str
`async_submit_url` argument that can be used to override where the job is submitted to.
`webhook_url` argument which is a URL Aryn will send a POST request to when the task stops and an str
`async_submit_url` argument that can be used to override where the task is submitted to.

Set the `docparse_url` argument to the url of the synchronous endpoint, and this function will automatically
change it to the async endpoint as long as `async_submit_url` is not set.
Expand All @@ -366,13 +366,13 @@ def partition_file_async_submit(
Args:
Includes All Arguments `partition_file` accepts plus those below:
...
webhook_url: A URL to send a POST request to when the job is done. The resulting POST request will have a
body like: {"done": [{"job_id": "aryn:j-47gpd3604e5tz79z1jro5fc"}]}
async_submit_url: When set, this will override the endpoint the job is submitted to.
webhook_url: A URL to send a POST request to when the task is done. The resulting POST request will have a
body like: {"done": [{"task_id": "aryn:j-47gpd3604e5tz79z1jro5fc"}]}
async_submit_url: When set, this will override the endpoint the task is submitted to.

Returns:
A dictionary containing the key "job_id" the value of which can be used with the `partition_file_async_result`
function to get the results and check the status of the async job.
A dictionary containing the key "task_id" the value of which can be used with the `partition_file_async_result`
function to get the results and check the status of the async task.
"""

if async_submit_url:
Expand Down Expand Up @@ -420,22 +420,23 @@ def _convert_sync_to_async_url(url: str, prefix: str, *, truncate: bool) -> str:


def partition_file_async_result(
job_id: str,
task_id: str,
*,
aryn_api_key: Optional[str] = None,
aryn_config: Optional[ArynConfig] = None,
ssl_verify: bool = True,
async_result_url: Optional[str] = None,
) -> dict[str, Any]:
"""
Get the results of an asynchronous partitioning job by job_id. Meant to be used with `partition_file_async_submit`.
Get the results of an asynchronous partitioning task by task_id. Meant to be used with
`partition_file_async_submit`.

For examples of usage see README.md

Returns:
A dict containing "status" and "status_code". When "status" is "done", the returned dict also contains "result"
which contains what would have been returned had `partition_file` been called directly. "status" can be "done",
"pending", "error", or "no_such_job".
"pending", "error", or "no_such_task".

Unlike `partition_file`, this function does not raise an Exception if the partitioning failed.
"""
Expand All @@ -444,36 +445,36 @@ def partition_file_async_result(

aryn_config = _process_config(aryn_api_key, aryn_config)

specific_job_url = f"{async_result_url.rstrip('/')}/{job_id}"
specific_task_url = f"{async_result_url.rstrip('/')}/{task_id}"
headers = _generate_headers(aryn_config.api_key())
response = requests.get(specific_job_url, headers=headers, stream=_should_stream(), verify=ssl_verify)
response = requests.get(specific_task_url, headers=headers, stream=_should_stream(), verify=ssl_verify)

if response.status_code == 200:
return {"status": "done", "status_code": response.status_code, "result": response.json()}
elif response.status_code == 202:
return {"status": "pending", "status_code": response.status_code}
elif response.status_code == 404:
return {"status": "no_such_job", "status_code": response.status_code}
return {"status": "no_such_task", "status_code": response.status_code}
else:
return {"status": "error", "status_code": response.status_code}


def partition_file_async_cancel(
job_id: str,
task_id: str,
*,
aryn_api_key: Optional[str] = None,
aryn_config: Optional[ArynConfig] = None,
ssl_verify: bool = True,
async_cancel_url: Optional[str] = None,
) -> bool:
"""
Cancel an asynchronous partitioning job by job_id. Meant to be used with `partition_file_async_submit`.
Cancel an asynchronous partitioning task by task_id. Meant to be used with `partition_file_async_submit`.

Returns:
A bool indicating whether the job was successfully cancelled by this request.
A bool indicating whether the task was successfully cancelled by this request.

A job can only be successfully cancelled once. A return value of false can mean the job was already cancelled,
the job is already done, or there was no job with the given job_id.
A task can only be successfully cancelled once. A return value of false can mean the task was already cancelled,
the task is already done, or there was no task with the given task_id.

For an example of usage see README.md
"""
Expand All @@ -482,9 +483,9 @@ def partition_file_async_cancel(

aryn_config = _process_config(aryn_api_key, aryn_config)

specific_job_url = f"{async_cancel_url.rstrip('/')}/{job_id}"
specific_task_url = f"{async_cancel_url.rstrip('/')}/{task_id}"
headers = _generate_headers(aryn_config.api_key())
response = requests.post(specific_job_url, headers=headers, stream=_should_stream(), verify=ssl_verify)
response = requests.post(specific_task_url, headers=headers, stream=_should_stream(), verify=ssl_verify)
if response.status_code == 200:
return True
elif response.status_code == 404:
Expand All @@ -501,10 +502,10 @@ def partition_file_async_list(
async_list_url: Optional[str] = None,
) -> dict[str, Any]:
"""
List pending async jobs. For an example of usage see README.md
List pending async tasks. For an example of usage see README.md

Returns:
A dict like the one below which maps job_ids to a dict containing details of the respective job.
A dict like the one below which maps task_ids to a dict containing details of the respective task.

{
"aryn:j-sc0v0lglkauo774pioflp4l": {
Expand All @@ -523,12 +524,12 @@ def partition_file_async_list(
headers = _generate_headers(aryn_config.api_key())
response = requests.get(async_list_url, headers=headers, stream=_should_stream(), verify=ssl_verify)

all_jobs = response.json()["jobs"]
all_tasks = response.json()["tasks"]
result = {}
for job_id in all_jobs.keys():
if all_jobs[job_id]["path"] == "/v1/document/partition":
del all_jobs[job_id]["path"]
result[job_id] = all_jobs[job_id]
for task_id in all_tasks.keys():
if all_tasks[task_id]["path"] == "/v1/document/partition":
del all_tasks[task_id]["path"]
result[task_id] = all_tasks[task_id]
return result


Expand Down
50 changes: 25 additions & 25 deletions lib/aryn-sdk/aryn_sdk/test/test_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,13 +181,13 @@ def test_convert_img():
assert png_str == real_str


def test_invalid_job_id():
def test_invalid_task_id():
response = partition_file_async_result("INVALID_JOB_ID")
assert response["status"] == "no_such_job"
assert response["status"] == "no_such_task"


def test_partition_file_async_submit(mocker):
data = b'{"job_id": "1234"}'
data = b'{"task_id": "1234"}'
expected_response = json.loads(data.decode())

mocked_response = mocker.Mock()
Expand Down Expand Up @@ -240,11 +240,11 @@ def check_nonstandard_url(

def test_partition_file_async_with_unsupported_file_format():
with open(RESOURCE_DIR / "image" / "unsupported-format-test-document-image.heic", "rb") as f:
job_id = partition_file_async_submit(f)["job_id"]
task_id = partition_file_async_submit(f)["task_id"]

start = time.time()
while True:
actual_result = partition_file_async_result(job_id)
actual_result = partition_file_async_result(task_id)
if actual_result["status"] != "pending" or time.time() - start >= ASYNC_TIMEOUT:
break
time.sleep(1)
Expand All @@ -255,57 +255,57 @@ def test_partition_file_async_with_unsupported_file_format():


def test_multiple_partition_file_async():
num_jobs = 4
job_ids = []
num_tasks = 4
task_ids = []

before = partition_file_async_list()
logging.info(f"List before:\n{json.dumps(before, indent=4)}")

for i in range(num_jobs):
logging.info(f"Submitting job {i + 1}/{num_jobs}")
job_id = partition_file_async_submit(RESOURCE_DIR / "pdfs" / "FR-2002-05-03-TRUNCATED-40.pdf")["job_id"]
logging.info(f"\tJob ID: {job_id}")
job_ids.append(job_id)
for i in range(num_tasks):
logging.info(f"Submitting task {i + 1}/{num_tasks}")
task_id = partition_file_async_submit(RESOURCE_DIR / "pdfs" / "FR-2002-05-03-TRUNCATED-40.pdf")["task_id"]
logging.info(f"\tTask ID: {task_id}")
task_ids.append(task_id)

after = partition_file_async_list()
logging.info(f"List after:\n{json.dumps(after, indent=4)}")
assert len(after) >= num_jobs
for job_id in job_ids:
assert job_id in after
assert len(after) >= num_tasks
for task_id in task_ids:
assert task_id in after

for i, job_id in enumerate(job_ids):
logging.info(f"Polling job ({job_id}) {i + 1}/{num_jobs}")
for i, task_id in enumerate(task_ids):
logging.info(f"Polling task ({task_id}) {i + 1}/{num_tasks}")
start = time.time()
while True:
actual_result = partition_file_async_result(job_id)
actual_result = partition_file_async_result(task_id)
if actual_result["status"] != "pending" or time.time() - start >= ASYNC_TIMEOUT:
break
time.sleep(1)
logging.info(f"\tContinuing to Poll Job {job_id} ({i + 1}/{num_jobs})")
logging.info(f"\tContinuing to Poll Task {task_id} ({i + 1}/{num_tasks})")
assert actual_result["status"] == "done"
assert len(actual_result["result"]["elements"]) > 1000


def test_partition_file_async_cancel():
with open(RESOURCE_DIR / "pdfs" / "FR-2002-05-03-TRUNCATED-40.pdf", "rb") as f:
job_id = partition_file_async_submit(f)["job_id"]
task_id = partition_file_async_submit(f)["task_id"]

before_cancel_result = partition_file_async_result(job_id)
before_cancel_result = partition_file_async_result(task_id)
assert before_cancel_result["status"] == "pending"
assert partition_file_async_cancel(job_id)
assert partition_file_async_cancel(task_id)

# Cancellation is not reflected in the result immediately
for _ in range(10):
time.sleep(0.1)
after_cancel_result = partition_file_async_result(job_id)
after_cancel_result = partition_file_async_result(task_id)
if after_cancel_result["status"] != "pending":
break
assert after_cancel_result["status"] == "pending"
assert after_cancel_result["status"] == "no_such_job"
assert after_cancel_result["status"] == "no_such_task"


def test_smoke_webhook(mocker):
data = b'{"job_id": "1234"}'
data = b'{"task_id": "1234"}'

webhook_url = "TEST"

Expand Down