Skip to content

Commit a0e55f6

Browse files
committed
Multiple changes:
- Make shutdown actually close the blocking read on the audio_pipe when Ctrl+C is pressed. However, the usage of global variables is still present. Shutdown seems to occur cleanly now. - Track clients that connect and disconnect from the websocket. This permits us to stream transcripts to multiple connected clients - Fixed so that we have only a single reader from the audio_pipe, and whenever we read, it will be broadcasted to whatever clients are connected. - The last generated code is tracked. We now start executing code when the audio request asks us to, rather than sending data to the client to automate the execute request.
1 parent a60885e commit a0e55f6

File tree

1 file changed

+113
-48
lines changed

1 file changed

+113
-48
lines changed

codebotler.py

Lines changed: 113 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
ws_server = None
3939
prompt_prefix = ""
4040
prompt_suffix = ""
41-
41+
pipe_descriptor = None
4242
def serve_interface_html(args):
4343
global httpd
4444
class HTMLFileHandler(http.server.SimpleHTTPRequestHandler):
@@ -84,6 +84,7 @@ def execute(code):
8484
global ros_available
8585
global robot_available
8686
global robot_interface
87+
print(f"Recieved execution request for program:\n```python\n{code}\n```")
8788
if not ros_available:
8889
print("ROS not available. Ignoring execute request.")
8990
elif not robot_available:
@@ -128,22 +129,32 @@ async def post_code(websocket, args, data, time_str):
128129
response = {"code": f"{data}", "timing": time_str}
129130
await websocket.send(json.dumps(response))
130131

131-
async def read_from_pipe(pipe_path):
132-
print(f"Entered `read_from_pipe` using {pipe_path}")
132+
async def read_from_pipe(fd, executor):
133+
print(f"Entered `read_from_pipe`")
133134
loop = asyncio.get_running_loop()
134-
## Opens in read-write mode to prevent EOF.
135-
fd = os.open(pipe_path, os.O_RDWR)
136-
with os.fdopen(fd, "r") as f:
137-
while True:
138-
print(f"Waiting to read a line from the pipe")
139-
line = await loop.run_in_executor(
140-
concurrent.futures.ThreadPoolExecutor(),
141-
f.readline
142-
)
143-
print(f"Read this line: {line}")
144-
## Only send transcripts that are marked as final
145-
if line.startswith("<FINAL>:"):
146-
yield line[8:].rstrip("\n")
135+
try:
136+
with os.fdopen(fd, "r") as f:
137+
while True:
138+
print(f"Waiting to read a line from the pipe")
139+
line = await loop.run_in_executor(
140+
executor,
141+
f.readline
142+
)
143+
print(f"Read this line: {line}")
144+
## Only send transcripts that are marked as final
145+
if line.startswith("<FINAL>:"):
146+
yield line[8:].rstrip("\n")
147+
except asyncio.CancelledError:
148+
print("read_from_pipe canceled, closing file")
149+
raise
150+
except OSError:
151+
print("Pipe already closed")
152+
finally:
153+
print("Closing pipe descriptor")
154+
try:
155+
os.close(fd)
156+
except OSError:
157+
print("Pipe is already closed")
147158

148159
class Accumulator:
149160
def __init__(self):
@@ -234,47 +245,90 @@ async def accumulate(self, text):
234245

235246
return self.instruction
236247

237-
async def ws_main(websocket, path, args, accumulator):
248+
async def ws_main(websocket, path, args, accumulator, connected_clients):
249+
## Whenever a client connects to the websocket, ws_main is called,
250+
## and websocket refers to the connection with that specific client.
251+
connected_clients.add(websocket)
252+
print(f"Client has connected; {len(connected_clients)} clients...")
238253

239254
async def receive_messages():
240255
try:
241256
async for message in websocket:
242257
await handle_message(websocket, message, args)
243-
except websockets.exceptions.ConnectionClosed:
244-
pass
245-
246-
async def send_messages():
247-
print("Entered `send_messages`")
248-
try:
249-
async for line in read_from_pipe(args.transcription_pipe):
250-
instruction = await accumulator.accumulate(line)
251-
executebool = await accumulator.execute_detected(line)
252-
await post_transcript(websocket, args, line, instruction, executebool)
253-
if instruction is not None and len(instruction) > 0:
254-
code, time_str = await asyncio.to_thread(generate_code, instruction, args)
255-
await post_code(websocket, args, code, time_str)
256-
except websockets.exceptions.ConnectionClosed:
257-
pass
258-
259-
await asyncio.gather(receive_messages(), send_messages())
260-
261-
async def start_ws_server(args):
262-
258+
except websockets.exceptions.ConnectionClosedOK:
259+
print("Client has disconnected...")
260+
except websockets.exceptions.ConnectionClosedError as e:
261+
print(f"Client has disconnected with error: {e}")
262+
finally:
263+
connected_clients.discard(websocket)
264+
print(f"A client has been removed; {len(connected_clients)} clients...")
265+
266+
await receive_messages()
267+
#await asyncio.gather(receive_messages(), send_messages())
268+
269+
async def broadcast_transcripts_from_pipe(args, fd, accumulator, ws_server, clients, last_gen_code, lock, executor):
270+
try:
271+
async for line in read_from_pipe(fd, executor):
272+
instruction = await accumulator.accumulate(line)
273+
executebool = await accumulator.execute_detected(line)
274+
## Start to execute the most recent program that was generated, if any
275+
if executebool:
276+
codetoexec = None
277+
async with lock:
278+
if last_gen_code[0] is not None:
279+
codetoexec = last_gen_code[0]
280+
if codetoexec is not None:
281+
execute(codetoexec)
282+
## Stream the transcribed text from the pipe to all connected clients.
283+
client_copy = clients.copy()
284+
for websocket in client_copy:
285+
try:
286+
await post_transcript(websocket, args, line, instruction, executebool)
287+
except:
288+
pass
289+
## If the transcription contains an instruction, generate a code plan for it
290+
## and cache the code plan locally, and send it to the connected clients
291+
if instruction is not None and len(instruction) > 0:
292+
code, time_str = await asyncio.to_thread(generate_code, instruction, args)
293+
async with lock:
294+
last_gen_code[0] = code
295+
client_copy = clients.copy()
296+
for websocket in client_copy:
297+
try:
298+
await post_code(websocket, args, code, time_str)
299+
except:
300+
pass
301+
except asyncio.CancelledError:
302+
print("Closing pipe")
303+
304+
async def start_ws_server(args, fd):
305+
executor = concurrent.futures.ThreadPoolExecutor()
306+
lock = asyncio.Lock()
307+
last_gen_code = [None]
263308
acc = Accumulator()
309+
connected_clients = set()
310+
## Start the websocket server
311+
websocket_server = await websockets.serve(
312+
lambda ws, path="": ws_main(ws, path, args, acc, connected_clients),
313+
args.ip, args.ws_port)
314+
print(f"WebSocket server started at ws://{args.ip}:{args.ws_port}")
315+
316+
broadcast_task = asyncio.create_task(
317+
broadcast_transcripts_from_pipe(
318+
args, fd, acc, websocket_server, connected_clients, last_gen_code, lock, executor))
264319

265-
async with websockets.serve(lambda ws, path="": ws_main(ws, path, args, acc), args.ip, args.ws_port):
266-
print(f"WebSocket server started at ws://{args.ip}:{args.ws_port}")
267-
await asyncio.Future() # Keeps the server running indefinitely.
320+
await asyncio.gather(websocket_server.wait_closed(), broadcast_task)
321+
executor.shutdown(wait=False)
268322

269-
def start_completion_callback(args):
323+
def start_completion_callback(args, fd):
270324
try:
271-
asyncio.run(start_ws_server(args))
325+
asyncio.run(start_ws_server(args, fd))
272326
except Exception as e:
273327
print("Websocket error: " + str(e))
274328
shutdown(None, None)
275329

276330
def shutdown(sig, frame):
277-
global ros_available, robot_available, robot_interface, server_thread, asyncio_loop, httpd, ws_server
331+
global ros_available, robot_available, robot_interface, server_thread, asyncio_loop, httpd, ws_server, pipe_descriptor
278332
print(" Shutting down server.")
279333
if robot_available and ros_available and robot_interface is not None:
280334
robot_interface._cancel_goals()
@@ -288,10 +342,18 @@ def shutdown(sig, frame):
288342
httpd.shutdown()
289343
if server_thread is not None and threading.current_thread() != server_thread:
290344
server_thread.join()
291-
if asyncio_loop is not None:
292-
for task in asyncio.all_tasks(loop=asyncio_loop):
293-
task.cancel()
294-
asyncio_loop.stop()
345+
running_loop = asyncio.get_running_loop()
346+
print(" Cancelling asyncio tasks")
347+
for task in asyncio.all_tasks(loop=running_loop):
348+
task.cancel()
349+
print(" Requested stopping asyncio loop")
350+
running_loop.stop()
351+
print(" Attempting to close pipe_descriptor")
352+
if pipe_descriptor is not None:
353+
## Need to write something to unblock the read
354+
os.write(pipe_descriptor, b"@")
355+
os.close(pipe_descriptor)
356+
print(" Shutdown closed the pipe_descriptor")
295357
if ws_server is not None:
296358
ws_server.close()
297359
if sig == signal.SIGINT or sig == signal.SIGTERM:
@@ -309,6 +371,7 @@ def main():
309371
global robot_interface
310372
global code_timeout
311373
global model
374+
global pipe_descriptor
312375
import argparse
313376
from pathlib import Path
314377
parser = argparse.ArgumentParser()
@@ -350,7 +413,9 @@ def main():
350413
args=[args])
351414
server_thread.start()
352415

353-
start_completion_callback(args)
416+
## Opens in read-write mode to prevent EOF.
417+
pipe_descriptor = os.open(args.transcription_pipe, os.O_RDWR)
418+
start_completion_callback(args, pipe_descriptor)
354419

355420
if __name__ == "__main__":
356421
main()

0 commit comments

Comments
 (0)