An excerpt from Kafka Streams in Action, Second Edition by Bill Bejeck

This excerpt talks about a schema as a language agnostic description of an object, including the name, the fields on the object and the type of each field, and why you should use one.


When you mention the word schema to developers, there’s a good chance their first thought is of database schemas. A database schema describes the structure of the database, including the names and startups of the columns in database tables and the relationship between tables. But the schema I’m referring to here, while similar in purpose, is not quite the same thing.

For our purposes what I’m referring to is a language agnostic description of an object, including the name, the fields on the object and the type of each field. Here’s an example of a potential schema in json format.

Listing 1. Basic example of a schema in json format

   {
"name":"Person",   (1)
  "fields": [      (2)
    {"name": "name", "type":"string"}, (3)
    {"name": "age", "type": "int"},
    {"name": "email", "type":"string"}
  ]
}

1 The name of the object

2 Defining the fields on the object

3 The names of the fields and their types

Here our fictional schema describes an object named Person with fields we’d expect to find on such an object. Now we have a structured description of an object that producers and consumers can use as an agreement or contract on what the object should look like before and after serialization. I’ll cover details on how you use schemas in message construction and (de)serialization in an upcoming section.

But for now I’d like review some key points we’ve established so far:

  • The Kafka broker only works with messages in binary format (byte arrays)
  • Kafka producers and consumers are responsible for the (de)serialization of messages. Additionally, since these two are unaware of each other, the records form a contract between them.

And we also learned that we can make the contract between producers and consumers explicit by using a schema. So we have our why for using a schema, but what we’ve defined so far is a bit abstract and we need to answer these questions for the how:

  • How do you put schemas to use in your application development lifecyle?
  • Given that serialization and deserialization is decoupled from the Kafka producers and consumers how can they use serialization that ensures messages are in the correct format?
  • How do you enforce the correct version of a schema to use? After all changes are inevitable?

The answer to these how questions is Schema Registry.

What is Schema Registry?

Schema Registry provides a centralized application for storing schemas, schema validation and sane schema evolution (message structure changes) procedures. Perhaps more importantly, it serves as the source of truth of schemas that producer and consumer clients can easily discover. Schema Registry provides serializers and deserializers that you can configure Kafka Producers and Kafka Consumers easing the development for applications working with Kafka.

The Schema Registry serializing code supports schemas from the serialization frameworks Avro (https://avro.apache.org/docs/current/) and Protocol Buffers (https://developers.google.com/protocol-buffers). Note that I’ll refer to Protocol Buffers as “Protobuf” going forward. Additionally Schema Registry supports schemas written using the JSON Schema (https://json-schema.org/), but this is more of a specification vs a framework. I’ll get into working with Avro, Protobuf JSON Schema as we progress through the chapter, but for now let’s take a high-level view of how Schema Registry works:

Figure 1. Schema registry ensures consistent data format between producers and consumers

Let’s quickly walk through how Schema Registry works based on this illustration:

  1. As a produce calls the serialize method, a Schema Registry aware serializer retrieves the schema (via HTTP) and stores it in its local cache.
  2. The serializer embedded in the producer serializes the record.
  3. The producer sends the serialized message (bytes) to Kafka.
  4. A consumer reads in the bytes.
  5. The Schema Registry aware deserializer in the consumer retrieves the schema and stores it in its local cache.
  6. The consumer deserializes the the bytes based on the schema
  7. The Schema Registry servers produces a message with the schema so that it’s stored in the __schemas topic.

While I’m presenting Schema Registry as an important part of the Kafka event streaming platform, it’s not required. Remember Kafka producers and consumers are decoupled from the serializers and deserializers they use. As long as you provide a class that implements the appropriate interface, they’ll work fine with the producer or consumer. But you will lose the validation checks that come from using Schema Registry. I’ll cover serializing without Schema Registry at the end of this chapter.

While the previous illustration gave you a good idea of how schema registry works, there’s an important detail I’d like to point out here. While it’s true that the serializer or deserializer will reach out to Schema Registry to retrieve a schema for a given record type, it only does so once, the first time it encounters a record type it doesn’t have the schema for. After that, the schema needed for (de)serialization operations is retrieved from local cache.

Getting Schema Registry

Our first step is to get Schema Registry up and running. Again you’ll use docker-compose to speed up your learning and development process. We’ll cover installing Schema Registry from a binary download and other options in an appendix. But for now just grab the docker-compose.yml file from the chapter_3 directory in the source code for the book.

This file is very similar to the docker-compose.yml file you used in chapter two. But in addition to the Zookeeper and Kafka images, there is an entry for a Schema Registry image as well. Go ahead and run docker-compose up -d. To refresh your memory about the docker commands the -d is for “detached” mode meaning the docker containers run in the background freeing up the terminal window you’ve executed the command in.

Architecture

Before we go into the details of how you work with Schema Registry, it would be good to get high level view of how it’s designed. Schema Registry is a distributed application that lives outside the Kafka brokers. Clients communicate with Schema Registry via a REST API. A client could be a serializer (producer), deserializer (consumer), a build tool plugin, or a command line request using curl.

Schema Registry uses Kafka as storage (write-ahead-log) of all its schemas in __schemas which is a single partitioned, compacted topic. It has a primary architecture meaning there is one leader node in the deployment and the other nodes are secondary.

What this means is that only the primary node in the deployment writes to the schemas topic. Any node in the deployment will accept a request to store or update a schema, but secondary nodes forward the request to the primary node. Let’s look at an illustration to demonstrate:

Figure 2. Schema Registry is a distributed application where only the primary node communicates with Kafka

Anytime a client registers or updates a schema, the primary node produces a record to the schemas topic. Schema Registry uses a Kafka producer for writing and all the nodes use a consumer for reading updates. So you can see that Schema Registry’s local state is backed up in a Kafka topic making schemas very durable.

But all Schema Registry nodes serve read requests from clients. If any secondary nodes receive a registration or update request, it is forwarded to the primary node. Then the secondary node returns the response from the primary node. Let’s take a look at an illustration of this architecture to solidify your mental model of how this works:

Figure 3. All Schema Registry nodes can serve read requests

Now that we’ve given an overview of the architecture, let’s get to work by issuing a few basic commands using Schema Registry REST API.

Communication – Using Schema Registry’s REST API

So far we’ve covered how Schema Registry works, but now it’s time to see it in action by uploading a schema then running some additional commands available to get more information about your uploaded schema. For the initial commands you’ll use curl and jq in a terminal window.

curl (https://curl.se/) is a command line utility for working with data via a URLs. jq (https://stedolan.github.io/jq/) is a command-line json processor. For installing jq for your platform you can visit the jq download site https://stedolan.github.io/jq/download/. For curl it should come installed on Windows 10+ and Mac Os. On Linux you can install via a package manager. If you are using Mac OS you can install both using homebrew – https://brew.sh/.

Typically you’ll use the build tool plugins for performing Schema Registry actions. First they make the development process much faster rather than having run the API calls from the command line, and secondly they will automatically generate source code from schemas. We’ll cover using build tool plugins in an upcoming section.

Register a schema

Before we get started make sure you’ve run docker-compose up -d so that we’ll have a Schema Registry instance running. But there’s going to be nothing registered so your first step is to register a schema. Let’s have a little fun and create a schema for Marvel Comic super heroes, the Avengers. You’ll use Avro for your first schema and let’s take a second now to discuss the format:

Listing 2. Avro schema for Avengers

   {"namespace": "bbejeck.chapter_3", (1)
 "type": "record",          (2)
 "name": "Avenger",         (3)
 "fields": [                (4)
     {"name": "name", "type": "string"},
     {"name": "real_name", "type": "string"},         (5)
     {"name": "movies", "type":
                      {"type": "array", "items": "string"},
      "default": []   (6)
    }
  ]
}

1 The namespace uniquely identifies the schema. For generated Java code the namespace is the package name.

2 The type is record which is a complex type. Other complex types are enums, arrays, maps, unions and fixed. We’ll go into more detail about Avro types later in this chapter.

3 The name of the record

4 Declaring the fields of the record

5 Describing the individual fields. Fields in Avro are either simple or complex.

6 Providing a default value. If the serialized bytes don’t contain this field, Avro uses the default value when deserializing.

You define Avro schemas in JSON format. You’ll use this same schema file in a upcoming section when we discuss the gradle plugin for code generation and interactions with Schema Registry. Since Schema Registry supports Protobuf and JSON Schema formats as well let’s take a look at the same type in those schema formats here as well:

Listing 1 nato01.c

   syntax = "proto3";  (1)

package bbejeck.chapter_3.proto;  (2)

option java_outer_classname = "AvengerProto"; (3)

message Avenger {   (4)
    string name = 1;  (5)
    string real_name = 2;
    repeated string movies = 3;  (6)

}

1 Defining the version of Protobuf, we’re using version three in this book

2 Declaring the package name

3 Specifying the name of the outer class, otherwise the name of the proto file is used

4 Defining the message

5 Unique field number

6 A repeated field; corresponds to a list

The Protobuf schema looks closer to regular code as the format is not JSON. Protobuf uses the numbers you see assigned to the fields to identify those fields in the message binary format. While Avro specification allows for setting default values, in Protobuf (version 3), every field is considered optional, but you don’t provide a default value. Instead, Protobuf uses the type of the field to determine the default. For example the default for a numerical field is 0, for strings it’s an empty string and repeated fields are an empty list.

Now let’s take a look at the JSON Schema version:

Listing 4. JSON Schema schema for Avengers

   {
  "$schema": "http://json-schema.org/draft-07/schema#",  (1)
  "title": "Avenger",
  "description": "A JSON schema of Avenger object",
  "type": "object",             (2)
  "javaType": "bbejeck.chapter_3.json.SimpleAvengerJson", (3)
  "properties": {     (4)
    "name": {
      "type": "string"
    },
    "realName": {
      "type": "string"
    },
    "movies": {
      "type": "array",
      "items": {
        "type": "string"
      },
      "default": []  (5)
    }
  },
  "required": [
    "name",
    "realName"
  ]
}

1 Referencing the specific schema spec

2 Specifying the type is an object

3 The javaType used when deserializing

4 Listing the fields of the object

5 Specifying a default value

The JSON Schema schema resembles the Avro version as both use JSON for the schema file. The biggest difference between the two is that in the JSON Schema you list the object fields under a properties element vs. a fields array and in the fields themselves you simply declare the name vs. having a name element.

I’ve shown the different schema formats here for comparison. But in the rest of the chapter, I’ll usually only show one version of a schema in an example to save space. But the source code will contain examples for all three supported types.

Now that we’ve reviewed the schemas, let’s go ahead and register one. The command to register a schema with REST API on the command-line looks like this:

Listing 5. Register a schema on the command line

   jq '. | {schema: tojson}' src/main/avro/avenger.avsc | \   (1)
curl -s -X POST http://localhost:8081/subjects/avro-avengers-value/versions\  (2)
         -H "Content-Type: application/vnd.schemaregistry.v1+json" \  (3)
         -d @-  \ (4)
         | jq (5)

1 Using the the jq tojson function to format the avenger.avsc file (new lines aren’t valid json) for uploading, then pipe the result to the curl command

2 The POST URL for adding the schema, the -s flag suppresses the progress info output from curl

3 The content header

4 The -d flag specifies the data and @- means read from STDIN i.e. the data provided by the jq command preceding the curl command

5 Piping the json response through jq to get a nicely formatted response

The result you see from running this command should look like this:

Listing 6. Expected response from uploading a schema

 
   {
  "id": 1
}

The response from the POST request is the id that Schema Registry assigned to the new schema. Schema Registry assigns a unique id (a monotonically increasing number) to each newly added schema. Clients use this id for storing schemas in their local cache.

Before we move on to another command I want to call your attention to annotation 2, specifically this part – subjects/avro-avengers-value/, it specifies the subject name for the schema. Schema Registry uses the subject name to manage the scope of any changes made to a schema. In this case it’s confined to avro-avengers-value which means that values (in the key-value pairs) going into the avro-avengers topic need to be in the format of the registered schema. We’ll cover subject names and the role they have in making changes in an upcoming section.

Next, let’s take a look at some of the available commands you can use to retrieve information from Schema Registry.

Imagine you are working on building a new application to work with Kafka. You’ve heard about Schema Registry and you’d like to take a look at particular schema one of your co-workers developed, but you can’t remember the name and it’s the weekend and you don’t want to bother anyone. What you can do is list all the subjects of registered schemas with the following command:

Listing 7. Listing the subjects of registered schemas

 
   curl -s "http://localhost:8081/subjects" | jq

The response from this command is a json array of all the subjects. Since we’ve only registered once schema so far the results should look like this:

 
   [
  "avro-avengers-value"
]

Great, you find here what you are looking for, the schema registered for the avro-avengers topic.

Now let’s consider there’s been some changes to the latest schema and you’d like to see what the previous version was. The problem is you don’t know the version history. The next command shows you all of versions for a given schema.

Listing 8. Getting all versions for a given schema

 
   curl -s "http://localhost:8081/subjects/avro-avengers-value/versions" | jq

This command returns a json array of the versions of the given schema. In our case here the results should look like this:

[ 1 ]

Now that you have the version number you need, now you can run another command to retrieve the schema at a specific version:

Listing 9. Retrieving a specific version of a schema

 
   curl -s "http://localhost:8081/subjects/avro-avengers-value/versions/1"\
 | jq '.'

After running this command you should see something resembling this:

 
   {
  "subject": "avro-avengers-value",
  "version": 1,
  "id": 1,
  "schema": "{\"type\":\"record\",\"name\":\"AvengerAvro\",
      \"namespace\":\"bbejeck.chapter_3.avro\",\"fields\"
      :[{\"name\":\"name\",\"type\":\"string\"},{\"name\"
        :\"real_name\",\"type\":\"string\"},{\"name\"
          :\"movies\",\"type\":{\"type\":\"array\"
            ,\"items\":\"string\"},\"default\":[]}]}"
}

The value for the schema field is formatted as a string, so the quotes are escaped and all new-line characters are removed.

With a couple of quick commands from a console window, you’ve been able to find a schema, determine the version history and view the schema of a particular version.

As a side note, if you don’t care about previous versions of a schema and you only want the latest one, you don’t need to know the actual latest version number. You can use the following REST API call to retrieve the latest schema:

Listing 10. Getting the latest version of a schema

curl -s "http://localhost:8081/subjects/avro-avengers-value/
  versions/latest" | jq '.'

I won’t show the results of this command here, as it is identical to the previous command.

That has been a quick tour of some of the commands available in the REST API for Schema Registry. This just a small subset of the available commands. For a full reference go to https://docs.confluent.io/platform/current/schema-registry/develop/api.html#sr-api-reference.

That’s all. Thanks for reading.