squirrel.iterstream.base

Module Contents

Classes

AsyncContent

Represents content that can be fetched asynchronously.

Composable

A mix-in class that provides stream manipulation functionalities.

class squirrel.iterstream.base.AsyncContent(item: str, func: Callable, executor: concurrent.futures.Executor)

Represents content that can be fetched asynchronously.

Initialize AsyncContent.

Parameters
  • item (str) – Key corresponding to a single item, will be passed to fetch_func.

  • func (Callable) – Function that fetches a given key.

  • executor (concurrent.futures.Executor) – Executor to submit func with item.

value(timeout: int = None)Any

Get the value asynchronously.

Parameters

timeout (int, optional) – Number of seconds to wait for the result. If None, then the future is waited indefinitely. Defaults to None.

Returns

Content.

Return type

Any

class squirrel.iterstream.base.Composable(source: Optional[Union[Iterable, Callable]] = None)

A mix-in class that provides stream manipulation functionalities.

Init

abstract __iter__()Iterator

Abstract iter

async_map(callback: Callable, buffer: int = 100, max_workers: Optional[int] = None, executor: Optional[concurrent.futures.Executor] = None, **kw)_AsyncMap

Applies the callback to the item in the self and returns the result.

Parameters
  • callback (Callable) – a callable to be applied to items in the stream

  • buffer (int) – the size of the buffer

  • max_workers (int) – number of workers in the ThreadPoolExecutor. max_workers is only used when executor is not provided, as the executor already includes the number of max_workers.

  • executor (concurrent.futures.Executor, dask.distributed.Client) –

    an optional executor to be used. By default a ThreadPoolExecutor is created, if no executor is provided. If you need a ProcessPoolExecutor, you can explicitly provide it here. It is also useful when chaining multiple async_map; you can pass the same executor to each async_map to share resources. If dask.distributed.Client is passed, tasks will be executed with the provided client (local or remote).

    Note if the executor is provided, it will not be closed in this function even after the iterator is exhausted.

    Note if executor is provided, the argument max_workers will be ignored. You should specify this in the executor that is being passed.

  • **kw (dict) – key-word arguments for callback

Returns (_AsyncMap)

batched(batchsize: int, collation_fn: Optional[Callable] = None, drop_last_if_not_full: bool = True)_Iterable

Batch items in the stream.

Parameters
  • batchsize – number of items to be batched together

  • collation_fn – Collation function to use.

  • drop_last_if_not_full (bool) – if the length of the last batch is less than the batchsize, drop it

collect()List[Any]

Collect and returns the result of the stream

compose(constructor: Type[Composable], *args, **kw)Composable

Apply the transformation expressed in the __iter__ method of the constructor to items in the stream. If the provided constructor has an __init__ method, then the source argument should not be provided.

dask_map(callback: Callable, **kw)_Iterable

Converts each item in the stream into a dask.delayed object by applying the callback to the item. Specify additional keyword arguments via kw :param callback: callback to be mapped over :type callback: Callback :param **kw: key-word arguments for callback

Returns

mapped Composable (_Iterable)

filter(predicate: Callable)_Iterable

Filters items by predicate callable

flatten()_Iterable

When items in the stream are themselves iterables, flatten turn them back to individual items again

join()None

A method to consume the stream

loop(n: Optional[int] = None)Composable

Repeat the iterable n times.

Parameters

n (int, Optional) – number of times that the iterable is looped over. If None (the default), it loops forever

Note: this method creates a deepcopy of the source attribute, i.e. all steps in the chain of Composables before the loop itself, which must be picklable.

map(callback: Callable, **kw)_Iterable

Applies the callback to each item in the stream. Specify key-word arguments for callback in **kw

materialize_dask(buffer: int = 10, max_workers: Optional[int] = None)_Iterable

Materialize the dask.delayed object by calling compute() method on each item in a thread pool

Parameters
  • buffer (int) – size of the buffer that retrieves the data in parallel from dask

  • max_workers (int) – parameter passed to the ThreadPoolExecutor

Returns (_Iterable)

monitor(callback: Callable[[squirrel.constants.MetricsType], Any], prefix: Optional[str] = None, metrics_conf: squirrel.iterstream.metrics.MetricsConf = MetricsConf, window_size: int = 5, **kw)_Iterable

Iterate through an iterable and calculate the metrics based on a rolling window. Notice that you can configure metrics to output only IOPS or throughput or None. All metrics are by default turned on and calculated. If only one metric is turned on, the calculation of the other metric will be skipped, and a dummy value 0 is reported instead. When all metrics are turned off, this method has no actual effect.

Parameters
  • callback (Callable) – wandb.log, mlflow.log_metrics or other metrics logger.

  • prefix (str) – If not None, will add this as a prefix to the metrics name. Can be used to monitor the same metric in different point in an iterstream in one run. Spaces are allowed.

  • metrics_conf (MetricsConf) – A config dataclass to control metrics calculated. Details see squirrel.metrics.MetricsConf

  • window_size (int) – How many items to average over the metrics calculation. Since each item passes by in a very small time window, for better accuracy, a rolling window cal is more accurate. Its value must be bigger than 0.

  • **kw (dict) – arguments to pass to your callback function.

Returns

An _Iterable instance which can be chained by other funcs in this class.

numba_map(callback: Callable)_NumbaMap

The iterator will be wrapped inside a numba.jit decorator to speed up the iteration. However, this is quite different from the standard asynchronous speed-up and does not always guarantee a better performance than the normal ThreadPoolExecutor, so please use with caution.

Parameters

callback (Callable) – a callback to be applied to each item in the stream

Returns (_NumbaMap)

shuffle(size: Optional[int] = 1000, **kw)Composable

Shuffles items in the buffer, defined by size, to simulate IID sample retrieval.

Parameters

size (int, optional) – Buffer size for shuffling. Defaults to 1000. Skip the shuffle step if size < 2.

Acceptable keyword arguments:

  • initial (int, optional): Minimum number of elements in the buffer before yielding the first element. Must be less than or equal to size, otherwise will be set to size. Defaults to 100.

  • rng (random.Random, optional): Either random module or a random.Random instance. If None, a random.Random() is used.

  • seed (Union[int, float, str, bytes, bytearray, None]): A data input that can be used for random.seed().

sliding(window_size: int, *, deepcopy: bool, stride: int = 1, drop_last_if_not_full: bool = True, min_window_size: int = 1, fill_nan_on_partial: bool = False)Composable

Apply sliding window over the stream.

Parameters
  • window_size (int) – the length of the window

  • deepcopy (bool) – If True, each window will be returned as a deepcopy. If items are mutated in the subsequent steps of the pipeline, this should be set to True, otherwise it should be False. Note that deepcopy may incur a substantial cost, so set this parameter carefully.

  • stride (int) – the distance that the window moves at each step

  • drop_last_if_not_full (bool) – If True, it would only return windows of size window_size and drops the last items which have fewer items.

  • min_window_size (int) – The minimum length of the window for the last remaining elements. This argument is only relevant if drop_last_if_not_full is set to False, otherwise it’s ignored.

  • fill_nan_on_partial (bool) – If drop_last_if_not_full is False, the length of the last few windows will be less than window_size. This argument fill the missing values with None if set to True. This argument take precedence over If min_window_size.

source_(source: Union[Iterable, Callable])Composable

Set the source of the stream

split_by_rank_pytorch(torch_dist_group: Optional[str] = None)Composable

Split the stream into multiple streams, one for each rank in the PyTorch distributed system

Parameters

torch_dist_group (str, optional) – The group name of the PyTorch distributed system. Defaults to None.

split_by_worker_pytorch()Composable

Split the stream into multiple streams, one for each worker in the PyTorch distributed system.

take(n: Optional[int])Composable

Take n samples from iterable

to(f: Callable, *args, **kw)_Iterable

Pipe the iterable into another iterable which applies f callable on it

to_torch_iterable(enforce_rank_check: bool = True, enforce_worker_check: bool = True)Composable

Convert the stream to a torch iterable.

Parameters
  • enforce_rank_check – if set to true, checks that the method split_by_rank_pytorch has been called prior to calling to_torch_iterable. This is important to avoid loading the same sample more than once in the multi-rank pytorch environment.

  • enforce_worker_check – if set to true, checks that the method split_by_worker_pytorch has been called prior to calling to_torch_iterable. This is important to avoid loading the same sample more than once in the multi-worker pytorch environment.

tqdm(**kw)_Iterable

Add tqdm to iterator.

zip_index(pad_length: int = None)Composable

Zip the item in the stream with its index and yield Tuple[index, item]

Parameters

pad_length – if provided, all indexes will be padded with zeros if they have less digits than pad_length, in which case all indexes are str rather than int.