Python资源管理和生成器

创建于 2024年7月30日修改于 2024年7月30日
Python

我和一位同事一起在Python中实现了一些异步流1抽象。我意识到,在异步生成器中进行可预测的清理存在一些相当严重的问题。

事实上,同样的问题也存在于普通生成器中,但在异步生成器中引起的问题更多。关键是,当我们没有完全消费一个生成器时,我们可以观察到意外的清理行为。

让我们来看一下!

from __future__ import annotations

import itertools
import time
from collections.abc import Generator


def gen() -> Generator[int]:
    # 一个包裹在一些清理代码中的无限迭代器的例子。
    try:
        yield from itertools.count(0)
    finally:
        print("cleaned up")


def main() -> None:
    generator = gen()
    # 我们提前跳出这个循环,这意味着我们没有完全消费`generator`。
    for elem in generator:
        print(elem)
        if elem > 3:
            break
    print("finished iterating")
    # 执行一些耗时操作。
    time.sleep(2)


if __name__ == "__main__":
    main()

运行这个程序时,输出如下:

0
1
2
3
4
finished iterating
cleaned up

这些打印语句的顺序表明,清理代码只有在函数退出时才会执行(例如,当生成器对象的引用计数降为0时)。

我们可以通过内联调用gen或者显式使用del语句来做得更好,以确保在我们开始一些昂贵的操作之前清理生成器。在这个例子中,虽然看起来没什么大问题,但在我们完成操作后立即释放资源(如打开的文件描述符或数据库连接)往往是可取的。

from __future__ import annotations

import itertools
import time
from collections.abc import Generator


def gen() -> Generator[int]:
    try:
        yield from itertools.count(0)
    finally:
        print("cleaned up")


def main() -> None:
    for elem in gen():
        print(elem)
        if elem > 3:
            break
    print("finished iterating")
    # 执行一些耗时操作。
    time.sleep(2)


if __name__ == "__main__":
    main()

这次的输出是:

0
1
2
3
4
cleaned up
finished iterating

因此,我们先清理了!但是,这在很大程度上依赖于CPython的一些实现细节。事实上,当我用pypy运行同样的代码时,清理从未被执行2!那么我们有什么选择呢?

让我们看看标准库中的一些代码;具体来说,让我们看看Lib/_collections_abc.py,以更好地了解生成器上可能可用的操作。

这段代码看起来很有希望:

class Generator(Iterator):

    __slots__ = ()

    def __next__(self):
        """Return the next item from the generator.
        When exhausted, raise StopIteration.
        """
        return self.send(None)

    @abstractmethod
    def send(self, value):
        """Send a value into the generator.
        Return next yielded value or raise StopIteration.
        """
        raise StopIteration

    @abstractmethod
    def throw(self, typ, val=None, tb=None):
        """Raise an exception in the generator.
        Return next yielded value or raise StopIteration.
        """
        if val is None:
            if tb is None:
                raise typ
            val = typ()
        if tb is not None:
            val = val.with_traceback(tb)
        raise val

    def close(self):
        """Raise GeneratorExit inside generator.
        """
        try:
            self.throw(GeneratorExit)
        except (GeneratorExit, StopIteration):
            pass
        else:
            raise RuntimeError("generator ignored GeneratorExit")

    @classmethod
    def __subclasshook__(cls, C):
        if cls is Generator:
            return _check_methods(C, '__iter__', '__next__',
                                  'send', 'throw', 'close')
        return NotImplemented

特别是,这表明生成器有一个close方法!

那么,如果我们在完成迭代后显式调用生成器的close方法,会发生什么呢?

from __future__ import annotations

import itertools
import time
from collections.abc import Generator


def gen() -> Generator[int]:
    try:
        yield from itertools.count(0)
    finally:
        print("cleaned up")


def main() -> None:
    generator = gen()
    for elem in generator:
        print(elem)
        if elem > 3:
            break
    generator.close()
    print("finished iterating")
    # 执行一些耗时操作。
    time.sleep(2)


if __name__ == "__main__":
    main()

在CPython和pypy上运行似乎都如期工作!但是我们甚至可以通过使用contextlib中的closing上下文管理器做得更好:

from __future__ import annotations

import itertools
import time
from collections.abc import Generator
from contextlib import closing


def gen() -> Generator[int]:
    try:
        yield from itertools.count(0)
    finally:
        print("cleaned up")


def main() -> None:
    with closing(gen()) as generator:
        for elem in generator:
            print(elem)
            if elem > 3:
                break
    print("finished iterating")
    # 执行一些耗时操作。
    time.sleep(2)


if __name__ == "__main__":
    main()

即使在for循环体中抛出异常,我们也会调用生成器清理代码。

很酷,我们终于找到了一个合理的惯用法,确保我们能以合理的信心(至少是所选Python运行时能提供的信心)调用清理代码。

实际上,很少有人会写这样的代码;CPython的清理行为相当可预测。但是当我们进入“异步领域”时会发生什么呢?这是一个类似的例子:

from __future__ import annotations

import asyncio
import itertools
from collections.abc import AsyncGenerator


async def gen() -> AsyncGenerator[int]:
    try:
        # `yield from`不支持异步生成器
        for x in itertools.count(0):
            yield x
    finally:
        # 需要启用“取消”
        await asyncio.sleep(0)
        print("Clean up")


async def main() -> None:
    generator = gen()
    async for elem in generator:
        print(elem)
        if elem > 3:
            break
    print("finished iterating")
    # 执行一些耗时操作。
    await asyncio.sleep(1)


if __name__ == "__main__":
    asyncio.run(main())

用CPython运行这个程序会显示一个类似于我们在用pypy运行普通生成器例子时观察到的问题:清理代码根本不会运行!但是,当我们用pypy运行这个程序时,清理代码却会运行?!

这与异步生成器清理的管理有关:事件循环需要注册异步生成器钩子,以便语言运行时可以在异步生成器对象“终结”时与事件循环通信(在垃圾回收期间处理未使用对象的清理工作)。

如果你想知道为什么await asyncio.sleep(0)存在:asyncio.run会在提供的协程执行完毕后调用取消未完成的任务。异步生成器清理任务就是这样的任务。如果省略了await asyncio.sleep(0),则会执行打印语句。

让我们做一个类似的调整,将gen调用内联到async for循环中,看看会发生什么:

from __future__ import annotations

import asyncio
import itertools
from collections.abc import AsyncGenerator


async def gen() -> AsyncGenerator[int]:
    try:
        for x in itertools.count(0):
            yield x
    finally:
        # 由于“reasons”需要
        await asyncio.sleep(0)
        print("Clean up")


async def main() -> None:
    async for elem in gen():
        print(elem)
        if elem > 3:
            break
    print("finished iterating")
    # 执行一些耗时操作。
    await asyncio.sleep(1)


if __name__ == "__main__":
    asyncio.run(main())

在CPython中,清理现在会执行。这是因为生成器在退出async for循环时终结,因此在await asyncio.sleep(1)之前调度清理任务,从而允许事件循环在协程完成之前“处理”清理任务。在pypy中,清理仍然会执行,但由于垃圾回收行为的差异,只是在await asyncio.sleep(1)完成之后。

最后,让我们看看在异步代码中使用类似的显式清理惯用法。contextlib库提供了一个名为aclosing的异步版本的closing上下文管理器:

from __future__ import annotations

import asyncio
import itertools
from collections.abc import AsyncGenerator
from contextlib import aclosing


async def gen() -> AsyncGenerator[int]:
    try:
        for x in itertools.count(0):
            yield x
    finally:
        # 由于“reasons”需要
        await asyncio.sleep(0)
        print("Clean up")


async def main() -> None:
    async with aclosing(gen()) as generator:
        async for elem in generator:
            print(elem)
            if elem > 3:
                break
    print("finished iterating")
    # 执行一些耗时操作。
    await asyncio.sleep(1)


if __name__ == "__main__":
    asyncio.run(main())

这段代码在CPython和pypy上都有一致且可靠的行为。

显式清理代码在异步代码中特别重要还有一些额外的原因:

结论

在使用生成器(无论是否异步)时,遵循可预测的清理惯用法可能是一个相当好的做法。实际上,大多数人可能不会在普通生成器中使用这些惯用法,但在与异步生成器交互时,真的应该考虑应用这些惯用法。

在研究这篇文章时,我学到了很多。异步生成器是一个有趣的语言特性示例,它需要语言运行时与异步事件循环协同工作。异步生成器清理曾经是个魔法,但现在我认为我对正在发生的事情有了一个合理的心理模型,并且对一些可以帮助确保可预测行为的惯用法有了更好的理解。

额外阅读

原文:https://samgeo.codes/python-generator-cleanup/

Footnotes

  1. 在处理语言模型时,为了改善高延迟 completions 的用户体验,需要大量“流”。

  2. CPython利用引用计数和仅用于处理循环引用的生成垃圾收集器。PyPy使用自己的增量垃圾收集器(没有引用计数)。PyPy的文档提供了对一些差异的合理讨论。