ChatGPT解决这个技术问题 Extra ChatGPT

如何从 Python 异步运行外部命令?

我需要从 Python 脚本异步运行 shell 命令。我的意思是我希望我的 Python 脚本在外部命令关闭并执行它需要做的任何事情时继续运行。

我读了这篇文章:

在 Python 中调用外部命令

然后我开始进行一些测试,看起来 os.system() 将完成这项工作,前提是我在命令末尾使用 &,这样我就不必等待它返回。我想知道这是否是完成此类事情的正确方法?我试过 commands.call() 但它对我不起作用,因为它会阻止外部命令。

请让我知道是否建议为此使用 os.system(),或者我是否应该尝试其他途径。


N
Neuron

subprocess.Popen 完全符合您的要求。

from subprocess import Popen
p = Popen(['watch', 'ls']) # something long running
# ... do other stuff while subprocess is running
p.terminate()

(编辑以完成评论中的答案)

Popen 实例可以执行各种其他操作,例如您可以 poll() 它以查看它是否仍在运行,并且您可以使用它 communicate() 在标准输入上向其发送数据,并等待它终止。


您还可以使用 poll() 检查子进程是否已终止,或使用 wait() 等待它终止。
亚当,非常正确,尽管使用communicate() 等待可能会更好,因为它可以更好地处理输入/输出缓冲区,并且在某些情况下可能会阻塞这些缓冲区。
不过,communicate() 和 wait() 是阻塞操作。您不会像 OP 似乎询问您是否使用它们那样并行化命令。
Cdleary 是绝对正确的,应该提到,communication 和 wait 确实阻塞,所以只有在等待事情关闭时才这样做。 (为了表现得好,你真的应该这样做)
当然,在一个子进程上调用通信不会阻塞其他正在运行的子进程...
y
yizzlez

如果您想并行运行多个进程,然后在它们产生结果时处理它们,您可以使用轮询,如下所示:

from subprocess import Popen, PIPE
import time

running_procs = [
    Popen(['/usr/bin/my_cmd', '-i %s' % path], stdout=PIPE, stderr=PIPE)
    for path in '/tmp/file0 /tmp/file1 /tmp/file2'.split()]

while running_procs:
    for proc in running_procs:
        retcode = proc.poll()
        if retcode is not None: # Process finished.
            running_procs.remove(proc)
            break
        else: # No process is done, wait a bit and check again.
            time.sleep(.1)
            continue

    # Here, `proc` has finished with return code `retcode`
    if retcode != 0:
        """Error handling."""
    handle_results(proc.stdout)

那里的控制流有点复杂,因为我试图让它变小——你可以根据自己的喜好进行重构。 :-)

这样做的好处是首先为早期完成的请求提供服务。如果您在第一个运行的进程上调用 communicate,结果证明它运行的时间最长,那么其他正在运行的进程将一直处于空闲状态当您本可以处理他们的结果时。


@Tino这取决于您如何定义忙等待。请参阅What is the difference between busy-wait and polling?
有没有办法轮询一组进程而不仅仅是一个?
注意:如果进程生成足够的输出,它可能会挂起。如果您使用 PIPE,您应该同时使用标准输出(子进程的文档中有(太多但不够)警告)。
使用 ['/usr/bin/my_cmd', '-i', path] 而不是 ['/usr/bin/my_cmd', '-i %s' % path]
只是想知道,您为什么在声明 procfor proc in running_procs: 循环之外使用 proc.stdout
g
gerrit

这在“等待命令异步终止”下的 Python 3 Subprocess Examples 中涵盖。使用 IPythonpython -m asyncio 运行此代码:

import asyncio proc = await asyncio.create_subprocess_exec('ls','-lha', stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) # 在 ls 工作时做其他事情 # 如果 proc 需要很长时间才能完成, CPU 可以自由使用周期来执行 # 其他进程 stdout, stderr = await proc.communicate()

该进程将在 await asyncio.create_subprocess_exec(...) 完成后立即开始运行。如果在您调用 await proc.communicate() 时它还没有完成,它将在那里等待以便为您提供输出状态。如果完成,proc.communicate() 将立即返回。

这里的要点类似于Terrels answer,但我认为 Terrels 的答案似乎使事情过于复杂。

有关详细信息,请参阅 asyncio.create_subprocess_exec


限制是 Python 3.6+ 是必需的。
@DanielF 是的,但是不再支持任何比 3.6 更旧的 Python,因此每个人都应该至少已经使用 Python 3.6。
@maf88 嗯。好问题!显然it needs to be inside an await function...
@maf88 我可以发誓我在写这个答案的时候试过了:-S
@maf88 谢谢 :) 当我写这个答案时,我一定在 ipython 中尝试过,但从未意识到这个限制。
S
S.Lott

我想知道的是,这 [os.system()] 是否是完成此类事情的正确方法?

不,os.system() 不是正确的方法。这就是为什么每个人都说使用 subprocess

有关详细信息,请阅读 http://docs.python.org/library/os.html#os.system

subprocess 模块提供了更强大的工具来生成新进程并检索它们的结果;使用该模块优于使用此功能。使用子流程模块。尤其要检查用子流程模块替换旧功能部分。


否,因为 os.system() 是同步而不是异步
@user889030 据我了解,os.system("my_command &") 将在后台异步运行该命令。
T
Terrel Shumway

接受的答案很老了。

我在这里找到了一个更好的现代答案:

https://kevinmccarthy.org/2016/07/25/streaming-subprocess-stdin-and-stdout-with-asyncio-in-python/

并做了一些改动:

让它在 Windows 上工作 让它与多个命令一起工作

import sys
import asyncio

if sys.platform == "win32":
    asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())


async def _read_stream(stream, cb):
    while True:
        line = await stream.readline()
        if line:
            cb(line)
        else:
            break


async def _stream_subprocess(cmd, stdout_cb, stderr_cb):
    try:
        process = await asyncio.create_subprocess_exec(
            *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
        )

        await asyncio.wait(
            [
                _read_stream(process.stdout, stdout_cb),
                _read_stream(process.stderr, stderr_cb),
            ]
        )
        rc = await process.wait()
        return process.pid, rc
    except OSError as e:
        # the program will hang if we let any exception propagate
        return e


def execute(*aws):
    """ run the given coroutines in an asyncio loop
    returns a list containing the values returned from each coroutine.
    """
    loop = asyncio.get_event_loop()
    rc = loop.run_until_complete(asyncio.gather(*aws))
    loop.close()
    return rc


def printer(label):
    def pr(*args, **kw):
        print(label, *args, **kw)

    return pr


def name_it(start=0, template="s{}"):
    """a simple generator for task names
    """
    while True:
        yield template.format(start)
        start += 1


def runners(cmds):
    """
    cmds is a list of commands to excecute as subprocesses
    each item is a list appropriate for use by subprocess.call
    """
    next_name = name_it().__next__
    for cmd in cmds:
        name = next_name()
        out = printer(f"{name}.stdout")
        err = printer(f"{name}.stderr")
        yield _stream_subprocess(cmd, out, err)


if __name__ == "__main__":
    cmds = (
        [
            "sh",
            "-c",
            """echo "$SHELL"-stdout && sleep 1 && echo stderr 1>&2 && sleep 1 && echo done""",
        ],
        [
            "bash",
            "-c",
            "echo 'hello, Dave.' && sleep 1 && echo dave_err 1>&2 && sleep 1 && echo done",
        ],
        [sys.executable, "-c", 'print("hello from python");import sys;sys.exit(2)'],
    )

    print(execute(*runners(cmds)))

示例命令不太可能在您的系统上完美运行,并且它不会处理奇怪的错误,但此代码确实演示了一种使用 asyncio 运行多个子进程并流式传输输出的方法。


我在 Windows 上运行的 cpython 3.7.4 和在 Ubuntu WSL 和本机 Alpine Linux 上运行的 cpython 3.7.3 上对此进行了测试
N
Noah

我在 asyncproc 模块上取得了很好的成功,它很好地处理了进程的输出。例如:

import os
from asynproc import Process
myProc = Process("myprogram.app")

while True:
    # check to see if process has ended
    poll = myProc.wait(os.WNOHANG)
    if poll is not None:
        break
    # print any new output
    out = myProc.read()
    if out != "":
        print out

这是github上的任何地方吗?
它是 gpl 许可证,所以我敢肯定它在那里很多次。这是一个:github.com/albertz/helpers/blob/master/asyncproc.py
我添加了一个要点并进行了一些修改,以使其与 python3 一起使用。 (主要用字节替换 str )。请参阅gist.github.com/grandemk/cbc528719e46b5a0ffbd07e3054aab83
此外,您需要在退出循环后再读取一次输出,否则您将丢失一些输出。
J
Jean-François Fabre

pexpect 与非阻塞 readlines 一起使用是另一种方法。 Pexpect 解决了死锁问题,允许您轻松地在后台运行进程,并提供简单的方法来在您的进程吐出预定义的字符串时进行回调,并且通常使与进程的交互更加容易。


P
Pugsley

考虑到“我不必等待它返回”,最简单的解决方案之一是:

subprocess.Popen( \
    [path_to_executable, arg1, arg2, ... argN],
    creationflags = subprocess.CREATE_NEW_CONSOLE,
).pid

但是...从我读到的内容来看,这不是“完成此类事情的正确方法”,因为 subprocess.CREATE_NEW_CONSOLE 标志会产生安全风险。

这里发生的关键事情是使用 subprocess.CREATE_NEW_CONSOLE 创建新的控制台和 .pid(返回进程 ID,以便您以后可以根据需要检查程序),以免等待程序完成其工作。


P
Patrizio Rullo

我在尝试使用 Python 中的 s3270 脚本软件连接到 3270 终端时遇到了同样的问题。现在我正在用我在这里找到的一个 Process 子类来解决这个问题:

http://code.activestate.com/recipes/440554/

这是从文件中获取的样本:

def recv_some(p, t=.1, e=1, tr=5, stderr=0):
    if tr < 1:
        tr = 1
    x = time.time()+t
    y = []
    r = ''
    pr = p.recv
    if stderr:
        pr = p.recv_err
    while time.time() < x or r:
        r = pr()
        if r is None:
            if e:
                raise Exception(message)
            else:
                break
        elif r:
            y.append(r)
        else:
            time.sleep(max((x-time.time())/tr, 0))
    return ''.join(y)

def send_all(p, data):
    while len(data):
        sent = p.send(data)
        if sent is None:
            raise Exception(message)
        data = buffer(data, sent)

if __name__ == '__main__':
    if sys.platform == 'win32':
        shell, commands, tail = ('cmd', ('dir /w', 'echo HELLO WORLD'), '\r\n')
    else:
        shell, commands, tail = ('sh', ('ls', 'echo HELLO WORLD'), '\n')

    a = Popen(shell, stdin=PIPE, stdout=PIPE)
    print recv_some(a),
    for cmd in commands:
        send_all(a, cmd + tail)
        print recv_some(a),
    send_all(a, 'exit' + tail)
    print recv_some(a, e=0)
    a.wait()

S
Shital Shah

这里有几个答案,但没有一个能满足我的以下要求:

我不想等待命令完成或用子进程输出污染我的终端。我想使用重定向运行 bash 脚本。我想在我的 bash 脚本中支持管道(例如 find ... | tar ...)。

满足上述要求的唯一组合是:

subprocess.Popen(['./my_script.sh "arg1" > "redirect/path/to"'],
                 stdout=subprocess.PIPE, 
                 stderr=subprocess.PIPE,
                 shell=True)