asyncio.as_completed() supposedly accepting `Iterable`, but crashes if input is `Generator`?

2 min read 05-10-2024
asyncio.as_completed() supposedly accepting `Iterable`, but crashes if input is `Generator`?


Unveiling the Mystery: Why asyncio.as_completed() Chokes on Generators

The asyncio.as_completed() function is a powerful tool for orchestrating asynchronous operations in Python. Its promise: to yield completed tasks in the order they finish, regardless of when they were started. However, a curious behavior emerges when using generators as input. While the documentation states asyncio.as_completed() accepts an Iterable, it throws an error when given a generator. Let's delve into the root cause of this discrepancy and find solutions to work around it.

The Scenario:

Imagine a scenario where we want to fetch data from multiple APIs asynchronously. We might write a function using a generator to yield these tasks:

import asyncio
import aiohttp

async def fetch_data(urls):
    async with aiohttp.ClientSession() as session:
        for url in urls:
            async with session.get(url) as response:
                yield asyncio.create_task(response.json())

async def main():
    urls = ["https://api.example.com/data1", "https://api.example.com/data2"]
    tasks = fetch_data(urls) # Generator!

    async for task in asyncio.as_completed(tasks):
        result = await task
        print(f"Result from {result['source']}: {result['data']}")

asyncio.run(main())

Running this code will result in a TypeError: TypeError: 'generator' object is not subscriptable. This seems contradictory to the documentation which states asyncio.as_completed() expects an Iterable.

Unveiling the Root Cause:

The key lies in how asyncio.as_completed() handles its input. It internally utilizes a queue.PriorityQueue for scheduling tasks based on their completion time. This priority queue requires objects to be hashable and comparable, which generators fundamentally are not.

Think of a generator as a magical box that spits out values one at a time. While you can iterate through it, you can't "peek" inside to see what's coming up next, making it difficult to compare or hash.

The Solution:

To overcome this limitation, we need to transform the generator into a proper iterable that asyncio.as_completed() can work with. This can be achieved through a simple workaround:

import asyncio
import aiohttp

async def fetch_data(urls):
    async with aiohttp.ClientSession() as session:
        for url in urls:
            async with session.get(url) as response:
                yield asyncio.create_task(response.json())

async def main():
    urls = ["https://api.example.com/data1", "https://api.example.com/data2"]
    tasks = list(fetch_data(urls)) # Convert to list

    async for task in asyncio.as_completed(tasks):
        result = await task
        print(f"Result from {result['source']}: {result['data']}")

asyncio.run(main())

By converting the generator output into a list, we create a concrete iterable that asyncio.as_completed() can handle without issues.

Key Takeaways:

  • While asyncio.as_completed() expects an Iterable, it's not compatible with generators due to their inherent limitations.
  • To work around this, convert the generator output into a list or any other suitable iterable.
  • Remember that the underlying implementation of asyncio.as_completed() relies on a priority queue which requires hashable and comparable objects.

Additional Considerations:

  • For larger datasets, consider using a collections.deque instead of a list for better memory efficiency.
  • If you need to dynamically generate tasks based on previous results, investigate using an asyncio.Queue to manage tasks efficiently.

By understanding this behavior and employing the right workarounds, you can harness the power of asyncio.as_completed() to manage your asynchronous operations with grace.