Softlandia background

Softlandia

Blog

Distributed Data Science with Metaflow and Dask

Data science projects may start as small experiments on a laptop, but eventually compute-needs grow beyond a single machine’s capabilities. Sooner or later it becomes necessary to automate and schedule data processing jobs. At this point, projects need to be moved to the cloud or a local computer cluster. While cloud deployment is a natural evolution of data science projects, it may also be a bottleneck, in case infrastructure does not allow a seamless transition between local and remote resources.

There are two aspects to consider when moving to the cloud: the first is deploying workflows that define how data is loaded and processed, and the second is distributed computation of specific tasks, such as map-reduce jobs. Sometimes it is enough to just deploy your workflows on a powerful virtual machine and run all computations there. Nevertheless, it’s good to have a dedicated compute-cluster for executing specific algorithms in a distributed manner, for maintainability and scalability reasons.

Worry not, for we can make such an infrastructure painless to set up and efficient to utilize. This is one of the missions of Metaflow, a free, open source tool for orchestrating data science workflows and projects. Metaflow is developed by Outerbounds. Metaflow covers the full data science stack, from writing data science experiments to scheduling data workflows to run automatically in the cloud. With Metaflow, flows are defined as steps of a directed acyclic graph. By writing code in flows, you get goodies such as experiment versioning, data artifact storing, collaboration capabilities, retry and resume possibilities as well as the ability to deploy steps or complete graphs to run in the cloud. 

While Metaflow itself scales with available compute resources, for example when running parallel tasks, it is not a compute engine that could distribute specific numerical routines to multiple computers. When your data processing requires more CPUs or memory than a single computer can provide, a tool such as Dask or Spark is needed to distribute the computations across multiple computers.

Here we’ll show you how to run Metaflow in the cloud with Kubernetes, and run distributed computing with Dask in Kubernetes as well. We’ll also show the Metaflow UI for keeping track of experiments. We’ll host Kubernetes in Azure Kubernetes Service (AKS), where we can scale Metaflow and Dask nodes easily. Then, we’ll write a workflow that can utilize these cloud resources. When the flow is run, simple arguments will control whether the cloud or local resources are used. Neat!

We’ve previously covered how Metaflow compares to Azure machine learning tools and how to get started with writing data science tasks in Metaflow. Now we’ll set up the necessary cloud infrastructure for Metaflow and Dask to run flows and computations in the cloud.

Setting up the infrastructure

The following steps are typically performed by a dedicated infrastructure team, and once the infrastructure is available, the data scientists simply need to know these resources are at their disposal. If you're eager to see the code, feel free to skim through this part 🙂

Kubernetes

Kubernetes is an open source tool for managing containerized application ecosystems. It is the backbone of many cloud-based solutions, providing scalability and robustness. Many cloud platforms provide Kubernetes hosting, and in this tutorial we will be using Microsoft’s Azure Kubernetes Service. It provides a handy command line tool as well as a web portal for managing your clusters.

We’ll deploy Kubernetes in AKS using Terraform templates provided by Outerbounds for Metaflow’s Azure deployment! For the deployment and for managing our Kubernetes cluster, we’ll want to install Kubectl. Please see https://kubernetes.io/docs/tasks/tools/ for instructions.

Metaflow

To run Metaflow in the cloud, some infrastructure needs to be provisioned. Metaflow’s Azure deployment will set up the cloud with a data store for artifacts produced in flows, a service for tracking experiments and the necessary services for running the Metaflow UI. Locally, we’ll want to install

Metaflow documentation gives great instructions for provisioning the necessary cloud resources in Azure. The setup is automated through Terraform, you can find the instructions here https://outerbounds.com/engineering/deployment/azure-k8s/deployment/. We recommend to check the following from the Terraform templates before proceeding: 

  • In variables.tf check that location is correct for you, we change it to westeurope

  • In infra/kubernetes.tf you may scale down the maximum number of virtual machines, although this can be changed post-deployment as well

  • If this is not a production cluster, you may also change the VM sizes, for example vm_size = "Standard_B2ms", to keep costs down

You don't need to worry about the virtual machine sizes too much, since they can always be changed with AKS. Once the Terraforming has been completed, you’re shown your Azure tenant ID, a client ID and a client secret. It may be helpful to write these to a file so that you can easily export them as environment variables, to allow Metaflow flows to authenticate to Azure.


If you followed Metaflow’s Azure deployment instructions, you cloned the Metaflow-tools repository https://github.com/outerbounds/metaflow-tools. There, in the scripts/ folder, you’ll find a Python script forward_metaflow_ports.py. Run this in your virtual environment, and it will use kubectl port forwarding to allow you to access cloud resources from your local machine! You should now be able to view the Metaflow UI, by default at http://localhost:3000/:

A screenshot of the Metaflow UI, after running some flows to process electricity spot price data.

A screenshot of the Metaflow UI, after running some flows to process electricity spot price data.

Dask

Dask is an open source tool for running Numpy, Pandas and Scikit-learn routines on distributed systems. It’s handy when you need more compute resources than your local machine or a single cloud node can provide. As your data sets grow, a tool like Dask quickly becomes a necessity.

While we can use Metaflow to parallelize steps of our data processing workflow, Metaflow is agnostic to the actual data processing tools used. In data science, we'll often use tools like Pandas or Pytorch for data handling and modeling within the flows. If we need to distribute specific operations  to multiple machines, we’ll use a tool for that. This is exactly what Dask does.

A Dask cluster can be provisioned with Kubernetes! This allows you to create workers flexibly to meet your resource needs. To provision a Dask cluster to Kubernetes, we’ll use Helm, which you can install with these instructions https://helm.sh/docs/intro/install/.

We’ll use the Dask chart for Helm, the long instructions are found from the Dask tutorial, the short way is running

helm repo add dask https://helm.dask.org/
helm repo update

Before you install the Dask Helm chart, it’s good to be mindful of configuration options. You can see all options with helm values dask/dask. For example, we may scale down the number of worker nodes, and provision the Dask resources with

helm install daskcluster dask/dask --set worker.replicas=2,jupyter.enabled=false

This will install Dask resources to the same Kubernetes cluster where Metaflow is, although it would be entirely possible to use a separate cluster. We disable the Jupyter service, which we won’t use here. daskcluster is a name you can choose. After installation, you should have a Dask scheduler and two workers under the name daskcluster. Confirm with kubectl get pods:

Kubectl pods after installing Metaflow and Dask

You should see the Dask scheduler, workers as well as the Metaflow metadata service and UI services. 

The Helm installation also gives you instructions on how to use kubectl port forwarding to make the scheduler available from your local machine. If you missed it, view it again with helm status daskcluster. You also get access to the Dask UI where you can follow submitted Dask jobs! I like to save this info to a shell script that can be run when I start working:

export DASK_SCHEDULER="127.0.0.1"
export DASK_SCHEDULER_UI_IP="127.0.0.1"
export DASK_SCHEDULER_PORT=8786
export DASK_SCHEDULER_UI_PORT=8787
kubectl port-forward --namespace default svc/daskcluster-scheduler $DASK_SCHEDULER_PORT:8786 &
kubectl port-forward --namespace default svc/daskcluster-scheduler $DASK_SCHEDULER_UI_PORT:80

Notice that by default the Helm chart suggests using port 8080 for the Dask UI, but this clashes with the Metaflow service default, so I’ve changed that setting for Dask to port 8787.

We’re done with the infra! Let’s recap:

  1. Provision Metaflow cloud resources to Azure using the Terraform template

  2. Provision Dask resources using Helm

  3. Forward Metaflow ports with Kubectl

  4. Forward Dask ports with Kubectl

Cloud power-up

Let’s use Metaflow to define a flow that will load data and deploy a distributed computation job using Dask. We’ll run both, the flow and the Dask job in Kubernetes. The code is below, next we’ll unpack it piece by piece.

"""Analyze a dataframe of spot price data."""
from metaflow import FlowSpec, step, Flow, Parameter, conda_base


@conda_base(
    libraries={"dask": "2022.10.2"}, python="3.9.12"
)
class DistributedAnalysisFlow(FlowSpec):
    """Run analysis and modeling of electricity prices."""

    n_partitions = Parameter(
        "partitions",
        help="Number of Dask partitions",
        required=False,
        default=2,
        type=int,
    )
    dask_scheduler = Parameter(
        "client",
        help="Address of Dask scheduler",
        required=False,
        default="127.0.0.1:8786",
        type=str,
    )

    @step
    def start(self):
        """Load results from a previous flow."""
        self.preprocess_flow = Flow("PreprocessElspotFlow")
        self.spot_data = self.preprocess_flow.latest_run["end"].task.data.spot_data
        self.next(self.build_task)

    @step
    def build_task(self):
        """Select only numeric data to use for further processing."""
        import dask.dataframe as dd

        # Create a Dask DataFrame
        self.dask_df = dd.from_pandas(self.spot_data, npartitions=self.n_partitions)

        # Start forming the Dask job to be submitted to a Dask cluster, taking
        # advantage of lazy evaluation..

        # Keep numeric columns, and drop nans, which were generated when
        # computing lagging features for time series data
        self.task = self.dask_df.select_dtypes(["number"]).dropna()

        # We'll compute the Pearson correlation matrix of the data
        self.task = self.task.corr()

        self.next(self.submit)

    @step
    def submit(self):
        """Compute a low-dimensional representation of the data set."""
        from dask.distributed import Client

        # Connect to a scheduler for Dask jobs. The client class can't be pickled, so we
        # don't save it as an artifact!
        # Note about timeout: if you spawn the scheduler pod on the fly using
        # DaskKubernetes or DaskHelm, reserve at least 10 seconds.
        client = Client(self.dask_scheduler, timeout=20)

        # Submit the job and call result() to block until job is finished
        self.result = client.submit(self.task.compute).result()

        self.next(self.end)

    @step
    def end(self):
        """End the flow."""


if __name__ == "__main__":
    flow = DistributedAnalysisFlow()

This file imports all the Metaflow goodies that are needed. It then defines the DistributedAnalysisFlow, decorated with conda_base. This is how an isolated environment for this flow can be created. By instructing the Python version and the needed Python libraries, Kubernetes also knows how to prepare pods for running the flow.

This flow has two parameters, the number of partitions and the client address. The partition number tells Dask how to distribute the data that will be processed. The client parameter tells Dask where to find the scheduler that controls the worker nodes. The defaults are reasonable for running the flow locally, if you have a local scheduler or are forwarding the Kubernetes scheduler to the default client address.

The start-step loads data from a previously run PreprocessElsportFlow. This is an artifact that was created and stored to the cloud by Metaflow, and we use the Metaflow Client API to access the artifact here. Quite useful for passing data between flows without writing save/load methods. You can load up any of your own data here.

The build task uses Dask to set up a chain of operations that will compute correlations between variables in our data. Since Dask jobs are evaluated lazily, this step won’t yet compute any results.

The submit-task connects to the Dask scheduler with the provided client address, and evaluates the actual job using our worker nodes! If you’re watching the Dask UI, at this step you should see how much resources were used across workers. The result of this computation is saved as an artifact, simply by assigning it to self.result.

Notice that we import Dask modules inside the steps, rather than on the flow level. This is necessary to ensure that the ephemeral pods, where Metaflow runs tasks, always have the Conda-environment ready before importing 3rd party libraries.

Now we are ready to run the flow! Firstly, to run the Metaflow steps locally while using the Dask cluster for computation, we’ll run

python flow_distributed_analysis.py --environment=conda run --client=127.0.0.1:8786 

Here, the client address is pointing to the address that our kubectl port forwarding is relaying from our Kubernetes cloud cluster. If you have a local Dask cluster, you may use that IP+port as well.

When we want to also run the flow steps in the cloud using Kubernetes, we’ll run

python flow_distributed_analysis.py --environment=conda run --client=10.0.82.120:8786 --with kubernetes

and Metaflow will schedule each step of the flow to run in pod with Kubernetes! Notice that we changed the Dask client address; now that everything is running in Kubernetes, our local forwarded address won’t work. To find the Dask Scheduler address inside your Kubernetes cluster, run kubectl get services, and look for daskcluster-scheduler CLUSTER-IP as well as PORT.

Now you have perfect control over your cloud resource and data science workflows!

Summary

You’ve scaled up your flow to run in the cloud, freed from the resource restrictions of your laptop! The nicest thing is that our code is not cluttered with infrastructure details, instead it’s focused on getting the data science done.

Metaflow is an excellent tool for improving data scientist productivity. It allows us to think about the problems we want to solve, rather than worry about restrictions of our infrastructure. Here we combined it with a bunch of open source tools, namely Dask and Kubernetes, to efficiently make use of cloud resources. In upcoming blog posts, we’ll look at scheduling flows to run completely automatically. If these features sound beneficial for your organization, please get in touch with us!