from robot.libraries.Process import Process
from robot.libraries.OperatingSystem import OperatingSystem
from .helper import Helper
from .logger import Logger
log = Logger().get_logger(__name__)
[docs]class Proc(Helper):
"""
| 本地进程操作
| 借用`s`RF``的 `Process <https://github.com/robotframework/robotframework/blob/master/src/robot/libraries/Process.py>`_
| 借用``RF``的 `OperatingSystem <https://github.com/robotframework/robotframework/blob/master/src/robot/libraries/OperatingSystem.py>`_
| 参考:charisma/ideate/blob/master/common/resource/process.resource
| 核心参数说明:
| **configuration**:
| def __init__(self, cwd=None, shell=False, stdout=None, stderr=None, stdin='PIPE',output_encoding='CONSOLE', alias=None, env=None, \*\*rest):
| self.cwd = self._get_cwd(cwd)
| self.shell = is_truthy(shell)
| self.alias = alias
| self.output_encoding = output_encoding
| self.stdout_stream = self._new_stream(stdout)
| self.stderr_stream = self._get_stderr(stderr, stdout, self.stdout_stream)
| self.stdin_stream = self._get_stdin(stdin)
| self.env = self._construct_env(env, rest)
| **ExecuteResult**:
| def __init__(self, process, stdout, stderr, stdin=None, rc=None,output_encoding=None):
| self._process = process
| self.stdout_path = self._get_path(stdout)
| self.stderr_path = self._get_path(stderr)
| self.rc = rc
| self._output_encoding = output_encoding
| self._stdout = None
| self._stderr = None
| self._custom_streams = [stream for stream in (stdout, stderr, stdin) if self._is_custom_stream(stream)]
| **command and arguments**:
| conf = ProcessConfiguration(\*\*configuration)
| command = conf.get_command(command, list(arguments))
| self._log_start(command, conf)
| process = subprocess.Popen(command, \*\*conf.popen_config)
| self._results[process] = ExecutionResult(process, \*\*conf.result_config)
| self._processes.register(process, alias=conf.alias)
| return self._processes.current
|
"""
def __init__(self):
self.proc = Process()
self.os = OperatingSystem()
[docs] def run(self, command):
"""
| 执行command命令,返回(返回值,标准输出)
| :param command: command
| :return: (rc, output)
"""
log.info("执行命令:{}".format(command))
return self.os.run(command)
[docs] def run_process(self, command, *arguments, **configuration):
"""
执行一个进程,并等待其结束
| **Examples** :
| res = run_process("python", "-c", "print('Hello world')")
| assert res.stdout == "Hello world"
| assert res.rc == 0
"""
return self.proc.run_process(command, *arguments, **configuration)
[docs] def start_process(self, command, *arguments, **configuration):
"""
后台执行进程,立即返回
| **Examples** :
| p1 = start_process("sleep 10", shell=True, alias="mysleep1")
| p2 = start_process("sleep 2", shell=True, alias="mysleep2")
| process_should_be_running("mysleep1")
| terminate_process("mysleep1")
| process_should_be_stopped("mysleep1")
| wait_for_process(handle="mysleep2", timeout="3s", on_timeout="kill") # Caution: Do not use time.sleep
| res1 = get_process_result("mysleep1")
| res2 = get_process_result("mysleep2")
| assert res1.rc == -15
| assert res2.rc == 0
"""
return self.proc.start_process(command, *arguments, **configuration)
[docs] def get_process_id(self, handle=None):
"""
取得进程ID,默认当前活跃进程id
"""
return self.proc.get_process_id(handle)
[docs] def get_process_object(self, handle=None):
"""
取得进程示例,Process 示例
"""
return self.proc.get_process_object(handle)
[docs] def get_process_result(self, handle=None, rc=False, stdout=False,
stderr=False, stdout_path=False, stderr_path=False):
"""
取得进程结果
"""
return self.proc.get_process_result(handle, rc, stdout, stderr, stdout_path, stderr_path)
[docs] def is_process_running(self, handle=None):
"""
进程是否在执行
"""
return self.proc.is_process_running(handle)
[docs] def join_command_line(self, *args):
"""
连接命令行
"""
return self.proc.join_command_line(*args)
[docs] def send_signal_to_process(self, signal, handle=None, group=False):
"""
向进程发送信号量
"""
return self.proc.send_signal_to_process(signal, handle, group)
[docs] def split_command_line(self, args, escaping=False):
"""
拆分命令行
"""
return self.proc.split_command_line(args, escaping)
[docs] def switch_process(self, handle):
"""
切换当前活跃进程
"""
return self.proc.switch_process(handle)
[docs] def terminate_process(self, handle=None, kill=False):
"""
Stops the process gracefully or forcefully.
"""
return self.proc.terminate_process(handle, kill)
[docs] def terminate_all_processes(self, kill=False):
"""
停止所有启动的进程
"""
return self.proc.terminate_all_processes(kill)
[docs] def wait_for_process(self, handle=None, timeout=None, on_timeout='continue'):
"""
等待进程结束
"""
return self.proc.wait_for_process(handle, timeout, on_timeout)
PROC = Proc()