There are several
use cases where data extracted from live data streams such as Twitter
may need to be persisted into external databases. In this example,
you will learn how to filter incoming live Twitter data and write
relevant subsets of Twitter data into IBM database DB2. Sample program will work against all flavors of IBM databases i.e. DB2 for
z/OS, DB2 distributed, dashDB and SQLDB.
We will use Spark
Streaming to receive live data streams from Twitter and filter the
tweets by a keyword . We will then extract the twitter user names
associated with the matching tweets and insert them into DB2. These
user names extracted from Twitter can have many applications – such
as a more comprehensive analysis on whether these Twitter users are
account holders of the bank by performing joins with other tables such
as customer table.
- For a background on Spark Streaming, refer to http://spark.apache.org/docs/latest/streaming-programming-guide.html.
- We will use TwitterUtils class provided by Spark Streaming. TwitterUtils uses Twitte4J under the covers, which is a Java library for Twitter API.
- Create a table in DB2 called TWITTERUSERS using -CREATE TABLE TWITTERUSERS (NAME VARCHAR(255))
- Create a new Scala class in Eclipse with following contents (also available here). Change database and Twitter credentials to yours.
- Make sure the Project Build Path contains the jars db2jcc.jar (DB2 JDBC driver), spark-assembly-1.3.1_IBM_1-hadoop2.6.0.jar and spark-examples-1.3.1_IBM_1-hadoop2.6.0.jar, as shown below -
- Lines 12 to 15 loads the DB2 driver class, establishes a connection to the database and prepares an INSERT statement that is used to insert Twitter user names into DB2.
- Lines 17 to 24 sets the system properties for consumerKey, consumerSecret, accessToken and accessTokenSecret that will be used by Twitter4j library to generate Oauth credentials. You do this by configuring consumer key/secret pair and access token/secret pair in your account at this link – https://dev.twitter.com/apps. Detailed instructions on how to generate the two pairs are contained at http://ampcamp.berkeley.edu/big-data-mini-course/realtime-processing-with-spark-streaming.html.
- Lines 26 and 27 create a local StreamingContext with 16 threads and batch interval of 2 seconds. StreamingContext is the entry point for all streaming functionality.
- Using the StreamingContext created above, Line 30 creates an object DStream called stream. DStream is the basic abstraction in Spark Streaming and is a continuous stream of RDDs containing object of Type twitter4j.Status (http://twitter4j.org/javadoc/twitter4j/Status.html). A filter is also specified (“Paris”) which will select only those tweets that have keyword “Paris” in them.
- In Line 31, map operation on stream maps each status object to its user name to create a new DStream called users.
- Line 32 returns a new DStream called recentUsers where user names are aggregated over 60 seconds.
- Lines 34 to 41 iterate over each RDD in the DStream recentUsers to return number of users every 60 seconds, and inserting those users into the database table TWITTERUSERS through JDBC.
- Lines 44 starts real processing and awaits termination.
- Following screenshot shows a snippet of console output when the program is run. Of course, you can change the filter to any keyword in line 29.
- You can also run SELECT * from TWITTERUSERS on your database to confirm that the Twitter users get inserted.
Above simple
Twitter program can be extended to more complicated use cases using Spark Streaming to do
analysis of social media data more effectively, persist subset of
social media data into databases and join social media data with
relational data to derive additional business insights.