Commit 56de06f7 authored by Jens Kroeger's avatar Jens Kroeger
Browse files

jobsub: implement solution compatible with python3 for local usage

parent a947d191
......@@ -219,101 +219,62 @@ def runCorryvreckan(filenamebase, jobtask, silent):
cmd = stdbuf + " -oL " + cmd
# need some additional libraries for process interaction
from subprocess import Popen, PIPE
from threading import Thread # threading used for non-blocking process output parsing
from time import sleep
try:
from Queue import Queue, Empty # python 2.x
except ImportError:
from queue import Queue, Empty # python 3.x
import asyncio
import os
import sys
from asyncio.subprocess import PIPE
import datetime
import shlex
# parsing process output using threads
# (approach from http://stackoverflow.com/a/4896288)
def enqueue_output(out, queue):
""" feed queue with readline output """
for line in iter(out.readline, ''):
queue.put(line)
out.close()
ON_POSIX = 'posix' in sys.builtin_module_names
# Based on the solution proposed here:
# https://gitlab.cern.ch/corryvreckan/corryvreckan/-/issues/146#note_4465941
async def read_stream_and_display(stream, display):
"""Read from stream line by line until EOF, display, and capture the lines."""
output = []
while True:
line = await stream.readline()
if not line:
break
output.append(line)
display(line) # assume it doesn't block
return b''.join(output)
async def read_and_display(cmd):
"""Capture cmd's stdout, stderr while displaying them as they arrive (line by line)."""
# start process
log.info("Starting process %s", cmd)
process = await asyncio.create_subprocess_exec(*shlex.split(cmd), stdout=PIPE, stderr=PIPE)
# read child's stdout/stderr concurrently (capture and display)
try:
stdout, stderr = await asyncio.gather(
read_stream_and_display(process.stdout, sys.stdout.buffer.write),
read_stream_and_display(process.stderr, sys.stderr.buffer.write))
except Exception:
process.kill()
raise
finally:
# wait for the process to exit
rc = await process.wait()
return rc, stdout, stderr
cmd = cmd+" -c "+filenamebase+".conf"
rcode = None # the return code that will be set by a later subprocess method
try:
# run process
log.info ("Now running Corryvreckan on "+filenamebase+".conf")
log.debug ("Executing: "+cmd)
p = Popen(shlex.split(cmd), stdout=PIPE, stderr=PIPE, bufsize=1, close_fds=ON_POSIX)
# setup output queues and threads
qout = Queue()
tout = Thread(target=enqueue_output, args=(p.stdout, qout))
qerr = Queue()
terr = Thread(target=enqueue_output, args=(p.stderr, qerr))
# threads die with the program
tout.daemon = True
terr.daemon = True
tout.start()
terr.start()
# open log file
log_file = open(filenamebase+".log", "w")
# print timestamp to log file
log_file.write("---=== Analysis started on " + datetime.datetime.now().strftime("%A, %d. %B %Y %I:%M%p") + " ===---\n\n")
try:
while p.poll() is None:
# read line without blocking
try:
line = qout.get_nowait() # or q.get(timeout=.1)
if not silent:
if b'WARNING' in line.strip():
log.warning(line.strip())
elif b'ERROR' in line.strip():
log.error(line.strip())
elif b'FATAL' in line.strip():
log.critical(line.strip())
else:
log.info(line.strip())
log_file.write(str(line,'utf-8'))
except Empty:
pass
try:
line = qerr.get_nowait() # or q.get(timeout=.1)
log.error(line.strip())
log_file.write(str(line,'utf-8'))
except Empty:
sleep(0.005) # sleep for 5 ms to avoid excessive CPU load
# process done
tout.join() # finish stdout thread; wait for remaining buffer to be read
terr.join() # finish stderr thread
# process the remainder of the buffers now stored in our queues
while not qout.empty() or not qerr.empty():
# read line without blocking
try:
line = qout.get_nowait() # or q.get(timeout=.1)
if not silent:
if "WARNING" in line.strip():
log.warning(line.strip())
elif "ERROR" in line.strip():
log.error(line.strip())
elif "FATAL" in line.strip():
log.critical(line.strip())
else:
log.info(line.strip())
log_file.write(str(line,'utf-8'))
except Empty:
pass
# run process
loop = asyncio.get_event_loop()
rcode, *output = loop.run_until_complete(read_and_display(cmd))
# close log file
loop.close()
log_file.close()
try:
line = qerr.get_nowait() # or q.get(timeout=.1)
log.error(line.strip())
log_file.write(str(line,'utf-8'))
except Empty:
pass
finally:
log_file.close()
rcode = p.returncode # get the return code
except OSError as e:
log.critical("Problem with Corryvreckan execution: Command '%s' resulted in error %s", cmd, e)
exit(1)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment