Softlandia background

Softlandia

Blog

Metaflow in Practice

In this post, we’ll show how we use Metaflow in our daily data sciencing. We use Metaflow to write production-grade data science workflows, and we adopt it early in our projects, already in the experimentation phase. Here we’ll demonstrate, with examples, how to

  • Add arguments to flows

  • Version flow exections

  • Store artifacts and outputs

  • Parallelize code

  • Connect flows

  • Employ a cloud backend

First, a recap. What is Metaflow? Metaflow is a production-grade, Python-based orchestration tool that helps data scientists to organize, version, execute, track and share data processing and analysis code and results. Yes, that’s a lot of stuff. Metaflow was developed at Netflix, but these days it is free, open source and maintained by Outerbounds. You can run Metaflow locally, or host it yourself in your cloud, so that data never leaves your (virtual) premises.

In brief, Metaflow organizes data workflows as directed acyclic graphs (DAGs). The actual data preparation and analysis are defined as steps of the DAG, and the choice of actual tools for working with data can be chosen freely. The same code can then be executed locally or in the cloud against a compute backend, so that minimal infrastructure details leak into the data processing workflows. We had the pleasure of being early adopters of Metaflow's Azure cloud backend, and we shared our experiences here. We are also using Metaflow on top of other solutions to enable our YOKOT.AI Private Enterprise Generative AI to process sensitive business data in a secure and private manner.

Metaflow has a couple of components. We’ll demonstrate flows, data store and the metadata service here. For full details of Metaflow’s productivity-boosting capabilities, you may head to the docs. Before we get anything done, we’ll need to install Metaflow. Since Metaflow is a free Python package, it’s as simple as

pip install metaflow

which we of course run in a virtual environment. Verify your installation with

metaflow status

which should print Metaflow version and other useful information.

The basics

Flows consists of steps, which are defined using the step-decorator. Two steps are required in every flow, and those are the start and end steps. To indicate the directions of the DAG, each step ends with self.next() method. Within the steps, you write your data processing logic using the tools you prefer. Metaflow hence allows the data scientists to keep using their favourite frameworks for the sciencing part. This basic knowledge is enough to get going!

Now the magic starts. When a DAG is run, Metaflow will by default check that there are no linter errors in the code, and if it finds any we’ll be prompted to fix them. This way you’ll catch errors before you spend precious compute time.

Metaflow stores metadata of each flow we run. This automatically leaves a trace of every execution of every flow, allowing us to inspect inputs, outputs, logs, run times and variables of past flows. Without any configuration, this metadata will be stored locally on the machine where the flow was run. We can then use commands such as python flow.py logs to inspect previously run flows, their parameters, run times and results. Cool! There’s also dump for inspecting variables and the extremely useful resume, which allows you to execute a flow from a specific step, bypassing steps before it.

Artifacts are a key concept in Metaflow. The stored metadata will attach flow runs to artifacts, such as parameters and any other data generated by flows. Artifacts are also how data propagates from step to another inside a flow. Defining artifacts is simple, just attach variables to the flow instance using self! Everything stored to self will be accessible to downstream steps of the flow as well as outside the flow through flow metadata, using Metaflow’s client API. You can save even large artifacts without worry, since they will be compressed and hashed, and by comparing hashes Metaflow will not write duplicates unnecessarily!

The code

Let’s walk through an example for processing electricity spot prices using Metaflow. We’ll define a flow that reads Excel files of electricity spot prices, preprocesses them in parallel, and returns a joined Pandas Dataframe. In this simple process, Metaflow embedded a lot of magic for us:

"""A flow to preprocess xlsx files of hourly electricity spot prices."""
import os 

from metaflow import FlowSpec, step, Parameter
import pandas as pd

from elspot_settings import Constants

class PreprocessElspotFlow(FlowSpec):
    """Read elspot xlsx files into a dataframe."""

    data_dir = Parameter("data", help="Folder of weekly elspot data.", required=True)

    @step
    def start(self):
        """Get started."""
        # Find our data from the directory given as parameter
        self.file_paths = [
            os.path.join(self.data_dir, f)
            for f in os.listdir(self.data_dir)
            if f.endswith(".xlsx")
        ]
        # we'll do preprocessing in parallel for each file
        self.next(self.preprocess, foreach="file_paths")

    @step
    def preprocess(self):
        """Process a single xlsx file."""
        self.prices = pd.read_excel(self.input, names=[
            Constants.datetime, Constants.price])
        self.prices.datetime = self.prices.datetime.apply(pd.to_datetime)
        self.prices[Constants.day_name] = self.prices.datetime.dt.day_name()
        self.next(self.join)

    @step
    def join(self, inputs):
        """Merge preprocessed dataframes."""
        self.spot_data = pd.concat([i.prices for i in inputs])
        self.next(self.end)

    @step
    def end(self):
        """End the flow."""

if __name__ == "__main__":
    PreprocessElspotFlow()

Let’s have a look at some patterns and features the code snippet above implements.

With the above code, we defined a required input parameter that our flow needs, namely data_dir in the code. That’s all the argument parsing we need to implement manually! We could now execute the program using

python flow_preprocess_elspot.py run --data path/to/data/folder

which makes self.data_dir available to each step of the flow. You may run the code with the --help switch to see all your parameter descriptions, as well as default options.

The flow takes as input argument a path to a folder that contains sheets of data in separate files. We are going to read and preprocess each in parallel. At the end of the start step, we call

self.next(self.preprocess, foreach="file_paths")

which tells Metaflow that next we’ll run the preprocess step for each item in the file_paths list.

In the preprocess step, self.input will contain one of the single file paths from the previous step, and we’ll write the method accordingly. At the end, we call self.next(self.join), where we merge the results of the parallel processing. The join method has a special signature:

def join(self, inputs):
    """Merge preprocessed dataframes."""
    self.spot_data = pd.concat([i.prices for i in inputs])

where inputs is a list that contains references to each invocations of the preprocess method, and their artifacts! Here we simply concatenate the preprocessed Dataframes into a single Dataframe.

And that’s how we parallilize flow steps! Once you deploy the flow into the cloud, Metaflow can automatically execute parallel steps on dedicated compute nodes.

It is good practice to keep flows focused and short. This will help to version, execute and schedule flows in a flexible and efficient manner. Towards this end, connecting flows, for example making the prediction flow use the model from the training flow, should be simple. And it is! The secret sauce is the Metaflow client API. Watch it in action below: we’ll define a new flow that uses our preprocessed electricity spot price data

"""Analyse a dataframe of spot price data."""
from metaflow import FlowSpec, step, Flow


class AnalyzeElspotFlow(FlowSpec):
    """Run analysis and modeling of electricity prices."""

    @step
    def start(self):
        """Load results from a previous flow."""
        self.preprocess_flow = Flow("PreprocessElspotFlow")
        self.spot_data = self.preprocess_flow.latest_run["join"].task.data.spot_data
        self.next(
            self.find_cheap_hours, self.find_expensive_hours, self.find_price_ranks
        )

    ...

if __name__ == "__main__":
    flow = AnalyzeElspotFlow()

Notice how we instantiate a Flow object with the name of our preprocessing flow. This is how we use Metaflow’s client API to access data and artifacts of other flows. The syntax is pretty:

self.spot_data = self.preprocess_flow.latest_run["join"].task.data.spot_data

we load the preprocessed spot price data that was stored as an artifact of the latest run of the preprocessing flow. And just like that, we have connected an artifact of one flow to another flow. Of course, we could have written the data into a file and have loaded it here. But the benefits of the client API are that we get to abstract away the file system details, allowing us to use the exact same code locally, in ephemeral containers, in the cloud… It’s a handy pattern for example for having a training flow that produces a model, and a prediction flow that always uses the latest trained model.

The cloud

When we want to store metadata in a centralized location, we’ll set up a Metaflow service LINK. With a centralized metadata service, we can share our flow executions with others, which is mighty helpful for collaboration, debugging and tracking of work. To execute flows against a service, we’ll run

python flow_preprocess_elspot.py --metadata=service run --data path/to/data/folder

which tells Metaflow to look up metadata service information from a configuration file, typically ~/.metaflowconfig/config.json. This is how we go from local experimentation to collaboration and centralized experiment tracking! When we run flows on schedule in the cloud, the service is essential for monitoring our data science workflows. Currently, there is great support for running the service locally, in Azure or in AWS.

The artifact store is separate from the metadata store, which allows us to control the data we actually save in the cloud and locally. We’ll run

python flow_preprocess_elspot.py --datastore=azure run --data path/to/data/folder

to have all our artifacts saved in an Azure blob container, accessible in the cloud!

Finally, when our flow needs to scale, we can use Kubernetes to deploy flow execution to the cloud, all from the convenience of our command line.

python flow_preprocess_elspot.py --datastore=azure --with kubernetes run --data path/to/data/folder

which will execute the flow using cloud resources. Metaflow will handle the orchestration of flow steps to correct resources, and we don’t have to change anything in the code. If your organization is using AWS, you can also scale using AWS-native Batch. The datastore could be S3 as well.

That’s a lot of features for simply deriving a flow from the FlowSpec class 😃

In follow-up posts, we’ll cover setting up cloud resources for Metaflow. We’ll leave you with some best practices to remember:

  • Start using Metaflow early on in projects, it will make scaling and collaboration later almost trivial

  • Keep flows short and self-contained, so that you have more control over versioning, execution and scheduling

  • Get familiar with the Metaflow client API, it’s a huge productivity boost

  • There’s a UI too! And Cards to visualize artifacts!

If such a powerful tool piques your interest, don’t hesitate to contact sales@softlandia.fi, or the vibrant Metaflow community! Read more about our work in another article focusing to Metaflow on Azure.