For a long time, the inflexibility of batch orchestrators and schedulers kept forcing data teams to maintain a separate set of systems to satisfy the modern event-driven, real-time data workloads. But it doesn't have to be that way. Read for a new approach.
Can a more flexible workflow orchestration system handle both paradigms?
Many believe that streaming technologies are the only way to achieve real-time analytics. Since most data workloads these days are orchestrated through batch-processing platforms, the real-time requirement forces data teams to adopt a new set of tools. However, maintaining two separate systems for batch processing and real-time streaming introduces additional burdens and costs.
This post offers an alternative approach that allows you to handle both batch processing and real-time streaming pipelines from a single orchestration platform.
Understanding the problem
In the past, the data industry was trying to address this problem by introducing new architectures that separate the streaming layer from the batch and serving layer (think: Lambda and Kappa architecture). But are these batch and streaming paradigms inherently different? In practice, both collect and process data for use in downstream applications. The only real distinction between batch and stream processing is that they operate at slightly different time intervals. For a long time, the inflexibility of batch orchestrators and schedulers kept forcing data teams to maintain a separate set of systems to satisfy the modern event-driven, real-time data workloads. But it doesn’t have to be that way.
At Prefect, we are committed to always understanding the problems that our users are facing. Orion
is the first workflow orchestration engine that allows you to run any code at any time for any reason. Its flexible API enables you to launch new flow runs in response to events from streaming systems or message queues, as well as to run your workflows on a regular cadence or as long-running processes. You can manage the full variety of data access patterns from a single orchestration platform.
A basic streaming data pipeline
The Orion API separates the orchestration engine from the code being orchestrated. You don’t need to rewrite your entire workflow code as a Directed Acyclic Graph (DAG). In fact, you don’t need a DAG at all. You are free to orchestrate infinitely long flow runs with as many loops and conditional statements as you need.
To illustrate this capability of Orion, let’s implement an example of a real-time streaming pipeline in Python.
As a sample streaming data source, here’s some code for an API that generates data upon request. This API imitates real-time orders being created through an e-commerce system.
Start the API by running a uvicorn
Here is a minimal workflow example that uses this data:
If you start the API and run the script, you should see randomly generated streaming data appear in your standard output:
Instead of requesting real-time data from a REST API, the flow might contain a long-running consumer process that continuously receives new events from a distributed publish-subscribe messaging system.
Improving the pipeline with Orion
That simple data pipeline works fine, but what if we:
Want to know when it fails and get notified about it?
Need observability into which components of that pipeline have failed?
Need to retry failed components on failure?
Want some downstream process to run after a specific task has been successfully completed?
Want to read the pipeline’s logs to ensure that the system works as expected?
This is where Orion can help. Modifying the previous script to make it work with Orion is as simple as importing Prefect and adding a single decorator
@flow to the main function.
💡 To run the above flow, you need to install Orion
Just two lines of code have been added, but notice the difference in the output when the script runs:
By simply adding the
@flow decorator, Orion automatically registered the flow and its first flow runs to the backend. It also assigned an executor to it.
Let’s have a look at the UI.
Exploring Orion components
When you start the Orion server, it will spin up a UI from which you can explore your flows, deployments, as well as flow runs and task runs.
prefect orion start
# INFO: Uvicorn running on <http://127.0.0.1:4200> (Press CTRL+C to quit)
The streaming flow and its corresponding first runs have been automatically registered in the backend, and you can inspect all flow runs and their states.
Taking it one step further: adding tasks
Having the entire logic encapsulated within a flow is an easy first step. But to get more visibility into what happens inside a flow, you can add some tasks
Tasks are beneficial when some parts of our workflow depend on previous steps
being successful. For instance, if the API didn’t generate any orders (e.g., it returned an empty dictionary), you can skip the step that loads the order to the analytical data store.
To mark the functions as tasks, all you need to do is add
@task decorators to each of them:
The output now shows that both tasks were completed successfully:
Add conditional logic to the flow
As mentioned previously, the flow should skip the load step if the API returns an empty dictionary. To enable this conditional logic between tasks within the flow, you need to add a condition that examines the output of the first task and executes the second task only if the returned data is not an empty dictionary.
With Orion, all you need is native Python
There is one caveat: since Orion supports parallel and asynchronous execution, the
orders object is a
PrefectFuture, which represents the state and result of a task computation happening in an executor.
To get the result of a task:
wait till the task finishes execution by using
orders_state = orders.wait(),
get the result of the task by using
Here is a complete example:
The output when there is an order:
The output when the real-time API returns no data:
What happens if something goes wrong? Retries to the rescue!
There is a lot that can go wrong when your code makes API calls and performs database operations:
The API may be temporarily unreachable or encounter other server-side issues.
There could be client-side network issues while making the API call.
The database connection may be broken.
Such operational issues can often be solved by retrying a task. Tasks natively support retries
, so you don’t have to write your own retry logic.
Add automatic retries to the workflow, e.g., up to five retries with a 10-second delay between retries, on the
The simplicity of adding retries in Orion’s real-time data pipeline belies how many issues it solves. Implementing a similar functionality in a distributed messaging system would potentially require configuring additional components such as dead-letter queues and managing their state in a separate process.
What if you need to parameterize the script?
Since Orion allows you to run any Python code, parameterizing Orion flow works the same way as passing parameters to any Python script. Additionally, if you provide type annotation to your parameters on the function decorated with
, Orion uses Pydantic
to validate the parameter values.
In this example, specifying
max_value: int = 20 as an argument to the flow's function defines that the input should be an integer. Thus, the code no longer needs to explicitly convert the input arguments.
For instance, you can now run this flow with the
max_value set to a much larger number such as 400:
Benefits of a unified orchestrator for real-time and batch processing
With a unified orchestrator like Orion:
You gain visibility into the health of your entire data platform — you no longer have to guess where you can find critical information about your data pipelines. Everything is available in a single flexible system.
You can build your workflows simply by decorating existing functions as tasks and flows and defining your workflow logic (including async, retry, conditional branching, and parametrization) as pure Python code.
You can quickly identify what the dependencies of specific workflows are.
You can get notified if any critical process fails.
You can quickly understand what went wrong and deploy a fix if necessary.
You can build data pipelines and applications that simultaneously require real-time event streams and batch extracts from operational systems.
You know, with confidence, which exact downstream workflows are affected if some real-time or batch data doesn’t arrive on time.
This post discussed the problem of maintaining separate systems for batch and stream processing and how Orion can help orchestrate both in a single system. We walked through an example streaming pipeline in Orion and how it can help implement conditional logic, retries, and parametrization.
You can find more about Orion in the official announcement blog post
. For a deep dive, visit our documentation page
Note that Orion is still in development. It is ready for the community’s feedback, but we do not recommend using it in production yet.
We are still exploring streaming use cases, and we welcome any feedback in this regard. If you want to give us feedback or ask about anything, you can reach us via our community Slack