From ee129e551c01c96645ac13f70ad767b17ea76590 Mon Sep 17 00:00:00 2001 From: sc Date: Mon, 25 Mar 2024 22:42:13 +0530 Subject: [PATCH 1/2] Added advanced usecases of airflow --- .gitignore | 5 + ...data-stored-procedure-through-airflow.adoc | 198 +++++++++++++++ .../pages/pass-data-between-airflow-dags.adoc | 235 ++++++++++++++++++ 3 files changed, 438 insertions(+) create mode 100644 modules/ROOT/pages/execute-teradata-stored-procedure-through-airflow.adoc create mode 100644 modules/ROOT/pages/pass-data-between-airflow-dags.adoc diff --git a/.gitignore b/.gitignore index 76ce356aa..71019cb22 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,8 @@ node_modules build buildw-local .vscode +.idea/vcs.xml +.idea/quickstarts.iml +.idea/modules.xml +.idea/misc.xml +.idea/inspectionProfiles/profiles_settings.xml diff --git a/modules/ROOT/pages/execute-teradata-stored-procedure-through-airflow.adoc b/modules/ROOT/pages/execute-teradata-stored-procedure-through-airflow.adoc new file mode 100644 index 000000000..2baaef6e3 --- /dev/null +++ b/modules/ROOT/pages/execute-teradata-stored-procedure-through-airflow.adoc @@ -0,0 +1,198 @@ += Execute Teradata Stored Procedure through Airflow Teradata Provider +:experimental: +:page-author: Satish Chinthanippu +:page-email: satish.chinthanippu@teradata.com +:page-revdate: March 25th, 2024 +:description: Execute Teradata Stored Procedure through Airflow Teradata Provider. +:keywords: data warehouses, compute storage separation, teradata, vantage, cloud data platform, object storage, business intelligence, enterprise analytics, orchestration, elt, airflow. +:tabs: + +== Overview + +This tutorial demonstrates how to use airflow teradata provider to execute Teradata Stored Procedure. +If you are new to airflow and airflow teradata provider we recommend that you start with our link:https://quickstarts.teradata.com/airflow.html[introductory quickstart guide., window="_blank"] + +== Prerequisites + +* Ubuntu 22.x +* Access to a Teradata Vantage instance. ++ +include::ROOT:partial$vantage_clearscape_analytics.adoc[] +* Python *3.8*, *3.9*, *3.10* or *3.11* installed. +* pip + +== Install Apache Airflow + +1. Set the AIRFLOW_HOME environment variable. Airflow requires a home directory and uses ~/airflow by default, but you can set a different location if you prefer. The AIRFLOW_HOME environment variable is used to inform Airflow of the desired location. ++ +[source, bash] +---- +export AIRFLOW_HOME=~/airflow +---- +2. Install `apache-airflow` stable version 2.8.1 from PyPI repository.: ++ +[source, bash] +---- +AIRFLOW_VERSION=2.8.2 +PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)" +CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt" +pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}" +---- +3. Install the Airflow Teradata provider stable version from PyPI repository. ++ +[source, bash] +---- +pip install "apache-airflow-providers-teradata" +---- + ++ +NOTE: For security reasons, the test connection functionality is disabled by default across Airflow UI, API and CLI. +The availability of the functionality can be controlled by the test_connection flag in the core section of the Airflow configuration ($AIRFLOW_HOME/airflow.cfg) or Define below environment variable before starting airflow server. +export AIRFLOW__CORE__TEST_CONNECTION=Enabled ++ + + +== Start Airflow Standalone + +1. Run Airflow Standalone ++ +[source, bash] +---- +airflow standalone +---- +2. Access the Airflow UI. Visit https://localhost:8080 in the browser and log in with the admin account details shown in the terminal. + + +Teradata Connections may be defined in Airflow in the following ways: + +1. Using Airflow Web UI +2. Using Environment Variable + +== Define a Teradata connection in Airflow Web UI + +1. Open the Admin -> Connections section of the UI. Click the Create link to create a new connection. ++ +image::{dir}/airflow-connection.png[Airflow admin dropdown, width=75%] +2. Fill in input details in New Connection Page. ++ +image::{dir}/airflow-newconnection.png[Airflow New Connection, width=75%] +* Connection Id: Unique ID of Teradata Connection. +* Connection Type: Type of the system. Select Teradata. +* Database Server URL (required): Teradata instance hostname to connect to. +* Database (optional): Specify the name of the database to connect to +* Login (required): Specify the user name to connect. +* Password (required): Specify the password to connect. +* Click on Test and Save. + +== Define a Teradata connection in Environment Variable +Airflow connections may be defined in environment variables in either of one below formats. + +1. JSON format +2. URI format + ++ +NOTE: The naming convention is AIRFLOW_CONN_{CONN_ID}, all uppercase (note the single underscores surrounding CONN). +So if your connection id is teradata_conn_id then the variable name should be AIRFLOW_CONN_TERADATA_CONN_ID ++ + + +== JSON format example + + +[source, bash] +---- +export AIRFLOW_CONN_TERADATA_CONN_ID='{ + "conn_type": "teradata", + "login": "teradata_user", + "password": "my-password", + "host": "my-host", + "schema": "my-schema", + "extra": { + "tmode": "TERA", + "sslmode": "verify-ca" + } +}' + +---- + +== URI format example + + +[source, bash] +---- +export AIRFLOW_CONN_TERADATA_CONN_ID='teradata://teradata_user:my-password@my-host/my-schema?tmode=TERA&sslmode=verify-ca' +---- + +Refer https://airflow.apache.org/docs/apache-airflow-providers-teradata/stable/connections/teradata.html[Teradata Hook] for detailed information on Teradata Connection in Airflow. + +== Define a DAG to execute Teradata Stored Procedure in Airflow + +1. In Airflow, DAGs are defined as Python code. +2. Create a DAG as a python file like example_stored_procedure.py under DAG_FOLDER - $AIRFLOW_HOME/files/dags directory. ++ +[source, python] +---- +from datetime import datetime +from airflow import DAG +from airflow.providers.teradata.operators.teradata import TeradataOperator, TeradataStoredProcedureOperator +CONN_ID = "teradata_conn_id" +with DAG( + dag_id="example_stored_procedure", + max_active_runs=1, + max_active_tasks=3, + catchup=False, + start_date=datetime(2023, 1, 1), +) as dag: + create_sp = TeradataOperator( + task_id="create_sp", + conn_id=CONN_ID, + sql=r"""replace procedure demo_sp (in p1 integer, inout p2 integer, out p3 integer) + dynamic result sets 2 + begin + declare cur1 cursor with return for select * from dbc.dbcinfo order by 1 ; + declare cur2 cursor with return for select infodata, infokey from dbc.dbcinfo order by 1 ; + open cur1 ; + open cur2 ; + set p2 = p1 + p2 ; + set p3 = p1 * p2 ; + end ; + """, + ) + execute_sp = TeradataStoredProcedureOperator( + task_id="execute_sp", + conn_id=CONN_ID, + procedure="demo_sp", + parameters=[3, 2, int] + ) + +---- + +== Load DAG + +Airflow loads DAGs from Python source files, which it looks for inside its configured DAG_FOLDER - $AIRFLOW_HOME/files/dags directory. + +== Run DAG +DAGs will run in one of two ways: +1. When they are triggered either manually or via the API +2. On a defined schedule, which is defined as part of the DAG +`example_stored_procedure` is defined to trigger as manually. To define a schedule, any valid link:https://en.wikipedia.org/wiki/Cron[Crontab, window="_blank"] schedule value can be passed to the schedule argument. +[source, python] +---- +with DAG( + dag_id="example_stored_procedure", + schedule="0 0 * * *" + ) as dag: +---- + +== Summary + +This tutorial demonstrated how to use airflow teradata provider to execute Teradata Stored Procedure. The example DAG provided creates stored procedure +`demo_sp` in the Teradata Vantage defined in airflow environment. The stored procedure `demo_sp` is defined with 1 IN parameter, 1 INOUT parameter and 1 OUT parameter. +The INOUT/OUT parameter values can be used in the same DAG or other DAGs. Please refer https://quickstarts.teradata.com/pass-data-between-airflow-dags.html[Sharing data between DAGs in Airflow] + + +== Further reading +https://quickstarts.teradata.com/airflow.html[Use Apache Airflow with Teradata Vantage] + + +include::ROOT:partial$community_link.adoc[] diff --git a/modules/ROOT/pages/pass-data-between-airflow-dags.adoc b/modules/ROOT/pages/pass-data-between-airflow-dags.adoc new file mode 100644 index 000000000..ce6fb9d80 --- /dev/null +++ b/modules/ROOT/pages/pass-data-between-airflow-dags.adoc @@ -0,0 +1,235 @@ += Execute Teradata Stored Procedure through Airflow Teradata Provider +:experimental: +:page-author: Satish Chinthanippu +:page-email: satish.chinthanippu@teradata.com +:page-revdate: March 25th, 2024 +:description: Share data between DAGs in Airflow using Teradata Provider +:keywords: data warehouses, compute storage separation, teradata, vantage, cloud data platform, object storage, business intelligence, enterprise analytics, orchestration, elt, airflow. +:tabs: + +== Overview + +This tutorial demonstrates how to use airflow teradata provider to share data between DAGs defined in airflow. +If you are new to airflow and airflow teradata provider we recommend that you start with our link:https://quickstarts.teradata.com/airflow.html[introductory quickstart guide., window="_blank"] + +== Prerequisites + +* Ubuntu 22.x +* Access to a Teradata Vantage instance. ++ +include::ROOT:partial$vantage_clearscape_analytics.adoc[] +* Python *3.8*, *3.9*, *3.10* or *3.11* installed. +* pip + +== Install Apache Airflow + +1. Set the AIRFLOW_HOME environment variable. Airflow requires a home directory and uses ~/airflow by default, but you can set a different location if you prefer. The AIRFLOW_HOME environment variable is used to inform Airflow of the desired location. ++ +[source, bash] +---- +export AIRFLOW_HOME=~/airflow +---- +2. Install `apache-airflow` stable version 2.8.1 from PyPI repository.: ++ +[source, bash] +---- +AIRFLOW_VERSION=2.8.2 +PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)" +CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt" +pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}" +---- +3. Install the Airflow Teradata provider stable version from PyPI repository. ++ +[source, bash] +---- +pip install "apache-airflow-providers-teradata" +---- + ++ +NOTE: For security reasons, the test connection functionality is disabled by default across Airflow UI, API and CLI. +The availability of the functionality can be controlled by the test_connection flag in the core section of the Airflow configuration ($AIRFLOW_HOME/airflow.cfg) or Define below environment variable before starting airflow server. +export AIRFLOW__CORE__TEST_CONNECTION=Enabled ++ + + +== Start Airflow Standalone + +1. Run Airflow Standalone ++ +[source, bash] +---- +airflow standalone +---- +2. Access the Airflow UI. Visit https://localhost:8080 in the browser and log in with the admin account details shown in the terminal. + + +Teradata Connections may be defined in Airflow in the following ways: + +1. Using Airflow Web UI +2. Using Environment Variable + +== Define a Teradata connection in Airflow Web UI + +1. Open the Admin -> Connections section of the UI. Click the Create link to create a new connection. ++ +image::{dir}/airflow-connection.png[Airflow admin dropdown, width=75%] +2. Fill in input details in New Connection Page. ++ +image::{dir}/airflow-newconnection.png[Airflow New Connection, width=75%] +* Connection Id: Unique ID of Teradata Connection. +* Connection Type: Type of the system. Select Teradata. +* Database Server URL (required): Teradata instance hostname to connect to. +* Database (optional): Specify the name of the database to connect to +* Login (required): Specify the user name to connect. +* Password (required): Specify the password to connect. +* Click on Test and Save. + +== Define a Teradata connection in Environment Variable +Airflow connections may be defined in environment variables in either of one below formats. + +1. JSON format +2. URI format + ++ +NOTE: The naming convention is AIRFLOW_CONN_{CONN_ID}, all uppercase (note the single underscores surrounding CONN). +So if your connection id is teradata_conn_id then the variable name should be AIRFLOW_CONN_TERADATA_CONN_ID ++ + + +== JSON format example + + +[source, bash] +---- +export AIRFLOW_CONN_TERADATA_CONN_ID='{ + "conn_type": "teradata", + "login": "teradata_user", + "password": "my-password", + "host": "my-host", + "schema": "my-schema", + "extra": { + "tmode": "TERA", + "sslmode": "verify-ca" + } +}' + +---- + +== URI format example + + +[source, bash] +---- +export AIRFLOW_CONN_TERADATA_CONN_ID='teradata://teradata_user:my-password@my-host/my-schema?tmode=TERA&sslmode=verify-ca' +---- + +Refer https://airflow.apache.org/docs/apache-airflow-providers-teradata/stable/connections/teradata.html[Teradata Hook] for detailed information on Teradata Connection in Airflow. + +== Define Upstream DAG to generate and share data to Downstream DAG + +1. In Airflow, DAGs are defined as Python code. +2. Create upstream DAG as a python file like example_upstream_dag.py under DAG_FOLDER - $AIRFLOW_HOME/files/dags directory. ++ +[source, python] +---- +from datetime import datetime +from airflow import DAG +from airflow.operators.trigger_dagrun import TriggerDagRunOperator +from airflow.providers.teradata.operators.teradata import TeradataOperator, TeradataStoredProcedureOperator +CONN_ID = "teradata_conn_id" +with DAG( + dag_id="example_upstream_dag", + max_active_runs=1, + max_active_tasks=3, + catchup=False, + start_date=datetime(2023, 1, 1), +) as dag: + create_sp = TeradataOperator( + task_id="create_sp", + conn_id=CONN_ID, + sql=r"""replace procedure demo_sp (in p1 integer, out p2 integer) + begin + set p2 = p1 * p1 ; + end ; + """, + ) + execute_sp = TeradataStoredProcedureOperator( + task_id="execute_sp", + conn_id=CONN_ID, + procedure="demo_sp", + parameters=[3, int] + ) + + example_trigger = TriggerDagRunOperator( + task_id="upstream_dag_task", + trigger_dag_id="example_downstream_dag", # Downstream DAG DAG_ID + allowed_states=["success", "failed"], # Trigger upstream DAG if downstream tag either success or fail. + conf={"input_param1": "{{ ti.xcom_pull(task_ids='execute_sp')[0][0] }}"} # Parameters from upstream DAG to downstream DAG + ) + + create_sp >> execute_sp >> example_trigger + +---- + +3. Create downstream DAG as a python file like example_downstream_dag.py under DAG_FOLDER - $AIRFLOW_HOME/files/dags directory. ++ +[source, python] +---- +from airflow import DAG +from airflow.providers.teradata.operators.teradata import TeradataOperator, TeradataStoredProcedureOperator +CONN_ID = "teradata_conn_id" +with DAG( + dag_id="example_downstream_dag", + max_active_runs=1, + max_active_tasks=3, + catchup=False +) as dag: + create_sp = TeradataOperator( + task_id="create_downstream_sp", + conn_id=CONN_ID, + sql=r"""replace procedure downstream_sp (in p1 integer, out p2 integer) + begin + set p2 = p1 + p1 ; + end ; + """, + ) + execute_sp = TeradataStoredProcedureOperator( + task_id="execute_downstream_sp", + conn_id=CONN_ID, + procedure="downstream_sp", + parameters=["{{ dag_run.conf.get('input_param1') }}", int] + ) + + create_sp >> execute_sp + +---- + + +== Load DAG + +Airflow loads DAGs from Python source files, which it looks for inside its configured DAG_FOLDER - $AIRFLOW_HOME/files/dags directory. + +== Run DAG +DAGs will run in one of two ways: +1. When they are triggered either manually or via the API +2. On a defined schedule, which is defined as part of the DAG +`example_stored_procedure` is defined to trigger as manually. To define a schedule, any valid link:https://en.wikipedia.org/wiki/Cron[Crontab, window="_blank"] schedule value can be passed to the schedule argument. +[source, python] +---- +with DAG( + dag_id="example_upstream_dag", + schedule="0 0 * * *" + ) as dag: +---- + +== Summary + +This tutorial demonstrated how to share data between DAGs. The upstream DAG `example_upstream_dag` generates value to parameter `input_param1` and shares +the value to downstream DAG `example_downstream_dag` using xcom feature of Airflow. `{{ ti.xcom_pull(task_ids='execute_sp')[0][0] }}` extracts data generated +in task `execute_sp` and shares it to downstream DAG as `input_param1`. + +== Further reading +https://quickstarts.teradata.com/airflow.html[Use Apache Airflow with Teradata Vantage] + + +include::ROOT:partial$community_link.adoc[] From bc217f167fcfe7ede1e678a5e0c1c3bed9ce04fb Mon Sep 17 00:00:00 2001 From: sc Date: Mon, 25 Mar 2024 22:44:14 +0530 Subject: [PATCH 2/2] reverted .gitignore changes --- .gitignore | 5 ----- 1 file changed, 5 deletions(-) diff --git a/.gitignore b/.gitignore index 71019cb22..76ce356aa 100644 --- a/.gitignore +++ b/.gitignore @@ -3,8 +3,3 @@ node_modules build buildw-local .vscode -.idea/vcs.xml -.idea/quickstarts.iml -.idea/modules.xml -.idea/misc.xml -.idea/inspectionProfiles/profiles_settings.xml