Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/workflows/downstream.yml
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ jobs:

qtconsole:
runs-on: ubuntu-latest
if: false
timeout-minutes: 20
steps:
- name: Checkout
Expand Down
74 changes: 37 additions & 37 deletions ipykernel/kernelbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,9 @@ def _default_ident(self):
# see https://github.com/jupyterlab/jupyterlab/issues/17785
_parent_ident: Mapping[str, bytes]

# Asyncio lock for main shell thread.
_main_asyncio_lock: asyncio.Lock

@property
def _parent_header(self):
warnings.warn(
Expand Down Expand Up @@ -327,6 +330,8 @@ def __init__(self, **kwargs):
}
)

self._main_asyncio_lock = asyncio.Lock()

async def dispatch_control(self, msg):
"""Dispatch a control request, ensuring only one message is processed at a time."""
# Ensure only one control message is processed at a time
Expand Down Expand Up @@ -539,7 +544,7 @@ async def do_one_iteration(self):
This is now a coroutine
"""
# flush messages off of shell stream into the message queue
if self.shell_stream:
if self.shell_stream and not self._supports_kernel_subshells:
self.shell_stream.flush()
# process at most one shell message per iteration
await self.process_one(wait=False)
Expand Down Expand Up @@ -649,56 +654,51 @@ async def shell_channel_thread_main(self, msg):
"""Handler for shell messages received on shell_channel_thread"""
assert threading.current_thread() == self.shell_channel_thread

if self.session is None:
return

# deserialize only the header to get subshell_id
# Keep original message to send to subshell_id unmodified.
_, msg2 = self.session.feed_identities(msg, copy=False)
try:
msg3 = self.session.deserialize(msg2, content=False, copy=False)
subshell_id = msg3["header"].get("subshell_id")

# Find inproc pair socket to use to send message to correct subshell.
subshell_manager = self.shell_channel_thread.manager
socket = subshell_manager.get_shell_channel_to_subshell_socket(subshell_id)
assert socket is not None
socket.send_multipart(msg, copy=False)
except Exception:
self.log.error("Invalid message", exc_info=True) # noqa: G201
async with self.shell_channel_thread.asyncio_lock:
if self.session is None:
return

if self.shell_stream:
self.shell_stream.flush()
# deserialize only the header to get subshell_id
# Keep original message to send to subshell_id unmodified.
_, msg2 = self.session.feed_identities(msg, copy=False)
try:
msg3 = self.session.deserialize(msg2, content=False, copy=False)
subshell_id = msg3["header"].get("subshell_id")

# Find inproc pair socket to use to send message to correct subshell.
subshell_manager = self.shell_channel_thread.manager
socket = subshell_manager.get_shell_channel_to_subshell_socket(subshell_id)
assert socket is not None
socket.send_multipart(msg, copy=False)
except Exception:
self.log.error("Invalid message", exc_info=True) # noqa: G201

async def shell_main(self, subshell_id: str | None, msg):
"""Handler of shell messages for a single subshell"""
if self._supports_kernel_subshells:
if subshell_id is None:
assert threading.current_thread() == threading.main_thread()
asyncio_lock = self._main_asyncio_lock
else:
assert threading.current_thread() not in (
self.shell_channel_thread,
threading.main_thread(),
)
socket_pair = self.shell_channel_thread.manager.get_shell_channel_to_subshell_pair(
subshell_id
)
asyncio_lock = self.shell_channel_thread.manager.get_subshell_asyncio_lock(
subshell_id
)
else:
assert subshell_id is None
assert threading.current_thread() == threading.main_thread()
socket_pair = None

try:
# Whilst executing a shell message, do not accept any other shell messages on the
# same subshell, so that cells are run sequentially. Without this we can run multiple
# async cells at the same time which would be a nice feature to have but is an API
# change.
if socket_pair:
socket_pair.pause_on_recv()
asyncio_lock = self._main_asyncio_lock

# Whilst executing a shell message, do not accept any other shell messages on the
# same subshell, so that cells are run sequentially. Without this we can run multiple
# async cells at the same time which would be a nice feature to have but is an API
# change.
assert asyncio_lock is not None
async with asyncio_lock:
await self.dispatch_shell(msg, subshell_id=subshell_id)
finally:
if socket_pair:
socket_pair.resume_on_recv()

def record_ports(self, ports):
"""Record the ports that this kernel is using.
Expand Down Expand Up @@ -739,7 +739,7 @@ def _publish_status(self, status, channel, parent=None):
def _publish_status_and_flush(self, status, channel, stream, parent=None):
"""send status on IOPub and flush specified stream to ensure reply is sent before handling the next reply"""
self._publish_status(status, channel, parent)
if stream and hasattr(stream, "flush"):
if stream and hasattr(stream, "flush") and not self._supports_kernel_subshells:
stream.flush(zmq.POLLOUT)

def _publish_debug_event(self, event):
Expand Down Expand Up @@ -1382,7 +1382,7 @@ def _abort_queues(self, subshell_id: str | None = None):

# flush streams, so all currently waiting messages
# are added to the queue
if self.shell_stream:
if self.shell_stream and not self._supports_kernel_subshells:
self.shell_stream.flush()

# Callback to signal that we are done aborting
Expand Down
3 changes: 3 additions & 0 deletions ipykernel/shellchannel.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from __future__ import annotations

import asyncio
from typing import Any

import zmq
Expand All @@ -28,6 +29,8 @@ def __init__(
self._context = context
self._shell_socket = shell_socket

self.asyncio_lock = asyncio.Lock()

@property
def manager(self) -> SubshellManager:
# Lazy initialisation.
Expand Down
16 changes: 1 addition & 15 deletions ipykernel/socket_pair.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ class SocketPair:
from_socket: zmq.Socket[Any]
to_socket: zmq.Socket[Any]
to_stream: ZMQStream | None = None
on_recv_callback: Any
on_recv_copy: bool

def __init__(self, context: zmq.Context[Any], name: str):
"""Initialize the inproc socker pair."""
Expand All @@ -43,21 +41,9 @@ def close(self):
def on_recv(self, io_loop: IOLoop, on_recv_callback, copy: bool = False):
"""Set the callback used when a message is received on the to stream."""
# io_loop is that of the 'to' thread.
self.on_recv_callback = on_recv_callback
self.on_recv_copy = copy
if self.to_stream is None:
self.to_stream = ZMQStream(self.to_socket, io_loop)
self.resume_on_recv()

def pause_on_recv(self):
"""Pause receiving on the to stream."""
if self.to_stream is not None:
self.to_stream.stop_on_recv()

def resume_on_recv(self):
"""Resume receiving on the to stream."""
if self.to_stream is not None and not self.to_stream.closed():
self.to_stream.on_recv(self.on_recv_callback, copy=self.on_recv_copy)
self.to_stream.on_recv(on_recv_callback, copy=copy)

def _address(self, name) -> str:
"""Return the address used for this inproc socket pair."""
Expand Down
3 changes: 3 additions & 0 deletions ipykernel/subshell.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""A thread for a subshell."""

import asyncio
from typing import Any

import zmq
Expand Down Expand Up @@ -29,6 +30,8 @@ def __init__(
# When aborting flag is set, execute_request messages to this subshell will be aborted.
self.aborting = False

self.asyncio_lock = asyncio.Lock()

def run(self) -> None:
"""Run the thread."""
try:
Expand Down
15 changes: 10 additions & 5 deletions ipykernel/subshell_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from __future__ import annotations

import asyncio
import json
import typing as t
import uuid
Expand Down Expand Up @@ -46,8 +47,7 @@ def __init__(
self._shell_channel_io_loop = shell_channel_io_loop
self._shell_socket = shell_socket
self._cache: dict[str, SubshellThread] = {}
self._lock_cache = Lock()
self._lock_shell_socket = Lock()
self._lock_cache = Lock() # Sync lock across threads when accessing cache.

# Inproc socket pair for communication from control thread to shell channel thread,
# such as for create_subshell_request messages. Reply messages are returned straight away.
Expand Down Expand Up @@ -107,7 +107,13 @@ def get_shell_channel_to_subshell_socket(self, subshell_id: str | None) -> zmq.S

def get_subshell_aborting(self, subshell_id: str) -> bool:
"""Get the boolean aborting flag of the specified subshell."""
return self._cache[subshell_id].aborting
with self._lock_cache:
return self._cache[subshell_id].aborting

def get_subshell_asyncio_lock(self, subshell_id: str) -> asyncio.Lock:
"""Return the asyncio lock belonging to the specified subshell."""
with self._lock_cache:
return self._cache[subshell_id].asyncio_lock

def list_subshell(self) -> list[str]:
"""Return list of current subshell ids.
Expand Down Expand Up @@ -216,8 +222,7 @@ def _process_control_request(

def _send_on_shell_channel(self, msg) -> None:
assert current_thread().name == SHELL_CHANNEL_THREAD_NAME
with self._lock_shell_socket:
self._shell_socket.send_multipart(msg)
self._shell_socket.send_multipart(msg)

def _stop_subshell(self, subshell_thread: SubshellThread) -> None:
"""Stop a subshell thread and close all of its resources."""
Expand Down
Loading