diff --git a/.github/workflows/downstream.yml b/.github/workflows/downstream.yml index aaa6b1913..63501c6a8 100644 --- a/.github/workflows/downstream.yml +++ b/.github/workflows/downstream.yml @@ -90,7 +90,6 @@ jobs: qtconsole: runs-on: ubuntu-latest - if: false timeout-minutes: 20 steps: - name: Checkout diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index aceaf3eb9..b815b934b 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -205,6 +205,9 @@ def _default_ident(self): # see https://github.com/jupyterlab/jupyterlab/issues/17785 _parent_ident: Mapping[str, bytes] + # Asyncio lock for main shell thread. + _main_asyncio_lock: asyncio.Lock + @property def _parent_header(self): warnings.warn( @@ -327,6 +330,8 @@ def __init__(self, **kwargs): } ) + self._main_asyncio_lock = asyncio.Lock() + async def dispatch_control(self, msg): """Dispatch a control request, ensuring only one message is processed at a time.""" # Ensure only one control message is processed at a time @@ -539,7 +544,7 @@ async def do_one_iteration(self): This is now a coroutine """ # flush messages off of shell stream into the message queue - if self.shell_stream: + if self.shell_stream and not self._supports_kernel_subshells: self.shell_stream.flush() # process at most one shell message per iteration await self.process_one(wait=False) @@ -649,56 +654,51 @@ async def shell_channel_thread_main(self, msg): """Handler for shell messages received on shell_channel_thread""" assert threading.current_thread() == self.shell_channel_thread - if self.session is None: - return - - # deserialize only the header to get subshell_id - # Keep original message to send to subshell_id unmodified. - _, msg2 = self.session.feed_identities(msg, copy=False) - try: - msg3 = self.session.deserialize(msg2, content=False, copy=False) - subshell_id = msg3["header"].get("subshell_id") - - # Find inproc pair socket to use to send message to correct subshell. - subshell_manager = self.shell_channel_thread.manager - socket = subshell_manager.get_shell_channel_to_subshell_socket(subshell_id) - assert socket is not None - socket.send_multipart(msg, copy=False) - except Exception: - self.log.error("Invalid message", exc_info=True) # noqa: G201 + async with self.shell_channel_thread.asyncio_lock: + if self.session is None: + return - if self.shell_stream: - self.shell_stream.flush() + # deserialize only the header to get subshell_id + # Keep original message to send to subshell_id unmodified. + _, msg2 = self.session.feed_identities(msg, copy=False) + try: + msg3 = self.session.deserialize(msg2, content=False, copy=False) + subshell_id = msg3["header"].get("subshell_id") + + # Find inproc pair socket to use to send message to correct subshell. + subshell_manager = self.shell_channel_thread.manager + socket = subshell_manager.get_shell_channel_to_subshell_socket(subshell_id) + assert socket is not None + socket.send_multipart(msg, copy=False) + except Exception: + self.log.error("Invalid message", exc_info=True) # noqa: G201 async def shell_main(self, subshell_id: str | None, msg): """Handler of shell messages for a single subshell""" if self._supports_kernel_subshells: if subshell_id is None: assert threading.current_thread() == threading.main_thread() + asyncio_lock = self._main_asyncio_lock else: assert threading.current_thread() not in ( self.shell_channel_thread, threading.main_thread(), ) - socket_pair = self.shell_channel_thread.manager.get_shell_channel_to_subshell_pair( - subshell_id - ) + asyncio_lock = self.shell_channel_thread.manager.get_subshell_asyncio_lock( + subshell_id + ) else: assert subshell_id is None assert threading.current_thread() == threading.main_thread() - socket_pair = None - - try: - # Whilst executing a shell message, do not accept any other shell messages on the - # same subshell, so that cells are run sequentially. Without this we can run multiple - # async cells at the same time which would be a nice feature to have but is an API - # change. - if socket_pair: - socket_pair.pause_on_recv() + asyncio_lock = self._main_asyncio_lock + + # Whilst executing a shell message, do not accept any other shell messages on the + # same subshell, so that cells are run sequentially. Without this we can run multiple + # async cells at the same time which would be a nice feature to have but is an API + # change. + assert asyncio_lock is not None + async with asyncio_lock: await self.dispatch_shell(msg, subshell_id=subshell_id) - finally: - if socket_pair: - socket_pair.resume_on_recv() def record_ports(self, ports): """Record the ports that this kernel is using. @@ -739,7 +739,7 @@ def _publish_status(self, status, channel, parent=None): def _publish_status_and_flush(self, status, channel, stream, parent=None): """send status on IOPub and flush specified stream to ensure reply is sent before handling the next reply""" self._publish_status(status, channel, parent) - if stream and hasattr(stream, "flush"): + if stream and hasattr(stream, "flush") and not self._supports_kernel_subshells: stream.flush(zmq.POLLOUT) def _publish_debug_event(self, event): @@ -1382,7 +1382,7 @@ def _abort_queues(self, subshell_id: str | None = None): # flush streams, so all currently waiting messages # are added to the queue - if self.shell_stream: + if self.shell_stream and not self._supports_kernel_subshells: self.shell_stream.flush() # Callback to signal that we are done aborting diff --git a/ipykernel/shellchannel.py b/ipykernel/shellchannel.py index 12102e870..32f5f7399 100644 --- a/ipykernel/shellchannel.py +++ b/ipykernel/shellchannel.py @@ -2,6 +2,7 @@ from __future__ import annotations +import asyncio from typing import Any import zmq @@ -28,6 +29,8 @@ def __init__( self._context = context self._shell_socket = shell_socket + self.asyncio_lock = asyncio.Lock() + @property def manager(self) -> SubshellManager: # Lazy initialisation. diff --git a/ipykernel/socket_pair.py b/ipykernel/socket_pair.py index e2669b8c0..f31dd3b92 100644 --- a/ipykernel/socket_pair.py +++ b/ipykernel/socket_pair.py @@ -21,8 +21,6 @@ class SocketPair: from_socket: zmq.Socket[Any] to_socket: zmq.Socket[Any] to_stream: ZMQStream | None = None - on_recv_callback: Any - on_recv_copy: bool def __init__(self, context: zmq.Context[Any], name: str): """Initialize the inproc socker pair.""" @@ -43,21 +41,9 @@ def close(self): def on_recv(self, io_loop: IOLoop, on_recv_callback, copy: bool = False): """Set the callback used when a message is received on the to stream.""" # io_loop is that of the 'to' thread. - self.on_recv_callback = on_recv_callback - self.on_recv_copy = copy if self.to_stream is None: self.to_stream = ZMQStream(self.to_socket, io_loop) - self.resume_on_recv() - - def pause_on_recv(self): - """Pause receiving on the to stream.""" - if self.to_stream is not None: - self.to_stream.stop_on_recv() - - def resume_on_recv(self): - """Resume receiving on the to stream.""" - if self.to_stream is not None and not self.to_stream.closed(): - self.to_stream.on_recv(self.on_recv_callback, copy=self.on_recv_copy) + self.to_stream.on_recv(on_recv_callback, copy=copy) def _address(self, name) -> str: """Return the address used for this inproc socket pair.""" diff --git a/ipykernel/subshell.py b/ipykernel/subshell.py index 911a9521c..ec1c7cc88 100644 --- a/ipykernel/subshell.py +++ b/ipykernel/subshell.py @@ -1,5 +1,6 @@ """A thread for a subshell.""" +import asyncio from typing import Any import zmq @@ -29,6 +30,8 @@ def __init__( # When aborting flag is set, execute_request messages to this subshell will be aborted. self.aborting = False + self.asyncio_lock = asyncio.Lock() + def run(self) -> None: """Run the thread.""" try: diff --git a/ipykernel/subshell_manager.py b/ipykernel/subshell_manager.py index 24d683523..7f65183dd 100644 --- a/ipykernel/subshell_manager.py +++ b/ipykernel/subshell_manager.py @@ -2,6 +2,7 @@ from __future__ import annotations +import asyncio import json import typing as t import uuid @@ -46,8 +47,7 @@ def __init__( self._shell_channel_io_loop = shell_channel_io_loop self._shell_socket = shell_socket self._cache: dict[str, SubshellThread] = {} - self._lock_cache = Lock() - self._lock_shell_socket = Lock() + self._lock_cache = Lock() # Sync lock across threads when accessing cache. # Inproc socket pair for communication from control thread to shell channel thread, # such as for create_subshell_request messages. Reply messages are returned straight away. @@ -107,7 +107,13 @@ def get_shell_channel_to_subshell_socket(self, subshell_id: str | None) -> zmq.S def get_subshell_aborting(self, subshell_id: str) -> bool: """Get the boolean aborting flag of the specified subshell.""" - return self._cache[subshell_id].aborting + with self._lock_cache: + return self._cache[subshell_id].aborting + + def get_subshell_asyncio_lock(self, subshell_id: str) -> asyncio.Lock: + """Return the asyncio lock belonging to the specified subshell.""" + with self._lock_cache: + return self._cache[subshell_id].asyncio_lock def list_subshell(self) -> list[str]: """Return list of current subshell ids. @@ -216,8 +222,7 @@ def _process_control_request( def _send_on_shell_channel(self, msg) -> None: assert current_thread().name == SHELL_CHANNEL_THREAD_NAME - with self._lock_shell_socket: - self._shell_socket.send_multipart(msg) + self._shell_socket.send_multipart(msg) def _stop_subshell(self, subshell_thread: SubshellThread) -> None: """Stop a subshell thread and close all of its resources."""