kafka meap

From Kafka Streams in Action by Bill Bejeck

This article discusses KSQL, a brand-new open source, Apache 2.0 streaming SQL engine that enables stream processing with Kafka. Basically, it makes it easy to read, write, and process streaming data in real-time, at scale, using SQL-like semantics.

Save 37% off Kafka Streams in Action with code streamkafka.


Imagine you’re working with business analysts as BSE. The analysts are interested in your ability to quickly write applications in Kafka Streams to perform real-time data analysis. This interest puts you in sort of a bind.

You want to work with the analysts and write applications for their requests, but you have a busy workload, and the additional work makes it hard to keep up with everything. The analysts understand the added work they’re creating, but they can’t write code and they’re dependent on you for getting their analytics written.

The analysts are experts on working with relational databases and are quite comfortable with SQL queries. If there were a way to give the analysts a SQL layer over Kafka Streams, then everyone’s productivity would increase. Well, there isnow.

In August of 2017, Confluent unveiled a powerful new tool for stream processing, KSQL (https://github.com/confluentinc/ksql#-ksql—streaming-sql-for-apache-kafka). KSQL is a streaming SQL engine for Apache Kafka, providing an interactive SQL interface allowing for writing power stream processing queries without the need for writing code. KSQL is especially adept at fraud detection and real-time applications.


KSQL Coverage

KSQL is a big topic and could take anarticle or two to cover, if not an entire book. The coverage here is concise. For more information, you can check out the source documentation at:https://github.com/confluentinc/ksql#-ksql—streaming-sql-for-apache-kafka.

KSQL provides scalable, distributed stream processing including aggregations, joins, windowing, and more. Additionally, unlike SQL, which runs against a database or a batch processing system, the results of a KSQL query are continuous. Before we dive into writing streaming queries, let’s take a minute to review some fundamental concepts of KSQL.

KSQL Streams and Tables

An event stream is an unbounded stream of individual independent events, while the update or record stream is a stream of updates to previous records with the same key.

KSQL has a similar concept of querying from a Stream or a Table. Where the Stream is an infinite series of events or facts, but are immutable, but with a query on a Table the facts are updatable or can even be deleted.

Although some of the terminologies might be different, the concepts are pretty much the same, and if you’re comfortable with Kafka Streams, you’ll feel right at home with KSQL. For more details on these concepts, you can view the KSQL Terminology section at https://github.com/confluentinc/ksql/blob/master/docs/concepts.md#terminology.

KSQL Archictecture

KSQL uses Kafka Streams under the covers to build and fetch the results of the query. KSQL is made up of two components, the KSQL CLI and the KSQL server. Users of standard SQL tools such as MySql, Oracle, or even Hive will feel right at home with CLI when writing queries in KSQL. Best of all KSQL is open-source (Apache 2.0 licensed).

The CLI is also the client connecting to the KSQL Server. The KSQL server is responsible for processing the queries and retrieving data from Kafka, as well as writing results into Kafka.

KSQL runs in two modes, standalone, which is useful for prototyping, and development or in distributed mode, which is how you’d use KSQL when working in a more realistic sized data environment.


KSQL is currently in Developer Preview

As exciting as KSQL is and what it promises to deliver for SQL over streaming data, at the time of this writing, KSQL is considered a developer preview and it’s not suggested to run against production clusters. A GA release of KSQL is expected soon.

Let’s take a look at how KSQL works in both modes staring with local mode demonstrated in figure 1.

Figure 1 KSQL in Local Mode

As you can see from the image above the KSQL CLI, REST server and the KSQL engine are all located on the same JVM, which is ideal when running on your laptop.

Now let’s take a look at how KSQL looks in the distributed mode.

Figure 2KSQL in Distributed Mode

With KSQL in distributed mode, the KSQL CLI is now by itself, and it connects to one of the remote KSQL servers (we’ll cover starting and connections in the next section). A key point here is that although you only explicitly connect to one of the remote KSQL servers, all servers pointing to the same Kafka cluster share in the workload of the submitted query.

A key point to note here is the KSQL Servers are using Kafka Streams to execute the queries. This means that if you need more processing power you can stand up another KSQL server even during live operations (like we can spin up another Kafka Streams application). The opposite case works as well; if you’ve excess capacity, you can stop any number of KSQL servers, with the assumption that you’ll leave at least one server operational. Otherwise, your queries will stop running!

Now that you have an idea of how KSQL is set up, let’s cover how you get KSQL installed and running.

Installing and Running KSQL

For installing KSQL, you’ll want to clone the KSQL repo with this command git clone git@github.com:confluentinc/ksql.git then cd into the ksql directory and execute this command mvn clean package, this builds the entire KSQL project. If you don’t have git installed or don’t want to build from source, you can download the release of the KSQL from https://github.com/confluentinc/ksql/releases/download/v0.4/ksql-0.4.tgz


Maven Needed for Building KSQL

KSQL is an Apache Maven based project, and you’ll need Maven installed to build KSQL. If you don’t have maven installed and you are on a Mac and have Homebrew installed, run brew install maven, otherwise you can head over to https://maven.apache.org/download.cgi and download maven directly, and install instructions can be found at https://maven.apache.org/install.html.

Now that you have the KSQL project downloaded and build our next step is to start KSQL. Make sure you’re in the base directory of the KSQL project before going any further.

Listing 1 Starting KSQL in local mode

 ./bin/ksql-cli local

After running the command above you should see something like this in your console:

Figure 3 KSQL successful launch result

Congratulations!You’ve successfully installed and launched KSQL! Next, let’s start writing some queries.

Creating a KSQL Stream

Getting back to your work at BSE, you’ve been approached by one of the analysts who is interested in one of the applications you’ve written before and would like to make some tweaks to the application. But now, instead of this request resulting in more work, you spin up a KSQL console and turn the analyst loose to reconstruct your application as an SQL statement!

The example you’re going to convert is the last windowed stream from the interactive queries example found in src/main/java/bbejeck/chapter_9/StockPerformanceInteractiveQueryApplication.java from lines 96-103. In that application, you’re tracking the number shares sold every ten seconds by company ticker symbol.

You already have the topic defined (the topic maps to a database table) and a model object StockTransaction where the fields on the object map to columns in a table. Even though the topic is defined, we need to register this information with KSQL by using a CREATE STREAM statement:

Listing 2 Creating a Stream found

 CREATE STREAM stock_txn_stream (symbol VARCHAR, sector VARCHAR, \       
    industry VARCHAR, shares BIGINT, sharePrice DOUBLE, \                
    customerId VARCHAR, transactionTimestamp STRING, purchase BOOLEAN) \
    WITH (VALUE_FORMAT = 'JSON', KAFKA_TOPIC = 'stock-transactions');    

  The CREATE STREAM statement named stock_txn_stream

  Registering the fields of the StockTransaction object as columns

  Specifying the data format and the Kafka topic serving as the source of the stream (both required parameters)

With this one statement you’re creating a KSQL Stream instance that you can now issue queries against. In the WITH clause you’ll notice two required parameters VALUE_FORMAT telling KSQL the format of the data and the KAFKA_TOPIC parameter, telling KSQL where to pull the data from.

There two additional parameters you can use in the WITH clause when creating a stream. One’s TIMESTAMP which associates the message timestamp with a column in the KSQL Stream. Operations requiring a timestamp, such as windowing, use this column to process the record.

The other is KEY which associates the key of the message with a column on the defined stream. In our case the message key for the stock-transactions topic matches the symbol field in the JSON value, and we didn’t need to specify the key.

But had this not been the case then you’d have needed to map the key to named column because you’ll always need a key to perform grouping operations, which you’ll see when we execute the stream SQL in an upcoming section.


Listing Topics To Register

With KSQL the command list topics; you’ll see list of topics on the broker the KSQL CLI’s pointing to and whether the topics are “registered” or not.

After you’ve created your new stream you can view all streams and verify KSQL created the new stream as expected with the following commands:

Listing 3 Listing all Streams and describing the stream you just created

 show streams;

The results of issuing these commands gives you results as demonstrated in figure 4:

Figure 4 Listing Streams and describing your newly created stream

You’ll notice two extra columns ROWTIME and ROWKEY that KSQL has inserted. The ROWTIME column is the timestamp placed on the message (either from the producer or by the broker), and the ROWKEY is the key (if any) of the message. Now that you’ve created the stream, let’s run our query on this stream.

Writing a KSQL Query

Listing 4  SQL for performing stock analysis

 SELECT symbol, sum(shares) FROM stock_txn_stream WINDOW TUMBLING (SIZE 10 SECONDS)
         GROUP BY symbol;

Once you run this query, you’ll results similar to what displayed here in figure 5:

Figure 5 Results of tumbling window query


Popluating Data for the KSQL examples

You’ll need to run ./gradlew runProducerInteractiveQueries to provide data for the KSQL examples.

The column on the left is the ticker symbol, and the number is the number of shares traded for that symbol over the last ten seconds. With this query, you’ve specified a tumbling window of ten seconds, but KSQL supports session and hopping windows as well. Now you’ve built a streaming application without writing any code at all; quite an achievement. For a comparison let’s take a look at the corresponding application written in the Kafka Streams API:

Listing 5. Stock analysis application written in Kafka Streams.

 KStream<String, StockTransaction> stockTransactionKStream =
                  Consumed.with(stringSerde, stockTransactionSerde)
 Aggregator<String, StockTransaction, Integer> sharesAggregator = (k, v, i) -> v.getShares() + i;
                 .aggregate(() -> 0, sharesAggregator,
                         Materialized.<String, Integer, WindowStore<Bytes,
                 .toStream().peek((k,v)->LOG.info("key is {} value is {}", k, v));

Even though the Kafka Streams API is concise, the equivalent you wrote in KSQL’s a one one-liner query.

That’s all for this article. For more, download the free first chapter of Kafka Streams in Action and see this Slideshare presentation.