Authors: amula Skip navigation

Big Data Management

3 Posts authored by: Abhilash Mula

Text Classification in BDM using NLP

 

This document shows how to do text classification in BDM using NLP. We will be using PredictionIO server to run our classification engine. The demo covers how to install predictionIO, build/train & deploy a text classification template and use that in BDM.

 

Apache PredictionIo Overview

 

Apache PredictionIO is an open source Machine Learning Server built on top of a state-of-the-art open source stack for developers and data scientists to create predictive engines for any machine learning task.

 

It lets you:

 

  • Quickly build and deploy an engine as a web service on production with customizable templates;
  • Respond to dynamic queries in real-time once deployed as a web service;
  • Evaluate and tune multiple engine variants systematically;
  • Unify data from multiple platforms in batch or in real-time for comprehensive predictive analytics;
  • Speed up machine learning modeling with systematic processes and pre-built evaluation measures;
  • Support machine learning and data processing libraries such as Spark MLLib and OpenNLP;
  • Implement your own machine learning models and seamlessly incorporate them into your engine;
  • Simplify data infrastructure management.
  • Apache PredictionIO can be installed as a full machine learning stack, bundled with Apache Spark, MLlib, HBase, Spray and Elasticsearch, which simplifies and accelerates scalable machine learning infrastructure management.

 

 

 

PredictionIO Architecture

 

Apache PredictionIO consists of different components.

 

PredictionIO Platform: An open source machine learning stack built on the top of some state-of-the-art open source application such as Apache Spark, Apache Hadoop, Apache HBase and Elasticsearch.

 

Event Server: This continuously gathers data from your web server or mobile application server in real-time mode or batch mode. The gathered data can be used to train the engine or to provide a unified view for data analysis. The event server uses Apache HBase to store the data.

 

Engine Server: The engine server is responsible for making the actual prediction. It reads the training data from the data store and uses one or more machine learning algorithm for building the predictive models. An engine, once deployed as a web service, responds to the queries made by a web or mobile app using REST API or SDK.

 

Template Gallery: This gallery offers various types of pre-built engine templates. You can choose a template which is similar to your use case and modify it according to your requirements.

 

Prerequisites

 

PredictionIO can also be installed on an existing Hadoop cluster but for the demo we will be installing the following standalone components and configure with PredictionIO

 

  • Java 1.8
  • Apache Spark
  • Apache Hbase
  • Apache Hadoop
  • Elastic Search

 

 

Installing Apache PredictionIo

 

Make sure java is installed on the machine and set JAVA_HOME and add $JAVA_HOME/bin to your path

 

 

 

Download and Install Apache PredictionIo

 

Apache provides PredictionIo source files which can be downloaded and compiled locally.Create a temporary directory and compile the source file

 

mkdir /tmp/pio_sourcefiles

cd /tmp/pio_sourcefiles

 

 

Download the PredictionIO source file archive using any apache mirror site

 

wget http://apache.mirror.vexxhost.com/incubator/predictionio/0.12.0-incubating/apache-predictionio-0.12.0-incubating.tar.gz

 

Extract the archive and compile the source to create a distribution of PredictionIO

 

tar -xvf apache-predictionio-0.12.0-incubating.tar.gz

./make-distribution.sh

 

The above distribution will be built against the default versions of the dependencies, which are Scala 2.11.8, Spark 2.1.1, Hadoop 2.7.3 and ElasticSearch 5.5.2. The build will take approximately 10-15 mins.

 

You can also build PredictionIo using the latest supported version of spark, scala,Hadoop and hbase but you may see some warnings during the build as some functions might be deprecated. To run the build using your own version run ./make-distribution.sh -Dscala.version=2.11.11 -Dspark.version=2.1.2 -Dhadoop.version=2.7.4 -Delasticsearch.version=5.5.3, replacing the version number according to your choice.

 

Once the build successfully finishes, you will see the following message at the end.

 

PredictionIO binary distribution created at PredictionIO-0.12.0-incubating.tar.gz

 

The PredictionIO binary files will be saved in the PredictionIO-0.12.0-incubating.tar.gz archive. Extract the archive in the /opt directory and provide the ownership to the current user.

 

sudo tar xf PredictionIO-0.12.0-incubating.tar.gz -C /opt/

sudo chown -R $USER:$USER /opt/PredictionIO-0.12.0-incubating

 

 

Set the PIO_HOME environment variable.

 

echo "export PIO_HOME=/opt/PredictionIO-0.12.0-incubating" >> ~/.bash_profile

source ~/.bash_profile

 

 

Install Required Dependencies

 

Create a new directory to install PredictionIO dependencies such as HBase, Spark and Elasticsearch.

 

mkdir /opt/PredictionIO-0.12.0-incubating/vendors

 

Download Scala version 2.11.8 and extract it into the vendors directory.

 

wget https://downloads.lightbend.com/scala/2.11.8/scala-2.11.8.tgz

tar xf scala-2.11.8.tgz -C /opt/PredictionIO-0.12.0-incubating/vendors

 

Download Apache Hadoop version 2.7.3 and extract it into the vendors directory.

 

wget https://archive.apache.org/dist/hadoop/common/hadoop-2.7.3/hadoop-2.7.3.tar.gz

tar xf hadoop-2.7.3.tar.gz -C /opt/PredictionIO-0.12.0-incubating/vendors

 

Apache Spark is the default processing engine for PredictionIO. Download Spark version 2.1.1 and extract it into the vendors directory.

 

wget https://archive.apache.org/dist/spark/spark-2.1.1/spark-2.1.1-bin-hadoop2.7.tgz

tar xf spark-2.1.1-bin-hadoop2.7.tgz -C /opt/PredictionIO-0.12.0-incubating/vendors

 

Download Elasticsearch version 5.5.2 and extract it into the vendors directory.

 

wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.5.2.tar.gz

tar xf elasticsearch-5.5.2.tar.gz -C /opt/PredictionIO-0.12.0-incubating/vendors

 

Finally, download HBase version 1.2.6 and extract it into the vendors directory.

 

wget https://archive.apache.org/dist/hbase/stable/hbase-1.2.6-bin.tar.gz

tar xf hbase-1.2.6-bin.tar.gz -C /opt/PredictionIO-0.12.0-incubating/vendors

 

Open the hbase-site.xml configuration file to configure HBase to work in a standalone environment.

 

vi /opt/PredictionIO-0.12.0-incubating/vendors/hbase-1.2.6/conf/hbase-site.xml

Add the following block to the hbase configuration

 

hbase-site.xml

<configuration>

<property>

<name>hbase.rootdir</name>

<value>file:///opt/PredictionIO-0.12.0-incubating/vendors/hbase-1.2.6/data</value>

</property>

<property>

<name>hbase.zookeeper.property.dataDir</name>

<value>/opt/PredictionIO-0.12.0-incubating/vendors/hbase-1.2.6/zookeepe</value>

</property>

</configuration>

 

The hbase-site.xml should look like below.

 

The data directory will be created automatically by HBase. Edit the HBase environment file to set the JAVA_HOME path.

 

vi /opt/PredictionIO-0.12.0-incubating/vendors/hbase-1.2.6/conf/hbase-env.sh

 

Add JAVA_HOME on line 27 and and also comment line 46 and 47 as they are not needed for Java 8. The hbase-env.sh should look like below.

 

 

Configuring Apache PredictionIo

 

The default configuration in the PredictionIO environment file pio-env.sh assumes that we are using PostgreSQL or MySQL. As we have used HBase and Elasticsearch, we will need to modify nearly every configuration in the file. It's best to take a backup of the existing file and create a new PredictionIO environment file.

 

mv /opt/PredictionIO-0.12.0-incubating/conf/pio-env.sh /opt/PredictionIO-0.12.0-incubating/conf/pio-env.sh.bak

 

Create a new file for PredictionIO environment configuration.

 

vi /opt/PredictionIO-0.12.0-incubating/conf/pio-env.sh

The file should look like below

 

pio-env.sh

 

# PredictionIO Main Configuration

#

# This section controls core behavior of PredictionIO. It is very likely that

# you need to change these to fit your site.

 

# SPARK_HOME: Apache Spark is a hard dependency and must be configured.

SPARK_HOME=$PIO_HOME/vendors/spark-2.1.1-bin-hadoop2.7

 

# POSTGRES_JDBC_DRIVER=$PIO_HOME/lib/postgresql-42.0.0.jar

# MYSQL_JDBC_DRIVER=$PIO_HOME/lib/mysql-connector-java-5.1.41.jar

 

# ES_CONF_DIR: You must configure this if you have advanced configuration for

#              your Elasticsearch setup.

ES_CONF_DIR=$PIO_HOME/vendors/elasticsearch-5.5.2/config

 

# HADOOP_CONF_DIR: You must configure this if you intend to run PredictionIO

#                  with Hadoop 2.

HADOOP_CONF_DIR=$PIO_HOME/vendors/spark-2.1.1-bin-hadoop2.7/conf

 

# HBASE_CONF_DIR: You must configure this if you intend to run PredictionIO

#                 with HBase on a remote cluster.

HBASE_CONF_DIR=$PIO_HOME/vendors/hbase-1.2.6/conf

 

# Filesystem paths where PredictionIO uses as block storage.

PIO_FS_BASEDIR=$HOME/.pio_store

PIO_FS_ENGINESDIR=$PIO_FS_BASEDIR/engines

PIO_FS_TMPDIR=$PIO_FS_BASEDIR/tmp

 

# PredictionIO Storage Configuration

#

# This section controls programs that make use of PredictionIO's built-in

# storage facilities. Default values are shown below.

#

# For more information on storage configuration please refer to

# http://predictionio.incubator.apache.org/system/anotherdatastore/

 

# Storage Repositories

 

# Default is to use PostgreSQL

PIO_STORAGE_REPOSITORIES_METADATA_NAME=pio_meta

PIO_STORAGE_REPOSITORIES_METADATA_SOURCE=ELASTICSEARCH

 

PIO_STORAGE_REPOSITORIES_EVENTDATA_NAME=pio_event

PIO_STORAGE_REPOSITORIES_EVENTDATA_SOURCE=HBASE

 

PIO_STORAGE_REPOSITORIES_MODELDATA_NAME=pio_model

PIO_STORAGE_REPOSITORIES_MODELDATA_SOURCE=LOCALFS

 

# Storage Data Sources

 

# PostgreSQL Default Settings

# Please change "pio" to your database name in PIO_STORAGE_SOURCES_PGSQL_URL

# Please change PIO_STORAGE_SOURCES_PGSQL_USERNAME and

# PIO_STORAGE_SOURCES_PGSQL_PASSWORD accordingly

# PIO_STORAGE_SOURCES_PGSQL_TYPE=jdbc

# PIO_STORAGE_SOURCES_PGSQL_URL=jdbc:postgresql://localhost/pio

# PIO_STORAGE_SOURCES_PGSQL_USERNAME=pio

# PIO_STORAGE_SOURCES_PGSQL_PASSWORD=pio

 

# MySQL Example

# PIO_STORAGE_SOURCES_MYSQL_TYPE=jdbc

# PIO_STORAGE_SOURCES_MYSQL_URL=jdbc:mysql://localhost/pio

# PIO_STORAGE_SOURCES_MYSQL_USERNAME=pio

# PIO_STORAGE_SOURCES_MYSQL_PASSWORD=pio

 

# Elasticsearch Example

PIO_STORAGE_SOURCES_ELASTICSEARCH_TYPE=elasticsearch

PIO_STORAGE_SOURCES_ELASTICSEARCH_HOSTS=localhost

PIO_STORAGE_SOURCES_ELASTICSEARCH_PORTS=9200

PIO_STORAGE_SOURCES_ELASTICSEARCH_SCHEMES=http

PIO_STORAGE_SOURCES_ELASTICSEARCH_CLUSTERNAME=pio

PIO_STORAGE_SOURCES_ELASTICSEARCH_HOME=$PIO_HOME/vendors/elasticsearch-5.5.2

 

# Optional basic HTTP auth

# PIO_STORAGE_SOURCES_ELASTICSEARCH_USERNAME=my-name

# PIO_STORAGE_SOURCES_ELASTICSEARCH_PASSWORD=my-secret

# Elasticsearch 1.x Example

# PIO_STORAGE_SOURCES_ELASTICSEARCH_TYPE=elasticsearch

# PIO_STORAGE_SOURCES_ELASTICSEARCH_CLUSTERNAME=<elasticsearch_cluster_name>

# PIO_STORAGE_SOURCES_ELASTICSEARCH_HOSTS=localhost

# PIO_STORAGE_SOURCES_ELASTICSEARCH_PORTS=9300

# PIO_STORAGE_SOURCES_ELASTICSEARCH_HOME=$PIO_HOME/vendors/elasticsearch-1.7.6

 

# Local File System Example

PIO_STORAGE_SOURCES_LOCALFS_TYPE=localfs

PIO_STORAGE_SOURCES_LOCALFS_PATH=$PIO_FS_BASEDIR/models

 

# HBase Example

PIO_STORAGE_SOURCES_HBASE_TYPE=hbase

PIO_STORAGE_SOURCES_HBASE_HOME=$PIO_HOME/vendors/hbase-1.2.6

 

# AWS S3 Example

# PIO_STORAGE_SOURCES_S3_TYPE=s3

# PIO_STORAGE_SOURCES_S3_BUCKET_NAME=pio_bucket

# PIO_STORAGE_SOURCES_S3_BASE_PATH=pio_model

 

Open the Elasticsearch configuration file:

 

cat /opt/PredictionIO-0.12.0-incubating/vendors/elasticsearch-5.5.2/config/elasticsearch.yml

 

Uncomment the line and set the cluster name to exactly the same as the one provided in the PredictionIO environment file. The cluster name is set to pio in the configuration (in bold below)

 

elasticsearch.yml

 

# ======================== Elasticsearch Configuration =========================

#

# NOTE: Elasticsearch comes with reasonable defaults for most settings.

#       Before you set out to tweak and tune the configuration, make sure you

#       understand what are you trying to accomplish and the consequences.

#

# The primary way of configuring a node is via this file. This template lists

# the most important settings you may want to configure for a production cluster.

#

# Please consult the documentation for further information on configuration options:

# https://www.elastic.co/guide/en/elasticsearch/reference/index.html

#

# ---------------------------------- Cluster -----------------------------------

#

# Use a descriptive name for your cluster:

#

cluster.name: pio

#

# ------------------------------------ Node ------------------------------------

#

# Use a descriptive name for the node:

#

#node.name: node-1

#

# Add custom attributes to the node:

#

#node.attr.rack: r1

#

# ----------------------------------- Paths ------------------------------------

#

# Path to directory where to store the data (separate multiple locations by comma):

#

#path.data: /path/to/data

#

# Path to log files:

#

#path.logs: /path/to/logs

#

# ----------------------------------- Memory -----------------------------------

#

# Lock the memory on startup:

#

#bootstrap.memory_lock: true

#

# Make sure that the heap size is set to about half the memory available

# on the system and that the owner of the process is allowed to use this

# limit.

#

# Elasticsearch performs poorly when the system is swapping the memory.

#

# ---------------------------------- Network -----------------------------------

#

# Set the bind address to a specific IP (IPv4 or IPv6):

#

#network.host: 192.168.0.1

#

# Set a custom port for HTTP:

#

#http.port: 9200

#

# For more information, consult the network module documentation.

#

# --------------------------------- Discovery ----------------------------------

#

# Pass an initial list of hosts to perform discovery when new node is started:

# The default list of hosts is ["127.0.0.1", "[::1]"]

#

#discovery.zen.ping.unicast.hosts: ["host1", "host2"]

#

# Prevent the "split brain" by configuring the majority of nodes (total number of master-eligible nodes / 2 + 1):

#

#discovery.zen.minimum_master_nodes: 3

#

# For more information, consult the zen discovery module documentation.

#

# ---------------------------------- Gateway -----------------------------------

#

# Block initial recovery after a full cluster restart until N nodes are started:

#

#gateway.recover_after_nodes: 3

#

# For more information, consult the gateway module documentation.

#

# ---------------------------------- Various -----------------------------------

#

# Require explicit names when deleting indices:

#

#action.destructive_requires_name: true

 

Add the $PIO_HOME/bin directory into the PATH variable so that the PredictionIO executables are executed directly.

 

echo "export PATH=$PATH:$PIO_HOME/bin" >> ~/.bash_profile

source ~/.bash_profile

 

 

At this point, PredictionIO is successfully installed on your server.

 

 

Starting PredictionIo

 

 

You can start all the services in PredictionIO such as Elasticsearch, HBase and Event server using a single command.

 

 

You will see the following output.


Use the following command to check the status of the PredictionIO server.

 

You will see the following output.

pio-start-all

Starting Elasticsearch...

Starting HBase...

starting master, logging to /opt/PredictionIO-0.12.0-incubating/vendors/hbase-1.2.6/bin/../logs/hbase-user-master-vultr.guest.out

   Waiting 10 seconds for Storage Repositories to fully initialize...

   Starting PredictionIO Event Server...

 

Implementing an Engine Template

 

Several ready to use engine templates are available on the PredictionIO Template Gallery which can be easily installed on the PredictionIO server. You can browse through the list of engine templates to find the one that is close to your requirements or you can write your own engine.

 

In this tutorial, we will implement the Text Classification engine template to demonstrate the functionality of PredictionIO server using some sample data.

 

This engine template takes input like twitter or email or newsgroups data and tells us the topic inside the data like whether they are talking about a particular topic, you can send a query with the twitter or email data and the output will be the topic name.

 

Install Git, as it will be used to clone the repository.

 

sudo yum -y install git

Clone the text classification engine template on your system.

 

git clone https://github.com/amrgit/textclassification.git

cd template-classification-opennlp

 

You can choose any name for your application.

 

pio app new docclassification

You can type the following command to list the apps that are created inside PredictionIo

 

pio app list

 

Install PredictionIO python SDK using pip

 

pip install predictionio

 

 

Run the Python script to add the sample data to the event server. The git project already has some sample datasets which can be used to train the model. We will use the  20newsgroups training data set for this demo.

python3 data/import_data.py --access_key 8FhrUWaTIZJLLPcuS0bRu64O4TiZoYjgZFWjWm_Mik3QgoxoZAUO-7Ti4xo59ZcX --file datasets/20ng-train-no-stop.txt

 

 

If the import is successful you should see a message like below

 

 

The above script imports 11294 events To check if the events are imported or not, you can run the following query.

 

curl -i -X GET "http://localhost:7070/events.json?accessKey=8FhrUWaTIZJLLPcuS0bRu64O4TiZoYjgZFWjWm_Mik3QgoxoZAUO-7Ti4xo59ZcX"

 

 

The output will show you the list of all the imported events in JSON format.

 

Open the engine.json file into the editor. This file contains the configuration of the engine. Make sure the appId matches to the Id from “pio app list” command.

 

 

Build the application using the following command. If you do no want to run in verbose remove the verbose parameter.

 

pio build --verbose

 

You should see the message that the build is successful and ready for training.

 

 

Train the engine. During the training, the engine analyzes the data set and trains itself according to the provided algorithm.

pio train

If the train command fails with OOM errors use the following command to

 

Header 1

pio train -- --driver-memory 2g --executor-memory 4g

You should see a message that the train is successful

 

 

Before we deploy the application, we will need to open the port 8000 so that the status of the application can be viewed on the Web GUI. Also, the websites and applications using the event server will send and receive their queries through this port. You can also use a different port in the deploy command.

 

You can deploy the PredictionIO engine using the following command

 

pio deploy

You can increase the driver memory for deploy command using the following command and you use use a different port using the –port argument.

 

pio deploy -- --driver-memory 4G &

You will see a message that the engine is deployed and running.

 

 

Calling PredictionIO engine through BDM

 

BDM will be sending data to predictionIO event server which is then passed to the prediction engine and the the engine sends the results back to BDM

 

 

 

You can use the sample datasets provided in the github link for testing through BDM mapping or get your own dataset. For the demo im using the datasets on github and copied these datasets onto my Hadoop cluster.

 

 

Create a flatfile dataobject called opennlp_dataset in developer client and in the advanced tab change the connection type to Hadoop file system and connection name to your HDFS connection. Also change the source file directory to the hdfs location.

 

 

 

 

Create a new mapping and call it m_Text_Classification_OpenNLP

 

 

 

 

Drag the flatfile object opennlp_dataset created in the above step into the mapping workspace and choose read operation. The object will look like below in the mapping workspace

 

 

 

Add a python transformation and create an input port called “data” and an output port called “class_output”

 

 

 

In the python tab of python transformation add the following code

 

import predictionio
engine_client = predictionio.EngineClient(
url="http://infa1021.infaaws.com:8000")
text_class = (engine_client.send_query({
"sentence": data}))
for i in text_class:
class_output = text_class[i]

 

 

 

The mapping should like below at this point.

 

 

 

Add a pass through expression transformation and drag the class_output from python tx to expression tx.

 

You can right click on the expression transformation and click on create target, then click relational choose relational,choose hive from drop down,name the hive table as text_class_output

 

The final mapping should like this.

 

 

 

 

In the mapping  properties window choose spark as execution engine and the Hadoop connection.

 

 

Run the mapping and monitor it through admin console

 

 

 

Once the mapping is successful verify the output through beeline or any hive client. For the demo im using zeppelin to query the table and view the results as piechart.

 

 

 

You can also view the results in tabular format in zeppelin and as you can see in the screen shot below the count of messages and topic name.

 

 

PredictionIO has other engine templates which can be deployed in a similar fashion and used in BDM.

This blog post shows how to call webservice in BDM using Spark.

We will be using the python transformation that’s introduced in BDM 10.2.1 to call the web-service.

 

Java tx is another option to call the webservice.

 

Pre-requisites

 

Python and jep package need to be installed on BDM DIS server, refer to the install documentation to configure python transformation with BDM.

 

Post python Installation, edit the Hadoop connection by going to window --> preferences --> connections --> click on your hadoop connection

 

 

 

Edit the hadoop connection and go to spark tab

 

 

 

Under the spark tab , advanced properties --> click the Edit button.  The first 3 properties in the screenshot are the python properties which come by default and we will put in the values for those 3 properties. Change values as per your python installation.

 

 

Web-service Details

 

 

We will be using the following webservice to get the states for any given country

 

http://services.groupkt.com/state/get

 

For example if we pass the country “USA” to the above url, the webservice will return all the state information within USA along with other details like area,capital, largest city etc.

 

To test the webservice for USA open the following URL in your browser and it will return json output.

 

http://services.groupkt.com/state/get/USA/all

 

 

 

Calling the web-service in BDM Mapping

 

 

We will create 2 mappings in BDM

 

In the first mapping we we will pass the country names from an input file,  then use the python tx to call the web-service and finally write the output to a HDFS file. The output will be a json file.

 

In the second mapping we will parse the json  output from the above mapping and write to Hive.

 

 

Mapping 1:

 

We have an input file is on hdfs with the following contents. We will pass the country names from this input file and get the states.

 

Create a flat file object file in developer client, in the advanced properties go to the read section and point to the HDFS connection and directory.

 

 

 

Create a new mapping and drag the flatfile object in the mapping and  choose the read operation.

 

 

 

Add a python transformation to the mapping and drag the country_name to input of python tx.

Create an output port for python tx and add call it states_data_json. The python tx ports should look like below.

 

 

Go to the python tab of the python tx and add the following code

 

 

import requests
import json

input_string = country_name
input_url =
"http://services.groupkt.com/state/get/"

states = requests.get(input_url + input_string + "/all")
states_data = states.json()
states_data_json = json.dumps(states_data)

 

 

Connect the output port of python transformation to a flatfile data  object writing to hdfs.

 

 

 

The target data object properties look like below

 

 

Change the execution mode of the mapping to run in spark

 

 

Execute the mapping and verify the status of the mapping in the admin console.

 

 

 

Verify the output of the mapping on hdfs and you will see the output in json format.

 

 

 

Mapping 2:

In this mapping we will parse the output file in the previous which is json making it structured.

 

Create a complex data object by right clicking on physical data objects -> New -> Physcial Data Object

 

 

Choose complex file data object and click Next

 

 

Name the complex file data objects as “cfr_states” and click on the browse button under connection and choose your hdfs connection and Under “selected resources” click on the Add button

 

 

In the Add resource, navigate to the hdfs file location (this is the output file location we gave in the previous mapping) and click on the json file and click OK

 

 

 

 

Click finish on the next step

 

 

Now create a dataprocessor transformation by right clicking on transformations -> New -> Transformation

 

 

Choose data processor transformation from the list of transformations

 

 

Name the data processor transformation as “dp_ws_state” and choose the “create a data processor using a wizard”

 

 

 

Since the input to the data processor transformation is coming as JSON , choose json In the next step and click next

 

 

 

Make sure you have sample output from the first mapping on the developer machine and choose the “sample json file” option and browse the sample json file and click next

 

 

 

Choose relational output and click finish

 

 

 

After you click on the finish button the data processor transformation will look like below

 

Create a hive table using the following DDL in your target database and import the hive table as a relational data object into the developer client

 

CREATE TABLE infa_pushdown.ws_states (

            FKey_states BIGINT,

            id DOUBLE,

            country STRING,

            name STRING,

            abbr STRING,

            area STRING,

            largest_city STRING,

            capital STRING

) ;

 

 

Now drag the compex file reader, the data processor transformation and the Hive target into the mapping. The connect the data port from CFR to the input of data processor and the output of data processor to Hive target.

 

The final mapping should look like below.

 

 

The mapping is tested in BDM 10.2.1 and in this version data processor is not supported in spark mode so we will run the second mapping using Blaze. Once data processor support is added in spark the second mapping can be eliminated by adding data processor transformation in the first mapping. Screen shot showing blaze as the execution engine.

 

 

Execute the mapping and verify the output of the target table by running the data viewer on target data object

 

This blog is a short overview about Apache Airflow and shows how to integrate BDM with Apache Airflow. I also have a sample template to orchestrate BDM mappings. Same concept can be extended with Powercenter and non-BDM mappings.

 

Apache Airflow overview

 

Airflow is a platform to programmatically author, schedule and monitor workflows.

 

Airflow is not a data streaming solution. Tasks do not move data from one to the other (though tasks can exchange metadata!). Airflow is not in the Spark Streaming or Storm space, it is more comparable to Oozie or Azkaban.

 

Generally, Airflow works in a distributed environment, as you can see in the diagram below. The airflow scheduler schedules jobs according to the dependencies defined in directed acyclic graphs (DAGs), and the airflow workers pick up and run jobs with their loads properly balanced. All job information is stored in the meta DB, which is updated in a timely manner. The users can monitor their jobs via a shiny Airflow web UI and/or the logs.

 

 

Installing Apache Airflow

 

The following installation method is for non-production type of uses. Refer to airflow documentation for production type of deployments.

 

Apache Airflow has various operators listed below. An operator describes a single task in a workflow.

 

https://github.com/apache/incubator-airflow/tree/master/airflow/operators

 

To trigger Informatica BDM mappings we will be using the bashoperator i.e triggering the mappings through commandline.

 

  1. If apache airflow is running on a machine different than infa node, install Informatica command line utilities on the airflow worker nodes
  2. Python

 

 

Create a directory /opt/infa/airflow

 

 

Easy way to install to run the following command. Pip is a python utility to install various python packages.

 

pip install apache-airflow

set AIRFLOW_HOME environment variable

 

Create a folder called “dags” inside AIRFLOW_HOME folder.

 

Initialize the airflow DB by typing the command “airflow initdb”. This is where the metdata will be stored, we will be using the default aclchemy database that comes with airflow, if needed the configuration can be modified to make mysql or postgres as the backend for airflow.

 

 

If the initdb shows any errors its most likely because of some missing airflow packages and a complete list of packages and the commands to install them are in the below link.

 

https://airflow.apache.org/installation.html

 

start the airflow web UI using the following command

 

Start the airflow scheduler

 

 

Login into the Airflow UI using the URL http://hostname:8080, if you have installed examples you should see the example DAG’s listed in the UI.

 

 

Creating a DAG for BDM Mappings

 

 

For the Demo we deployed the following 3 BDM mappings in to DIS.

 

Application_m_01_Get_States_Rest_Webservice

Application_m_02_Parse_Webservice_Output

Application_m_Read_Oracle_Customers_Write_Hive_Python

 

 

The 3 applications need to be orchestrated in the following way.

 

  1. Application_m_01_Get_States_Rest_Webservice and Application_m_01_Get_States_Rest_Webservice can run in parallel
  2. Application_m_02_Parse_Webservice_Output will run only if Application_m_01_Get_States_Rest_Webservice is successful

 

 

 

Save the following code as inside as airflow_bdm_sample.py under /opt/infa/airflow/dags folder.

There are different ways to call infacmd runmapping command, for example the command can be put in a shell script and the script can be called from the DAG.

 

 

#Start Code

import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta


# these args will get passed on to each operator
# you can override them on a per-task basis during operator initialization
default_args = {
'owner': 'infa',
'depends_on_past': False,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1),
'start_date': datetime.now() - timedelta(seconds=10),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'dag': dag,
# 'adhoc':False,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function,
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
# 'trigger_rule': u'all_success'
}

dag = DAG(
'Informatica_Bigdata_Demo',
default_args=default_args,
description='A simple Informatica BDM DAG')

# Printing start date and time of the DAG

t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)


t2 = BashOperator(
task_id='mapping_calling_webservice',
depends_on_past=False,
bash_command='infacmd.sh ms RunMapping -dn infa_dom_1021 -sn dis_bdm_cdh -un Administrator -m m_01_Get_States_Rest_Webservice -a Application_m_01_Get_States_Rest_Webservice -pd admin',
dag=dag)


t3 = BashOperator(
task_id='mapping_parsing_json',
depends_on_past=True,
bash_command='infacmd.sh ms RunMapping -dn infa_dom_1021 -sn dis_bdm_cdh -un Administrator -m m_02_Parse_Webservice_Output -a Application_m_02_Parse_Webservice_Output',
dag=dag)


t4 = BashOperator(
task_id='Read_Oracle_load_Hive',
depends_on_past=False,
bash_command='infacmd.sh ms RunMapping -dn infa_dom_1021 -sn dis_bdm_cdh -un Administrator -m m_Read_Oracle_Customers_Write_Hive_Python -a Application_m_Read_Oracle_Customers_Write_Hive_Python',
dag=dag)



t1.set_downstream(t2)
t2.set_downstream(t3)
t4.set_upstream(t1)

 

 

# End code

 

 

Restart the airflow webserver and the Informatica_Bigdata_Demo DAG will appear in the list of DAG’s

 

 

 

Click on the DAG and go to Graph View, it gives a better view of orchestration.

 

 

 

Run the DAG and you will see the status of the DAG’s running in the Airflow UI as well as the Informatica monitor

 

 

 

The above DAG code can be extended to get the mapping logs, status of the runs.