Tilaa blogimme
Viimeisimmät julkaisut
Metaflow in Practice
In this post, we’ll show how we use Metaflow in our daily data sciencing. We use Metaflow to write production-grade data science workflows, and we adopt it early in our projects, already in the experimentation phase. Here we’ll demonstrate, with examples, how to
Add arguments to flows
Version flow exections
Store artifacts and outputs
Parallelize code
Connect flows
Employ a cloud backend
First, a recap. What is Metaflow? Metaflow is a production-grade, Python-based orchestration tool that helps data scientists to organize, version, execute, track and share data processing and analysis code and results. Yes, that’s a lot of stuff. Metaflow was developed at Netflix, but these days it is free, open source and maintained by Outerbounds. You can run Metaflow locally, or host it yourself in your cloud, so that data never leaves your (virtual) premises.
In brief, Metaflow organizes data workflows as directed acyclic graphs (DAGs). The actual data preparation and analysis are defined as steps of the DAG, and the choice of actual tools for working with data can be chosen freely. The same code can then be executed locally or in the cloud against a compute backend, so that minimal infrastructure details leak into the data processing workflows. We had the pleasure of being early adopters of Metaflow's Azure cloud backend, and we shared our experiences here. We are also using Metaflow on top of other solutions to enable our YOKOT.AI Private Enterprise Generative AI to process sensitive business data in a secure and private manner.
Metaflow has a couple of components. We’ll demonstrate flows, data store and the metadata service here. For full details of Metaflow’s productivity-boosting capabilities, you may head to the docs. Before we get anything done, we’ll need to install Metaflow. Since Metaflow is a free Python package, it’s as simple as
pip install metaflow
which we of course run in a virtual environment. Verify your installation with
metaflow status
which should print Metaflow version and other useful information.
The basics
Flows consists of steps, which are defined using the step-decorator. Two steps are required in every flow, and those are the start and end steps. To indicate the directions of the DAG, each step ends with self.next()
method. Within the steps, you write your data processing logic using the tools you prefer. Metaflow hence allows the data scientists to keep using their favourite frameworks for the sciencing part. This basic knowledge is enough to get going!
Now the magic starts. When a DAG is run, Metaflow will by default check that there are no linter errors in the code, and if it finds any we’ll be prompted to fix them. This way you’ll catch errors before you spend precious compute time.
Metaflow stores metadata of each flow we run. This automatically leaves a trace of every execution of every flow, allowing us to inspect inputs, outputs, logs, run times and variables of past flows. Without any configuration, this metadata will be stored locally on the machine where the flow was run. We can then use commands such as python flow.py logs
to inspect previously run flows, their parameters, run times and results. Cool! There’s also dump
for inspecting variables and the extremely useful resume
, which allows you to execute a flow from a specific step, bypassing steps before it.
Artifacts are a key concept in Metaflow. The stored metadata will attach flow runs to artifacts, such as parameters and any other data generated by flows. Artifacts are also how data propagates from step to another inside a flow. Defining artifacts is simple, just attach variables to the flow instance using self
! Everything stored to self
will be accessible to downstream steps of the flow as well as outside the flow through flow metadata, using Metaflow’s client API. You can save even large artifacts without worry, since they will be compressed and hashed, and by comparing hashes Metaflow will not write duplicates unnecessarily!
The code
Let’s walk through an example for processing electricity spot prices using Metaflow. We’ll define a flow that reads Excel files of electricity spot prices, preprocesses them in parallel, and returns a joined Pandas Dataframe. In this simple process, Metaflow embedded a lot of magic for us:
"""A flow to preprocess xlsx files of hourly electricity spot prices."""
import os
from metaflow import FlowSpec, step, Parameter
import pandas as pd
from elspot_settings import Constants
class PreprocessElspotFlow(FlowSpec):
"""Read elspot xlsx files into a dataframe."""
data_dir = Parameter("data", help="Folder of weekly elspot data.", required=True)
@step
def start(self):
"""Get started."""
# Find our data from the directory given as parameter
self.file_paths = [
os.path.join(self.data_dir, f)
for f in os.listdir(self.data_dir)
if f.endswith(".xlsx")
]
# we'll do preprocessing in parallel for each file
self.next(self.preprocess, foreach="file_paths")
@step
def preprocess(self):
"""Process a single xlsx file."""
self.prices = pd.read_excel(self.input, names=[
Constants.datetime, Constants.price])
self.prices.datetime = self.prices.datetime.apply(pd.to_datetime)
self.prices[Constants.day_name] = self.prices.datetime.dt.day_name()
self.next(self.join)
@step
def join(self, inputs):
"""Merge preprocessed dataframes."""
self.spot_data = pd.concat([i.prices for i in inputs])
self.next(self.end)
@step
def end(self):
"""End the flow."""
if __name__ == "__main__":
PreprocessElspotFlow()
Let’s have a look at some patterns and features the code snippet above implements.
With the above code, we defined a required input parameter that our flow needs, namely data_dir
in the code. That’s all the argument parsing we need to implement manually! We could now execute the program using
python flow_preprocess_elspot.py run --data path/to/data/folder
which makes self.data_dir
available to each step of the flow. You may run the code with the --help
switch to see all your parameter descriptions, as well as default options.
The flow takes as input argument a path to a folder that contains sheets of data in separate files. We are going to read and preprocess each in parallel. At the end of the start
step, we call
self.next(self.preprocess, foreach="file_paths")
which tells Metaflow that next we’ll run the preprocess
step for each item in the file_paths
list.
In the preprocess
step, self.input
will contain one of the single file paths from the previous step, and we’ll write the method accordingly. At the end, we call self.next(self.join)
, where we merge the results of the parallel processing. The join
method has a special signature:
def join(self, inputs):
"""Merge preprocessed dataframes."""
self.spot_data = pd.concat([i.prices for i in inputs])
where inputs
is a list that contains references to each invocations of the preprocess
method, and their artifacts! Here we simply concatenate the preprocessed Dataframes into a single Dataframe.
And that’s how we parallilize flow steps! Once you deploy the flow into the cloud, Metaflow can automatically execute parallel steps on dedicated compute nodes.
It is good practice to keep flows focused and short. This will help to version, execute and schedule flows in a flexible and efficient manner. Towards this end, connecting flows, for example making the prediction flow use the model from the training flow, should be simple. And it is! The secret sauce is the Metaflow client API. Watch it in action below: we’ll define a new flow that uses our preprocessed electricity spot price data
"""Analyse a dataframe of spot price data."""
from metaflow import FlowSpec, step, Flow
class AnalyzeElspotFlow(FlowSpec):
"""Run analysis and modeling of electricity prices."""
@step
def start(self):
"""Load results from a previous flow."""
self.preprocess_flow = Flow("PreprocessElspotFlow")
self.spot_data = self.preprocess_flow.latest_run["join"].task.data.spot_data
self.next(
self.find_cheap_hours, self.find_expensive_hours, self.find_price_ranks
)
...
if __name__ == "__main__":
flow = AnalyzeElspotFlow()
Notice how we instantiate a Flow object with the name of our preprocessing flow. This is how we use Metaflow’s client API to access data and artifacts of other flows. The syntax is pretty:
self.spot_data = self.preprocess_flow.latest_run["join"].task.data.spot_data
we load the preprocessed spot price data that was stored as an artifact of the latest run of the preprocessing flow. And just like that, we have connected an artifact of one flow to another flow. Of course, we could have written the data into a file and have loaded it here. But the benefits of the client API are that we get to abstract away the file system details, allowing us to use the exact same code locally, in ephemeral containers, in the cloud… It’s a handy pattern for example for having a training flow that produces a model, and a prediction flow that always uses the latest trained model.
The cloud
When we want to store metadata in a centralized location, we’ll set up a Metaflow service LINK. With a centralized metadata service, we can share our flow executions with others, which is mighty helpful for collaboration, debugging and tracking of work. To execute flows against a service, we’ll run
python flow_preprocess_elspot.py --metadata=service run --data path/to/data/folder
which tells Metaflow to look up metadata service information from a configuration file, typically ~/.metaflowconfig/config.json
. This is how we go from local experimentation to collaboration and centralized experiment tracking! When we run flows on schedule in the cloud, the service is essential for monitoring our data science workflows. Currently, there is great support for running the service locally, in Azure or in AWS.
The artifact store is separate from the metadata store, which allows us to control the data we actually save in the cloud and locally. We’ll run
python flow_preprocess_elspot.py --datastore=azure run --data path/to/data/folder
to have all our artifacts saved in an Azure blob container, accessible in the cloud!
Finally, when our flow needs to scale, we can use Kubernetes to deploy flow execution to the cloud, all from the convenience of our command line.
python flow_preprocess_elspot.py --datastore=azure --with kubernetes run --data path/to/data/folder
which will execute the flow using cloud resources. Metaflow will handle the orchestration of flow steps to correct resources, and we don’t have to change anything in the code. If your organization is using AWS, you can also scale using AWS-native Batch. The datastore could be S3 as well.
That’s a lot of features for simply deriving a flow from the FlowSpec
class 😃
In follow-up posts, we’ll cover setting up cloud resources for Metaflow. We’ll leave you with some best practices to remember:
Start using Metaflow early on in projects, it will make scaling and collaboration later almost trivial
Keep flows short and self-contained, so that you have more control over versioning, execution and scheduling
Get familiar with the Metaflow client API, it’s a huge productivity boost
If such a powerful tool piques your interest, don’t hesitate to contact sales@softlandia.fi, or the vibrant Metaflow community! Read more about our work in another article focusing to Metaflow on Azure.