-
Notifications
You must be signed in to change notification settings - Fork 16
[fix] Add back TQ_ZERO_COPY_SERIALIZATION switch
#19
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,6 +15,7 @@ | |
|
|
||
| import logging | ||
| import os | ||
| import pickle | ||
| import socket | ||
| import time | ||
| from dataclasses import dataclass | ||
|
|
@@ -28,6 +29,7 @@ | |
| from transfer_queue.utils.utils import ( | ||
| ExplicitEnum, | ||
| TransferQueueRole, | ||
| get_env_bool, | ||
| ) | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
@@ -42,6 +44,8 @@ | |
|
|
||
| bytestr: TypeAlias = bytes | bytearray | memoryview | ||
|
|
||
| TQ_ZERO_COPY_SERIALIZATION = get_env_bool("TQ_ZERO_COPY_SERIALIZATION", default=False) | ||
|
|
||
|
|
||
| class ZMQRequestType(ExplicitEnum): | ||
| """ | ||
|
|
@@ -155,36 +159,42 @@ def create( | |
|
|
||
| def serialize(self) -> list: | ||
| """ | ||
| Serialize message using unified MsgpackEncoder. | ||
| Returns: list[bytestr] - [msgpack_header, *tensor_buffers] | ||
| Serialize message using unified MsgpackEncoder or pickle. | ||
| Returns: list[bytestr] - [msgpack_header, *tensor_buffers] or [bytes] | ||
| """ | ||
| msg_dict = { | ||
| "request_type": self.request_type.value, # Enum -> str for msgpack | ||
| "sender_id": self.sender_id, | ||
| "receiver_id": self.receiver_id, | ||
| "request_id": self.request_id, | ||
| "timestamp": self.timestamp, | ||
| "body": self.body, | ||
| } | ||
| return list(_encoder.encode(msg_dict)) | ||
| if TQ_ZERO_COPY_SERIALIZATION: | ||
| msg_dict = { | ||
| "request_type": self.request_type.value, # Enum -> str for msgpack | ||
| "sender_id": self.sender_id, | ||
| "receiver_id": self.receiver_id, | ||
| "request_id": self.request_id, | ||
| "timestamp": self.timestamp, | ||
| "body": self.body, | ||
| } | ||
| return list(_encoder.encode(msg_dict)) | ||
| else: | ||
| return [pickle.dumps(self)] | ||
|
|
||
| @classmethod | ||
| def deserialize(cls, frames: list) -> "ZMQMessage": | ||
| """ | ||
| Deserialize message using unified MsgpackDecoder. | ||
| Deserialize message using unified MsgpackDecoder or pickle. | ||
| """ | ||
| if not frames: | ||
| raise ValueError("Empty frames received") | ||
|
|
||
| msg_dict = _decoder.decode(frames) | ||
| return cls( | ||
| request_type=ZMQRequestType(msg_dict["request_type"]), | ||
| sender_id=msg_dict["sender_id"], | ||
| receiver_id=msg_dict["receiver_id"], | ||
| body=msg_dict["body"], | ||
| request_id=msg_dict["request_id"], | ||
| timestamp=msg_dict["timestamp"], | ||
| ) | ||
| if TQ_ZERO_COPY_SERIALIZATION: | ||
| msg_dict = _decoder.decode(frames) | ||
| return cls( | ||
| request_type=ZMQRequestType(msg_dict["request_type"]), | ||
| sender_id=msg_dict["sender_id"], | ||
| receiver_id=msg_dict["receiver_id"], | ||
| body=msg_dict["body"], | ||
| request_id=msg_dict["request_id"], | ||
| timestamp=msg_dict["timestamp"], | ||
| ) | ||
| else: | ||
| return pickle.loads(frames[0]) | ||
|
Comment on lines
156
to
+197
|
||
|
|
||
|
|
||
| def get_free_port() -> str: | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The PR reintroduces the
TQ_ZERO_COPY_SERIALIZATIONenvironment variable switch with a default value ofFalse, switching the serialization from msgpack to pickle. However, there are no tests in this PR to verify that the pickle serialization path works correctly.Given that the PR description mentions "minor bugs in current implementation" and that this is a fallback mechanism, it's critical to have test coverage for:
TQ_ZERO_COPY_SERIALIZATION=False)_filter_storage_datawhen using pickle modeThe existing tests in
test_serial_utils_on_cpu.pytestZMQMessageserialization, but they don't appear to test withTQ_ZERO_COPY_SERIALIZATION=False. Consider adding test cases that explicitly set this environment variable to ensure both code paths are tested.