[fix] Add back TQ_ZERO_COPY_SERIALIZATION switch#19
Conversation
Signed-off-by: hzy_19 <o0shower0o@outlook.com>
TQ_ZERO_COPY_SERIALIZATION switch
There was a problem hiding this comment.
Pull request overview
This PR reintroduces the TQ_ZERO_COPY_SERIALIZATION environment variable switch that was deprecated in previous PRs (#7 and #19). The switch is set to default to False, which falls back to using pickle for serialization instead of the msgpack-based zero-copy transfer mechanism, due to minor bugs discovered in the current zero-copy implementation.
Changes:
- Added back
TQ_ZERO_COPY_SERIALIZATIONenvironment variable switch with default valueFalse - Modified serialization/deserialization in
ZMQMessageto conditionally use pickle or msgpack based on the switch - Added tensor cloning logic in
_filter_storage_datafor pickle mode to prevent pickling whole tensors
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
| transfer_queue/utils/zmq_utils.py | Added pickle import, TQ_ZERO_COPY_SERIALIZATION switch, and conditional serialization logic in serialize() and deserialize() methods |
| transfer_queue/storage/managers/simple_backend_manager.py | Added TQ_ZERO_COPY_SERIALIZATION import and switch, added tensor cloning for pickle mode in _filter_storage_data, added type ignore comments |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| def serialize(self) -> list: | ||
| """ | ||
| Serialize message using unified MsgpackEncoder. | ||
| Returns: list[bytestr] - [msgpack_header, *tensor_buffers] | ||
| Serialize message using unified MsgpackEncoder or pickle. | ||
| Returns: list[bytestr] - [msgpack_header, *tensor_buffers] or [bytes] | ||
| """ | ||
| msg_dict = { | ||
| "request_type": self.request_type.value, # Enum -> str for msgpack | ||
| "sender_id": self.sender_id, | ||
| "receiver_id": self.receiver_id, | ||
| "request_id": self.request_id, | ||
| "timestamp": self.timestamp, | ||
| "body": self.body, | ||
| } | ||
| return list(_encoder.encode(msg_dict)) | ||
| if TQ_ZERO_COPY_SERIALIZATION: | ||
| msg_dict = { | ||
| "request_type": self.request_type.value, # Enum -> str for msgpack | ||
| "sender_id": self.sender_id, | ||
| "receiver_id": self.receiver_id, | ||
| "request_id": self.request_id, | ||
| "timestamp": self.timestamp, | ||
| "body": self.body, | ||
| } | ||
| return list(_encoder.encode(msg_dict)) | ||
| else: | ||
| return [pickle.dumps(self)] | ||
|
|
||
| @classmethod | ||
| def deserialize(cls, frames: list) -> "ZMQMessage": | ||
| """ | ||
| Deserialize message using unified MsgpackDecoder. | ||
| Deserialize message using unified MsgpackDecoder or pickle. | ||
| """ | ||
| if not frames: | ||
| raise ValueError("Empty frames received") | ||
|
|
||
| msg_dict = _decoder.decode(frames) | ||
| return cls( | ||
| request_type=ZMQRequestType(msg_dict["request_type"]), | ||
| sender_id=msg_dict["sender_id"], | ||
| receiver_id=msg_dict["receiver_id"], | ||
| body=msg_dict["body"], | ||
| request_id=msg_dict["request_id"], | ||
| timestamp=msg_dict["timestamp"], | ||
| ) | ||
| if TQ_ZERO_COPY_SERIALIZATION: | ||
| msg_dict = _decoder.decode(frames) | ||
| return cls( | ||
| request_type=ZMQRequestType(msg_dict["request_type"]), | ||
| sender_id=msg_dict["sender_id"], | ||
| receiver_id=msg_dict["receiver_id"], | ||
| body=msg_dict["body"], | ||
| request_id=msg_dict["request_id"], | ||
| timestamp=msg_dict["timestamp"], | ||
| ) | ||
| else: | ||
| return pickle.loads(frames[0]) |
There was a problem hiding this comment.
The serialization and deserialization paths now depend on a module-level constant TQ_ZERO_COPY_SERIALIZATION that is evaluated at import time. This creates a potential issue if the environment variable is changed after the modules are loaded, or if different processes have different values for this variable.
If one process serializes with TQ_ZERO_COPY_SERIALIZATION=True and another deserializes with TQ_ZERO_COPY_SERIALIZATION=False (or vice versa), the deserialization will fail. Consider one of the following approaches:
-
Include a serialization format marker in the serialized data (e.g., a magic byte or header) so that the deserializer can automatically detect which format was used.
-
Ensure processes are always started with consistent environment variable settings and document this requirement clearly.
-
Make both code paths compatible by attempting to detect the format during deserialization (e.g., try msgpack first, fall back to pickle on error).
| if TQ_ZERO_COPY_SERIALIZATION: | ||
| msg_dict = { | ||
| "request_type": self.request_type.value, # Enum -> str for msgpack | ||
| "sender_id": self.sender_id, | ||
| "receiver_id": self.receiver_id, | ||
| "request_id": self.request_id, | ||
| "timestamp": self.timestamp, | ||
| "body": self.body, | ||
| } | ||
| return list(_encoder.encode(msg_dict)) | ||
| else: | ||
| return [pickle.dumps(self)] |
There was a problem hiding this comment.
The PR reintroduces the TQ_ZERO_COPY_SERIALIZATION environment variable switch with a default value of False, switching the serialization from msgpack to pickle. However, there are no tests in this PR to verify that the pickle serialization path works correctly.
Given that the PR description mentions "minor bugs in current implementation" and that this is a fallback mechanism, it's critical to have test coverage for:
- Serialization and deserialization using pickle (when
TQ_ZERO_COPY_SERIALIZATION=False) - The tensor cloning behavior in
_filter_storage_datawhen using pickle mode - End-to-end scenarios with pickle serialization
The existing tests in test_serial_utils_on_cpu.py test ZMQMessage serialization, but they don't appear to test with TQ_ZERO_COPY_SERIALIZATION=False. Consider adding test cases that explicitly set this environment variable to ensure both code paths are tested.
Background
In previous PRs (https://gitcode.com/Ascend/TransferQueue/pull/7、https://gitcode.com/Ascend/TransferQueue/pull/19) the zero-copy transfer is refactored, and the environmental variable switch of
TQ_ZERO_COPY_SERIALIZATIONis deprecated.However, during recent tests, we observe some minor bugs in current implementation.
This PR add back the
TQ_ZERO_COPY_SERIALIZATIONswitch and set default value toFalse, which falls back topickleto perform serialization.