DaskΒΆ

Scaling out to a dask cluster only requires changing a single line of code:

from dask.distributed import Client

def io_bound(item):
    print(f"{item} io_bound")
    time.sleep(1)
    return item

client = Client()

it = IterableSource([1, 2, 3]).async_map(io_bound, executor=client)
t1 = time.time()
for item in it:
    print(item)
print(time.time() - t1)

In this example, a task is submitted and the result is gathered. An alternative would be to call dask_map instead of async_map, which transforms items in the stream into dask.delayed.Delayed objects. This pattern makes it possible to load and transform the data in a dask cluster and only load the fully ready data into the local machine.

it = IterableSource([1, 2, 3]).dask_map(io_bound).dask_map(lambda item: item + 1).materialize_dask()
t1 = time.time()
for item in it:
    print(item)
print(time.time() - t1)

Note that after calling dask_map for the first time, you can chain more dask_maps, which are then operating on the dask.delayed.Delayed objects, so that the data and the operations live on the dask cluster until materialize_dask is called.