squirrel.iterstream.base
¶
Module Contents¶
Classes¶
Represents content that can be fetched asynchronously. |
|
A mix-in class that provides stream manipulation functionalities. |
-
class
squirrel.iterstream.base.
AsyncContent
(item: str, func: Callable, executor: concurrent.futures.Executor)¶ Represents content that can be fetched asynchronously.
Initialize AsyncContent.
- Parameters
item (str) – Key corresponding to a single item, will be passed to fetch_func.
func (Callable) – Function that fetches a given key.
executor (concurrent.futures.Executor) – Executor to submit func with item.
-
class
squirrel.iterstream.base.
Composable
(source: Optional[Iterable] = None)¶ A mix-in class that provides stream manipulation functionalities.
Init
-
abstract
__iter__
(self) → Iterator¶ Abstract iter
-
async_map
(self, callback: Callable, buffer: int = 100, max_workers: Optional[int] = None, executor: Optional[concurrent.futures.Executor] = None) → _AsyncMap¶ Applies the callback to the item in the self and returns the result.
- Parameters
callback (Callable) – a callable to be applied to items in the stream
buffer (int) – the size of the buffer
max_workers (int) – number of workers in the ThreadPoolExecutor
executor (concurrent.futures.Executor, dask.distributed.Client) – an optional executor to be used. By default a ThreadPoolExecutor is created, if no executor is provided. If you need a ProcessPoolExecutor, you can explicitly provide it here. It is also useful when chaining multiple async_map; you can pass the same executor to each async_map to share resources. If dask.distributed.Client is passes, tasks will be executed with the provided client (local or remote). Note if the executor is provided, it will not be closed in this function even after the iterator is exhausted. Note if executor is provided, the arguments max_worker will be ignored. You should specify this in the executor that is being passed.
Returns (_AsyncMap)
-
batched
(self, batchsize: int, collation_fn: Optional[Callable] = None, drop_last_if_not_full: bool = True) → _Iterable¶ Batch items in the stream.
- Parameters
batchsize – number of items to be batched together
collation_fn – Collation function to use.
drop_last_if_not_full (bool) – if the length of the last batch is less than the batchsize, drop it
-
collect
(self) → List[Any]¶ Collect and returns the result of the stream
-
compose
(self, constructor: Type[Composable], *args, **kw) → Composable¶ Compose the items in the stream with items generated by constructor
-
dask_map
(self, callback: Callable) → _Iterable¶ Converts each item in the stream into a dask.delayed object by applying the callback to the item
-
filter
(self, predicate: Callable) → _Iterable¶ Filters items by predicate callable
-
flatten
(self) → _Iterable¶ When items in the stream are themselves iterables, flatten turn them back to individual items again
-
map
(self, callback: Callable) → _Iterable¶ Applies the callback to each item in the stream
-
materialize_dask
(self, buffer: int = 10, max_workers: Optional[int] = None) → _Iterable¶ Materialize the dask.delayed object by calling compute() method on each item in a thread pool
- Parameters
Returns (_Iterable)
-
monitor
(self, callback: Callable[[squirrel.constants.MetricsType], Any], prefix: Optional[str] = None, metrics_conf: squirrel.iterstream.metrics.MetricsConf = MetricsConf, window_size: int = 5, **kw) → _Iterable¶ Iterate through an iterable and calculate the metrics based on a rolling window. Notice that you can configure metrics to output only IOPS or throughput or None. All metrics are by default turned on and calculated. If only one metric is turned on, the calculation of the other metric will be skipped, and a dummy value 0 is reported instead. When all metrics are turned off, this method has no actual effect.
- Parameters
callback (Callable) – wandb.log, mlflow.log_metrics or other metrics logger.
prefix (str) – If not None, will add this as a prefix to the metrics name. Can be used to monitor the same metric in different point in an iterstream in one run. Spaces are allowed.
metrics_conf (MetricsConf) – A config dataclass to control metrics calculated. Details see squirrel.metrics.MetricsConf
window_size (int) – How many items to average over the metrics calculation. Since each item passes by in a very small time window, for better accuracy, a rolling window cal is more accurate. Its value must be bigger than 0.
**kw – arguments to pass to your callback function.
- Returns
An _Iterable instance which can be chained by other funcs in this class.
-
numba_map
(self, callback: Callable) → _NumbaMap¶ The iterator will be wrapped inside a numba.jit decorator to speed up the iteration. However, this is quite different from the standard asynchronous speed-up and does not always guarantee a better performance than the normal ThreadPoolExecutor, so please use with caution.
- Parameters
callback (Callable) – a callback to be applied to each item in the stream
Returns (_NumbaMap)
-
shuffle
(self, size: int, **kw) → Composable¶ Shuffles items in the buffer, defined by size, to simulate IID sample retrieval.
- Parameters
size (int, optional) – Buffer size for shuffling. Defaults to 1000. Skip the shuffle step if size < 2.
Acceptable keyword arguments:
initial (int, optional): Minimum number of elements in the buffer before yielding the first element. Must be less than or equal to bufsize, otherwise will be set to bufsize. Defaults to 100.
rng (random.Random, optional): Either random module or a
random.Random
instance. If None, a random.Random() is used.seed (Union[int, float, str, bytes, bytearray, None]): A data input that can be used for random.seed().
-
source_
(self, source: Composable) → Composable¶ Set the source attribute to the provided source argument
-
take
(self, n: Optional[int]) → Composable¶ Take n samples from iterable
-
to
(self, f: Callable, *args, **kw) → _Iterable¶ Pipe the iterable into another iterable which applies f callable on it
-
tqdm
(self, **kw) → _Iterable¶ Add tqdm to iterator.
-
abstract