Monitoring with MLflow and WandBΒΆ

In squirrel, performance in iterstream can be calculated and logged. This is done by applying an extra method monitor() into the original chaining iterstream. It can be added into any step in the above example where it is defined. For example, you can add .monitor(callback=wandb.log) right after async_map(times_two) Then the performance of all the previous steps combined will be calculated at this point and the calculated metrics will be passed to any user-specified callback such as wandb.log().

The following is a complete example:

import wandb
import mlflow
import numpy as np

def times_two(x: float) -> float:
    return x * 2

samples = [np.random.rand(10, 10) for i in range(10 ** 4)]
batch_size = 5

with wandb.init(): # or mlflow.start_run()
    it = (
        IterableSource(samples)
        .async_map(times_two)
        .monitor(wandb.log) # or mlflow.log_metrics
        .batched(batch_size)
    )
    it.collect() # or it.take(<some int>).join()

This will create an iterstream with the same transformation logics as it was without the method monitor, but the calculated metrics at step async_map is sent to the callback function wandb.log. (The calculated metrics is of type Dict[str, [int, float]], therefore any function takes such argument can be used to plug into the callback of monitor.)

By default, monitor calculates two metrics: Input/output operations per second (IOPS) and throughput. However, this can be configured by passing a data class squirrel.metrics.MetricsConf to the argument metrics_conf in monitor. For details, see squirrel.iterstream.metrics.

Monitoring at different locations in an iterstream in one run can be achieved by inserting monitor with different prefix:

with wandb.init(): # or mlflow.start_run()
    it = (
        IterableSource(samples)
        .monitor(wandb.log, prefix="(before async_map) ")
        .async_map(times_two)
        .monitor(wandb.log, prefix="(after async_map) ") # or mlflow.log_metrics
        .batched(batch_size)
    )
    it.collect() # or it.take(<some int>).join()

This will generate 4 instead of 2 metrics with each original metric bifurcate into two with different prefixes to track at which point the metrics are generated. (This does not interfere with metrics_conf which determines which metrics should be used in each monitor.)