Python资源管理和生成器
我和一位同事一起在Python中实现了一些异步流1抽象。我意识到,在异步生成器中进行可预测的清理存在一些相当严重的问题。
事实上,同样的问题也存在于普通生成器中,但在异步生成器中引起的问题更多。关键是,当我们没有完全消费一个生成器时,我们可以观察到意外的清理行为。
让我们来看一下!
from __future__ import annotations
import itertools
import time
from collections.abc import Generator
def gen() -> Generator[int]:
# 一个包裹在一些清理代码中的无限迭代器的例子。
try:
yield from itertools.count(0)
finally:
print("cleaned up")
def main() -> None:
generator = gen()
# 我们提前跳出这个循环,这意味着我们没有完全消费`generator`。
for elem in generator:
print(elem)
if elem > 3:
break
print("finished iterating")
# 执行一些耗时操作。
time.sleep(2)
if __name__ == "__main__":
main()
运行这个程序时,输出如下:
0
1
2
3
4
finished iterating
cleaned up
这些打印语句的顺序表明,清理代码只有在函数退出时才会执行(例如,当生成器对象的引用计数降为0
时)。
我们可以通过内联调用gen
或者显式使用del
语句来做得更好,以确保在我们开始一些昂贵的操作之前清理生成器。在这个例子中,虽然看起来没什么大问题,但在我们完成操作后立即释放资源(如打开的文件描述符或数据库连接)往往是可取的。
from __future__ import annotations
import itertools
import time
from collections.abc import Generator
def gen() -> Generator[int]:
try:
yield from itertools.count(0)
finally:
print("cleaned up")
def main() -> None:
for elem in gen():
print(elem)
if elem > 3:
break
print("finished iterating")
# 执行一些耗时操作。
time.sleep(2)
if __name__ == "__main__":
main()
这次的输出是:
0
1
2
3
4
cleaned up
finished iterating
因此,我们先清理了!但是,这在很大程度上依赖于CPython的一些实现细节。事实上,当我用pypy
运行同样的代码时,清理从未被执行2!那么我们有什么选择呢?
让我们看看标准库中的一些代码;具体来说,让我们看看Lib/_collections_abc.py
,以更好地了解生成器上可能可用的操作。
这段代码看起来很有希望:
class Generator(Iterator):
__slots__ = ()
def __next__(self):
"""Return the next item from the generator.
When exhausted, raise StopIteration.
"""
return self.send(None)
@abstractmethod
def send(self, value):
"""Send a value into the generator.
Return next yielded value or raise StopIteration.
"""
raise StopIteration
@abstractmethod
def throw(self, typ, val=None, tb=None):
"""Raise an exception in the generator.
Return next yielded value or raise StopIteration.
"""
if val is None:
if tb is None:
raise typ
val = typ()
if tb is not None:
val = val.with_traceback(tb)
raise val
def close(self):
"""Raise GeneratorExit inside generator.
"""
try:
self.throw(GeneratorExit)
except (GeneratorExit, StopIteration):
pass
else:
raise RuntimeError("generator ignored GeneratorExit")
@classmethod
def __subclasshook__(cls, C):
if cls is Generator:
return _check_methods(C, '__iter__', '__next__',
'send', 'throw', 'close')
return NotImplemented
特别是,这表明生成器有一个close
方法!
那么,如果我们在完成迭代后显式调用生成器的close
方法,会发生什么呢?
from __future__ import annotations
import itertools
import time
from collections.abc import Generator
def gen() -> Generator[int]:
try:
yield from itertools.count(0)
finally:
print("cleaned up")
def main() -> None:
generator = gen()
for elem in generator:
print(elem)
if elem > 3:
break
generator.close()
print("finished iterating")
# 执行一些耗时操作。
time.sleep(2)
if __name__ == "__main__":
main()
在CPython和pypy
上运行似乎都如期工作!但是我们甚至可以通过使用contextlib
中的closing
上下文管理器做得更好:
from __future__ import annotations
import itertools
import time
from collections.abc import Generator
from contextlib import closing
def gen() -> Generator[int]:
try:
yield from itertools.count(0)
finally:
print("cleaned up")
def main() -> None:
with closing(gen()) as generator:
for elem in generator:
print(elem)
if elem > 3:
break
print("finished iterating")
# 执行一些耗时操作。
time.sleep(2)
if __name__ == "__main__":
main()
即使在for
循环体中抛出异常,我们也会调用生成器清理代码。
很酷,我们终于找到了一个合理的惯用法,确保我们能以合理的信心(至少是所选Python运行时能提供的信心)调用清理代码。
实际上,很少有人会写这样的代码;CPython的清理行为相当可预测。但是当我们进入“异步领域”时会发生什么呢?这是一个类似的例子:
from __future__ import annotations
import asyncio
import itertools
from collections.abc import AsyncGenerator
async def gen() -> AsyncGenerator[int]:
try:
# `yield from`不支持异步生成器
for x in itertools.count(0):
yield x
finally:
# 需要启用“取消”
await asyncio.sleep(0)
print("Clean up")
async def main() -> None:
generator = gen()
async for elem in generator:
print(elem)
if elem > 3:
break
print("finished iterating")
# 执行一些耗时操作。
await asyncio.sleep(1)
if __name__ == "__main__":
asyncio.run(main())
用CPython运行这个程序会显示一个类似于我们在用pypy
运行普通生成器例子时观察到的问题:清理代码根本不会运行!但是,当我们用pypy
运行这个程序时,清理代码却会运行?!
这与异步生成器清理的管理有关:事件循环需要注册异步生成器钩子,以便语言运行时可以在异步生成器对象“终结”时与事件循环通信(在垃圾回收期间处理未使用对象的清理工作)。
如果你想知道为什么await asyncio.sleep(0)
存在:asyncio.run
会在提供的协程执行完毕后调用取消未完成的任务。异步生成器清理任务就是这样的任务。如果省略了await asyncio.sleep(0)
,则会执行打印语句。
让我们做一个类似的调整,将gen
调用内联到async for
循环中,看看会发生什么:
from __future__ import annotations
import asyncio
import itertools
from collections.abc import AsyncGenerator
async def gen() -> AsyncGenerator[int]:
try:
for x in itertools.count(0):
yield x
finally:
# 由于“reasons”需要
await asyncio.sleep(0)
print("Clean up")
async def main() -> None:
async for elem in gen():
print(elem)
if elem > 3:
break
print("finished iterating")
# 执行一些耗时操作。
await asyncio.sleep(1)
if __name__ == "__main__":
asyncio.run(main())
在CPython中,清理现在会执行。这是因为生成器在退出async for
循环时终结,因此在await asyncio.sleep(1)
之前调度清理任务,从而允许事件循环在协程完成之前“处理”清理任务。在pypy
中,清理仍然会执行,但由于垃圾回收行为的差异,只是在await asyncio.sleep(1)
完成之后。
最后,让我们看看在异步代码中使用类似的显式清理惯用法。contextlib
库提供了一个名为aclosing
的异步版本的closing
上下文管理器:
from __future__ import annotations
import asyncio
import itertools
from collections.abc import AsyncGenerator
from contextlib import aclosing
async def gen() -> AsyncGenerator[int]:
try:
for x in itertools.count(0):
yield x
finally:
# 由于“reasons”需要
await asyncio.sleep(0)
print("Clean up")
async def main() -> None:
async with aclosing(gen()) as generator:
async for elem in generator:
print(elem)
if elem > 3:
break
print("finished iterating")
# 执行一些耗时操作。
await asyncio.sleep(1)
if __name__ == "__main__":
asyncio.run(main())
这段代码在CPython和pypy
上都有一致且可靠的行为。
显式清理代码在异步代码中特别重要还有一些额外的原因:
- 不同的事件循环实现可能会不同地实现异步生成器钩子,因此有更多的奇怪行为表面(请参见一些使用Pyodide的webloop的PyScript示例)。
- 使用运行时上下文的清理代码(例如检查当前运行的异步任务)将观察到由事件循环的异步生成器钩子启动的任务的运行时上下文。
结论
在使用生成器(无论是否异步)时,遵循可预测的清理惯用法可能是一个相当好的做法。实际上,大多数人可能不会在普通生成器中使用这些惯用法,但在与异步生成器交互时,真的应该考虑应用这些惯用法。
在研究这篇文章时,我学到了很多。异步生成器是一个有趣的语言特性示例,它需要语言运行时与异步事件循环协同工作。异步生成器清理曾经是个魔法,但现在我认为我对正在发生的事情有了一个合理的心理模型,并且对一些可以帮助确保可预测行为的惯用法有了更好的理解。
额外阅读
- PEP 525: 描述异步生成器及其实现的PEP。包括关于异步生成器钩子的详细信息。
- PEP 533: 一个“延期”的PEP,提议扩展同步和异步迭代器协议,以支持在for循环中进行确定性清理。
- contextlib文档: 我第一次注意到
async with aclosing(gen()) as generator
这个惯用法的地方。
原文:https://samgeo.codes/python-generator-cleanup/