You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

163 lines
5.5 KiB

import os
import re
import signal
import subprocess
import sys
from functools import partial
from threading import Thread
import psutil # type: ignore
from gooey.gui import events
from gooey.gui.pubsub import pub
from gooey.gui.util.casting import safe_float
from gooey.util.functional import unit, bind
from gooey.python_bindings.types import GooeyParams
try:
import _winapi
creationflag = subprocess.CREATE_NEW_PROCESS_GROUP
except ModuleNotFoundError:
# default Popen creation flag
creationflag = 0
class ProcessController(object):
@classmethod
def of(cls, params: GooeyParams):
return cls(
params.get('progress_regex'),
params.get('progress_expr'),
params.get('hide_progress_msg'),
params.get('encoding'),
params.get('requires_shell'),
params.get('shutdown_signal', signal.SIGTERM)
)
def __init__(self, progress_regex, progress_expr, hide_progress_msg,
encoding, shell=True, shutdown_signal=signal.SIGTERM, testmode=False):
self._process = None
self.progress_regex = progress_regex
self.progress_expr = progress_expr
self.hide_progress_msg = hide_progress_msg
self.encoding = encoding
self.wasForcefullyStopped = False
self.shell_execution = shell
self.shutdown_signal = shutdown_signal
self.testMode = testmode
def was_success(self):
self._process.communicate()
return self._process.returncode == 0
def poll(self):
if not self._process:
raise Exception('Not started!')
return self._process.poll()
def stop(self):
"""
Sends a signal of the user's choosing (default SIGTERM) to
the child process.
"""
if self.running():
self.wasForcefullyStopped = True
self.send_shutdown_signal()
def send_shutdown_signal(self):
self._send_signal(self.shutdown_signal)
def _send_signal(self, sig):
parent = psutil.Process(self._process.pid)
for child in parent.children(recursive=True):
child.send_signal(sig)
parent.send_signal(sig)
def running(self):
return self._process and self.poll() is None
def run(self, command):
"""
Kicks off the user's code in a subprocess.
Implementation Note: CREATE_NEW_SUBPROCESS is required to have signals behave sanely
on windows. See the signal_support module for full background.
"""
self.wasForcefullyStopped = False
env = os.environ.copy()
env["GOOEY"] = "1"
env["PYTHONIOENCODING"] = self.encoding
# TODO: why is this try/catch here..?
try:
self._process = subprocess.Popen(
command.encode(sys.getfilesystemencoding()),
stdout=subprocess.PIPE, stdin=subprocess.PIPE,
stderr=subprocess.STDOUT, shell=self.shell_execution, env=env,
creationflags=creationflag)
except:
self._process = subprocess.Popen(
command,
stdout=subprocess.PIPE, stdin=subprocess.PIPE,
stderr = subprocess.STDOUT, shell = self.shell_execution, env=env,
creationflags=creationflag
)
# the message pump depends on the wx instance being initiated and its
# mainloop running (to dispatch pubsub messages). This makes testing difficult
# so we only spin up the thread when we're not testing.
if not self.testMode:
t = Thread(target=self._forward_stdout, args=(self._process,))
t.start()
def _forward_stdout(self, process):
'''
Reads the stdout of `process` and forwards lines and progress
to any interested subscribers
'''
while True:
line = process.stdout.readline()
if not line:
break
_progress = self._extract_progress(line)
pub.send_message(events.PROGRESS_UPDATE, progress=_progress)
if _progress is None or self.hide_progress_msg is False:
pub.send_message(events.CONSOLE_UPDATE,
msg=line.decode(self.encoding))
pub.send_message(events.EXECUTION_COMPLETE)
def _extract_progress(self, text):
'''
Finds progress information in the text using the
user-supplied regex and calculation instructions
'''
# monad-ish dispatch to avoid the if/else soup
find = partial(re.search, string=text.strip().decode(self.encoding))
regex = unit(self.progress_regex)
match = bind(regex, find)
result = bind(match, self._calculate_progress)
return result
def _calculate_progress(self, match):
'''
Calculates the final progress value found by the regex
'''
if not self.progress_expr:
return safe_float(match.group(1))
else:
return self._eval_progress(match)
def _eval_progress(self, match):
'''
Runs the user-supplied progress calculation rule
'''
_locals = {k: safe_float(v) for k, v in match.groupdict().items()}
if "x" not in _locals:
_locals["x"] = [safe_float(x) for x in match.groups()]
try:
return int(eval(self.progress_expr, {}, _locals))
except:
return None