feat(transport): add Lambda Lang content-type negotiation#71
Conversation
Add a self-contained Lambda Lang codec that provides semantic compression for ASAP JSON-RPC payloads by substituting common keys/values with short Lambda Lang atoms. The codec is fully reversible with 100% fidelity. Content-Type negotiation: - Server: checks Accept header for application/vnd.asap+lambda and encodes response accordingly, with graceful fallback to JSON - Client: opt-in via lambda_codec_enabled=False parameter, sends Accept header and decodes Lambda responses automatically New files: - src/asap/transport/codecs/lambda_codec.py — encoder/decoder - src/asap/transport/codecs/__init__.py — public API - tests/transport/unit/test_lambda_codec.py — 27 unit tests - tests/transport/integration/test_lambda_negotiation.py — 7 integration tests No changes to existing behavior when Lambda is not negotiated. No external dependencies required. Closes adriannoes#52
|
@voidborne-d is attempting to deploy a commit to the adrianno's projects Team on Vercel. A member of the Team first needs to authorize it. |
…onse union FastAPI cannot use union return types (JSONResponse | Response) as route handler annotations — it raises FastAPIError during app import. Use Response (the parent class) which accepts both.
… compatibility FastAPI cannot create a response model from starlette.responses.Response. Adding response_model=None disables automatic response model generation, fixing the CI failure.
adriannoes
left a comment
There was a problem hiding this comment.
This PR perfectly executes the architectural vision outlined in Issue #52, keeping the Lambda Lang encoding cleanly segregated at the transport layer via Content-Type negotiation.
However, before it can be merged into production, the serialization logic needs to be heavily optimized. The current implementation relies on the standard json module and sequential string replacements, which act as a CPU-bound bottleneck that will block the asyncio event loop.
| for token, atom in _ENCODE_MAP.items(): | ||
| json_str = json_str.replace(token, atom) |
There was a problem hiding this comment.
This loop iterating over 35 .replace() operations on a potentially massive JSON string is highly CPU-bound. Because FastAPI operates on an asyncio event loop, executing this directly will block the event loop, stalling all concurrent agent requests (DoS risk).
Suggestion: Optimize this to a single pass using Python's built-in re module. Because re is implemented in C under the hood, running one regex substitution is drastically faster than executing a Python for loop and avoids creating 35 intermediate strings in memory:
```python
import re
# Pre-compile globally
_ENCODE_PATTERN = re.compile("|".join(map(re.escape, _ENCODE_MAP.keys())))
def _encode_match(m: re.Match) -> str:
return _ENCODE_MAP[m.group(0)]
def encode(json_str: str) -> str:
# Assuming you accepted the suggestion to take json_str directly
encoded_str = _ENCODE_PATTERN.sub(_encode_match, json_str)
return _VERSION_PREFIX + encoded_str
```
Even with this optimization, for very large payloads, this should ideally be called via await asyncio.to_thread(lambda_codec.encode, ...) from the routing layers to ensure the event loop is never blocked.
| json_str = encoded[len(_VERSION_PREFIX) :] | ||
|
|
||
| # Reverse substitutions | ||
| for atom, token in _DECODE_MAP.items(): |
There was a problem hiding this comment.
The same event loop blocking risk applies here during decoding. Please apply the parallel regex optimization here as well:
```python
_DECODE_PATTERN = re.compile("|".join(map(re.escape, _DECODE_MAP.keys())))
def _decode_match(m: re.Match) -> str:
return _DECODE_MAP[m.group(0)]
def decode(encoded_str: str) -> str:
# Check version prefix...
json_str = encoded_str[len(_VERSION_PREFIX):]
return _DECODE_PATTERN.sub(_decode_match, json_str)
```
Problem: Taking the raw string and executing an O(N*K) string replacement loop in an async context.
Rationale: FastAPI operates on an asyncio event loop. Performing extremely heavy CPU-bound string manipulation on potentially massive JSON payloads (e.g., large LLM context windows) will block the event loop entirely. This stalls all concurrent agent requests and introduces a severe DoS vector.
| return True | ||
|
|
||
|
|
||
| def encode(data: dict[str, Any]) -> str: |
There was a problem hiding this comment.
Accepting a dict here forces us to call json.dumps() in Python, which is much slower than utilizing Pydantic's native Rust core for serialization.
Suggestion: Change the signature to accept a pre-serialized JSON string. We can rely on the upstream routing layer to dump the model efficiently and pass it here:
```python
def encode(json_str: str) -> str:
# Drop the local json.dumps() import and execution.
# (Apply the regex optimization on `json_str` directly)
```
src/asap/transport/server.py
Outdated
|
|
||
| if accept_lambda and lambda_codec.is_available(): | ||
| try: | ||
| encoded_body = lambda_codec.encode(rpc_response.model_dump()) |
There was a problem hiding this comment.
Running model_dump() creates a massive intermediate Python dictionary just to pass it to the codec for serializing again. This completely bypasses Pydantic v2's Rust-based performance architecture.
My suggestion is to serialize directly to JSON using Pydantic's core and pass the string to the updated codec:
```python
# Leverage Pydantic's native Rust execution for near-native speeds
encoded_body = lambda_codec.encode(rpc_response.model_dump_json(by_alias=True))
```
Problem: The codec accepts a dict and uses standard json.dumps(). In server.py, rpc_response.model_dump() creates an intermediate dict just to be re-serialized.
Rationale:
- Converting Pydantic models to
dictand then to JSON in Python adds noticeable layer overhead for large payloads. - We can keep the codebase lean (without adding external
orjsondependencies) by ensuring the custom codec plugs into the pre-serialized output of Pydantic's highly-optimized Rust core (model_dump_json()).
| content=encoded_body, | ||
| media_type=LAMBDA_CONTENT_TYPE, | ||
| ) | ||
| except Exception as e: |
There was a problem hiding this comment.
Problem: Catching a generic Exception and logging a warning using str(e), completely discarding the stack trace.
Rationale: We heavily depend on observability. Without a stack trace, debugging production encoding failures becomes nearly impossible.
Fix Suggestion:
```python
except Exception as e:
logger.warning(
"asap.server.lambda_encode_failed",
error=str(e),
error_type=type(e).__name__,
exc_info=True, # MUST log the stack trace
)
```
| if LAMBDA_CONTENT_TYPE in response_content_type: | ||
| json_response = lambda_codec.decode(response.text) | ||
| else: | ||
| json_response = response.json() |
There was a problem hiding this comment.
This decode call runs string replacement loops directly on response.text. For large payloads, this will block the client's asyncio event loop while waiting for CPU-bound parsing.
Suggestion: Offload the decoding to a thread to keep the client's async event loop free:
```python
import asyncio
if LAMBDA_CONTENT_TYPE in response_content_type:
# Offload CPU-bound decoding to unblock the main event loop
json_response = await asyncio.to_thread(lambda_codec.decode, response.text)
else:
json_response = response.json()
```
|
Thanks for the PR, @voidborne-d! The implementation is excellent and perfectly aligns with the Content-Type negotiation approach we discussed in #52. I've left a few inline comments regarding serialization performance and asyncio event loop blocking and I've provided ready-to-use code snippets in the inline comments to make this easy. Once those are addressed, this is good to merge! |
Address all review comments from @adriannoes: 1. **Single-pass regex substitution** (codec encode/decode) - Replace O(N*K) loop of .replace() calls with pre-compiled re.compile + .sub() for C-level single-pass performance - Eliminates 35 intermediate string allocations 2. **String-based API** (codec signature change) - encode() now accepts a pre-serialized JSON string instead of dict - decode() now returns a JSON string instead of dict - Avoids redundant json.dumps() inside the codec; callers use model_dump_json(by_alias=True) for Pydantic Rust-core speed 3. **asyncio.to_thread for client decode** (event loop safety) - Client-side decode offloaded via asyncio.to_thread() to prevent blocking the event loop on large payloads 4. **model_dump_json(by_alias=True) in server** (no intermediate dict) - Server encodes directly from Pydantic's Rust serializer output - Eliminates model_dump() → json.dumps() double-serialization 5. **exc_info=True on encode failure** (observability) - Server-side Lambda encode failures now log full stack trace 6. **Updated tests** for new string-based API - Unit tests use _roundtrip() helper (json.dumps → encode → decode → json.loads) - Integration tests parse decoded JSON strings explicitly - Added test_encode_returns_string and test_decode_returns_string
|
Thanks for the thorough review @adriannoes! All 6 suggestions addressed in the latest push:
Used your code snippets directly where provided. Ready for re-review! |
Summary
Adds Lambda Lang content-type negotiation for ASAP envelopes, enabling semantic compression of JSON-RPC payloads.
Closes #52
What
Lambda Codec (
src/asap/transport/codecs/lambda_codec.py)decode(encode(data)) == dataapplication/vnd.asap+lambdaλ1:) for forward compatibilityServer-side negotiation (
server.py)Accept: application/vnd.asap+lambdaheaderClient-side negotiation (
client.py)lambda_codec_enabledparameter (default:False)Accept: application/vnd.asap+lambdaheaderTests
tests/transport/unit/test_lambda_codec.py): round-trips, atom mappings, edge cases, error handlingtests/transport/integration/test_lambda_negotiation.py): content-type negotiation, fallback, mixed scenariospytest,ruff check,ruff format,mypy— all greenDesign Decisions
compression.pyasap.observability.get_logger()for structured logging§…§) chosen because they cannot appear in valid JSON keys*/*) do NOT trigger Lambda encodingenvelope.pyor any existing behavior