Wednesday, 12 August 2015

Spark blog 4 - Persisting Spark DataFrames into DB2


There are several use cases where data in Spark needs to be persisted in a backend database. Enterprise wide analytics may require load of data into Spark from different data sources, apply transformations, perform in-memory analytics and write the transformed data back to a enterprise RDMS such as DB2.

In this blog, simple techniques are shown using latest Spark release to load data from a JSON file into Spark and write that back into DB2 using DB2 supplied JDBC driver.

Step 1)
Download latest pre-built Spark library (1.4.1) from http://spark.apache.org/downloads.html. With the rapid evolution in Spark, many methods in 1.3 have been deprecated, and it is best to experiment with the latest.

Step 2)
In your Eclipse Scala IDE build path, add Spark library and DB2 JDBC driver as shown below -

Step 3)
Create a json file with following contents -

        { "EMPNO":10, "EDLEVEL":18, "SALARY":152750, "BONUS":1000 }
    { "EMPNO":20, "EDLEVEL":18, "SALARY":94250, "BONUS":800 }

Step 4)
Create a Scala application with following logic -
   1:  val DB2_CONNECTION_URL = "jdbc:db2://localhost:50000/sample:
currentSchema=pallavipr;user=pallavipr;password=XXXXXX;traceFile=
C:/Eclipse_Scala/trace_scala.txt;";
   2:    
   3:    val conf = new SparkConf()
   4:      .setMaster("local[1]")
   5:      .setAppName("GetEmployee")
   6:      .set("spark.executor.memory", "1g")
   7:   
   8:    val sc = new SparkContext(conf)
   9:    val sqlcontext = new SQLContext(sc)
  10:    val path = "C:/Eclipse_Scala/empint.json"
  11:   
  12:    val empdf = sqlcontext.read.json(path)
  13:    empdf.printSchema()
  14:    empdf.show()
  15:    
  16:    Class.forName("com.ibm.db2.jcc.DB2Driver");
  17:    
  18:    val prop = new Properties()
  19:    prop.put("spark.sql.dialect" , "sql"); 
  20:      
  21:    empdf.write.jdbc(DB2_CONNECTION_URL, "PALLAVIPR.EMPLOYEESALARY", prop)


Step 5) 
JSON file is loaded into Spark in Line 12 using new DataFrameReader introduced 
in Spark 1.4.0.
 
Step 6)
DB2 JDBC driver is loaded in Line 16 to carry out the write operation to DB2.
Step 7) 
On running this Scala program, you will see following schema output from 
printSchema method on DataFrame created from JSON file -

 
 
Step 8)
Print of DataFrame using Dataframes show method produces following output-




Step 9)
Final write to DB2 is done using DataFrameWriter jdbc API introduced in 1.4.0 
(as shown in Line 21) which under the covers generate the CREATE TABLE and 
INSERT sql's for table EMPLOYEESALARY.
 
Step 10)
You can verify that the table is created and the JSON data is inserted into 
DB2 using any tool of your choice -


Note that we are working through couple of issues on write back of String data types in DB2 from Spark, however, that should not stop you from experimenting with numeric types. Watch this space for more blogs and updates on DB2-Spark.

Tuesday, 4 August 2015

Spark Blog 3 – Simplify joining DB2 data and JSON data with Spark

Spark SQL gives powerful API to work with data across different data sources using Python, Scala and Java. In this post, we will demonstrate how easily DB2 data (both z/OS and LUW) can be loaded into Spark and joined with JSON data using DataFrames.

We will use a standalone Java program in this example. For setup of Eclipse to work with Spark, please refer to second blog in Spark series.

Step by step instructions for our sample program is given below.

  1. Create a table in DB2 (“EMPLOYEESUB”) using Command Line Processor (CLP) that contains a subset of EMPLOYEE information. DDL for EMPSUB is - CREATE TABLE "PALLAVIPR"."EMPLOYEESUB" ( "EMPNO" CHAR(6) NOT NULL , "FIRSTNME" VARCHAR(12) NOT NULL , "MIDINIT" CHAR(1) , "LASTNAME" VARCHAR(15) NOT NULL , "WORKDEPT" CHAR(3) , "COMM" DECIMAL(9,2) ). Change the schema to your schema and make sure that you are connected to the correct database you want to create your table in.



  1. Load EMPLOYEESUB table with 5 rows of data below, which is stored in a CSV file (C:\1.csv).

"000010","CHRISTINE","I","HAAS","A00",+0004220.00
"000020","MICHAEL","L","THOMPSON","B01",+0003300.00
"000030","SALLY","A","KWAN","C01",+0003060.00
"000050","JOHN","B","GEYER","E01",+0003214.00
"000060","IRVING","F","STERN","D11",+0002580.00



  1. Copy the following contents into a JSON file (employeesub.json)
    { "EMPNO":"000010", "EDLEVEL":"18", "SALARY":"152750.00", "BONUS":"1000.00" }
{ "EMPNO":"000020", "EDLEVEL":"18", "SALARY":"94250.00", "BONUS":"800.00" }
{ "EMPNO":"000030", "EDLEVEL":"20", "SALARY":"98250.00", "BONUS":"800.00" }
{ "EMPNO":"000050", "EDLEVEL":"16", "SALARY":"80175.00", "BONUS":"800.00" }
{ "EMPNO":"000060", "EDLEVEL":"16", "SALARY":"72250.00", "BONUS":"500.00" }

  1. As you can see, DB2 table EMPLOYEESUB contains 6 columns, EMPNO, FIRSTNME, MIDINIT, LASTNAME, WORKDEPT, COMM, while JSON file has following 4 keys – EMPNO, EDLEVEL, SALARY, BONUS. Our goal is to join both sets of data using EMPNO as the join key so that a combined data set can be created with all employee information in one place.

  2. Copy the following program in an Eclipse java class.

       1:   
       2:  import java.util.HashMap;
       3:  import java.util.Map;
       4:   
       5:  import org.apache.log4j.Level;
       6:  import org.apache.log4j.Logger;
       7:  import org.apache.spark.SparkConf;
       8:  import org.apache.spark.api.java.JavaSparkContext;
       9:  import org.apache.spark.sql.DataFrame;
      10:  import org.apache.spark.sql.Row;
      11:  import org.apache.spark.sql.SQLContext;
      12:   
      13:  public class DB2JsonJoinSpark {
      14:    public static void main(String[] args) {
      15:      
      16:        SparkConf conf = new SparkConf();
      17:      
      18:        conf.setMaster("local[1]");
      19:        conf.setAppName("GetUnionEmployeeTable");
      20:        conf.set("spark.executor.memory", "1g");
      21:      
      22:        String path = "C:\\Eclipse_Scala\\employeesub.json";
      23:      
      24:        JavaSparkContext sc = new JavaSparkContext(conf);
      25:        SQLContext sqlContext = new SQLContext(sc);
      26:   
      27:        Map<String, String> options = new HashMap<String, String>();
      28:        options.put("url", "jdbc:db2://localhost:50000/sample:currentSchema=pallavipr;user=pallavipr;password=XXXXXX;");
      29:        options.put("driver", "com.ibm.db2.jcc.DB2Driver");
      30:        options.put("dbtable", "pallavipr.employeesub");
      31:   
      32:        DataFrame df1 = sqlContext.load("jdbc", options);
      33:        df1.show();
      34:   
      35:        DataFrame df2 = sqlContext.jsonFile(path);
      36:        df2.show();
      37:   
      38:        DataFrame finaldf = df1.join(df2, df2.col("EMPNO").equalTo(df1.col("EMPNO")) );
      39:        System.out.println("Printing Joined Data");
      40:        finaldf.show();
      41:    }
      42:  }
     
  3. As shown above, we create SQLContext in Line 25 to work with RDMS and load DB2 data into Spark using DB2 JDBC driver as a DataFrame in Line 32. We load JSON data as Spark DataFrame in Line 35.

  4. Make sure that you have included Spark libraries and DB2 JDBC driver libraries in the Build path.   

  5. Right click the Java class and Run As Java application. You should see below output on the console.  
As you can see above, the output contains data from both DB2 table and JSON file, joined by EMPNO. This example clearly highlights the ease with which Spark enables join across disparate data sources with its powerful DataFrames API.

Enjoy your Spark testdrive with DB2!
- Pallavi (pallavipr@in.ibm.com) and Param (param.bng@in.bm.com)