Skip to content

Commit 8af228f

Browse files
authored
docs(examples): processing parquet file of images (#2641)
* add example processing parquet file of images Signed-off-by: Michele Dolfi <[email protected]> * vlm using vllm api Signed-off-by: Michele Dolfi <[email protected]> * use openvino and add more docs Signed-off-by: Michele Dolfi <[email protected]> * add default input file Signed-off-by: Michele Dolfi <[email protected]> * change default to standard for running in CI Signed-off-by: Michele Dolfi <[email protected]> * use simple rapidocr without openvino in the CI example Signed-off-by: Michele Dolfi <[email protected]> --------- Signed-off-by: Michele Dolfi <[email protected]>
1 parent da4c2e9 commit 8af228f

File tree

3 files changed

+185
-0
lines changed

3 files changed

+185
-0
lines changed
4.88 MB
Binary file not shown.

docs/examples/parquet_images.py

Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
# %% [markdown]
2+
# What this example does
3+
# - Run a batch conversion on a parquet file with an image column.
4+
#
5+
# Requirements
6+
# - Python 3.9+
7+
# - Install Docling: `pip install docling`
8+
#
9+
# How to run
10+
# - `python docs/examples/parquet_images.py FILE`
11+
#
12+
# The parquet file should be in the format similar to the ViDoRe V3 dataset.
13+
# https://huggingface.co/collections/vidore/vidore-benchmark-v3
14+
#
15+
# For example:
16+
# - https://huggingface.co/datasets/vidore/vidore_v3_hr/blob/main/corpus/test-00000-of-00001.parquet
17+
#
18+
# ### Start models with vllm
19+
# ```console
20+
# vllm serve ibm-granite/granite-docling-258M \
21+
# --host 127.0.0.1 --port 8000 \
22+
# --max-num-seqs 512 \
23+
# --max-num-batched-tokens 8192 \
24+
# --enable-chunked-prefill \
25+
# --gpu-memory-utilization 0.9
26+
# ```
27+
# %%
28+
29+
import io
30+
import time
31+
from pathlib import Path
32+
from typing import Annotated, Literal
33+
34+
import pyarrow.parquet as pq
35+
import typer
36+
from PIL import Image
37+
38+
from docling.datamodel import vlm_model_specs
39+
from docling.datamodel.base_models import ConversionStatus, DocumentStream, InputFormat
40+
from docling.datamodel.pipeline_options import (
41+
PdfPipelineOptions,
42+
PipelineOptions,
43+
RapidOcrOptions,
44+
VlmPipelineOptions,
45+
)
46+
from docling.datamodel.pipeline_options_vlm_model import ApiVlmOptions, ResponseFormat
47+
from docling.datamodel.settings import settings
48+
from docling.document_converter import DocumentConverter, ImageFormatOption
49+
from docling.pipeline.base_pipeline import ConvertPipeline
50+
from docling.pipeline.standard_pdf_pipeline import StandardPdfPipeline
51+
from docling.pipeline.vlm_pipeline import VlmPipeline
52+
53+
54+
def process_document(
55+
images: list[Image.Image], chunk_idx: int, doc_converter: DocumentConverter
56+
):
57+
"""Builds a tall image and sends it through Docling."""
58+
59+
print(f"\n--- Processing chunk {chunk_idx} with {len(images)} images ---")
60+
61+
# Convert images to mode RGB (TIFF pages must match)
62+
rgb_images = [im.convert("RGB") for im in images]
63+
64+
# First image is the base frame
65+
first = rgb_images[0]
66+
rest = rgb_images[1:]
67+
68+
# Create multi-page TIFF using PIL frames
69+
buf = io.BytesIO()
70+
first.save(
71+
buf,
72+
format="TIFF",
73+
save_all=True,
74+
append_images=rest,
75+
compression="tiff_deflate", # good compression, optional
76+
)
77+
buf.seek(0)
78+
79+
# Docling conversion
80+
doc_stream = DocumentStream(name=f"doc_{chunk_idx}.tiff", stream=buf)
81+
82+
start_time = time.time()
83+
conv_result = doc_converter.convert(doc_stream)
84+
runtime = time.time() - start_time
85+
86+
assert conv_result.status == ConversionStatus.SUCCESS
87+
88+
pages = len(conv_result.pages)
89+
print(
90+
f"Chunk {chunk_idx} converted in {runtime:.2f} sec ({pages / runtime:.2f} pages/s)."
91+
)
92+
93+
94+
def run(
95+
filename: Annotated[Path, typer.Argument()] = Path(
96+
"docs/examples/data/vidore_v3_hr-slice.parquet"
97+
),
98+
doc_size: int = 192,
99+
batch_size: int = 64,
100+
pipeline: Literal["standard", "vlm"] = "standard",
101+
):
102+
if pipeline == "standard":
103+
pipeline_cls: type[ConvertPipeline] = StandardPdfPipeline
104+
pipeline_options: PipelineOptions = PdfPipelineOptions(
105+
# ocr_options=RapidOcrOptions(backend="openvino"),
106+
ocr_batch_size=batch_size,
107+
layout_batch_size=batch_size,
108+
table_batch_size=4,
109+
)
110+
elif pipeline == "vlm":
111+
settings.perf.page_batch_size = batch_size
112+
pipeline_cls = VlmPipeline
113+
vlm_options = ApiVlmOptions(
114+
url="http://localhost:8000/v1/chat/completions",
115+
params=dict(
116+
model=vlm_model_specs.GRANITEDOCLING_TRANSFORMERS.repo_id,
117+
max_tokens=4096,
118+
skip_special_tokens=True,
119+
),
120+
prompt=vlm_model_specs.GRANITEDOCLING_TRANSFORMERS.prompt,
121+
timeout=90,
122+
scale=1.0,
123+
temperature=0.0,
124+
concurrency=batch_size,
125+
stop_strings=["</doctag>", "<|end_of_text|>"],
126+
response_format=ResponseFormat.DOCTAGS,
127+
)
128+
pipeline_options = VlmPipelineOptions(
129+
vlm_options=vlm_options,
130+
enable_remote_services=True, # required when using a remote inference service.
131+
)
132+
else:
133+
raise RuntimeError(f"Pipeline {pipeline} not available.")
134+
135+
doc_converter = DocumentConverter(
136+
format_options={
137+
InputFormat.IMAGE: ImageFormatOption(
138+
pipeline_cls=pipeline_cls,
139+
pipeline_options=pipeline_options,
140+
)
141+
}
142+
)
143+
144+
start_time = time.time()
145+
doc_converter.initialize_pipeline(InputFormat.IMAGE)
146+
init_runtime = time.time() - start_time
147+
print(f"Pipeline initialized in {init_runtime:.2f} seconds.")
148+
149+
# ------------------------------------------------------------
150+
# Open parquet file in streaming mode
151+
# ------------------------------------------------------------
152+
pf = pq.ParquetFile(filename)
153+
154+
image_buffer = [] # holds up to doc_size images
155+
chunk_idx = 0
156+
157+
# ------------------------------------------------------------
158+
# Stream batches from parquet
159+
# ------------------------------------------------------------
160+
for batch in pf.iter_batches(batch_size=batch_size, columns=["image"]):
161+
col = batch.column("image")
162+
163+
# Extract Python objects (PIL images)
164+
# Arrow stores them as Python objects inside an ObjectArray
165+
for i in range(len(col)):
166+
img_dict = col[i].as_py() # {"bytes": ..., "path": ...}
167+
pil_image = Image.open(io.BytesIO(img_dict["bytes"]))
168+
image_buffer.append(pil_image)
169+
170+
# If enough images gathered → process one doc
171+
if len(image_buffer) == doc_size:
172+
process_document(image_buffer, chunk_idx, doc_converter)
173+
image_buffer.clear()
174+
chunk_idx += 1
175+
176+
# ------------------------------------------------------------
177+
# Process trailing images (last partial chunk)
178+
# ------------------------------------------------------------
179+
if image_buffer:
180+
process_document(image_buffer, chunk_idx, doc_converter)
181+
182+
183+
if __name__ == "__main__":
184+
typer.run(run)

mkdocs.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ nav:
118118
- ⚡️ GPU optimization:
119119
- "Standard pipeline": examples/gpu_standard_pipeline.py
120120
- "VLM pipeline": examples/gpu_vlm_pipeline.py
121+
- "Parquet benchmark": examples/parquet_images.py
121122
- 🗂️ More examples:
122123
- examples/dpk-ingest-chunk-tokenize.ipynb
123124
- examples/rag_azuresearch.ipynb

0 commit comments

Comments
 (0)