squirrel.iterstream

At a low level, squirrel iterstream is defined through a class called Composable. All chaining methods aredefined within this class, and can be applied at its high level twin class called IterableSource.

Package Contents

Classes

Composable

A mix-in class that provides stream manipulation functionalities.

FilePathGenerator

A specialized version of IterableSource that accepts a url without instantiating it eagerly. It simply generates

IterableSamplerSource

A class that samples from iterables into an iterstream.

IterableSource

A class that turns an iterable to a source of a stream and provides stream manipulation functionalities on top,

class squirrel.iterstream.Composable(source: Optional[Iterable] = None)

A mix-in class that provides stream manipulation functionalities.

Init

abstract __iter__(self)Iterator

Abstract iter

async_map(self, callback: Callable, buffer: int = 100, max_workers: Optional[int] = None, executor: Optional[concurrent.futures.Executor] = None)_AsyncMap

Applies the callback to the item in the self and returns the result.

Parameters
  • callback (Callable) – a callable to be applied to items in the stream

  • buffer (int) – the size of the buffer

  • max_workers (int) – number of workers in the ThreadPoolExecutor

  • executor (concurrent.futures.Executor, dask.distributed.Client) – an optional executor to be used. By default a ThreadPoolExecutor is created, if no executor is provided. If you need a ProcessPoolExecutor, you can explicitly provide it here. It is also useful when chaining multiple async_map; you can pass the same executor to each async_map to share resources. If dask.distributed.Client is passes, tasks will be executed with the provided client (local or remote). Note if the executor is provided, it will not be closed in this function even after the iterator is exhausted. Note if executor is provided, the arguments max_worker will be ignored. You should specify this in the executor that is being passed.

Returns (_AsyncMap)

batched(self, batchsize: int, collation_fn: Optional[Callable] = None, drop_last_if_not_full: bool = True)_Iterable

Batch items in the stream.

Parameters
  • batchsize – number of items to be batched together

  • collation_fn – Collation function to use.

  • drop_last_if_not_full (bool) – if the length of the last batch is less than the batchsize, drop it

collect(self)List[Any]

Collect and returns the result of the stream

compose(self, constructor: Type[Composable], *args, **kw)Composable

Compose the items in the stream with items generated by constructor

dask_map(self, callback: Callable)_Iterable

Converts each item in the stream into a dask.delayed object by applying the callback to the item

filter(self, predicate: Callable)_Iterable

Filters items by predicate callable

flatten(self)_Iterable

When items in the stream are themselves iterables, flatten turn them back to individual items again

join(self)None

A method to consume the stream

map(self, callback: Callable)_Iterable

Applies the callback to each item in the stream

materialize_dask(self, buffer: int = 10, max_workers: Optional[int] = None)_Iterable

Materialize the dask.delayed object by calling compute() method on each item in a thread pool

Parameters
  • buffer (int) – size of the buffer that retrieves the data in parallel from dask

  • max_workers (int) – parameter passed to the ThreadPoolExecutor

Returns (_Iterable)

monitor(self, callback: Callable[[squirrel.constants.MetricsType], Any], prefix: Optional[str] = None, metrics_conf: squirrel.iterstream.metrics.MetricsConf = MetricsConf, window_size: int = 5, **kw)_Iterable

Iterate through an iterable and calculate the metrics based on a rolling window. Notice that you can configure metrics to output only IOPS or throughput or None. All metrics are by default turned on and calculated. If only one metric is turned on, the calculation of the other metric will be skipped, and a dummy value 0 is reported instead. When all metrics are turned off, this method has no actual effect.

Parameters
  • callback (Callable) – wandb.log, mlflow.log_metrics or other metrics logger.

  • prefix (str) – If not None, will add this as a prefix to the metrics name. Can be used to monitor the same metric in different point in an iterstream in one run. Spaces are allowed.

  • metrics_conf (MetricsConf) – A config dataclass to control metrics calculated. Details see squirrel.metrics.MetricsConf

  • window_size (int) – How many items to average over the metrics calculation. Since each item passes by in a very small time window, for better accuracy, a rolling window cal is more accurate. Its value must be bigger than 0.

  • **kw – arguments to pass to your callback function.

Returns

An _Iterable instance which can be chained by other funcs in this class.

numba_map(self, callback: Callable)_NumbaMap

The iterator will be wrapped inside a numba.jit decorator to speed up the iteration. However, this is quite different from the standard asynchronous speed-up and does not always guarantee a better performance than the normal ThreadPoolExecutor, so please use with caution.

Parameters

callback (Callable) – a callback to be applied to each item in the stream

Returns (_NumbaMap)

shuffle(self, size: int, **kw)Composable

Shuffles items in the buffer, defined by size, to simulate IID sample retrieval.

Parameters

size (int, optional) – Buffer size for shuffling. Defaults to 1000. Skip the shuffle step if size < 2.

Acceptable keyword arguments:

  • initial (int, optional): Minimum number of elements in the buffer before yielding the first element. Must be less than or equal to bufsize, otherwise will be set to bufsize. Defaults to 100.

  • rng (random.Random, optional): Either random module or a random.Random instance. If None, a random.Random() is used.

  • seed (Union[int, float, str, bytes, bytearray, None]): A data input that can be used for random.seed().

source_(self, source: Composable)Composable

Set the source attribute to the provided source argument

take(self, n: Optional[int])Composable

Take n samples from iterable

to(self, f: Callable, *args, **kw)_Iterable

Pipe the iterable into another iterable which applies f callable on it

tqdm(self, **kw)_Iterable

Add tqdm to iterator.

class squirrel.iterstream.FilePathGenerator(url: str, nested: bool = False, max_workers: Optional[int] = None, max_keys: int = 1000000, max_dirs: int = 10)

Bases: IterableSource

A specialized version of IterableSource that accepts a url without instantiating it eagerly. It simply generates directories under the given url by instantiating a fsspec filesystem and yielding the result of fs.ls(url).

Parameters
  • url – the url for which, ls is performed

  • nested – if True, it attempts to make ls on each directory that it encounters. Otherwise, it will only yields the top-level paths and will not expand if the path is a directory

  • max_workers (int) – passed to the ThreadPoolExecutor. Only applicable if nested==True

  • max_keys (int) – maximum number of keys to keep in memory at the same time. If this number is reached, no new expansion on the currently discovered directories is done, until enough keys are yielded to make room for the new ones.

  • max_dirs (int) – maximum number of parallel ls operation.

__iter__(self)Iterator[str]

Iterator that does ls and yield filepaths under the given url

class squirrel.iterstream.IterableSamplerSource(iterables: List[Iterable], probs: Optional[List[float]] = None, rng: Optional[random.Random] = None, seed: Optional[int] = None)

Bases: squirrel.iterstream.base.Composable

A class that samples from iterables into an iterstream.

Initialize IterableSamplerSource.

Parameters
  • iterables (List[Iterable]) – List of iterables to sample from.

  • probs (Optional[List[float]], optional) – [description]. Defaults to None.

  • rng (random.Random, optional) – Random number generator to use.

  • seed (Optional[int]) – An int or other acceptable types that works for random.seed(). Will be used to seed rng. If None, a unique identifier will be used to seed.

__iter__(self)Iterator

Samples items from the iterables, returns all samples until all iterables are exhausted.

class squirrel.iterstream.IterableSource(source: Iterable = ())

Bases: squirrel.iterstream.base.Composable

A class that turns an iterable to a source of a stream and provides stream manipulation functionalities on top, for instance: - map - map_async - filter - batched - shuffle - and more

For the detailed description of each, please refer to the corresponding docstring in Composable.

Initialize IterableSource.

Parameters

source (Iterable) – An Iterable that the IterableSource is built based on.

__iter__(self)Iterator

Iterates over the items in the iterable