From Pulsar in Action by David Kjerrumgaard

This article gives an intro into Pulsar Functions: what they are and what they do.


Take 37% off Pulsar in Action. Just enter fcckjerrumgaard into the discount code box at checkout at manning.com.


From a development perspective we’ve a couple of implementation options for our leaderboard application. The first is to write a standard Java application which uses Pulsar’s Java client library such as the one shown in Listing 1, using the standard Java client libraries to connect to Pulsar and consume the messages.

Because this single application is responsible for processing all of the scores for millions of players it could potentially encounter scalability issues as the data volume increased. Not to mention the fact that it’s a single point of failure in our system.

Listing 1 Leaderboard Calculator

 
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
  
 public class LeaderBoardCalculator {
  
        public static void main(String[] args) throws PulsarClientException {
            PulsarClient client = PulsarClient.builder()
                           .serviceUrl("pulsar://127.0.0.1:6650/")
                           .build();
  
            Consumer<String> consumer = client.newConsumer(Schema.STRING)
          .topic("persistent://manning/chapter02/GameScores")    #A
          .subscriptionType(SubscriptionType.Exclusive)     #B 
          .subscribe();  
  
            Message<String> msg = null;
            while ((msg = consumer.receive()) != null) {
          calculate(msg);    #C
          consumer.acknowledge(msg);    #D   
            }
  
        }
  
        private static void calculate(Message<String> msg) {
          // Parse the JSON and perform the calculation.
        }
 }
  

#A Subscribe to the global GameScores topic to receive all the scoring events.

#B Use an exclusive subscription to ensure that we process all of the events in our application.

#C Performs the relevant calculations.

#D Acknowledge the message as processed.

Another option is to use one of the complex event processing engines currently supported by Pulsar, such as Storm, Heron, Flink, or Spark Streaming. Each of these choices not only incur the operational overhead of deploying and maintaining another distributed computing system, but also require our development team to learn a complex programming model and API to accomplish the relatively simple task of keeping a running total of scores.

Our team doesn’t want to invest a significant amount of time and effort to learn how to develop a Storm topology based on spouts and bolts, nor did we want to architect our solution based on the functional programming model using programming primitives such as map, flatmap, reduce, etc. using a language, Scala, that we didn’t know well. In short, those options presented a steep learning curve for the development team that was unwarranted given the task at hand. Fortunately, there was an alternative which allowed us to use our preferred programming language Java, with a programming model we’re already familiar with; Pulsar Functions.

Pulsar Functions Overview

Pulsar Functions are lightweight compute processes which consume messages from one or more Pulsar topics, apply a user-supplied function (processing logic) to each incoming message, and publish the results to one or more Pulsar topics as shown in Figure 1.


Figure 1: Pulsar Functions execute user-defined code on data published to Pulsar topics.


Pulsar Functions are often characterized as Lambda-style functions which are specifically designed to use Pulsar as the underlying message bus. This is because they take several design cues from the popular AWS Lambda framework which allows you to run code without provisioning or managing servers to host the code. Hence the common term for this programming model is “serverless”.

Users can develop simple self-contained pieces of code and then deploy them, and Pulsar takes care of the underlying details required to run and scale your code with high availability. Pulsar Functions provides a ready-made computing infrastructure on your existing Pulsar cluster.

 

Programming Model

The programming model behind Pulsar Functions is straightforward, which allows us to greatly simplify our deployment, reduce development time, and maximize developer productivity. As shown in Figure 2, Pulsar Functions receive messages from one or more input topics, and every time a message is published to the topic, the function code is executed.


Figure 2: Overview of the Pulsar Functions programming model.


Upon being triggered, the function code can perform a variety of actions such as applying some processing logic to the incoming message and writing output to an output topic, logging information to a log topic, or updating some information inside of Apache BookKeeper via a counter.

It’s also possible to have the output topic of one Pulsar Function be the input topic of another, allowing us to effectively create a directly acyclic graph (DAG) of Pulsar Functions. In such a graph, each edge represents a flow of data and each vertex represents a Pulsar Function that applies the user-defined logic to process the data. Figure 3 depicts a logical view of such a processing graph.


Figure 3: Pulsar Functions can be logically structured into a processing network.


The combinations of Pulsar Functions into these DAGs is endless, and although it’s entirely possible to limit your use of Pulsar Functions to a few limited functions, that perform a single task on incoming data. It’s also possible to write an application which is entirely composed of Pulsar Functions structured as a DAG.

 

Processing Guarantees

When executing a stream processing application which is structured as a DAG, users usually want to specify the processing semantics for the data processing across the entirety of the graph. These guarantees are meaningful because you can always assume the possibility of failures via network, machines, etc. that can result in data loss.

In the context of Pulsar Functions, these processing semantics determine how often a message is processed and how it’s handled in the event of failure. Pulsar Functions support three distinct messaging semantics that you can apply to any function which we’ll discuss below. By default, Pulsar Functions provide at-most-once delivery guarantees.

 

At-most-once

At-most-once processing doesn’t provide any guarantee that data is processed, and no additional attempts are made to reprocess the data if it’s lost before the Pulsar Function can process it. Each message which is sent to the function is either processed once, at the most, or not at all.

As you can see in Figure 4, when a Pulsar Function is configured to use at-most-once processing, the message is immediately acknowledged after it’s consumed, regardless of whether or not the message is successfully processed. Therefore, in this scenario, message M2 is processed next by the Function no matter what occurs.


Figure 4: With at-most-once processing, the incoming messages are acknowledged regardless of processing success or failure. This ensures that they’re processed, at most, once.


You only want to use this processing guarantee if your application can handle periodic data loss without impacting the correctness of the data. One such example is a Function that calculates the average value of sensor reading, such as a temperature from a particular topic. Over the lifetime of the Function, it processes 10s of millions of individual temperature readings, and the loss of handful of these readings is inconsequential to the accuracy of the computed average.

 

At-least-once

At-least-once processing guarantees that any data published to one of the Functions input topics is processed, but in the event of a processing failure or redelivery it could be processed multiple times.

With this processing semantic, the Pulsar Function reads the message from the input topic, executes its logic, and acknowledges the message. If the Function fails to acknowledge the message it’s re-processed. This process is repeated until the Function acknowledges the message. Figure 5 depicts the scenario in which the Function consumes the message but encounters a processing error that causes it to fail to send and acknowledgement. In this scenario, the next message processed by the Function is M1 and continues to M1 until the Function succeeds.


Figure 5: With at-least-once processing, if the Function encounters an error and fails to acknowledge the message, then the same message is processed again.


You only want to use this processing guarantee if your application can handle processing the same data multiple times without impacting the correctness of the data. One such scenario is if the incoming messages represented records that updated in a database. In such a scenario, multiple updates with the same values have no impact to the underlying database.

Effectively-once

With effectively-once guarantees, a message can be received and processed more than once, which is common in the presence of failures. The crucial thing, though, is that the outcome of the Function processing on the resulting state is as if the re-processed events were observed only once. This is most often what you want to achieve, the system processes each message once and only once.

Figure 6 depicts the scenario in which an upstream producer to the Function’s input topic has re-sent the same message, M1. When configured to provide effectively-once processing, the Function checks to see if has previously processed the message (based on user-defined properties) and if true, ignore the message and send an acknowledgment not to re-process.


Figure 6: With effectively-once processing, duplicate messages are ignored.


Our leaderboard application requires an effectively-once processing guarantee, as we can neither afford to miss scoring events nor can we process the same event multiple times. Doing either of these affects the accuracy of our results significantly.

State Storage

If you were paying close attention, you may have noticed that we slipped in a storage location for previous messages in Figure 6 which shows the effectively-once processing guarantee. You might be asking yourself where this storage layer came from. The answer is the Apache BookKeeper table service, which provides simple key/value API for storing data.

Pulsar relies on Apache BookKeeper to provide scalable storage for all of the messages publishes to Pulsar. Therefore, each Pulsar Broker also has the ability to connect to and interact with the BookKeeper service and by extension the Apache BookKeeper table service. This capability is available for free in the sense that no additional components are required, such as a relational, or NoSQL database.

Developing Pulsar Functions

Pulsar Functions can currently be written in Java and Python. Therefore, if you’re already familiar with either of these popular languages, you can develop Pulsar Functions relatively quickly.

When writing Pulsar Functions in Java you have two choices, you can either write “native-language” functions by implementing the java.util.Function interface, or you can use the Pulsar Function SDK to access some additional features. In order to write Pulsar Functions in Java, you need to install the proper dependencies and package your function along with all of its dependencies as either a “fat” JAR or a NAR file. This and much, much more is covered in the book!

That’s all for now.

If you want to learn more about the book, check it out on or browser-based liveBook reader here and see this slide deck.