Skip to content

[fix] Add back TQ_ZERO_COPY_SERIALIZATION switch#19

Merged
0oshowero0 merged 2 commits intoAscend:mainfrom
0oshowero0:fix_serial
Jan 28, 2026
Merged

[fix] Add back TQ_ZERO_COPY_SERIALIZATION switch#19
0oshowero0 merged 2 commits intoAscend:mainfrom
0oshowero0:fix_serial

Conversation

@0oshowero0
Copy link
Copy Markdown
Collaborator

Background

In previous PRs (https://gitcode.com/Ascend/TransferQueue/pull/7、https://gitcode.com/Ascend/TransferQueue/pull/19) the zero-copy transfer is refactored, and the environmental variable switch of TQ_ZERO_COPY_SERIALIZATION is deprecated.

However, during recent tests, we observe some minor bugs in current implementation.

This PR add back the TQ_ZERO_COPY_SERIALIZATION switch and set default value to False, which falls back to pickle to perform serialization.

Signed-off-by: hzy_19 <o0shower0o@outlook.com>
Copilot AI review requested due to automatic review settings January 28, 2026 12:36
@0oshowero0 0oshowero0 changed the title [fix] Add back TQ_ZERO_COPY_SERIALIZATION switch [fix] Add back TQ_ZERO_COPY_SERIALIZATION switch Jan 28, 2026
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR reintroduces the TQ_ZERO_COPY_SERIALIZATION environment variable switch that was deprecated in previous PRs (#7 and #19). The switch is set to default to False, which falls back to using pickle for serialization instead of the msgpack-based zero-copy transfer mechanism, due to minor bugs discovered in the current zero-copy implementation.

Changes:

  • Added back TQ_ZERO_COPY_SERIALIZATION environment variable switch with default value False
  • Modified serialization/deserialization in ZMQMessage to conditionally use pickle or msgpack based on the switch
  • Added tensor cloning logic in _filter_storage_data for pickle mode to prevent pickling whole tensors

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.

File Description
transfer_queue/utils/zmq_utils.py Added pickle import, TQ_ZERO_COPY_SERIALIZATION switch, and conditional serialization logic in serialize() and deserialize() methods
transfer_queue/storage/managers/simple_backend_manager.py Added TQ_ZERO_COPY_SERIALIZATION import and switch, added tensor cloning for pickle mode in _filter_storage_data, added type ignore comments

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 156 to +197
def serialize(self) -> list:
"""
Serialize message using unified MsgpackEncoder.
Returns: list[bytestr] - [msgpack_header, *tensor_buffers]
Serialize message using unified MsgpackEncoder or pickle.
Returns: list[bytestr] - [msgpack_header, *tensor_buffers] or [bytes]
"""
msg_dict = {
"request_type": self.request_type.value, # Enum -> str for msgpack
"sender_id": self.sender_id,
"receiver_id": self.receiver_id,
"request_id": self.request_id,
"timestamp": self.timestamp,
"body": self.body,
}
return list(_encoder.encode(msg_dict))
if TQ_ZERO_COPY_SERIALIZATION:
msg_dict = {
"request_type": self.request_type.value, # Enum -> str for msgpack
"sender_id": self.sender_id,
"receiver_id": self.receiver_id,
"request_id": self.request_id,
"timestamp": self.timestamp,
"body": self.body,
}
return list(_encoder.encode(msg_dict))
else:
return [pickle.dumps(self)]

@classmethod
def deserialize(cls, frames: list) -> "ZMQMessage":
"""
Deserialize message using unified MsgpackDecoder.
Deserialize message using unified MsgpackDecoder or pickle.
"""
if not frames:
raise ValueError("Empty frames received")

msg_dict = _decoder.decode(frames)
return cls(
request_type=ZMQRequestType(msg_dict["request_type"]),
sender_id=msg_dict["sender_id"],
receiver_id=msg_dict["receiver_id"],
body=msg_dict["body"],
request_id=msg_dict["request_id"],
timestamp=msg_dict["timestamp"],
)
if TQ_ZERO_COPY_SERIALIZATION:
msg_dict = _decoder.decode(frames)
return cls(
request_type=ZMQRequestType(msg_dict["request_type"]),
sender_id=msg_dict["sender_id"],
receiver_id=msg_dict["receiver_id"],
body=msg_dict["body"],
request_id=msg_dict["request_id"],
timestamp=msg_dict["timestamp"],
)
else:
return pickle.loads(frames[0])
Copy link

Copilot AI Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The serialization and deserialization paths now depend on a module-level constant TQ_ZERO_COPY_SERIALIZATION that is evaluated at import time. This creates a potential issue if the environment variable is changed after the modules are loaded, or if different processes have different values for this variable.

If one process serializes with TQ_ZERO_COPY_SERIALIZATION=True and another deserializes with TQ_ZERO_COPY_SERIALIZATION=False (or vice versa), the deserialization will fail. Consider one of the following approaches:

  1. Include a serialization format marker in the serialized data (e.g., a magic byte or header) so that the deserializer can automatically detect which format was used.

  2. Ensure processes are always started with consistent environment variable settings and document this requirement clearly.

  3. Make both code paths compatible by attempting to detect the format during deserialization (e.g., try msgpack first, fall back to pickle on error).

Copilot uses AI. Check for mistakes.
Comment on lines +165 to +176
if TQ_ZERO_COPY_SERIALIZATION:
msg_dict = {
"request_type": self.request_type.value, # Enum -> str for msgpack
"sender_id": self.sender_id,
"receiver_id": self.receiver_id,
"request_id": self.request_id,
"timestamp": self.timestamp,
"body": self.body,
}
return list(_encoder.encode(msg_dict))
else:
return [pickle.dumps(self)]
Copy link

Copilot AI Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR reintroduces the TQ_ZERO_COPY_SERIALIZATION environment variable switch with a default value of False, switching the serialization from msgpack to pickle. However, there are no tests in this PR to verify that the pickle serialization path works correctly.

Given that the PR description mentions "minor bugs in current implementation" and that this is a fallback mechanism, it's critical to have test coverage for:

  1. Serialization and deserialization using pickle (when TQ_ZERO_COPY_SERIALIZATION=False)
  2. The tensor cloning behavior in _filter_storage_data when using pickle mode
  3. End-to-end scenarios with pickle serialization

The existing tests in test_serial_utils_on_cpu.py test ZMQMessage serialization, but they don't appear to test with TQ_ZERO_COPY_SERIALIZATION=False. Consider adding test cases that explicitly set this environment variable to ensure both code paths are tested.

Copilot uses AI. Check for mistakes.
Signed-off-by: hzy_19 <o0shower0o@outlook.com>
@0oshowero0 0oshowero0 merged commit 90a3cfd into Ascend:main Jan 28, 2026
4 checks passed
@0oshowero0 0oshowero0 deleted the fix_serial branch April 2, 2026 07:23
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants