Most companies rely on batch processing jobs at set schedules for a large number of tasks. These jobs range from informatica IICS mappings to database backups to file watchers etc. and organizations try to either use application in-built scheduling tools or use cron but its very complex orchestrating jobs across different systems, it can become increasingly challenging to manage them using cron.With cron it can even get increasingly harder with scale as the complexity of these jobs and their dependency graphs increases.


To overcome these complexities companies use enterprise  scheduling tools like Control M, Tidal and there are a few open source alternatives like Pinball, Azkaban and Luigi and Airflow.


This blog covers how to call IICS mapping tasks using a third party scheduler and I will be using apache airflow as the the enterprise scheduler and use IICS Rest API's to trigger the mapping tasks.


For this blog imagine a scenario where we have 3 CDI tasks and 1 CDI-Elastic tasks and there is a need to execute the CDI tasks in parallel and upon successful completion of the CDI tasks trigger the CDI-Elastic task.


For more details on CDI-Elastic please visit this page Big Data Integration Scalability? Get Cloud Data Integration Elastic | Informatica



Apache Airflow overview


Airflow is a platform to programmatically author, schedule and monitor workflows.


Airflow is not a data streaming solution. Tasks do not move data from one to the other (though tasks can exchange metadata!). Airflow is not in the Spark Streaming or Storm space, it is more comparable to Oozie or Azkaban.


Generally, Airflow works in a distributed environment, as you can see in the diagram below. The airflow scheduler schedules jobs according to the dependencies defined in directed acyclic graphs (DAGs), and the airflow workers pick up and run jobs with their loads properly balanced. All job information is stored in the meta DB, which is updated in a timely manner. The users can monitor their jobs via a shiny Airflow web UI and/or the logs.



My previous blog Informatica DataEngineering jobs execution using Apache Airflow has detailed steps on installing and configuring apache airflow.


Creating a DAG for IICS Mapping Tasks


The DAG code is written in such a way that it dynamically creates the airflow tasks (DAG is equivalent of taskflow in IICS)

In the below code under IICS parameter start section update the code with the iics org username and password. The password can be encrypted by using airflow variables.Please refer to airflow documentation for details on airflow variables.


On line 26 provide list of all CDI tasks and on line 27 provide the list of CDI-Elastic tasks. The code will dynamically parse the list of tasks and it will create the dependency in such a way that all CDI tasks will run in parallel and upon successful completion of CDI tasks the CDI-Elastic tasks will be triggered.


The code not only triggers the IICS mapping tasks but also retrieves the task log for every run to be viewed through airflow web UI.


For the Demo we have 3 CDI tasks and 1 CDI-Elastic task.

  • Task_Items, Task_Store_Sales, Task_Date_Dim can run in parallel
  • Upon successful completion of the above tasks Total_Store_Sales_IWDEMO will be triggered



DAG Code

## Sample Airflow DAG to trigger IICS mappings
Data Integration TaskType , use one of the following codes:
  DMASK. Masking task.
  DRS. Replication task.
  DSS. Synchronization task.
  MTT. Mapping task.
  PCS. PowerCenter task.

import json

import sys

import time

from datetime import datetime, timedelta


import requests

from airflow import DAG

from airflow.operators.dummy_operator import DummyOperator

from airflow.operators.python_operator import PythonOperator


######### IICS Parameters Start ##########
iics_username = "iicsuser_name"
iics_password = "iics_password"
task_type = 'MTT'
base_url = ""
CDI_task_name = ["Task_Date_Dim", "Task_Items", "Task_Store_Sales"]

CDI_E_task_name = ["Task_Total_Store_Sales_IWDEMO"]

######### IICS Parameters End ##########

# Airflow Parameters -- these args will get passed on to each operator
# you can override them on a per-task basis during operator initialization
default_args = {

   'owner': 'infa',
   'depends_on_past': False,
   'email': [''],
   'email_on_failure': False,
   'email_on_retry': False,
   'retries': 1,
   'retry_delay': timedelta(minutes=1),
   'start_date': - timedelta(seconds=10),
   'schedule': '@daily'



def get_session_id(un, pw):

   session_id = ''
   data = {'@type': 'login', 'username': un, 'password': pw}

  url = base_url

  headers = {'Content-Type': 'application/json', 'Accept': 'application/json'}

  r =, data=json.dumps(data), headers=headers)

   # print('Session Id API Response Status Code: ' + str(r.status_code))
   if r.status_code == 200:

  session_id = r.json()["icSessionId"]

  server_url = r.json()["serverUrl"]

   # print('Session Id: ' + session_id)

   print('API call failed:')





   return session_id, server_url



def start_job(session_id, server_url, taskname, taskType):

   '''Use Session Id and Server URL from the user login API
  and start the specified job'''
   job_start_url = server_url + "/api/v2/job"
   headers = {'Content-Type': 'application/json'
   , 'icSessionId': session_id, 'Accept': 'application/json'}

  data = {'@type': 'job', 'taskName': taskname, 'taskType': taskType}

  r =, data=json.dumps(data), headers=headers)

   if r.status_code == 200:

  response_content = json.loads(r.text)

  taskid = response_content['taskId']

  runid = response_content['runId']

  tname = response_content['taskName']

   print("Job " + taskname + " has been successfully started")

   return taskid, runid, tname


   print('Job failed to start with status: ' + str(r.status_code))




def get_status(server_url, session_id):

  job_activity_url = server_url + "/api/v2/activity/activityMonitor"
   headers = {'Content-Type': 'application/json', 'icSessionId': session_id, 'Accept': 'application/json'}

  r = requests.get(job_activity_url, headers=headers)

   if r.status_code == 200:

  response_content = json.loads(r.text)

   for obj in response_content:

  tn = obj['taskName']

  tid = obj['taskId']

  exec_state = obj['executionState']

  rid = obj['runId']

   print("Status of job " + tn + " is " + exec_state)

   return tid, exec_state, tn, rid


   print('Failed to get activity monitor : ' + str(r.status_code))




def execute_task(task_name):

  username = iics_username

  password = iics_password

  login_response = get_session_id(username, password)

  session_id = login_response[0]

  server_url = login_response[1]

  start_job(session_id, server_url, task_name, task_type)

  log_url = server_url + "/api/v2/activity/activityLog/"
   headers = {'Content-Type': 'application/json', 'icSessionId': session_id, 'Accept': 'application/json'}

  task_status = get_status(server_url, session_id)

  task_id = task_status[0]

  run_id = task_status[3]

   while True:



  new_status = get_status(server_url, session_id)

   if new_status is None:

  url = log_url + "?taskId=" + task_id + "&runId=" + str(run_id)

  r = requests.get(url, headers=headers)

  response_content = json.loads(r.text)

   for obj in response_content:

  t_id = obj['id']

  task_log = requests.get(log_url + t_id + "/sessionLog", headers=headers)



# Airflow DAG

dag = DAG(

   description='A Sample IICS Airflow DAG')


cdi_start = DummyOperator(




cdi_end = DummyOperator(



for i in CDI_task_name:

  cdi_task = PythonOperator(

   task_id='IICS_CDI_' + i,
   op_kwargs={'task_name': i},

  cdi_start >> cdi_task >> cdi_end


for j in CDI_E_task_name:

  cdi_e_task = PythonOperator(

   task_id='IICS_CDI_E_' + j,
   op_kwargs={'task_name': j},

  cdi_end >> cdi_e_task



Save the above code as inside as under /opt/infa/airflow/dags folder.


Restart the airflow webserver and the IICS_Airflow_Demo DAG will appear in the list of DAG’s



Click on the DAG and go to Graph View.



Run the DAG and you will see the status of the DAG’s running in the Airflow UI as well as the IICS monitor. In the IICS monitor task details you can see the job is triggered via IICS rest API.




To view session logs,in the airflow Web UI click on any task run and click the "view Log" button to retrieve mapping details and session log.