Conversation
- Introduced `afile_content_streaming` and `file_content_streaming` functions in `litellm/files/main.py` to handle asynchronous and synchronous file content streaming. - Added `FileContentStreamingResponse` class in `litellm/files/streaming.py` to manage streaming responses with logging capabilities. - Updated OpenAI API integration in `litellm/llms/openai/openai.py` to support new streaming methods. - Enhanced file content retrieval in `litellm/proxy/openai_files_endpoints/files_endpoints.py` to route requests for streaming. - Added unit tests for the new streaming functionality in `tests/test_litellm/llms/openai/test_openai_file_content_streaming.py` and `tests/test_litellm/proxy/openai_files_endpoint/test_files_endpoint.py`. - Refactored type hints and imports for better clarity and organization across modified files.
…e. This provides a 1:1 behaviour mapping similar to the non streaming behaviour.
- Removed unused imports and streamlined type hints in `litellm/utils.py` and `litellm/files/main.py`. - Moved `FileContentStreamingResult` to a new `litellm/files/types.py` for better organization. - Updated `FileContentStreamingResponse` in `litellm/files/streaming.py` to include asynchronous close methods and improved logging capabilities. - Enhanced tests to ensure proper closure of streaming iterators in `tests/test_litellm/llms/openai/test_openai_file_content_streaming.py` and `tests/test_litellm/proxy/openai_files_endpoint/test_files_endpoint.py`.
- Static Methods for Streaming Handler Function - Remove the afile_content_streaming wrapper function. Enabled with a stream boolean in afile_content - Cleaned up test cases after refactor
… routing - Updated `FileContentStreamingHandler` to utilize `custom_llm_provider` from credentials for routing. - Added error handling for missing `custom_llm_provider` in credentials. - Introduced new tests to validate streaming behavior with routed providers and non-OpenAI providers. - Cleaned up imports and ensured proper type casting for improved clarity.
…provider routing - Added validation to ensure credentials include a custom LLM provider before routing. - Cleaned up type casting for better readability. - Introduced a new test to verify behavior when a non-OpenAI provider is used, ensuring proper handling of streaming responses. - Updated imports to include necessary modules for testing.
- Changed the import path for `upload_file_to_storage_backend` in test files to reflect the new module structure. - Ensured consistency in mocking for storage backend service tests.
- Introduced a new method in `FileContentStreamingHandler` to resolve streaming request parameters, enhancing the routing logic based on credentials. - Updated the `should_stream_file_content` method to check against supported providers. - Cleaned up type hints and imports across multiple files for better organization and clarity. - Added comprehensive tests to validate the new routing behavior and ensure original data integrity during streaming requests.
Add file content streaming support for OpenAI and related utilities
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
|
|
| ): | ||
| verbose_proxy_logger.debug( | ||
| "Using streaming file content helper for custom_llm_provider=%s, original_file_id=%s, file_id=%s, model_used=%s", | ||
| resolved_custom_llm_provider, |
| verbose_proxy_logger.debug( | ||
| "Using streaming file content helper for custom_llm_provider=%s, original_file_id=%s, file_id=%s, model_used=%s", | ||
| resolved_custom_llm_provider, | ||
| original_file_id, |
| "Using streaming file content helper for custom_llm_provider=%s, original_file_id=%s, file_id=%s, model_used=%s", | ||
| resolved_custom_llm_provider, | ||
| original_file_id, | ||
| resolved_file_id, |
| resolved_custom_llm_provider, | ||
| original_file_id, | ||
| resolved_file_id, | ||
| model_used, |
| from litellm.litellm_core_utils.litellm_logging import ( | ||
| Logging as LiteLLMLoggingObj, | ||
| ) | ||
| from litellm.types.utils import StandardLoggingHiddenParams, StandardLoggingPayload |
|
|
||
| if TYPE_CHECKING: | ||
| from litellm.proxy._types import UserAPIKeyAuth | ||
| from litellm.proxy.utils import ProxyLogging |
| from litellm.proxy.openai_files_endpoints.common_utils import ( | ||
| prepare_data_with_credentials, | ||
| ) |
| from litellm.proxy.common_request_processing import ( | ||
| ProxyBaseLLMRequestProcessing, | ||
| ) |
| from litellm.proxy.openai_files_endpoints.storage_backend_service import ( | ||
| StorageBackendFileService, | ||
| ) |
| from litellm.proxy.openai_files_endpoints.file_content_streaming_handler import ( | ||
| FileContentStreamingHandler, | ||
| ) |
Greptile SummaryThis PR adds streaming support for the OpenAI-compatible file content endpoint ( Confidence Score: 5/5Safe to merge — the streaming implementation is logically correct and all key edge cases are covered by tests. All open findings are P2 style concerns. The coroutine handoff through asyncio.iscoroutine in afile_content, the finally-based aclose chain, and the response_cm.aexit exception propagation are all correct. Test coverage for the new paths is thorough and fully mocked. litellm/proxy/openai_files_endpoints/file_content_streaming_handler.py — dict key ordering in get_streaming_file_content_response
|
| Filename | Overview |
|---|---|
| litellm/proxy/openai_files_endpoints/file_content_streaming_handler.py | New handler that resolves routing params, gates on supported providers, and builds the StreamingResponse — dict key ordering allows data to silently override explicit stream/file_id params |
| litellm/files/streaming.py | New FileContentStreamingResponse wrapper: handles logging callbacks, aclose/close lifecycle, and hidden-params propagation correctly |
| litellm/files/main.py | Adds chunk_size/stream params to afile_content/file_content, new file_content_streaming function that correctly handles async/sync via asyncio.iscoroutine check in afile_content |
| litellm/llms/openai/openai.py | Adds afile_content_streaming (async) and file_content_streaming (sync) methods; correctly propagates exceptions through context manager exit in both paths |
| litellm/proxy/openai_files_endpoints/files_endpoints.py | Wires streaming handler into get_file_content; inline imports for FileContentStreamingHandler and StorageBackendFileService remain despite style-guide rule |
| litellm/files/types.py | New module: moves FileContentProvider Literal and adds FileContentStreamingResult NamedTuple to break circular imports |
| tests/test_litellm/llms/openai/test_openai_file_content_streaming.py | New unit tests for streaming, all properly mocked; covers success, failure, aclose, exception propagation through context manager exit |
| tests/test_litellm/proxy/openai_files_endpoint/test_files_endpoint.py | New proxy integration tests for streaming paths; mock patch paths updated to match new import locations correctly |
Sequence Diagram
sequenceDiagram
participant Client
participant Proxy as files_endpoints.py<br/>get_file_content
participant Handler as FileContentStreamingHandler
participant LiteLLM as litellm.afile_content
participant OAI as OpenAIFilesAPI<br/>afile_content_streaming
participant Upstream as OpenAI API
Client->>Proxy: GET /v1/files/{file_id}/content
Proxy->>Handler: resolve_streaming_request_params(...)
Handler-->>Proxy: resolved_provider, resolved_file_id, resolved_data
Proxy->>Handler: should_stream_file_content(resolved_provider)
Handler-->>Proxy: True (OpenAI-compatible provider)
Proxy->>Handler: get_streaming_file_content_response(...)
Handler->>LiteLLM: afile_content(stream=True, ...)
LiteLLM->>OAI: file_content_streaming(_is_async=True)
OAI->>Upstream: files.with_streaming_response.content(file_id)
Upstream-->>OAI: HTTP streaming response + headers
OAI-->>LiteLLM: FileContentStreamingResult(AsyncIterator, headers)
LiteLLM-->>Handler: FileContentStreamingResult wrapped in FileContentStreamingResponse
Handler-->>Proxy: StreamingResponse(stream_file_content_with_logging)
Proxy-->>Client: HTTP 200 StreamingResponse with Content-Length + custom headers
loop Each chunk
Client->>Proxy: read chunk
Proxy->>Upstream: iter_bytes()
Upstream-->>Proxy: bytes chunk
Proxy-->>Client: yield chunk
end
Note over Proxy: StopAsyncIteration triggers _log_success_async(), update_request_status(success), aclose() releases HTTP connection
Reviews (2): Last reviewed commit: "Update streaming.py. Provide Type Annota..." | Re-trigger Greptile
| ) | ||
| from litellm.proxy.openai_files_endpoints.storage_backend_service import ( | ||
| StorageBackendFileService, | ||
| ) |
There was a problem hiding this comment.
Inline import moved from module-level
StorageBackendFileService was imported at the top of the file before this PR. Moving it inside route_create_file violates the codebase style guide ("Avoid imports within methods — place all imports at the top of the file"). The FileContentStreamingHandler import added later in get_file_content has the same issue. If a circular-import cycle genuinely forces these to be deferred, a brief comment explaining which cycle is being broken would help; otherwise, restoring them to module-level is preferred.
Context Used: CLAUDE.md (source)
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
| logging_obj=logging_obj, | ||
| ), | ||
| headers=response.headers, | ||
| ) | ||
|
|
||
| response: Union[ | ||
| FileContentStreamingResult, Coroutine[Any, Any, FileContentStreamingResult] | ||
| ] = FileContentStreamingResult(stream_iterator=iter(()), headers={}) | ||
| if custom_llm_provider in OPENAI_COMPATIBLE_BATCH_AND_FILES_PROVIDERS: | ||
| openai_creds = get_openai_credentials( | ||
| api_base=optional_params.api_base, |
There was a problem hiding this comment.
Dead
else branch is unreachable
file_content_streaming is only ever called from file_content after _should_sdk_support_streaming(custom_llm_provider) returns True, which is defined as custom_llm_provider in OPENAI_COMPATIBLE_BATCH_AND_FILES_PROVIDERS. The else: raise BadRequestError block at the end of file_content_streaming therefore can never be reached, and the initial sentinel response = FileContentStreamingResult(stream_iterator=iter(()), headers={}) is also always overwritten. Consider either removing the dead branch or asserting the invariant for clarity.
| api_base=api_base, | ||
| timeout=timeout, | ||
| max_retries=max_retries, | ||
| organization=organization, | ||
| client=client, | ||
| _is_async=_is_async, | ||
| ) |
There was a problem hiding this comment.
Return type annotation misrepresents the async branch
file_content_streaming is declared -> FileContentStreamingResult, but when _is_async is True it returns self.afile_content_streaming(...) which is a coroutine (Coroutine[Any, Any, FileContentStreamingResult]), not a FileContentStreamingResult. The # type: ignore on the return suppresses the type error. The caller in main.py handles this correctly at runtime via asyncio.iscoroutine(), but the annotation is misleading. Consider updating the signature to Union[FileContentStreamingResult, Coroutine[Any, Any, FileContentStreamingResult]] to match main.py's file_content_streaming.
| def _build_standard_logging_object( | ||
| self, | ||
| end_time: datetime.datetime, | ||
| ) -> Optional["StandardLoggingPayload"]: | ||
| if self.standard_logging_object is not None: | ||
| return self.standard_logging_object | ||
|
|
||
| if self.logging_obj is None: | ||
| return None | ||
|
|
||
| from litellm.litellm_core_utils.litellm_logging import ( | ||
| get_standard_logging_object_payload, | ||
| ) | ||
|
|
||
| self._sync_hidden_params() | ||
| payload = get_standard_logging_object_payload( | ||
| kwargs=self.logging_obj.model_call_details, | ||
| init_response_obj=self._build_logging_response(), | ||
| start_time=self._start_time, | ||
| end_time=end_time, | ||
| logging_obj=self.logging_obj, | ||
| status="success", | ||
| ) | ||
| if payload is None: | ||
| return None | ||
|
|
||
| merged_hidden_params = cast( | ||
| "StandardLoggingHiddenParams", | ||
| { | ||
| **cast(Dict[str, Any], payload.get("hidden_params") or {}), | ||
| **self._hidden_params, | ||
| }, | ||
| ) | ||
| payload["hidden_params"] = merged_hidden_params | ||
| payload["response"] = self._build_logging_response() | ||
| if self.custom_llm_provider is not None: | ||
| payload["custom_llm_provider"] = self.custom_llm_provider | ||
| if self.model is not None: | ||
| payload["model"] = self.model | ||
| if self._hidden_params.get("api_base"): | ||
| payload["api_base"] = cast(str, self._hidden_params["api_base"]) | ||
|
|
||
| self.standard_logging_object = payload | ||
| return payload |
There was a problem hiding this comment.
_build_standard_logging_object hardcodes status="success" and caches the result
The method passes status="success" to get_standard_logging_object_payload and stores the result in self.standard_logging_object. If the method were ever called before a failure (e.g., a future refactor moves it into _log_failure_async), the cached "success" payload would be silently returned for the failure event. The failure handlers currently bypass this method entirely, so there's no active bug — but a comment or an explicit guard would make the invariant visible and prevent accidental misuse.
Relevant issues
Pre-Submission checklist
Please complete all items before asking a LiteLLM maintainer to review your PR
tests/test_litellm/directory, Adding at least 1 test is a hard requirement - see detailsmake test-unit@greptileaiand received a Confidence Score of at least 4/5 before requesting a maintainer reviewDelays in PR merge?
If you're seeing a delay in your PR being merged, ping the LiteLLM Team on Slack (#pr-review).
CI (LiteLLM team)
Branch creation CI run
Link:
CI run for the last commit
Link:
Merge / cherry-pick CI run
Links:
Screenshots / Proof of Fix
Type
🆕 New Feature
🐛 Bug Fix
🧹 Refactoring
📖 Documentation
🚄 Infrastructure
✅ Test
Changes