Implement direct server-to-server communication#331
Conversation
4b94939 to
9aebf0a
Compare
9aebf0a to
234bafb
Compare
src/petals/server/handler.py
Outdated
| module_backends: Dict[str, TransformerBackend], | ||
| *, | ||
| push_manager: SyncManager, | ||
| session_pipes: Dict[str, Tuple[PipeConnection, threading.Lock]], |
There was a problem hiding this comment.
rpc_push() may be received by a connection handler different from the one holding the inference session, so we use some multiprocess communication here.
src/petals/server/handler.py
Outdated
| assert isinstance(block_uid, str) and isinstance(metadata, dict) | ||
| return block_uid, inputs, metadata | ||
|
|
||
| async def rpc_push(self, request: runtime_pb2.ExpertRequest, context: P2PContext) -> runtime_pb2.ExpertResponse: |
There was a problem hiding this comment.
TODO: This can be stream-to-unary handler, so that (a) the previous server doesn't have to make a new connection each time and (b) we don't have to parse metadata at this stage each time (now it's done to find session_id). I'm not sure if it affects performance much though, so I'd postpone that to a later PR.
| span = None | ||
| try: | ||
| if not self._chosen_spans or not self._server_sessions or attempt_no >= 1: | ||
| # If there is a failed server session, this code closes it |
There was a problem hiding this comment.
This code was moved to a separate method InferenceSession._update_sequence() to simplify this method.
73a0563 to
49bbce4
Compare
0e5ce37 to
2445aa5
Compare
2445aa5 to
cef5662
Compare
7d73698 to
429146e
Compare
429146e to
9a23c1e
Compare
bccdc21 to
845c172
Compare
845c172 to
0635968
Compare
c5dfba2 to
a637fd2
Compare
a637fd2 to
7e204d0
Compare
9427cae to
5b3f180
Compare
| inputs = self.history # Pass full inputs including prefix | ||
| else: | ||
| inputs = inputs[:, -n_input_tokens:] # No need to pass prefix further | ||
|
|
There was a problem hiding this comment.
Refactored: This code was moved from InferenceSession.step() to _ServerInferenceSession.step(), since it's actually about one server only. The overall structure is more clear this way.
Implement #226.
TODO:
manager.Queue())next_servers