The Splice Machine Native Spark DataSource

Splice Machine has introduced our new native Spark DataSource, which interacts with your Splice Machine cluster and provides the methods that make it simple to interact directly with your database through Spark.

In this article, we’ll demonstrate our native DataSource using an Apache Zeppelin notebook on Splice Machine’s Cloud Service – which means you can instantly see results without having to deal with any infrastructure issues: just log into your own Splice Machine cluster. Sign up today at cloud.splicemachine.io and you will have your own personal sandbox to run our included sample notebooks.

Overview

This article describes the Splice Machine Native Spark DataSource in these sections:

  • Why Is This Important briefly describes why the developer familiarity, convenience, and performance boost of the DataSource is so compelling.
  • The SplicemachineContext Class introduces the object class that you use to interact with the Splice Machine Native Spark DataSource.
  • A Simple Notebook Example introduces using basic methods of the DataSource.
  • Additional DML Operations demonstrates using DataFrames to insert, update, and delete records in a Splice Machine database.
  • Real World Examples includes code snippets and discusses using the Native Spark DataSource for common computing tasks, including real-time streaming of data into a database, ETL pipelines, and machine learning.
  • Additional DDL Operations demonstrates how to truncate and drop a table in your database from Spark.

Why Is This Important?

Before delving into how you can use the Splice Machine Native Spark DataSource, let’s explore why this is so important, and why so many of our customers are enjoying the ease of implementation and performance boost they get from using this tool with Spark. Here we’ll briefly explore the two main factors behind this, Dramatic Performance Improvements and Leveraging Developer Agility.

Later in this article, we’ll delve more deeply into several real-world examples that demonstrate how easy it is to build streaming, ETL, and predictive applications using our Native Spark DataSource.

Dramatic Performance Improvements

The Splice Machine Native Spark DataSource provides dramatic performance improvements for large scale data operations. Why? Because it works directly on native DataFrames and RDDs, eliminating the need to serialize data over the wire using the JDBC protocol.

Spark is optimized to work on DataFrames, which are the main structure used by Spark. A DataFrame is a distributed collection of data (an RDD) organized into named columns, with a schema that specifies data types, that is designed to support efficiently operating on scalable, massive datasets.

The Splice Machine DataSource is native to Spark, which means that it operates directly on these DataFrames and in the same Spark executors that your programs are using to analyze or transform the data. Instead of accessing, inserting, or manipulating data one record at a time over a serialized connection, you can now use the Splice Machine Native Spark DataSource to pull the contents of an entire DataFrame into your database, and to push database query results into a DataFrame.

We’ve seen 100x performance increases compared to using JDBC for operations such as inserting millions of records in a database!

For example, a typical web application might use a React frontend with a Node backend that accesses information in a database. When a customer refreshes the app dashboard, the app uses a JDBC connection to query the database, pulling information out one record at a time to populate the screen. The results of each query are serialized (turned into a string of data), then sent over a network connection to the app, and then displayed on the customer’s screen.

When you use the Splice Machine Native Spark DataSource, the contents of the database table are typically sitting in a DataFrame in memory that resides on the same Spark executor that’s performing the query. The query takes place in memory, and there’s no need to serialize or stream the results over a wire. Similarly, when the app sends updates to the database, the data is inserted into the database directly from the DataFrame. As a result, a great deal of overhead is eliminated, and performance gains can be remarkable.

Leveraging Developer Agility

Splice Machine has always supported the standard tools and patterns for the Application Developer; for example, using Hibernate and Spring. With the addition of the Native Spark DataSource, Splice Machine can say the same for supporting the development tools of the Data Scientists and Data Engineers alike.

Data Scientists and Data Engineers typically access Spark contexts to operate on DataFrames. Spark provides a powerful set of transformations and actions to the developer to manipulate large datasets efficiently plus additional libraries for machine learning and streaming. With Splice Machine’s native Spark DataSource, developers can now perform transactional database operations directly on DataFrames and receive DataFrames as result sets of arbitrary ANSI-SQL queries.

This means that Splice Machine’s full transactional capabilities are available to developers without requiring them to change the way they do data engineering and data science. We’ll see examples of this below.

The SplicemachineContext Class

SplicemachineContext is the primary serializable class that you can broadcast in your Spark applications. This class interacts with your Splice Machine cluster in your Spark executors, and provides the methods that you can use to perform operations including:

  • interfacing with the Splice Machine RDD
  • running inserts, updates, and deletes on your data
  • converting data types between Splice Machine and Spark

We’ll explore these methods, and more, in the remainder of this article.

Creating Your Context

In Zeppelin, you can create your SplicemachineContext object with this simple %spark interpreter code, replacing the with the JDBC URL for your cluster:

%spark
import com.splicemachine.spark.splicemachine._
import com.splicemachine.derby.utils._

val JDBC_URL = "jdbc:splice://:1527/splicedb;user=;password="
val splicemachineContext = new SplicemachineContext(JDBC_URL)

import com.splicemachine.spark.splicemachine._
import com.splicemachine.derby.utils._
JDBC_URL: String = jdbc:splice://:1527/splicedb;user=;password=
splicemachineContext: com.splicemachine.spark.splicemachine.splicemachineContext = com.splicemachine.spark.splicemachine.SplicemachineContext@5f88ac02

Once you’ve got your context established, it’s quite simple to operate directly between Splice Machine and Spark, as you’ll see in the example in the next section.

A Simple Notebook Example

Here’s a very simple notebook that demonstrates how easy it is to directly pull the data in a Spark DataFrame into your Splice Machine database. We create a DataFrame in Spark, then insert that data into our database with a single insert operation.

In this paragraph, we use the Zeppelin %spark interpreter to create a DataFrame, using Scala:

%spark
val carsDF = Seq(
   (1, "Toyota", "Camry"),
   (2, "Honda", "Accord"),
   (3, "Subaru", "Impreza"),
   (4, "Chevy", "Volt")
).toDF("NUMBER", "MAKE", "MODEL")

carsDF: org.apache.spark.sql.DataFrame = [NUMBER: int, MAKE: string … 1 more field]

As you probably already know, Spark DataFrames are distributed collections of data that’s organized into named columns; as such, it’s straightforward to use them with database tables.

Let’s create a new table in our Splice Machine database that we’ll use to exchange data with the DataFrame we just created. We’ll use the %splice interpreter in Zeppelin:

%splicemachine
drop table if exists carsTbl;
create table carsTbl (number int primary key, make varchar(20), model varchar(20));

Query executed successfully. Affected rows : 0
Query executed successfully. Affected rows : 0

Now we’re ready to populate our table from the DataFrame, using the insert method of our SplicemachineContext class.

%spark
splicemachineContext.insert(carsDF, "SPLICE.CARSTBL")

Note: You’ll need to plug in your JDBC URL when running this on your own cluster.

Let’s use the Spark Adapter to query our table and verify its content; we’ll use the df method, which stores the results in a DataFrame::

%spark
 splicemachineContext.df( "SELECT * FROM SPLICE.CARSTBL" )
+------+------+-------+
|NUMBER|  MAKE|  MODEL|
+------+------+-------+
|     1|Toyota|  Camry|
|     2| Honda| Accord|
|     3|Subaru|Impreza|
|     4| Chevy|   Volt|
+------+------+-------+

Alternatively, we can use the %splicemachine Zeppelin interpreter to query our database:

%splicemachine
 SELECT * FROM CARSTBL;

Additional DML Operations

In this section, we’ll add a few more Data Manipulation Language (DML) operations to our Zeppelin notebook.

The Splice Machine native Spark DataSource supports the following DML operations, each of which is demonstrated in this section:

  • INSERT rows of a DataFrame into a Splice Machine table.
  • UPDATE rows in the Splice Machine table from rows in the DataFrame.
  • DELETE rows found in the DataFrame from a Splice Machine table.
INSERT Records

You’ve already seen a simple example of inserting data from a DataFrame into a Splice Machine database. Here we’ll add a couple more records, using this DataFrame:

%spark
val newCarsDF = Seq(
   (5, "Ford", "Focus"),
   (6, "Nissan", "Altima")
).toDF("NUMBER", "MAKE", "MODEL")

newCarsDF: org.apache.spark.sql.DataFrame = [NUMBER: int, MAKE: string … 1 more field]
%spark
splicemachineContext.insert(newCarsDF, "SPLICE.CARSTBL")
splicemachineContext.df( "SELECT * FROM SPLICE.CARSTBL" )
+------+------+-------+
|NUMBER|  MAKE|  MODEL|
+------+------+-------+
|     1|Toyota|  Camry|
|     2| Honda| Accord|
|     3|Subaru|Impreza|
|     4| Chevy|   Volt|
|     5|  Ford|  Focus|
|     6|Nissan| Altima|
+------+------+-------+
UPDATE Records

The UPDATE operation updates an existing record in the database. Let’s update a couple records to represent different car models, using this DataFrame:

%spark
val updateCarsDF = Seq(
   (1, "Toyota", "Rav 4 XLE"),
   (4, "Honda", "Accord Hybrid")
).toDF("NUMBER", "MAKE", "MODEL")

updateCarsDF: org.apache.spark.sql.DataFrame = [NUMBER: int, MAKE: string … 1 more field]
%spark
 splicemachineContext.update(updateCarsDF, "SPLICE.CARSTBL")
 splicemachineContext.df( "SELECT * FROM SPLICE.CARSTBL" )
+------+------+-------------+
|NUMBER|  MAKE|        MODEL|
+------+------+-------------+
|     1|Toyota|    Rav 4 XLE|
|     2| Honda|       Accord|
|     3|Subaru|      Impreza|
|     4| Honda|Accord Hybrid|
|     5|  Ford|        Focus|
|     6|Nissan|       Altima|
+------+------+-------------+
DELETE Records

For this example, we’ll use a DataFrame to specify a few records we’re eliminating as possibilities, as we narrow our search for the right car:

%spark
val deleteCarsDF = Seq(
   (3),
   (6)
).toDF("NUMBER")

deleteDF: org.apache.spark.sql.DataFrame = [NUMBER: int]

%spark
splicemachineContext.delete(deleteCarsDF, "SPLICE.CARSTBL")
splicemachineContext.df( "SELECT * FROM SPLICE.CARSTBL" )
+------+------+-------------+
|NUMBER|  MAKE|        MODEL|
+------+------+-------------+
|     1|Toyota|    Rav 4 XLE|
|     2| Honda|       Accord|
|     4| Honda|Accord Hybrid|
|     5|  Ford|        Focus|
+------+------+-------------+

Real World Examples

Let’s explore how easy it is to perform complex computing tasks with the Splice Machine Native Spark DataSource, using simple examples of three common activities:

Example 1: Streaming Data

One common scenario is to stream data from IOT devices in real-time into your database, to track various kinds of activity. For this example, we’ll use weather forecast data from openweathermap.org, which provides APIs that allow us to query the forecast weather by city for every 3 hours for the next 5 days.

This example shows you a portion of a Splice Machine Zeppelin notebook named WeatherStream. This notebook makes use of a Java program, KafkaWeatherStream, that we created that publishes forecast data to a Kafka queue; you can download this program from our Zeppelin notebook page.

Note: The WeatherStream notebook contains all of the code for configuring the Kafka queue, streaming weather data from various locales into the database, and then using that data to forecast precipitation. It also displays and provides a link to the code for our KafkaWeatherStream Java program.

This examples presents snippets from that notebook.

The Weather Data

This example uses the KafkaWeatherStream Java program to publish weather forecast data from openweathermap to a Kafka queue, and then reads that streaming data into a Splice Machine database table named WEATHER.FORECAST.

Our JSON forecast data looks like this:

{"CITY_ID":5506956, "AS_OF_TS":"2018-02-17 16:02:10",
 "FORECAST_TS":"2018-02-21 00:00:00", "WEATHER_CODE_ID":800}
{"CITY_ID":5506956, "AS_OF_TS":"2018-02-17 16:02:10",
 "FORECAST_TS":"2018-02-21 03:00:00", "WEATHER_CODE_ID":800}
{"CITY_ID":5506956, "AS_OF_TS":"2018-02-17 16:02:10",
 "FORECAST_TS":"2018-02-21 06:00:00", "WEATHER_CODE_ID":800}
{"CITY_ID":5506956, "AS_OF_TS":"2018-02-17 16:02:10", 
 "FORECAST_TS":"2018-02-21 09:00:00", "WEATHER_CODE_ID":800}

Streaming the Data into Splice Machine

This code segment reads streaming weather data from the Kafka queue and inserts the data into our database table:

%spark
// Our Kafka topic
val topicName = "WeatherForecast"

// Our Kafka broker
val brokerList = "ec2-xxxxxxxxxxx.compute-1.amazonaws.com:9092"

// Splice Machine table we are populating
val SPLICE_TABLE_NAME = "WEATHER.FORECAST"

// Forecast schema
val schema = new StructType()
      .add("CITY_ID", LongType)
      .add("AS_OF_TS", TimestampType)
      .add("FORECAST_TS", TimestampType)
      .add("WEATHER_CODE_ID", LongType)

// Set Kafka Queue parameters
val kafkaParams = Map[String, Object](
    "bootstrap.servers" -> brokerList,
    "group.id"-> "cg",
    "auto.offset.reset" -> "latest",
    "enable.auto.commit" -> (false: java.lang.Boolean),
    "key.deserializer" -> classOf[StringDeserializer],
    "value.deserializer" -> classOf[StringDeserializer]
    )

val splicemachineContext = new SplicemachineContext(JDBC_URL)

// Create Streaming Context
val ssc = new StreamingContext(sc, Seconds(5))

// Stop gracefully when driver is stopped
 sys.ShutdownHookThread {
      ssc.stop(true, true)
  }

// Create Direct Stream
val stream = KafkaUtils.createDirectStream[String, String](
  ssc,
  PreferConsistent,
  Subscribe[String, String](Array(topicName), kafkaParams)
)

//Parse the queue messages
val toPair = stream.map(record => (record.key, record.value))
val msgs = toPair.map(_._2)

msgs.foreachRDD { rdd =>
   val df = spark.read.schema(schema).json(rdd)
   if(df.count > 0)
        splicemachineContext.insert(df, SPLICE_TABLE_NAME)
    }

ssc.start()
ssc.awaitTerminationOrTimeout(collectionTime)
ssc.stop(stopSparkContext = false, stopGracefully = true)

Querying the Database to Forecast Precipitation

Once we have data in our table, we can query it to forecast precipitation in selected cities:

%splicemachine
 SELECT description, weather_code_id, forecast_ts, COUNT(*)
 FROM weather.forecast, weather.weather_code
 WHERE id = weather_code_id
 AND as_of_ts = (select max(as_of_ts) FROM weather.forecast)
 AND (weather_code_id > 899 OR weather_code_id < 800)
 GROUP BY weather_code_id, description, forecast_ts 
 ORDER BY weather_code_id;

Example 2: ETL Pipelines

Here's how the Splice Machine Native Spark DataSource can make your ETL pipelines faster, easier, and more robust:

  • You can use our Native Spark DataSource to Extract data from Splice Machine into a Spark DataFrame quickly and efficiently.
  • Spark includes connectors for reading data from a wide variety of other data sources that may be included in your pipeline.
  • Spark connectors, including our Native DataSource, include built-in handling of corrupt records, so you can configure your pipeline to continue execution even when bad input data is present.
  • You can use Spark's many built-in transformations and actions to Transform quickly transform data in a DataFrame via its.
  • You can quickly and easily Load transformed data into a Splice Machine database directly from a Spark DataFrame.
  • Your pipeline can scale without modification: Spark and the Splice Machine Native Spark DataSource take care of the details for you.
Example 3: Machine Learning

Our Deep Dive Notebook series includes an example that demonstrates how easy it is to create machine learning applications with Splice Machine and Spark. This example leverages Spark's machine learning library, MLlib, and the Splice Machine Native Spark DataSource to realize in-process machine learning.

Specifically, the example in this notebook uses data that tracks international shipments to learn, and then predicts, in real time, how late a specific shipment will be, based on various factors. Over time, as more data is processed by the model, the predictions become more accurate.

About our Sample Data

We’ve obtained some actual shipping data that tracks international shipments between ports, and have imported that data into a Splice Machine database that we’ve named ASD. The tables of interest are named SHIPMENT_IN_TRANSIT and SHIPMENT_HISTORY; you’ll see these table used in the sample code below.

About our Learning Model

We use a Logistic Regression estimator as the final stage in our pipeline to produce a Logistic Regression Model of lateness from our data, and then deploy that model on a dataset to predict lateness.
The estimator operates on data that is formatted into vectors of integers. Since most of the fields in our input dataset contain string values, we need to convert any data that will be used by the estimator into this format, as you’ll see below.

About our Features Table

We also create a database table named Features that forms the basis of the DataFrame we use for our learning model. This table categorizes shipping quantities into four lateness bins to improve learning accuracy:

BinLateness measure

00 days late
11-5 days late
25-10 days late
310 days or more late

 

Running our Model
We use our Native Spark DataSource to pull all of the data from our features table into a Spark dataframe:

%spark
val df = splicemachineContext.df("select * from ASD.Features")

We then create our pipeline stages:

1. We first convert our input data into integer vectors, so that the Logistic Regression estimator can operate on the data. We use the Spark StringIndexer function to transform each column in our data into a new, integer column. For example:

%spark
val shipperIndexer = new 
StringIndexer().setInputCol("shipper").setOutputCol("shipperIndex").setHandleInvalid("skip")

2. We then assemble our vectors using a Spark VectorAssembler object to transform a set of input columns into a features vector that we store in a new column in our DataFrame:

val assembler = new VectorAssembler()
                .setInputCols(Array(
                    "shipmodeIndex",
                    "shipperIndex",
                    ...
                    ))
                .setOutputCol("features")

3. Next we create our estimator, specifying which column contains the feature set:

val lr = new LogisticRegression()
    .setMaxIter(30)
    .setLabelCol("label")
    .setFeaturesCol("features")
    .set
    RegParam(0.3)

4. Finally, we chain our stages together into a pipeline sequence:

val lrPipeline = new Pipeline().setStages(
        Array(consigneeIndexer,
              shipperIndexer,
              shipmodeIndexer,
              gross_weight_lbIndexer,
              foreign_portIndexer,
              us_portIndexer,
              vessel_nameIndexer,
              country_of_originIndexer,
              container_numberIndexer,
              container_typeIndexer,
              ship_registered_inIndexer,
              carrier_codeIndexer,
              carrier_cityIndexer,
              notify_partyIndexer,
              place_of_receiptIndexer,
              zipcodeIndexer,
              assembler,
              lr
              ))

Training and Materializing our Model

Now that we've set up our pipeline, we can train our model by feeding our dataframe into the pipeline's fit method:

val lrModel = lrPipeline.fit(df)
And then materialize the model by applying it to real data and displaying the results:
lrModel.transform(df).select("prediction", "probability", "features").show(10)

+----------+--------------------+--------------------+
|prediction|         probability|            features|
+----------+--------------------+--------------------+
|       1.0|[0.06917849601230...|[1.0,0.0,8.0,2.0,...|
|       1.0|[0.06361534179433...|[2.0,3.0,9.0,4.0,...|
|       2.0|[0.07062191291178...|(16,[1,2,3,7,8,9,...|
|       2.0|[0.07231320891797...|[1.0,0.0,2.0,1.0,...|
|       2.0|[0.07325627115826...|[1.0,0.0,3.0,1.0,...|
|       2.0|[0.07356384459057...|[1.0,0.0,5.0,1.0,...|
|       2.0|[0.07086251837299...|(16,[1,5,7,9],[1....|
|       2.0|[0.06783325424617...|(16,[1,2,5,7,8,9]...|
|       2.0|[0.07102819891870...|(16,[1,5,7,9,10],...|
|       2.0|[0.06699322238524...|(16,[1,2,5,7,8,9]...|
+----------+--------------------+--------------------+

Now we can load our lateness predictions into the predictions table in our database and display them:

%spark
val predictions = lrPredictions.select("SHIPMENTID", "PREDICTION")
splicemachineContext.insert(predictions, "ASN.predictions")
%splicemachine
select * from ASN.predictions;

Additional DDL Operations

We previously demonstrated one Data Definition Language (DDL) method available in the SplicemachineContext class, createTable. In this section, we'll add an additional DDL operation to our Zeppelin notebook.

Removing (Dropping) a Table From Your Database

After we finally make a decision about which car we want, we can drop our table from the database:

 %spark
 splicemachineContext.dropTable("SPLICE.CARSTBL")
 %splicemachine
 SELECT * FROM CARSTBL;

 java.sql.SQLSyntaxErrorException: Table/View 'SPLICE.CARSTBL' does not exist.

Conclusion

The Splice Machine Native Spark DataSource makes it easy for you to access and manipulate data in your database directly from Spark, yielding tremendous performance boosts. Your data scientists and developers can use the full power of both Splice Machine and Spark with familiar development tools and without the added overhead of serializing and transmitting data to and from your database.

This article has shown you how to move data between Spark and Splice Machine in a Zeppelin notebook. We've provided snippets from several real-world examples, which we can make available to you on an Splice Machine cluster on our cloud service.

Schedule a Demo

See a demo of machine learning in action on Splice Machine using the native Spark DataSource.

Get Started

Leave a Reply

Your email address will not be published. Required fields are marked *