IterStream

Squirrel provides an API for chaining iterables. The functionality is provided through the Composable class, which acts as a base class for most classes in IterStream.

Stream Processing Methods

The Composable class offers three kinds of methods for processing streams.

  • Source: The first node in the stream that generates items or wraps an iterable, for instance IterableSource.

  • Transformations: Provide a way to apply transformations on items in the stream, such as map() and filter(), or manipulate the stream itself, such as shuffle(), batched().

  • Terminal: join(), collect(). These methods are used to consume the stream.

Example Workflow

from squirrel.iterstream import IterableSource
import time

it = IterableSource([1, 2, 3, 4])
for item in it:
    print(item)

IterableSource is a Composable and has several methods to conveniently load data, given an iterable as the input:

it = IterableSource([1, 2, 3, 4]).map(lambda x: x + 1).async_map(lambda x: x ** 2).filter(lambda x: x % 2 == 0)
for item in it:
    print(item)

map_async() applies the provided function asynchronously. More on this in the following sections. To pass additional arguments to the mapped function, simply pass them as additional key-word arguments to map. This works analogously for async_map and dask_map.

it = IterableSource([1, 2, 3, 4]).map(lambda x, offset: x + offset, offset=1)

In addition to explicitly iterating over the items, it’s also possible to call collect() to collect all items in a list, or join() to iterate over items without returning anything.

Items in the stream can be shuffled in the buffer and batched

it = IterableSource(range(10)).shuffle(size=5).map(lambda x: x+1).batched(batchsize=3, drop_last_if_not_full=True)
for item in it:
    print(item)

Note that the argument drop_last_if_not_full (default True) will drop the last batch if its size is less than batchsize argument; so, only 3 items will be printed above.

Items in IterableSource can be composed by providing a Composable in the compose() method:

from squirrel.iterstream import Composable

class MyIter(Composable):
    def __init__(self):
        super().__init__()

    def __iter__(self):
        for i in iter(self.source):
            yield f"_{i}", i

it = IterableSource([1, 2, 3]).compose(MyIter)
for item in it:
    print(item)

To see how you can chain custom Composables with compose(), see the advanced section for IterStream.

Note

Note that when defining a custom Composable that is meant to be used as Source, i.e. the first step in the pipeline, the source argument in the constructor must be present, but if it is Transformation or Terminal, i.e. any step except the first step in the pipeline, you have to omit the source argument in the constructor signature of the custom Composable class because the source of your custom Composable is automatically set by Squirrel to the Composable that it operates on.

Combining multiple iterables can be achieved using IterableSamplerSource:

from squirrel.iterstream import IterableSamplerSource

it1 = IterableSource([1, 2, 3]).map(lambda x: x + 1)
it2 = [1, 2, 3]

res = IterableSamplerSource(iterables=[it1, it2], probs=[.7, .3]).collect()
print(res)
assert sum(res) == 15

Note that you can pass the probabilities of sampling from each iterator. When an iterator is exhausted, the probabilities are normalized.

Asynchronous execution

Part of the fast speed from iterstream thanks to squirrel.iterstream.base.Composable.async_map(). This method carries out the callback function you specified to each item in the stream asynchronously, therefore offers a large speed-up.

def io_bound(item):
    print(f"{item} io_bound")
    time.sleep(1)
    return item

it = IterableSource([1, 2, 3]).async_map(io_bound, max_workers=4).async_map(io_bound, max_workers=None)
t1 = time.time()
for i in it:
    print(i)
print(time.time() - t1)

By default, async_map instantiates a ThreadPoolExecutor (executor=None). It also accepts ProcessPoolExecutor, which is a good choice when performing cpu-bound operations on a single machine.

The argument max_workers defines the maximum number of workers/threads the ThreadPoolExecutor uses when executor=None. By default, max_workers=None relies on an internal heuristic of the ThreadPoolExecutor to select a reasonable upper bound. This may differ between Python versions. See the documentation of ThreadPoolExecutor for details.

In the above example, two ThreadPoolExecutors are created, one with an upper bound of 4 threads and the other with a smart upper bound. After the iterator is exhausted, both of these pools will be closed.

If executor is provided, no internal ThreadPoolExecutor is created and managed. As a result, max_workers is ignored since the provided executor already includes the information and the executor has to be manually closed.

from concurrent.futures import ThreadPoolExecutor
tpool = ThreadPoolExecutor(max_workers=4)

def io_bound(item):
    print(f"{item} io_bound")
    time.sleep(1)
    return item

it = IterableSource([1, 2, 3]).async_map(io_bound, executor=tpool).async_map(io_bound, executor=tpool)
t1 = time.time()
for i in it:
    print(i)
print(time.time() - t1)

# now the external pool needs to be manually closed
tpool.shutdown()

In the above example, a ThreadPoolExecutor is created with a maximum of 4 workers. This pool of workers is shared among both async_map calls. After exhausting the iterator, the tpool is shutdown.