squirrel.iterstream
¶
At a low level, squirrel iterstream is defined through a class called Composable
.
All chaining methods aredefined within this class, and can be applied at its high level twin class called
IterableSource
.
Submodules¶
Package Contents¶
Classes¶
A mix-in class that provides stream manipulation functionalities. |
|
A specialized version of Composable that accepts a url without instantiating a filesystem instance in the init. |
|
A class that samples from iterables into an iterstream. |
|
A class that turns an iterable to a source of a stream and provides stream manipulation functionalities on top, |
-
class
squirrel.iterstream.
Composable
(source: Optional[Union[Iterable, Callable]] = None)¶ A mix-in class that provides stream manipulation functionalities.
Init
-
abstract
__iter__
() → Iterator¶ Abstract iter
-
async_map
(callback: Callable, buffer: int = 100, max_workers: Optional[int] = None, executor: Optional[concurrent.futures.Executor] = None, **kw) → _AsyncMap¶ Applies the callback to the item in the self and returns the result.
- Parameters
callback (Callable) – a callable to be applied to items in the stream
buffer (int) – the size of the buffer
max_workers (int) – number of workers in the
ThreadPoolExecutor
. max_workers is only used when executor is not provided, as the executor already includes the number of max_workers.executor (concurrent.futures.Executor, dask.distributed.Client) –
an optional executor to be used. By default a
ThreadPoolExecutor
is created, if no executor is provided. If you need aProcessPoolExecutor
, you can explicitly provide it here. It is also useful when chaining multiple async_map; you can pass the same executor to each async_map to share resources. If dask.distributed.Client is passed, tasks will be executed with the provided client (local or remote).Note if the executor is provided, it will not be closed in this function even after the iterator is exhausted.
Note if executor is provided, the argument max_workers will be ignored. You should specify this in the executor that is being passed.
**kw (dict) – key-word arguments for callback
Returns (_AsyncMap)
-
batched
(batchsize: int, collation_fn: Optional[Callable] = None, drop_last_if_not_full: bool = True) → _Iterable¶ Batch items in the stream.
- Parameters
batchsize – number of items to be batched together
collation_fn – Collation function to use.
drop_last_if_not_full (bool) – if the length of the last batch is less than the batchsize, drop it
-
collect
() → List[Any]¶ Collect and returns the result of the stream
-
compose
(constructor: Type[Composable], *args, **kw) → Composable¶ Apply the transformation expressed in the __iter__ method of the constructor to items in the stream. If the provided constructor has an __init__ method, then the source argument should not be provided.
-
dask_map
(callback: Callable, **kw) → _Iterable¶ Converts each item in the stream into a dask.delayed object by applying the callback to the item. Specify additional keyword arguments via kw :param callback: callback to be mapped over :type callback: Callback :param **kw: key-word arguments for callback
- Returns
mapped Composable (_Iterable)
-
filter
(predicate: Callable) → _Iterable¶ Filters items by predicate callable
-
flatten
() → _Iterable¶ When items in the stream are themselves iterables, flatten turn them back to individual items again
-
loop
(n: Optional[int] = None) → Composable¶ Repeat the iterable n times.
- Parameters
n (int, Optional) – number of times that the iterable is looped over. If None (the default), it loops forever
Note: this method creates a deepcopy of the source attribute, i.e. all steps in the chain of Composables before the loop itself, which must be picklable.
-
map
(callback: Callable, **kw) → _Iterable¶ Applies the callback to each item in the stream. Specify key-word arguments for callback in **kw
-
materialize_dask
(buffer: int = 10, max_workers: Optional[int] = None) → _Iterable¶ Materialize the dask.delayed object by calling compute() method on each item in a thread pool
- Parameters
buffer (int) – size of the buffer that retrieves the data in parallel from dask
max_workers (int) – parameter passed to the
ThreadPoolExecutor
Returns (_Iterable)
-
monitor
(callback: Callable[[squirrel.constants.MetricsType], Any], prefix: Optional[str] = None, metrics_conf: squirrel.iterstream.metrics.MetricsConf = MetricsConf, window_size: int = 5, **kw) → _Iterable¶ Iterate through an iterable and calculate the metrics based on a rolling window. Notice that you can configure metrics to output only IOPS or throughput or None. All metrics are by default turned on and calculated. If only one metric is turned on, the calculation of the other metric will be skipped, and a dummy value 0 is reported instead. When all metrics are turned off, this method has no actual effect.
- Parameters
callback (Callable) – wandb.log, mlflow.log_metrics or other metrics logger.
prefix (str) – If not None, will add this as a prefix to the metrics name. Can be used to monitor the same metric in different point in an iterstream in one run. Spaces are allowed.
metrics_conf (MetricsConf) – A config dataclass to control metrics calculated. Details see squirrel.metrics.MetricsConf
window_size (int) – How many items to average over the metrics calculation. Since each item passes by in a very small time window, for better accuracy, a rolling window cal is more accurate. Its value must be bigger than 0.
**kw (dict) – arguments to pass to your callback function.
- Returns
An _Iterable instance which can be chained by other funcs in this class.
-
numba_map
(callback: Callable) → _NumbaMap¶ The iterator will be wrapped inside a numba.jit decorator to speed up the iteration. However, this is quite different from the standard asynchronous speed-up and does not always guarantee a better performance than the normal
ThreadPoolExecutor
, so please use with caution.- Parameters
callback (Callable) – a callback to be applied to each item in the stream
Returns (_NumbaMap)
-
shuffle
(size: Optional[int] = 1000, **kw) → Composable¶ Shuffles items in the buffer, defined by size, to simulate IID sample retrieval.
- Parameters
size (int, optional) – Buffer size for shuffling. Defaults to 1000. Skip the shuffle step if size < 2.
Acceptable keyword arguments:
initial (int, optional): Minimum number of elements in the buffer before yielding the first element. Must be less than or equal to size, otherwise will be set to size. Defaults to 100.
rng (random.Random, optional): Either random module or a
random.Random
instance. If None, a random.Random() is used.seed (Union[int, float, str, bytes, bytearray, None]): A data input that can be used for random.seed().
-
sliding
(window_size: int, *, deepcopy: bool, stride: int = 1, drop_last_if_not_full: bool = True, min_window_size: int = 1, fill_nan_on_partial: bool = False) → Composable¶ Apply sliding window over the stream.
- Parameters
window_size (int) – the length of the window
deepcopy (bool) – If True, each window will be returned as a deepcopy. If items are mutated in the subsequent steps of the pipeline, this should be set to True, otherwise it should be False. Note that deepcopy may incur a substantial cost, so set this parameter carefully.
stride (int) – the distance that the window moves at each step
drop_last_if_not_full (bool) – If True, it would only return windows of size window_size and drops the last items which have fewer items.
min_window_size (int) – The minimum length of the window for the last remaining elements. This argument is only relevant if drop_last_if_not_full is set to False, otherwise it’s ignored.
fill_nan_on_partial (bool) – If drop_last_if_not_full is False, the length of the last few windows will be less than window_size. This argument fill the missing values with None if set to True. This argument take precedence over If min_window_size.
-
source_
(source: Union[Iterable, Callable]) → Composable¶ Set the source of the stream
-
split_by_rank_pytorch
(torch_dist_group: Optional[str] = None) → Composable¶ Split the stream into multiple streams, one for each rank in the PyTorch distributed system
- Parameters
torch_dist_group (str, optional) – The group name of the PyTorch distributed system. Defaults to None.
-
split_by_worker_pytorch
() → Composable¶ Split the stream into multiple streams, one for each worker in the PyTorch distributed system.
-
take
(n: Optional[int]) → Composable¶ Take n samples from iterable
-
to
(f: Callable, *args, **kw) → _Iterable¶ Pipe the iterable into another iterable which applies f callable on it
-
to_torch_iterable
(enforce_rank_check: bool = True, enforce_worker_check: bool = True) → Composable¶ Convert the stream to a torch iterable.
- Parameters
enforce_rank_check – if set to true, checks that the method split_by_rank_pytorch has been called prior to calling to_torch_iterable. This is important to avoid loading the same sample more than once in the multi-rank pytorch environment.
enforce_worker_check – if set to true, checks that the method split_by_worker_pytorch has been called prior to calling to_torch_iterable. This is important to avoid loading the same sample more than once in the multi-worker pytorch environment.
-
tqdm
(**kw) → _Iterable¶ Add tqdm to iterator.
-
zip_index
(pad_length: int = None) → Composable¶ Zip the item in the stream with its index and yield Tuple[index, item]
- Parameters
pad_length – if provided, all indexes will be padded with zeros if they have less digits than pad_length, in which case all indexes are str rather than int.
-
abstract
-
class
squirrel.iterstream.
FilePathGenerator
(url: str, nested: bool = False, max_workers: Optional[int] = None, max_keys: int = 1000000, max_dirs: int = 10, **storage_options)¶ Bases:
squirrel.iterstream.base.Composable
A specialized version of Composable that accepts a url without instantiating a filesystem instance in the init. It simply generates directories under the given url by instantiating a fsspec filesystem and yielding the result of fs.ls(url).
- Parameters
url – the url for which, ls is performed
nested – if True, it attempts to make ls on each directory that it encounters. Otherwise, it will only yields the top-level paths and will not expand if the path is a directory
max_workers (int) – passed to the ThreadPoolExecutor. Only applicable if nested==True
max_keys (int) – maximum number of keys to keep in memory at the same time. If this number is reached, no new expansion on the currently discovered directories is done, until enough keys are yielded to make room for the new ones.
max_dirs (int) – maximum number of parallel ls operation.
**storage_options (dict) – kwargs to pass onto the fsspec filesystem initialization.
-
class
squirrel.iterstream.
IterableSamplerSource
(iterables: List[Iterable], probs: Optional[List[float]] = None, rng: Optional[random.Random] = None, seed: Optional[int] = None)¶ Bases:
squirrel.iterstream.base.Composable
A class that samples from iterables into an iterstream.
Initialize IterableSamplerSource.
- Parameters
iterables (List[Iterable]) – List of iterables to sample from.
probs (Optional[List[float]], optional) – [description]. Defaults to None.
rng (random.Random, optional) – Random number generator to use.
seed (Optional[int]) – An int or other acceptable types that works for random.seed(). Will be used to seed rng. If None, a unique identifier will be used to seed.
-
__iter__
() → Iterator¶ Samples items from the iterables, returns all samples until all iterables are exhausted.
-
class
squirrel.iterstream.
IterableSource
(source: Optional[Union[Iterable, Callable]] = ())¶ Bases:
squirrel.iterstream.base.Composable
A class that turns an iterable to a source of a stream and provides stream manipulation functionalities on top, for instance: - map - map_async - filter - batched - shuffle - and more
For the detailed description of each, please refer to the corresponding docstring in
Composable
.Initialize IterableSource.
- Parameters
source (Union[Iterable, Callable], Optional) – An Iterable that the IterableSource is built based on, or a callable that generates items when called.
-
__iter__
() → Iterator¶ Iterates over the items in the iterable