diff --git a/ipykernel/tests/test_embed_kernel.py b/ipykernel/tests/test_embed_kernel.py index 5e60245b0..061dc5c67 100644 --- a/ipykernel/tests/test_embed_kernel.py +++ b/ipykernel/tests/test_embed_kernel.py @@ -6,6 +6,7 @@ import os import sys import time +import json from contextlib import contextmanager from subprocess import Popen, PIPE @@ -28,39 +29,56 @@ def setup_kernel(cmd): ------- kernel_manager: connected KernelManager instance """ - kernel = Popen([sys.executable, '-c', cmd], stdout=PIPE, stderr=PIPE) - connection_file = os.path.join( - paths.jupyter_runtime_dir(), - 'kernel-%i.json' % kernel.pid, - ) - # wait for connection file to exist, timeout after 5s - tic = time.time() - while not os.path.exists(connection_file) \ - and kernel.poll() is None \ - and time.time() < tic + SETUP_TIMEOUT: - time.sleep(0.1) - - if kernel.poll() is not None: - o,e = kernel.communicate() - e = py3compat.cast_unicode(e) - raise IOError("Kernel failed to start:\n%s" % e) - - if not os.path.exists(connection_file): - if kernel.poll() is None: - kernel.terminate() - raise IOError("Connection file %r never arrived" % connection_file) - client = BlockingKernelClient(connection_file=connection_file) - client.load_connection_file() - client.start_channels() - client.wait_for_ready() + def connection_file_ready(connection_file): + """Check if connection_file is a readable json file.""" + if not os.path.exists(connection_file): + return False + try: + with open(connection_file) as f: + json.load(f) + return True + except ValueError: + return False + kernel = Popen([sys.executable, '-c', cmd], stdout=PIPE, stderr=PIPE) try: - yield client + connection_file = os.path.join( + paths.jupyter_runtime_dir(), + 'kernel-%i.json' % kernel.pid, + ) + # wait for connection file to exist, timeout after 5s + tic = time.time() + while not connection_file_ready(connection_file) \ + and kernel.poll() is None \ + and time.time() < tic + SETUP_TIMEOUT: + time.sleep(0.1) + + # Wait 100ms for the writing to finish + time.sleep(0.1) + + if kernel.poll() is not None: + o,e = kernel.communicate() + e = py3compat.cast_unicode(e) + raise IOError("Kernel failed to start:\n%s" % e) + + if not os.path.exists(connection_file): + if kernel.poll() is None: + kernel.terminate() + raise IOError("Connection file %r never arrived" % connection_file) + + client = BlockingKernelClient(connection_file=connection_file) + client.load_connection_file() + client.start_channels() + client.wait_for_ready() + try: + yield client + finally: + client.stop_channels() finally: - client.stop_channels() kernel.terminate() + def test_embed_kernel_basic(): """IPython.embed_kernel() is basically functional""" cmd = '\n'.join([