Prefect Integration
DataHub supports integration of
- Prefect flow and task metadata
- Flow run and Task run information as well as
- Lineage information when present
What is Prefect Datahub Block?
Blocks are primitive within Prefect that enable the storage of configuration and provide an interface for interacting with external systems. We integrated prefect-datahub
block which use Datahub Rest emitter to emit metadata events while running prefect flow.
Prerequisites to use Prefect Datahub Block
- You need to use either Prefect Cloud (recommended) or the self hosted Prefect server.
- Refer Cloud Quickstart to setup Prefect Cloud.
- Refer Host Prefect server to setup self hosted Prefect server.
- Make sure the Prefect api url is set correctly. You can check it by running below command:
prefect profile inspect
- If you are using Prefect Cloud, the API URL should be set as
https://api.prefect.cloud/api/accounts/<account_id>/workspaces/<workspace_id>
. - If you are using a self-hosted Prefect server, the API URL should be set as
http://<host>:<port>/api
.
Setup
Installation
Install prefect-datahub
with pip
:
pip install 'prefect-datahub'
Requires an installation of Python 3.7+.
Saving configurations to a block
This is a one-time activity, where you can save the configuration on the Prefect block document store. While saving you can provide below configurations. Default value will get set if not provided while saving the configuration to block.
Config | Type | Default | Description |
---|---|---|---|
datahub_rest_url | str | http://localhost:8080 | DataHub GMS REST URL |
env | str | PROD | The environment that all assets produced by this orchestrator belong to. For more detail and possible values refer here. |
platform_instance | str | None | The instance of the platform that all assets produced by this recipe belong to. For more detail please refer here. |
from prefect_datahub.datahub_emitter import DatahubEmitter
DatahubEmitter(
datahub_rest_url="http://localhost:8080",
env="PROD",
platform_instance="local_prefect"
).save("BLOCK-NAME-PLACEHOLDER")
Congrats! You can now load the saved block to use your configurations in your Flow code:
from prefect_datahub.datahub_emitter import DatahubEmitter
DatahubEmitter.load("BLOCK-NAME-PLACEHOLDER")
!!! info "Registering blocks"
Register blocks in this module to
[view and edit them](https://docs.prefect.io/ui/blocks/)
on Prefect Cloud:
```bash
prefect block register -m prefect_datahub
```
Load the saved block in prefect workflows
After installing prefect-datahub
and saving the configution, you can easily use it within your prefect workflows to help you emit metadata event as show below!
from prefect import flow, task
from prefect_datahub.dataset import Dataset
from prefect_datahub.datahub_emitter import DatahubEmitter
datahub_emitter = DatahubEmitter.load("MY_BLOCK_NAME")
@task(name="Transform", description="Transform the data")
def transform(data):
data = data.split(" ")
datahub_emitter.add_task(
inputs=[Dataset("snowflake", "mydb.schema.tableA")],
outputs=[Dataset("snowflake", "mydb.schema.tableC")],
)
return data
@flow(name="ETL flow", description="Extract transform load flow")
def etl():
data = transform("This is data")
datahub_emitter.emit_flow()
Note: To emit the tasks, user compulsory need to emit flow. Otherwise nothing will get emit.
Concept mapping
Prefect concepts are documented here, and datahub concepts are documented here.
Prefect Concept | DataHub Concept |
---|---|
Flow | DataFlow |
Flow Run | DataProcessInstance |
Task | DataJob |
Task Run | DataProcessInstance |
Task Tag | Tag |
How to validate saved block and emit of metadata
- Go and check in Prefect UI at the Blocks menu if you can see the datahub emitter.
- Run a Prefect workflow. In the flow logs, you should see Datahub related log messages like:
Emitting flow to datahub...
Emitting tasks to datahub...
Debugging
Incorrect Prefect API URL
If your Prefect API URL aren't being generated correctly or set incorrectly, then in that case you can set the Prefect API URL manually as show below:
prefect config set PREFECT_API_URL='http://127.0.0.1:4200/api'
Connection error for Datahub Rest URL
If you get ConnectionError: HTTPConnectionPool(host='localhost', port=8080), then in that case your GMS service is not up.