Tech Blog: Data Ingestion using Prefect for workflow orchestration
Posted by Abhishek R. S. on 2024-05-09, updated on 2024-05-21

The blog is about...

Workflow orchestration using Prefect.

1.1) Prefect Setup

The installation of Prefect is pretty straight-forward. Run the following command to install prefect. pip install -U prefect

1.2) Starting the prefect server

The following command can be used to start the prefect server.
prefect server start

To configure prefect server, run the following command.
prefect config set PREFECT_API_URL=http://127.0.0.1:4200/api

The following error may appear.
sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) database is locked
Ignored for now. I have not figured out a solution for this so far.

1.3) Main components of Prefect

Prefect can be used for a orchestrating wide range of scalable data pipeline applications. The main advantages of using Prefect as a workflow orchestrator are scheduling, caching, retries, notifications, logging and much more.

The main components of a Prefect workflow are task and flow. Flow is like a container for workflow logic whereas a task is a discrete work unit in the workflow. A flow connects tasks that have dependencies on one another. There can also be subflows i.e. a flow within another flow. For tasks that are very expensive, caching can be leveraged when inputs do not change. This will definitely come in handy 💪. Also, retries can be leveraged for both flows and tasks.

1.4) Using Prefect for workflow orchestration

I developed a simple data ingestion workflow using Prefect to understand how it can be leveraged. This is available in my Github repo Data_Ingestion_Prefect.

The data ingestion pipeline developed in this project uses the following workflow. It downloads the parquet dataset files from the publicly available New York City taxi trips dataset for the specified year and month. It also creates a larger parquet file for the entire year by merging other months data from the relevant year.

The flow is divided into four different tasks.

  • Create a directory for the dataset, if it does not exist already.

  • Download the dataset file (parquet) for the selected year and month. Since this task tries to download it from the web, it can fail due to failed connections, so retries has been implemented.

  • Load the dataframe of the data for the selected month.

  • Load the dataframe of the year, (create an empty one if it does not exist already). Merge the dataframe of the selected month with the dataframe for the year. Save the parquet dataset file for the entire year.

To understand how things worked, I tried two scenarios where the workflow was successful and the workflow failed. To test first scenario where the workflow succeeds was a straight-forward one. To test the other scenario, I just tried to download a non-existent dataset file from the dataset webpage. I have included some dashboard visualizations from my experiments.

The following image shows a Prefect dashboard visualization of the registered flows.

The following image shows a Prefect dashboard visualization of a successful flow.

The following image shows a Prefect dashboard visualization of a failed flow.

The following image shows a Prefect dashboard logs of the failed retries for a failed flow.

1.5) Creating a local deployment

  1. The first step is to create a work pool. This can be done in the Prefect UI in the following way.
    Work Pools -> Create -> Choose the type of work pools from a list of supported work pools (process for local) -> Select a name

  2. The next step is to create a deployment. Run the following command in the project directory and choose the deployment type (local or git) to initialize the prefect deployment. Depending on the version of Prefect, use the appropriate command.
    prefect project init
    Or
    prefect init

  3. Run the following command to create a deployment
    prefect deploy script.py:[flow_name] -n [deployment_name] -p [work_pool]

  4. Run the following command to schedule a deployment
    prefect deployment run '[flow_name]/[deployment_name]'

  5. Run the following command to start a work pool
    prefect worker start --pool '[work_pool]'

  6. Run a deployment from the UI. Go to deployments -> Quick Run on the appropriate deployment

Main takeaway

I learned to perform workflow orchestration using Prefect with the data ingestion pipeline example. The main advantage is the simplicity of the usage of Prefect with just Python decorators. The sophistication that Prefect has to offer like the caching, retries, scheduling, support for cloud etc., can only be appreciated by using it more and more. This blog can be used as a reference for deploying complex MLOps and data pipelines using Prefect for orchestration. To deploying more complex MLOps and data pipelines with Prefect orchestration now 😄.

Next Steps

  • Deploy multiple complex MLOps pipelines using Prefect for workflow orchestration.

  • Deploy some Data Engineering projects on some realworld data using Prefect for workflow orchestration.

About the author: Abhishek is a graduate AI engineer who is interested in developing and deploying Artificial Intelligence, Machine Learning, Computer Vision, Data Science applications. His goal is to improve human lives by leveraging real world data.