在使用ThreadPoolExecutor时,常常需要动态地添加或删除线程。虽然线程本身可以简单地结束,但ThreadPoolExecutor需要一个机制来维护所有线程。一种方法是通过在三个步骤中提供完整的回调API来查询执行的线程,另一种是通过查看内部数据来查询执行的线程。
以下是一种通过内部数据抛出异常来从ThreadPoolExecutor终止线程的示例。
import asyncio
from concurrent.futures import ThreadPoolExecutor
async def main_task():
loop = asyncio.get_event_loop()
async with ThreadPoolExecutor() as pool:
for i in range(2):
loop.run_in_executor(pool, task, i)
async def task(index):
try:
while True:
print("Task-{} is running...".format(index))
await asyncio.sleep(1)
except Exception as e:
print("Task-{} is interrupted due to {}".format(index, e))
async def stop_task(loop):
await asyncio.sleep(2)
tasks = [task1, task2] # get task objects from a global list
for task in tasks:
task.cancel()
loop.stop()
if __name__ == "__main__":
loop = asyncio.get_event_loop()
task1 = loop.create_task(task(1))
task2 = loop.create_task(task(2))
loop.create_task(main_task())
loop.create_task(stop_task(loop))
loop.run_forever()
在上述示例中,main_task()
中使用了ThreadPoolExecutor提供了两个线程来运行task()
,stop_task()
在main_task()
后面,会在2秒后调用task1
和task2
的cancel()
方法将两个线程给中止。task()
中捕获了asyncio.CancelledError
异常,然后可以做一些清理等收尾工作。
运行该脚本,可以看到:
Task-1 is running...
Task-2 is running...
Task-1 is running...
Task-2 is running...
Task-1 is interrupted due to Task was cancelled
Task-2 is interrupted due to Task was cancelled
在取消一个正在执行的线程时,除了抛出asyncio.CancelledError
之外,这个线程不能接收任何其他的异常,这意味着所有的例外都需要被捕获并妥善处理。
上一篇:asyncio:如何处理异常