Tuesday, 24 November 2015

Spark Blog 5 - Using Spark Streaming to write live Twitter data into IBM Databases

There are several use cases where data extracted from live data streams such as Twitter may need to be persisted into external databases. In this example, you will learn how to filter incoming live Twitter data and write relevant subsets of Twitter data into IBM database DB2. Sample program will work against all flavors of IBM databases i.e. DB2 for z/OS, DB2 distributed, dashDB and SQLDB.

We will use Spark Streaming to receive live data streams from Twitter and filter the tweets by a keyword . We will then extract the twitter user names associated with the matching tweets and insert them into DB2. These user names extracted from Twitter can have many applications – such as a more comprehensive analysis on whether these Twitter users are account holders of the bank by performing joins with other tables such as customer table.

  1. For a background on Spark Streaming, refer to http://spark.apache.org/docs/latest/streaming-programming-guide.html.

  2. We will use TwitterUtils class provided by Spark Streaming. TwitterUtils uses Twitte4J under the covers, which is a Java library for Twitter API.

  3. Create a table in DB2 called TWITTERUSERS using -
    CREATE TABLE TWITTERUSERS (NAME VARCHAR(255))

  4. Create a new Scala class in Eclipse with following contents (also available here). Change database and Twitter credentials to yours.


  5. Make sure the Project Build Path contains the jars db2jcc.jar (DB2 JDBC driver), spark-assembly-1.3.1_IBM_1-hadoop2.6.0.jar and spark-examples-1.3.1_IBM_1-hadoop2.6.0.jar, as shown below - 


  6. Lines 12 to 15 loads the DB2 driver class, establishes a connection to the database and prepares an INSERT statement that is used to insert Twitter user names into DB2.

  7. Lines 17 to 24 sets the system properties for consumerKey, consumerSecret, accessToken and accessTokenSecret that will be used by Twitter4j library to generate Oauth credentials. You do this by configuring consumer key/secret pair and access token/secret pair in your account at this link – https://dev.twitter.com/apps. Detailed instructions on how to generate the two pairs are contained at http://ampcamp.berkeley.edu/big-data-mini-course/realtime-processing-with-spark-streaming.html.

  8. Lines 26 and 27 create a local StreamingContext with 16 threads and batch interval of 2 seconds. StreamingContext is the entry point for all streaming functionality. 

  9. Using the StreamingContext created above, Line 30 creates an object DStream called stream. DStream is the basic abstraction in Spark Streaming and is a continuous stream of RDDs containing object of Type twitter4j.Status (http://twitter4j.org/javadoc/twitter4j/Status.html). A filter is also specified (“Paris”) which will select only those tweets that have keyword “Paris” in them.

  10. In Line 31, map operation on stream maps each status object to its user name to create a new DStream called users.

  11. Line 32 returns a new DStream called recentUsers where user names are aggregated over 60 seconds. 

  12. Lines 34 to 41 iterate over each RDD in the DStream recentUsers to return number of users every 60 seconds, and inserting those users into the database table TWITTERUSERS through JDBC. 

  13. Lines 44 starts real processing and awaits termination.

  14. Following screenshot shows a snippet of console output when the program is run. Of course, you can change the filter to any keyword in line 29.


  15. You can also run SELECT * from TWITTERUSERS on your database to confirm that the Twitter users get inserted.
Above simple Twitter program can be extended to more complicated use cases using Spark Streaming to do analysis of social media data more effectively, persist subset of social media data into databases and join social media data with relational data to derive additional business insights.

Friday, 9 October 2015

Quick Start in accessing DB2 data from Node.js applications

Quick Start in accessing DB2 data from Node.js applications

We are seeing a trend of DB2 data being accessed by modern distributed applications written in new APIs and frameworks. JavaScript has become extremely popular for Web application development. JavaScript adoption was revolutionized by Node.js which makes it possible to run JavaScript on the server-side. There is an increasing interest amongst developers to write analytics applications in Node.js that need to access DB2 data (both z/OS and distributed). Modern DB2 provides a Node.js driver that makes Node.js connectivity straight forward. Below are step-by-step instructions for a basic end-to-end Node.js application on Windows for accessing data from DB2 for z/OS and DB2 distributed -

1) Install Node and its companion NPM. NPM is a tool to manage Node modules. Download the installer from https://nodejs.org/dist/v0.12.7/x64/node-v0.12.7-x64.msi.

2) Note that DB2 Node.js driver does not support Node 4 on Windows yet. Node 4 support is already available for Mac and Linux. We will have Node 4 support for Windows out very soon.

3) Install a 64-bit version of Node since DB2 Node.js driver does not support 32-bit.

4) Run the installer (in my case node-v0.12.7-x64.msi). You should see a screen like Screenshot 1.

Screenshot 1

5) Follow the instructions on license and folder choice until you reach the screen for the features you want installed. Default selection is recommended and click Next to start intsall (Screenshot 2).

Screenshot 2

6) Verify that the installation is complete by opening the command prompt and executing node -v and npm -v as shown in Screenshot 3.

Screenshot 3

7) You can write a simple JavaScript program to test the installation. Create a file called Confirmation.js with contents console.log('You have successfully installed Node and NPM.');

8) Navigate to the folder you have created the file in and run the application using node Confirmation.js. Output looks like Screenshot 4.

Screenshot 4

9) Now install the DB2 Node.js driver using the following command from Windows command line: npm install ibm_db (For NodeJS 4+, installation command would be different as follows
npm install git+https://git@github.com/ibmdb/node-ibm_db.git#v4_support).

10) Under the covers, the npm command downloads node-ibm_db package from github and includes the DB2 ODBC CLI driver to provide connectivity to the DB2 backend. You should see following output (Screenshot 5).

Screenshot 5


11) Copy the following simple DB2 access program in a file called DB2Test.js and change the database credentials to yours -


var ibmdb = require('ibm_db');
ibmdb.open("DRIVER={DB2};DATABASE=<dbname>;HOSTNAME=<myhost>;UID=db2user;PWD=password;PORT=<dbport>;PROTOCOL=TCPIP", function (err,conn) {
if (err) return console.log(err);
conn.query('select 1 from sysibm.sysdummy1', function (err, data) {
if (err) console.log(err);
else console.log(data);
conn.close(function () {
console.log('done');
});
});
});


12) Run the following command from Windows command line to execute the program: node DB2Test.js. You should see Screenshot 6, containing the output of SQL SELECT 1 from SYSIBM.SYSDUMMY1. Your simple Node application can now access DB2.

Screenshot 6

13) For connecting to DB2 for z/OS, modify the Connection URL, DB name, port, user name and password to DB2 for z/OS credentials.

14) DB2 for z/OS access needs DB2 Connect license entitlement. In most production DB2 for z/OS systems with DB2 Connect Unlimited Edition licensing, server side license activation would have already been done, in which case you don't need to do anything about licensing. If you get any license error on executing the program, server side activation may not have been done. In that case, copy the DB2 Connect ODBC client side license file into ibm_db/installer/clidriver/license folder.

15) Also make sure that the DB2 for z/OS server you are testing against has CLI packages already bound (this would have been already done as part of DB2 Connect setup on the DB2 z/OS server).

16) Run the program with DB2 for z/OS credentials and you will observe similar output as Step 12.

Enjoy your Node.js test drive with DB2!

Wednesday, 12 August 2015

Spark blog 4 - Persisting Spark DataFrames into DB2


There are several use cases where data in Spark needs to be persisted in a backend database. Enterprise wide analytics may require load of data into Spark from different data sources, apply transformations, perform in-memory analytics and write the transformed data back to a enterprise RDMS such as DB2.

In this blog, simple techniques are shown using latest Spark release to load data from a JSON file into Spark and write that back into DB2 using DB2 supplied JDBC driver.

Step 1)
Download latest pre-built Spark library (1.4.1) from http://spark.apache.org/downloads.html. With the rapid evolution in Spark, many methods in 1.3 have been deprecated, and it is best to experiment with the latest.

Step 2)
In your Eclipse Scala IDE build path, add Spark library and DB2 JDBC driver as shown below -

Step 3)
Create a json file with following contents -

        { "EMPNO":10, "EDLEVEL":18, "SALARY":152750, "BONUS":1000 }
    { "EMPNO":20, "EDLEVEL":18, "SALARY":94250, "BONUS":800 }

Step 4)
Create a Scala application with following logic -
   1:  val DB2_CONNECTION_URL = "jdbc:db2://localhost:50000/sample:
currentSchema=pallavipr;user=pallavipr;password=XXXXXX;traceFile=
C:/Eclipse_Scala/trace_scala.txt;";
   2:    
   3:    val conf = new SparkConf()
   4:      .setMaster("local[1]")
   5:      .setAppName("GetEmployee")
   6:      .set("spark.executor.memory", "1g")
   7:   
   8:    val sc = new SparkContext(conf)
   9:    val sqlcontext = new SQLContext(sc)
  10:    val path = "C:/Eclipse_Scala/empint.json"
  11:   
  12:    val empdf = sqlcontext.read.json(path)
  13:    empdf.printSchema()
  14:    empdf.show()
  15:    
  16:    Class.forName("com.ibm.db2.jcc.DB2Driver");
  17:    
  18:    val prop = new Properties()
  19:    prop.put("spark.sql.dialect" , "sql"); 
  20:      
  21:    empdf.write.jdbc(DB2_CONNECTION_URL, "PALLAVIPR.EMPLOYEESALARY", prop)


Step 5) 
JSON file is loaded into Spark in Line 12 using new DataFrameReader introduced 
in Spark 1.4.0.
 
Step 6)
DB2 JDBC driver is loaded in Line 16 to carry out the write operation to DB2.
Step 7) 
On running this Scala program, you will see following schema output from 
printSchema method on DataFrame created from JSON file -

 
 
Step 8)
Print of DataFrame using Dataframes show method produces following output-




Step 9)
Final write to DB2 is done using DataFrameWriter jdbc API introduced in 1.4.0 
(as shown in Line 21) which under the covers generate the CREATE TABLE and 
INSERT sql's for table EMPLOYEESALARY.
 
Step 10)
You can verify that the table is created and the JSON data is inserted into 
DB2 using any tool of your choice -


Note that we are working through couple of issues on write back of String data types in DB2 from Spark, however, that should not stop you from experimenting with numeric types. Watch this space for more blogs and updates on DB2-Spark.

Tuesday, 4 August 2015

Spark Blog 3 – Simplify joining DB2 data and JSON data with Spark

Spark SQL gives powerful API to work with data across different data sources using Python, Scala and Java. In this post, we will demonstrate how easily DB2 data (both z/OS and LUW) can be loaded into Spark and joined with JSON data using DataFrames.

We will use a standalone Java program in this example. For setup of Eclipse to work with Spark, please refer to second blog in Spark series.

Step by step instructions for our sample program is given below.

  1. Create a table in DB2 (“EMPLOYEESUB”) using Command Line Processor (CLP) that contains a subset of EMPLOYEE information. DDL for EMPSUB is - CREATE TABLE "PALLAVIPR"."EMPLOYEESUB" ( "EMPNO" CHAR(6) NOT NULL , "FIRSTNME" VARCHAR(12) NOT NULL , "MIDINIT" CHAR(1) , "LASTNAME" VARCHAR(15) NOT NULL , "WORKDEPT" CHAR(3) , "COMM" DECIMAL(9,2) ). Change the schema to your schema and make sure that you are connected to the correct database you want to create your table in.



  1. Load EMPLOYEESUB table with 5 rows of data below, which is stored in a CSV file (C:\1.csv).

"000010","CHRISTINE","I","HAAS","A00",+0004220.00
"000020","MICHAEL","L","THOMPSON","B01",+0003300.00
"000030","SALLY","A","KWAN","C01",+0003060.00
"000050","JOHN","B","GEYER","E01",+0003214.00
"000060","IRVING","F","STERN","D11",+0002580.00



  1. Copy the following contents into a JSON file (employeesub.json)
    { "EMPNO":"000010", "EDLEVEL":"18", "SALARY":"152750.00", "BONUS":"1000.00" }
{ "EMPNO":"000020", "EDLEVEL":"18", "SALARY":"94250.00", "BONUS":"800.00" }
{ "EMPNO":"000030", "EDLEVEL":"20", "SALARY":"98250.00", "BONUS":"800.00" }
{ "EMPNO":"000050", "EDLEVEL":"16", "SALARY":"80175.00", "BONUS":"800.00" }
{ "EMPNO":"000060", "EDLEVEL":"16", "SALARY":"72250.00", "BONUS":"500.00" }

  1. As you can see, DB2 table EMPLOYEESUB contains 6 columns, EMPNO, FIRSTNME, MIDINIT, LASTNAME, WORKDEPT, COMM, while JSON file has following 4 keys – EMPNO, EDLEVEL, SALARY, BONUS. Our goal is to join both sets of data using EMPNO as the join key so that a combined data set can be created with all employee information in one place.

  2. Copy the following program in an Eclipse java class.

       1:   
       2:  import java.util.HashMap;
       3:  import java.util.Map;
       4:   
       5:  import org.apache.log4j.Level;
       6:  import org.apache.log4j.Logger;
       7:  import org.apache.spark.SparkConf;
       8:  import org.apache.spark.api.java.JavaSparkContext;
       9:  import org.apache.spark.sql.DataFrame;
      10:  import org.apache.spark.sql.Row;
      11:  import org.apache.spark.sql.SQLContext;
      12:   
      13:  public class DB2JsonJoinSpark {
      14:    public static void main(String[] args) {
      15:      
      16:        SparkConf conf = new SparkConf();
      17:      
      18:        conf.setMaster("local[1]");
      19:        conf.setAppName("GetUnionEmployeeTable");
      20:        conf.set("spark.executor.memory", "1g");
      21:      
      22:        String path = "C:\\Eclipse_Scala\\employeesub.json";
      23:      
      24:        JavaSparkContext sc = new JavaSparkContext(conf);
      25:        SQLContext sqlContext = new SQLContext(sc);
      26:   
      27:        Map<String, String> options = new HashMap<String, String>();
      28:        options.put("url", "jdbc:db2://localhost:50000/sample:currentSchema=pallavipr;user=pallavipr;password=XXXXXX;");
      29:        options.put("driver", "com.ibm.db2.jcc.DB2Driver");
      30:        options.put("dbtable", "pallavipr.employeesub");
      31:   
      32:        DataFrame df1 = sqlContext.load("jdbc", options);
      33:        df1.show();
      34:   
      35:        DataFrame df2 = sqlContext.jsonFile(path);
      36:        df2.show();
      37:   
      38:        DataFrame finaldf = df1.join(df2, df2.col("EMPNO").equalTo(df1.col("EMPNO")) );
      39:        System.out.println("Printing Joined Data");
      40:        finaldf.show();
      41:    }
      42:  }
     
  3. As shown above, we create SQLContext in Line 25 to work with RDMS and load DB2 data into Spark using DB2 JDBC driver as a DataFrame in Line 32. We load JSON data as Spark DataFrame in Line 35.

  4. Make sure that you have included Spark libraries and DB2 JDBC driver libraries in the Build path.   

  5. Right click the Java class and Run As Java application. You should see below output on the console.  
As you can see above, the output contains data from both DB2 table and JSON file, joined by EMPNO. This example clearly highlights the ease with which Spark enables join across disparate data sources with its powerful DataFrames API.

Enjoy your Spark testdrive with DB2!
- Pallavi (pallavipr@in.ibm.com) and Param (param.bng@in.bm.com)

Tuesday, 16 June 2015

Spark blog 2 - Accessing DB2 data in Spark via stadalone Scala and Java programs in Eclipse.


My colleague Param (param.bng@in.ibm.com) and I (pallavipr@in.ibm.com) are exploring various aspects of Spark integration with DB2 and DB2 Connect drivers. We have decided to write a series of articles capturing our experimentation for the benefit of others as we did not find any article that focuses on different aspects of DB2 access via Spark.

Our first article in the series covered DB2 access via Spark Scala shell. This second article focuses on accessing DB2 data from via standalone Scala and Java program in Eclipse using DB2 JDBC driver and DataFrames API. Below are the detailed step by step instructions. Note that same instructions will apply to DB2 on all platforms (z/OS, LUW, I) as well as Informix.

  1. Confirm that you have Java installed by running java -version from Windows command line. JDK version 1.7 or 1.8 is recommended.
  2. Install Spark on local machine by downloading spark from https://spark.apache.org/downloads.html.
  3. We chose pre-built binaries as shown in Screenshot 1 (instead of source code download) to avoid building spark in early experimentation phase.
    Screenshot 1



  4. Unzip the installation file to a local directory (say C:/spark).
  5. Download Scala Eclipse IDE from http://scala-ide.org/download/sdk.html
  6. Unzip scala-SDK-4.1.0-vfinal-2.11-win32.win32.x86_64.zip into a folder (say c:\Eclipse_Scala)
  7. Find eclipse.exe from eclipse folder and run. Make sure you have 64-bit Java installed by running java -version from cmd prompt. Incompatibility between 64 bit Eclipse package and 32-bit Java will give an error and Eclipse would not start.
  8. Choose a workspace for your Scala project as shown in Screenshot 2.

    Screenshot 2


  9. Create a new Scala project using File->New Scala Project.
  10. Add Spark libraries downloaded in Step 6 to the newly created Scala project as shown in Screenshot 3.
    Screenshot 3
  11. You may see an error about more than 1 scala libraries as shown in Screenshot 4 since Spark has its own copy of Scala library.
Screenshot 4

  1. Remove Scala reference from the Java build path as shown in Screenshot 5 to remove the error. 
    Screenshot 5
  2. You may see another error “The version of scala library found in the build path of DB2SparkAccess (2.10.4) is prior to the one provided by scala IDE (2.11.6). Setting a Scala Installation Choice to match”. Right click Project->Properties->Scala Compiler and change project setting to 2.10 as shown in Screenshot 6.
    Screenshot 6

  3. After clicking OK, project gets rebuilt and you will only see a warning about different Scala versions that you can ignore.
  4. Now you can right click DB2SparkAccess project and choose New Scala App as shown in Screenshot 7. Enter application name and click Finish.

    Screenshot 7

  1. Copy the following source code into the new Scala application you have created (.scala file) and modify the database credentials to yours.

    import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object DB2SparkScala extends App {
val conf = new SparkConf()
.setMaster("local[1]")
.setAppName("GetEmployee")
.set("spark.executor.memory", "1g")

val sc = new SparkContext(conf)

val sqlContext = new SQLContext(sc)

val employeeDF = sqlContext.load("jdbc", Map(
"url" -> "jdbc:db2://localhost:50000/sample:currentSchema=pallavipr;user=pallavipr;password=XXXX;",
"driver" -> "com.ibm.db2.jcc.DB2Driver",
"dbtable" -> "pallavipr.employee"))

employeeDF.show();
}


  1. Right click the application and select Run As-> Scala application as shown in Screenshot 8-


    Screenshot 8
  2. You may see the following exception - Exception in thread "main" java.lang.ClassNotFoundException: com.ibm.db2.jcc.DB2Driver. To get rid of the above exception, select Project->Properties and configure Java Build Path to include the IBM DB2 JDBC driver (db2jcc.jar or db2jcc4.jar) as shown in Screenshot 9. JDBC driver can be downloaded from http://www-01.ibm.com/support/docview.wss?uid=swg21385217
    Screenshot 9


  3. Now click on your Scala application and select Run As->Scala Application again and you should see the employee data retrieved from DB2 table as shown in Screenshot 10.
    Screenshot 10
  4. To perform similar access via a standalone Java program, Click on Project->New->Other as shown in Screenshot 11.  
    Screenshot 11
  5. Select Java->Class and click Next that takes you to Screenshot 12.
    Screenshot 12
  6. Enter a name for your Java class and click Finish as shown in Screenshot 13 -
    Screenshot 13
  7. Paste the following code into your newly created class (.java file) with database credentials changed to yours.
import java.util.HashMap;
import java.util.Map;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;

public class DB2SparkJava {
public static void main(String[] args) {

SparkConf conf = new SparkConf().setAppName("Simple Application");

conf.setMaster("local[1]");
conf.set("spark.executor.memory", "1g");

JavaSparkContext sc = new JavaSparkContext(conf);

SQLContext sqlContext = new SQLContext(sc);

Map<String, String> options = new HashMap<String, String>();
options.put(
"url",
"jdbc:db2://localhost:50000/sample:currentSchema=pallavipr;user=pallavipr;password=XXXX;");
options.put("driver", "com.ibm.db2.jcc.DB2Driver");
options.put("dbtable", "pallavipr.employee");

DataFrame jdbcDF = sqlContext.load("jdbc", options);

jdbcDF.show();

}
}


  1. Right click your newly created Java application. Select Run As → Java application. You should see similar results as Step 19 .

Wednesday, 10 June 2015

Spark blog 1 - Using Spark's interactive Scala shell for accessing DB2 data using DB2 JDBC driver and Spark's new DataFrames API


My colleague Param (param.bng@in.ibm.com) and I (pallavipr@in.ibm.com) are exploring various aspects of Spark integration with DB2 and DB2 Connect drivers. We have decided to write a series of blogs capturing our experimentation for benefit of others as we did not find any article that focuses on different aspects of DB2 access via Spark.

Currently Spark shell is available in Scala and Python. This article covers accessing and filtering DB2 data via Scala shell using DB2 supplied JDBC driver (IBM Data Server Driver for JDBC and SQLJ). Below are the step by step instructions -

1) Confirm that you have Java installed by running java -version from Windows command line. JDK version 1.7 or 1.8 is recommended.

2) Install Spark on local machine by downloading spark from https://spark.apache.org/downloads.html.

3) We chose pre-built binaries as shown in Screenshot 1 (instead of source code download) to avoid building spark in early experimentation phase.

Screenshot 1

4) Unzip the installation file to a local directory (say C:/spark).

5) Start Windows command prompt.

6) Navigate to the directory that has bin folder of spark installation (c:/spark/bin).

7) Download the DB2 JDBC driver jar (db2jcc.jar or db2jcc4.jar) from http://www-01.ibm.com/support/docview.wss?uid=swg21385217 into C:\ or any other location you desire.

8) Set spark_classpath to the location of the DB2 driver by running SET SPARK_CLASSPATH=c:\db2jcc.jar

9) Run spark-shell.cmd script found in bin folder to start Spark shell using Scala.

10) If installation was successful, you should see output like Screenshot 2, followed by a Scala prompt as in Screenshot 3.

Screenshot 2

Screenshot 3

11) In Screenshot 3, you see 2 important objects are already created for you –
11.1) SparkContext – Any Spark application needs a SparkContext which tells Spark how to access a cluster. In the shell mode, a SparkContext is already created for you in a variable called sc.
11.2) SqlContext – This is needed to construct DataFrames (equivalent to relational tables) from database data and serves as the entry point for working with structured data.

12) Once you have Spark up and running, you can issue queries to DB2 on z/OS as well as DB2 LUW through the DB2 JDBC driver. Tables from DB2 database can be loaded as a DataFrame using the following options on load -
12.1) url The JDBC URL to connect to
12.2) dbtable The JDBC table that should be read. Note that anything that is valid in a `FROM` clause of a SQL query can be used.
12.3) driver The class name of the JDBC driver needed to connect to this URL.

13) From Scala command line, issue
val employeeDF = sqlContext.load("jdbc", Map("url" -> "jdbc:db2://localhost:50000/sample:currentSchema=pallavipr;user=pallavipr;password=XXXXXX;","driver" -> "com.ibm.db2.jcc.DB2Driver","dbtable" -> "pallavipr.employee"))

14) You should see output containing the table metadata as shown in Screenshot 4 -

Screenshot 4

15) To see the contents of the EMPLOYEE table, issue employeeDF.show() from Scala command line, which shows the contents of the DataFrame as captured in Screenshot 5. Show() returns first 20 records from the table by default (out of ~40 rows that exist).

Screenshot 5

16) You can further narrow the search results above by using filter criteria. For eg. If you want to see only columns employee id, firstname, lastname and job title out of all existing columns, you will issue – employeeDF.select("empno","firstnme","lastname",”job”).show(). This gives results shown in Screenshot 6.


Screenshot 6

17) Now if you want to filter out only those rows that have job title DESIGNER, issue the following from scala shell - employeeDF.filter(employeeDF("job").equalTo("DESIGNER")).show(). You will see results shown in Screenshot 7.

Screenshot 7