Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 35 additions & 8 deletions litellm/integrations/s3_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"""

import asyncio
import time
from datetime import datetime
from typing import List, Optional, cast

Expand Down Expand Up @@ -371,11 +372,23 @@ async def async_upload_data_to_s3(
# Prepare the signed headers
signed_headers = dict(aws_request.headers.items())

# Make the request
response = await self.async_httpx_client.put(
url, data=json_string, headers=signed_headers
)
response.raise_for_status()
# Make the request with retry for transient S3 errors (500/503)
max_retries = 3
for attempt in range(max_retries):
response = await self.async_httpx_client.put(
url, data=json_string, headers=signed_headers
)
if response.status_code in (500, 503) and attempt < max_retries - 1:
wait_time = 2**attempt # 1s, 2s
verbose_logger.warning(
f"S3 upload returned {response.status_code}, retrying in {wait_time}s "
f"(attempt {attempt + 1}/{max_retries}) "
f"key={batch_logging_element.s3_object_key}"
)
await asyncio.sleep(wait_time)
continue
response.raise_for_status()
break
except Exception as e:
verbose_logger.exception(f"Error uploading to s3: {str(e)}")
self.handle_callback_failure(callback_name="S3Logger")
Expand Down Expand Up @@ -550,9 +563,23 @@ def upload_data_to_s3(self, batch_logging_element: s3BatchLoggingElement):
if self.s3_verify is not None
else None
)
# Make the request
response = httpx_client.put(url, data=json_string, headers=signed_headers)
response.raise_for_status()
# Make the request with retry for transient S3 errors (500/503)
max_retries = 3
for attempt in range(max_retries):
response = httpx_client.put(
url, data=json_string, headers=signed_headers
)
if response.status_code in (500, 503) and attempt < max_retries - 1:
wait_time = 2**attempt # 1s, 2s
verbose_logger.warning(
f"S3 upload returned {response.status_code}, retrying in {wait_time}s "
f"(attempt {attempt + 1}/{max_retries}) "
f"key={batch_logging_element.s3_object_key}"
)
time.sleep(wait_time)
continue
response.raise_for_status()
break
except Exception as e:
verbose_logger.exception(f"Error uploading to s3: {str(e)}")
self.handle_callback_failure(callback_name="S3Logger")
Expand Down
203 changes: 203 additions & 0 deletions tests/test_litellm/integrations/test_s3_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,209 @@ def test_s3_v2_virtual_hosted_style(self, mock_periodic_flush, mock_create_task)

assert result == {"downloaded": "data"}

@pytest.mark.asyncio
async def test_async_upload_retries_on_s3_503():
"""
Test that async_upload_data_to_s3 retries on transient S3 503 Slow Down
and succeeds on the second attempt.
"""
from unittest.mock import AsyncMock, MagicMock

from litellm.types.integrations.s3_v2 import s3BatchLoggingElement

logger = S3Logger(
s3_bucket_name="test-bucket",
s3_aws_access_key_id="test-key",
s3_aws_secret_access_key="test-secret",
s3_region_name="us-east-1",
)

test_element = s3BatchLoggingElement(
s3_object_key="2025-09-14/test-retry.json",
payload={"test": "retry"},
s3_object_download_filename="test-retry.json",
)

# First call returns 503, second call returns 200
response_503 = MagicMock()
response_503.status_code = 503
response_200 = MagicMock()
response_200.status_code = 200
response_200.raise_for_status = MagicMock()

logger.async_httpx_client = AsyncMock()
logger.async_httpx_client.put = AsyncMock(side_effect=[response_503, response_200])

with patch("asyncio.sleep", new_callable=AsyncMock) as mock_sleep:
await logger.async_upload_data_to_s3(test_element)

# Verify PUT was called twice (retry after 503)
assert logger.async_httpx_client.put.call_count == 2
# Verify sleep was called with the backoff delay
mock_sleep.assert_called_once_with(1) # 2**0 = 1s


@pytest.mark.asyncio
async def test_async_upload_retries_on_s3_500():
"""
Test that async_upload_data_to_s3 retries on transient S3 500 errors.
"""
from unittest.mock import AsyncMock, MagicMock

from litellm.types.integrations.s3_v2 import s3BatchLoggingElement

logger = S3Logger(
s3_bucket_name="test-bucket",
s3_aws_access_key_id="test-key",
s3_aws_secret_access_key="test-secret",
s3_region_name="us-east-1",
)

test_element = s3BatchLoggingElement(
s3_object_key="2025-09-14/test-retry-500.json",
payload={"test": "retry-500"},
s3_object_download_filename="test-retry-500.json",
)

response_500 = MagicMock()
response_500.status_code = 500
response_200 = MagicMock()
response_200.status_code = 200
response_200.raise_for_status = MagicMock()

logger.async_httpx_client = AsyncMock()
logger.async_httpx_client.put = AsyncMock(side_effect=[response_500, response_200])

with patch("asyncio.sleep", new_callable=AsyncMock) as mock_sleep:
await logger.async_upload_data_to_s3(test_element)

assert logger.async_httpx_client.put.call_count == 2
mock_sleep.assert_called_once_with(1)


@pytest.mark.asyncio
async def test_async_upload_exhausts_retries_on_persistent_503():
"""
Test that async_upload_data_to_s3 raises after exhausting all retries
on persistent S3 503.
"""
from unittest.mock import AsyncMock, MagicMock

from litellm.types.integrations.s3_v2 import s3BatchLoggingElement

logger = S3Logger(
s3_bucket_name="test-bucket",
s3_aws_access_key_id="test-key",
s3_aws_secret_access_key="test-secret",
s3_region_name="us-east-1",
)

test_element = s3BatchLoggingElement(
s3_object_key="2025-09-14/test-exhaust.json",
payload={"test": "exhaust"},
s3_object_download_filename="test-exhaust.json",
)

# All 3 attempts return 503
response_503 = MagicMock()
response_503.status_code = 503
response_503.raise_for_status = MagicMock(
side_effect=Exception("503 Service Unavailable")
)

logger.async_httpx_client = AsyncMock()
logger.async_httpx_client.put = AsyncMock(return_value=response_503)

with patch("asyncio.sleep", new_callable=AsyncMock) as mock_sleep:
with patch.object(logger, "handle_callback_failure") as mock_failure:
await logger.async_upload_data_to_s3(test_element)

# 3 PUT attempts total
assert logger.async_httpx_client.put.call_count == 3
# 2 sleeps (between attempts 1-2 and 2-3)
assert mock_sleep.call_count == 2
# Callback failure handler called after exhausting retries
mock_failure.assert_called_once_with(callback_name="S3Logger")


@pytest.mark.asyncio
async def test_async_upload_no_retry_on_4xx():
"""
Test that async_upload_data_to_s3 does NOT retry on 4xx errors (client errors).
"""
from unittest.mock import AsyncMock, MagicMock

from litellm.types.integrations.s3_v2 import s3BatchLoggingElement

logger = S3Logger(
s3_bucket_name="test-bucket",
s3_aws_access_key_id="test-key",
s3_aws_secret_access_key="test-secret",
s3_region_name="us-east-1",
)

test_element = s3BatchLoggingElement(
s3_object_key="2025-09-14/test-no-retry.json",
payload={"test": "no-retry"},
s3_object_download_filename="test-no-retry.json",
)

response_403 = MagicMock()
response_403.status_code = 403
response_403.raise_for_status = MagicMock(side_effect=Exception("403 Forbidden"))

logger.async_httpx_client = AsyncMock()
logger.async_httpx_client.put = AsyncMock(return_value=response_403)

with patch.object(logger, "handle_callback_failure") as mock_failure:
await logger.async_upload_data_to_s3(test_element)

# Only 1 attempt — no retry for 4xx
assert logger.async_httpx_client.put.call_count == 1
mock_failure.assert_called_once_with(callback_name="S3Logger")


def test_sync_upload_retries_on_s3_503():
"""
Test that the sync upload_data_to_s3 retries on transient S3 503.
"""
from unittest.mock import MagicMock

from litellm.types.integrations.s3_v2 import s3BatchLoggingElement

logger = S3Logger(
s3_bucket_name="test-bucket",
s3_aws_access_key_id="test-key",
s3_aws_secret_access_key="test-secret",
s3_region_name="us-east-1",
)

test_element = s3BatchLoggingElement(
s3_object_key="2025-09-14/test-sync-retry.json",
payload={"test": "sync-retry"},
s3_object_download_filename="test-sync-retry.json",
)

response_503 = MagicMock()
response_503.status_code = 503
response_200 = MagicMock()
response_200.status_code = 200
response_200.raise_for_status = MagicMock()

mock_sync_client = MagicMock()
mock_sync_client.put = MagicMock(side_effect=[response_503, response_200])

with patch(
"litellm.integrations.s3_v2._get_httpx_client",
return_value=mock_sync_client,
):
with patch("time.sleep") as mock_sleep:
logger.upload_data_to_s3(test_element)

assert mock_sync_client.put.call_count == 2
mock_sleep.assert_called_once_with(1)


@pytest.mark.asyncio
async def test_async_log_event_skips_when_standard_logging_object_missing():
"""
Expand Down
Loading