Skip navigation

Introduction

 

Most companies rely on batch processing jobs at set schedules for a large number of tasks. These jobs range from informatica IICS mappings to database backups to file watchers etc. and organizations try to either use application in-built scheduling tools or use cron but its very complex orchestrating jobs across different systems, it can become increasingly challenging to manage them using cron.With cron it can even get increasingly harder with scale as the complexity of these jobs and their dependency graphs increases.

 

To overcome these complexities companies use enterprise  scheduling tools like Control M, Tidal and there are a few open source alternatives like Pinball, Azkaban and Luigi and Airflow.

 

This blog covers how to call IICS mapping tasks using a third party scheduler and I will be using apache airflow as the the enterprise scheduler and use IICS Rest API's to trigger the mapping tasks.

 

For this blog imagine a scenario where we have 3 CDI tasks and 1 CDI-Elastic tasks and there is a need to execute the CDI tasks in parallel and upon successful completion of the CDI tasks trigger the CDI-Elastic task.

 

For more details on CDI-Elastic please visit this page Big Data Integration Scalability? Get Cloud Data Integration Elastic | Informatica

 

Apache Airflow overview

 

Airflow is a platform to programmatically author, schedule and monitor workflows.

 

Airflow is not a data streaming solution. Tasks do not move data from one to the other (though tasks can exchange metadata!). Airflow is not in the Spark Streaming or Storm space, it is more comparable to Oozie or Azkaban.

 

Generally, Airflow works in a distributed environment, as you can see in the diagram below. The airflow scheduler schedules jobs according to the dependencies defined in directed acyclic graphs (DAGs), and the airflow workers pick up and run jobs with their loads properly balanced. All job information is stored in the meta DB, which is updated in a timely manner. The users can monitor their jobs via a shiny Airflow web UI and/or the logs.

 

 

My previous blog Informatica DataEngineering jobs execution using Apache Airflow has detailed steps on installing and configuring apache airflow.

 

Creating a DAG for IICS Mapping Tasks

 

The DAG code is written in such a way that it dynamically creates the airflow tasks (DAG is equivalent of taskflow in IICS)

In the below code under IICS parameter start section update the code with the iics org username and password. The password can be encrypted by using airflow variables.Please refer to airflow documentation for details on airflow variables.

 

On line 26 provide list of all CDI tasks and on line 27 provide the list of CDI-Elastic tasks. The code will dynamically parse the list of tasks and it will create the dependency in such a way that all CDI tasks will run in parallel and upon successful completion of CDI tasks the CDI-Elastic tasks will be triggered.

 

The code not only triggers the IICS mapping tasks but also retrieves the task log for every run to be viewed through airflow web UI.

 

For the Demo we have 3 CDI tasks and 1 CDI-Elastic task.

  • Task_Items, Task_Store_Sales, Task_Date_Dim can run in parallel
  • Upon successful completion of the above tasks Total_Store_Sales_IWDEMO will be triggered

 

 

DAG Code

"""
## Sample Airflow DAG to trigger IICS mappings
Data Integration TaskType , use one of the following codes:
  DMASK. Masking task.
  DRS. Replication task.
  DSS. Synchronization task.
  MTT. Mapping task.
  PCS. PowerCenter task.
"""

import json

import sys

import time

from datetime import datetime, timedelta

 

import requests

from airflow import DAG

from airflow.operators.dummy_operator import DummyOperator

from airflow.operators.python_operator import PythonOperator

 

######### IICS Parameters Start ##########
iics_username = "iicsuser_name"
iics_password = "iics_password"
task_type = 'MTT'
base_url = "https://dm-us.informaticacloud.com/ma/api/v2/user/login"
CDI_task_name = ["Task_Date_Dim", "Task_Items", "Task_Store_Sales"]

CDI_E_task_name = ["Task_Total_Store_Sales_IWDEMO"]



######### IICS Parameters End ##########

# Airflow Parameters -- these args will get passed on to each operator
# you can override them on a per-task basis during operator initialization
default_args = {

   'owner': 'infa',
   'depends_on_past': False,
   'email': ['airflow@example.com'],
   'email_on_failure': False,
   'email_on_retry': False,
   'retries': 1,
   'retry_delay': timedelta(minutes=1),
   'start_date': datetime.now() - timedelta(seconds=10),
   'schedule': '@daily'
}

 

 

def get_session_id(un, pw):

   session_id = ''
   data = {'@type': 'login', 'username': un, 'password': pw}

  url = base_url

  headers = {'Content-Type': 'application/json', 'Accept': 'application/json'}

  r = requests.post(url, data=json.dumps(data), headers=headers)

   # print('Session Id API Response Status Code: ' + str(r.status_code))
   if r.status_code == 200:

  session_id = r.json()["icSessionId"]

  server_url = r.json()["serverUrl"]

   # print('Session Id: ' + session_id)
   else:

   print('API call failed:')

   print(r.headers)

   print(r.json())

  sys.exit(1)

 

   return session_id, server_url

 

 

def start_job(session_id, server_url, taskname, taskType):

   '''Use Session Id and Server URL from the user login API
  and start the specified job'''
   job_start_url = server_url + "/api/v2/job"
   headers = {'Content-Type': 'application/json'
   , 'icSessionId': session_id, 'Accept': 'application/json'}

  data = {'@type': 'job', 'taskName': taskname, 'taskType': taskType}

  r = requests.post(job_start_url, data=json.dumps(data), headers=headers)

   if r.status_code == 200:

  response_content = json.loads(r.text)

  taskid = response_content['taskId']

  runid = response_content['runId']

  tname = response_content['taskName']

   print("Job " + taskname + " has been successfully started")

   return taskid, runid, tname

   else:

   print('Job failed to start with status: ' + str(r.status_code))

   print(r.content)

 

 

def get_status(server_url, session_id):

  job_activity_url = server_url + "/api/v2/activity/activityMonitor"
   headers = {'Content-Type': 'application/json', 'icSessionId': session_id, 'Accept': 'application/json'}

  r = requests.get(job_activity_url, headers=headers)

   if r.status_code == 200:

  response_content = json.loads(r.text)

   for obj in response_content:

  tn = obj['taskName']

  tid = obj['taskId']

  exec_state = obj['executionState']

  rid = obj['runId']

   print("Status of job " + tn + " is " + exec_state)

   return tid, exec_state, tn, rid

   else:

   print('Failed to get activity monitor : ' + str(r.status_code))

   print(r.content)

 

 

def execute_task(task_name):

  username = iics_username

  password = iics_password

  login_response = get_session_id(username, password)

  session_id = login_response[0]

  server_url = login_response[1]

  start_job(session_id, server_url, task_name, task_type)

  log_url = server_url + "/api/v2/activity/activityLog/"
   headers = {'Content-Type': 'application/json', 'icSessionId': session_id, 'Accept': 'application/json'}

  task_status = get_status(server_url, session_id)

  task_id = task_status[0]

  run_id = task_status[3]

   while True:

   status = {"RUNNING", "INITIALIZED", "STOPPING", "QUEUED"}

  time.sleep(15)

  new_status = get_status(server_url, session_id)

   if new_status is None:

  url = log_url + "?taskId=" + task_id + "&runId=" + str(run_id)

  r = requests.get(url, headers=headers)

  response_content = json.loads(r.text)

   for obj in response_content:

  t_id = obj['id']

  task_log = requests.get(log_url + t_id + "/sessionLog", headers=headers)

   print(task_log.text)

   break


# Airflow DAG

dag = DAG(

   'IICS_Airflow_Demo',
   default_args=default_args,
   description='A Sample IICS Airflow DAG')

 

cdi_start = DummyOperator(

   task_id='cdi_start',
   dag=dag

)

 

cdi_end = DummyOperator(

   task_id='cdi_end',
   dag=dag)

 

for i in CDI_task_name:

  cdi_task = PythonOperator(

   task_id='IICS_CDI_' + i,
   python_callable=execute_task,
   op_kwargs={'task_name': i},
   dag=dag)

  cdi_start >> cdi_task >> cdi_end

 

for j in CDI_E_task_name:

  cdi_e_task = PythonOperator(

   task_id='IICS_CDI_E_' + j,
   python_callable=execute_task,
   op_kwargs={'task_name': j},
   dag=dag)

  cdi_end >> cdi_e_task

 

 

Save the above code as inside as IICS_Airflow_Sample.py under /opt/infa/airflow/dags folder.

 

Restart the airflow webserver and the IICS_Airflow_Demo DAG will appear in the list of DAG’s

 

 

Click on the DAG and go to Graph View.

 

 

Run the DAG and you will see the status of the DAG’s running in the Airflow UI as well as the IICS monitor. In the IICS monitor task details you can see the job is triggered via IICS rest API.

 

 

 

To view session logs,in the airflow Web UI click on any task run and click the "view Log" button to retrieve mapping details and session log.

 

 

This article describes a way of automatically downloading, installing and registering a secureagent using IICS REST API.

The script requires only the IICS org username and password and the install_dir is optional. This script auto detects the operating system where its being executed and downloads the appropriate installer.

 

Pre-Requisites for the Script

 

IICS Login Credentials. If you dont have IICS login credentials you can register for a trial here Integration At Scale: Spark Serverless – Preview Registration

 

On the Linux machine make sure you have pip and requests module installed. You can can run the following commands to install pip and requests on linux.

 

curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py

python get-pip.py
python -m pip install requests



On the windows machine make sure have python and requests module are installed. Python can be download using this link Python Releases for Windows | Python.org

Once python is installed open command prompt and run the command "pip install requests"


Script

"""
## Usage
Check the following parameters
username = IICS username
password = IICS password
install_dir = the target directory to install secure agent
down_load_dir = the download location of secureagent, if not specified the installer will be downloaded to the script location
option = Enter the following values
  1 = Download
  2 = Download and Install
  3 = Download,Install and Register
"""
import os

import requests

import json

import sys

import time

 

######### Parameters Start ##########
username = "iics_username"
password = "iics_password"
install_dir = '/opt/infa_agent'
option = 3
base_url = "https://dm-us.informaticacloud.com/ma/api/v2/user/login"
######### Parameters End ##########

if os.name == 'posix':

  download_dir = 'agent64_install_ng_ext.bin'
if os.name == 'nt':

  download_dir = 'agent64_install_ng_ext.exe'

def get_session_id(un, pw):

   '''Authenticate with username and password and
  retrieve icSessionId and serverUrl that are used for Subsequent API calls'''
   session_id = ''
   data = {'@type': 'login', 'username': un, 'password': pw}

  url = base_url

  headers = {'Content-Type': 'application/json', 'Accept': 'application/json'}

  r = requests.post(url, data=json.dumps(data), headers=headers)

   # print('Session Id API Response Status Code: ' + str(r.status_code))
   if r.status_code == 200:

  session_id = r.json()["icSessionId"]

  server_url = r.json()["serverUrl"]

   # print('Session Id: ' + session_id)
   else:

   print('API call failed:')

   print(r.headers)

   print(r.json())

  sys.exit(1)

 

   return session_id, server_url

 

 

def get_token():

  call_get_session_id = get_session_id(username, password)

  session_id = call_get_session_id[0]

  server_url = call_get_session_id[1]

   url = server_url + "/api/v2/agent/installerInfo/linux64"
   headers = {'Content-Type': 'application/json', 'icSessionId': session_id, 'Accept': 'application/json'}

   payload = {'application': ''}

   if os.name == 'posix':

  platform = 'linux64'
   if os.name == 'nt':

  platform = 'win64'
   r = requests.get(server_url + '/api/v2/agent/installerInfo/' + platform, headers=headers)

   print('Token API Response Status Code: ' + str(r.status_code))

  download_url = r.json()["downloadUrl"]

  install_token = r.json()["installToken"]

   print('Download Url: ' + download_url)

   print('Install Token: ' + install_token)

   return download_url, install_token

 

 

def download_secure_agent(install_download_loc):

   print(30 * '-')

   print("Downloading Secure Agent")

   print(30 * '-')

   with open(install_download_loc, 'wb') as f:

  response = requests.get(get_token()[0], stream=True)

  total = response.headers.get('content-length')

   if total is None:

  f.write(response.content)

   else:

  downloaded = 0
   total = int(total)

   for data in response.iter_content(chunk_size=max(int(total / 1000), 1024 * 1024)):

  downloaded += len(data)

  f.write(data)

  done = int(50 * downloaded / total)

  sys.stdout.write('\r[{}{}]'.format('#' * done, '.' * (50 - done)))

  sys.stdout.flush()

  sys.stdout.write('\n')

 

 

def install_secure_agent(install_loc):

   if os.name == 'posix':

   download_filename = 'agent64_install_ng_ext.bin'
   download_secure_agent(download_dir)

   print(30 * '-')

   print("Installing Linux Secure Agent")

   print(30 * '-')

  os.chmod(download_dir, 0o777)

  final_install_dir = lambda install_final_loc: "$HOME" if (not install_loc) else install_loc
   os.system('sh ' + download_dir + ' -i silent -DUSER_INSTALL_DIR=' + final_install_dir(''))

   if os.name == 'nt':

   download_filename = 'agent64_install_ng_ext.exe'
   download_secure_agent(download_dir)

   print(30 * '-')

   print("Installing Windows Secure Agent")

   print(30 * '-')

  os.chmod(download_dir, 0o777)

  final_install_dir = lambda install_final_loc: "$HOME" if (not install_loc) else install_loc
   os.system(download_dir + ' -i silent -DUSER_INSTALL_DIR=' + final_install_dir(''))

 

 

def register_secure_agent():

  install_secure_agent(install_dir)

   print(30 * '-')

   print("Registering Secure Agent")

   print(30 * '-')

  os.chdir(install_dir + "/apps/agentcore")

   if os.name == 'posix':

   start_agent = os.system("./infaagent startup")

  time.sleep(10)

  register_agent = os.system("./consoleAgentManager.sh configureToken '" + username + "' '" + get_token()[1] + "'")

   print(register_agent)

   if os.name == 'nt':

  time.sleep(10)

   register_agent = os.system('consoleAgentManager.bat configureToken ' + username + ' "' + get_token()[1] + '"')

 

 

 

if option == 1:

  download_secure_agent(download_dir)

 

if option == 2:

  install_secure_agent(install_dir)

 

if option == 3:

  register_secure_agent()

 

Script Execution

 

Save the script to a file with .py extension and run the command "python <script_name>.py"