This blog is a short overview about Apache Airflow and shows how to integrate BDM with Apache Airflow. I also have a sample template to orchestrate BDM mappings. Same concept can be extended with Powercenter and non-BDM mappings.

 

Apache Airflow overview

 

Airflow is a platform to programmatically author, schedule and monitor workflows.

 

Airflow is not a data streaming solution. Tasks do not move data from one to the other (though tasks can exchange metadata!). Airflow is not in the Spark Streaming or Storm space, it is more comparable to Oozie or Azkaban.

 

Generally, Airflow works in a distributed environment, as you can see in the diagram below. The airflow scheduler schedules jobs according to the dependencies defined in directed acyclic graphs (DAGs), and the airflow workers pick up and run jobs with their loads properly balanced. All job information is stored in the meta DB, which is updated in a timely manner. The users can monitor their jobs via a shiny Airflow web UI and/or the logs.

 

 

Installing Apache Airflow

 

The following installation method is for non-production type of uses. Refer to airflow documentation for production type of deployments.

 

Apache Airflow has various operators listed below. An operator describes a single task in a workflow.

 

https://github.com/apache/incubator-airflow/tree/master/airflow/operators

 

To trigger Informatica BDM mappings we will be using the bashoperator i.e triggering the mappings through commandline.

 

  1. If apache airflow is running on a machine different than infa node, install Informatica command line utilities on the airflow worker nodes
  2. Python

 

 

Create a directory /opt/infa/airflow

 

 

Easy way to install to run the following command. Pip is a python utility to install various python packages.

 

pip install apache-airflow

set AIRFLOW_HOME environment variable

 

Create a folder called “dags” inside AIRFLOW_HOME folder.

 

Initialize the airflow DB by typing the command “airflow initdb”. This is where the metdata will be stored, we will be using the default aclchemy database that comes with airflow, if needed the configuration can be modified to make mysql or postgres as the backend for airflow.

 

 

If the initdb shows any errors its most likely because of some missing airflow packages and a complete list of packages and the commands to install them are in the below link.

 

https://airflow.apache.org/installation.html

 

start the airflow web UI using the following command

 

Start the airflow scheduler

 

 

Login into the Airflow UI using the URL http://hostname:8080, if you have installed examples you should see the example DAG’s listed in the UI.

 

 

Creating a DAG for BDM Mappings

 

 

For the Demo we deployed the following 3 BDM mappings in to DIS.

 

Application_m_01_Get_States_Rest_Webservice

Application_m_02_Parse_Webservice_Output

Application_m_Read_Oracle_Customers_Write_Hive_Python

 

 

The 3 applications need to be orchestrated in the following way.

 

  1. Application_m_01_Get_States_Rest_Webservice and Application_m_01_Get_States_Rest_Webservice can run in parallel
  2. Application_m_02_Parse_Webservice_Output will run only if Application_m_01_Get_States_Rest_Webservice is successful

 

 

 

Save the following code as inside as airflow_bdm_sample.py under /opt/infa/airflow/dags folder.

There are different ways to call infacmd runmapping command, for example the command can be put in a shell script and the script can be called from the DAG.

 

 

#Start Code

import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta


# these args will get passed on to each operator
# you can override them on a per-task basis during operator initialization
default_args = {
'owner': 'infa',
'depends_on_past': False,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1),
'start_date': datetime.now() - timedelta(seconds=10),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'dag': dag,
# 'adhoc':False,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function,
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
# 'trigger_rule': u'all_success'
}

dag = DAG(
'Informatica_Bigdata_Demo',
default_args=default_args,
description='A simple Informatica BDM DAG')

# Printing start date and time of the DAG

t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)


t2 = BashOperator(
task_id='mapping_calling_webservice',
depends_on_past=False,
bash_command='infacmd.sh ms RunMapping -dn infa_dom_1021 -sn dis_bdm_cdh -un Administrator -m m_01_Get_States_Rest_Webservice -a Application_m_01_Get_States_Rest_Webservice -pd admin',
dag=dag)


t3 = BashOperator(
task_id='mapping_parsing_json',
depends_on_past=True,
bash_command='infacmd.sh ms RunMapping -dn infa_dom_1021 -sn dis_bdm_cdh -un Administrator -m m_02_Parse_Webservice_Output -a Application_m_02_Parse_Webservice_Output',
dag=dag)


t4 = BashOperator(
task_id='Read_Oracle_load_Hive',
depends_on_past=False,
bash_command='infacmd.sh ms RunMapping -dn infa_dom_1021 -sn dis_bdm_cdh -un Administrator -m m_Read_Oracle_Customers_Write_Hive_Python -a Application_m_Read_Oracle_Customers_Write_Hive_Python',
dag=dag)



t1.set_downstream(t2)
t2.set_downstream(t3)
t4.set_upstream(t1)

 

 

# End code

 

 

Restart the airflow webserver and the Informatica_Bigdata_Demo DAG will appear in the list of DAG’s

 

 

 

Click on the DAG and go to Graph View, it gives a better view of orchestration.

 

 

 

Run the DAG and you will see the status of the DAG’s running in the Airflow UI as well as the Informatica monitor

 

 

 

The above DAG code can be extended to get the mapping logs, status of the runs.