Description: https://images.manning.com/360/480/resize/book/6/8d1983a-37de-4257-ba2d-d7d1c14edfc0/Riedesel-ST-MEAP-HI.png

From Software Telemetry by Jamie Riedesel

This article covers emitting and shipping telemetry to somewhere off the system, such as into a queue.


Take 40% off Software Telemetry by entering fccriedesel into the discount code box at checkout at manning.com.


Emitter/Shipper functions, telemetry from production code

A trade-off is required when deciding whether to emit to a file and have something else move telemetry off the system or to have your production code do the moving.

  • By emitting to a file or the system-log, a dedicated shipper program can handle the complexity of moving telemetry in order that your production systems don’t have to, and it’s simpler for your production systems. Also, emitting telemetry to somewhere off the system doesn’t block production code-paths if the queue or stream is temporarily down.
  • By emitting directly from production code to somewhere off the system your telemetry spends the least amount of time on the production system as possible. Also, you don’t need to configure an additional shipper to do it, and your overall telemetry system is simpler.

Shipping directly into storage

This section covers the simplest of Shipping Stages, when your Emitting Stage emitter sends telemetry directly into storage. The simplest possible way to do this is by sending your telemetry to a file. Sending to a file is only centralized logging if your production code is only running in one place. This article is about shipping your telemetry off the system, and we look at sending telemetry to a database of some kind.

Our one-hundred-person startup is facing a problem. They’ve grown enough as a company that keeping their logs inside their Kubernetes cluster isn’t scaling, and they want to send their logs into an Elasticsearch cluster maintained by their cloud-provider instead. Figure 1 describes their desired architecture.


Figure 1. The desired flow of telemetry from this startup is Docker-based production code into a cloud-provider-managed Elasticsearch. Emitter functions in the production code is rewritten to send telemetry directly into Elasticsearch, and telemetry is no longer taking up space in the Kubernetes cluster.


To get to their desired architecture, we need to rewrite their metrics function to send to Elasticsearch. Figure 2 gives us how the metric data is transformed and written into Elasticsearch.


Figure 2. metrics.py rewritten to insert into an Elasticsearch cluster. After being called the function creates a hash with the supplied values, and passes it to the esclient object to index it into the cluster. The index method then makes an HTTP PUT on an Elasticsearch specific indexing URL passing the hash as data.


Listing 1 metrics.py: emitting into Elasticsearch

 
 # A class for wrapping the elasticsearch class, intended to provide
 # a metrics-specific facility.
  
 from elasticsearch import Elasticsearch
  
 esclient = Elasticsearch (    #A
   hosts=[{"host": "escluster.prod.internal", "port" : 9200}],    #A
   sniff_on_start=False,    #A
   sniffer_timeout=60    #A
   )    #A
  
 def counter(msg, count=1):    #B
   """Emits a metric intended to be counted or summarized.
  
   Example: counter("pages", "15")
   """
   metric = {    #C
      "metric_name": msg,
      "metric_value": count,
      "metric_type": "counter"
   }
   esclient.index(    #D
     index="metrics",
     body=metric
   )
  
 def timer(msg, time=0.0):    #E
   """Emits a metric for tracking run-times.
  
   Example: timer("convert_worker_runtime", "2.7")
   """
   metric = {    #F
     "metric_name": msg,
     "metric_value": time,
     "metric_type": "timer”
   }
   esclient.index(     #G
     index="metrics",
     body=metric
   )
  

#A Defines a client class for Elasticsearch.

#B Defines a counter class.

#C Builds a hash with the metric document we will insert for the counter.

#D Inserts the metric hash into the ‘metrics’ index

#E Defines a timer class.

#F Builds a hash with the metric document we will insert for the timer.

#G Inserts the metric hash into the ‘metrics’ index

The function in listing 1 is somewhat short, let’s take a look at what’s going on:

  • metrics.counter(“profile_image_uploaded”) to increment the count of uploaded profile images by one.
  • metrics.counter(“pdf_pages”, 19) to indicate the number of pdf_pages encountered was nineteen
  • metrics.timer(“profile_image_convert_time”, 0.9) to indicate that converting the uploaded profile image to an appropriate image-dimension took 0.9 seconds.

Instead of emitting lines into a file, or lines into Syslog, this is injecting JSON-formatted documents into an Elasticsearch index. Elasticsearch, which is the storage for this particular Shipping Stage, indexes the document for searchability. A Presentation Stage system using Elasticsearch as storage could retrieve this telemetry through a search like:

 
 metric_name:”pdf_pages”
  

This returns all of the metrics telemetry with that value in metric_name. When combined with a date-range search, the timestamp on our JSON documents is created automatically by Elasticsearch when it’s indexed, the searcher can narrow to a small window of time. The presentation system handles converting and aggregating the metric_value field. Elasticsearch is used in this example, but there are many different databases you can use for metrics. Here are a few:

  • Prometheus: a dedicated time-series database which is open source.
  • InfluxDB: another dedicated time-series database, this one is open-core (open source for basic features, pay for extended features like clustering).
  • Any relational database: such as MySQL, Postrgres, and MS-SQL.

Although the example here use metrics, the same direct-insert method is useful for centralized logging as well.

Shipping through queues and streams

Not all emitter/shipper functions ship directly to storage; often telemetry data needs more transformation before it’s ultimately stored. Also, having the vast multitude of your production systems all individually writing to a database can be woefully inefficient (not to mention hard on the database). This section is about centralizing the flow of telemetry to allow down-stream Shipping Stage components to bulk-insert into your storage systems. The most popular ways to accomplish this is through using queues and a related technology called streams.

  • A queue is a basic structure. Data is pushed onto the bottom of the queue, and data at the top is serviced by the next system requesting data. First in, first out (FIFO).
  • A stream does this, but adds the concept of a consumer group. A consumer group is a group of systems that all act together and see the same FIFO behavior. Different consumer groups see the same data, but the FIFO behavior is kept inside each consumer group. To accomplish this, the stream system needs to keep each item of data in a stream until all consumer groups have seen it. This is a powerful system, and we’ll get into more of this later.

In the previous section, our one-hundred–person startup had a Kubernetes cluster, and wanted their production code to send telemetry directly into Elasticsearch. What if it wasn’t a single Kubernetes cluster, but one hundred of them? It isn’t hard to imagine that Elasticsearch is dealing with ten thousand Docker containers each submitting telemetry. Figure 3 provides two views of this problem, one with all ten thousand Docker containers writing to Elasticsearch, and a second with those Docker containers instead writing to a queue or stream.


Figure 3. Two views of the one hundred Kubernetes / ten thousand Docker container problem. To the right, all ten thousand Docker containers are writing directly to Elasticsearch. Below, they write to a queue or stream. A group of bulk writer systems service the queue or stream and use bulk-insert into Elasticsearch. Write transactions are reduced ninety-five percent compared to the direct insert model.


Although it’s entirely possible to engineer Elasticsearch to accept ten thousand small writes a second, Elasticsearch requires fewer resources (and has an easier time with consistency) when dealing with fewer, larger writes instead. Most databases generally behave this way, in fact. By extending our telemetry pipeline to add more stages we take load off of the database. To start, let’s focus on modifying the emitter demonstrated in listing 1 to emit to a Redis list instead of an Elasticsearch database. We’ll examine the bulk writers later. Redis is an in-memory data-structure store which allows storing lists, hashes, sets, and other types of data. For our purposes we use a list, because that can be made to function like a queue. Our data transformation flow (seen in figure 2) is revised into what we see in figure 4.


Figure 4. The metrics logger revised to send to a Redis list instead. Flow through the logger program is similar from figure 2, with fewer details encoded in the hash. The rpush method is called on the redis_client to submit the metadata. The Redis RPUSH command is called, inserting our metadata to the end of the list named metrics_counters.


We see the same number of steps but instead are writing to a new destination. Listing 2 provides a view into the changes to our metrics.py method.

2 metrics.py: emitting to a Redis queue/list

 
 # A class for wrapping the redis class, intended to provide
 # a metrics-specific facility.
  
 import redis
 import json
  
 redis_client = redis.Redis( host=’log-queue.prod.internal’)    #A
  
 def counter(msg, count=1):    #B
   """Emits a metric intended to be counted or summarized.
  
   Example: counter("pages", "15")
   """
   metric = {
     "metric_name" : msg,
     "metric_value" : count
   }
   redis_client.rpush(’metrics_counters’, json.dump(metric))    #C
  
 def timer(msg, time=0.0):    #D
   """Emits a metric for tracking run-times.
  
   Example: timer("convert_worker_runtime", "2.7")
   """
   metric = {
     "metric_name": msg,
     "metric_value": time,
   }
   redis_client.rpush(’metrics_timers’, json.dump(metric))    #E
  

#A Creates an interface to a redis sever on  log-queue.prod.internal

#B The counter class, same as previous examples.

#C Pushes the generated hash into the list named metrics_counters

#D The timer class, same as previous examples.

#E Pushes the generated hash into the list named metrics_timers

 

The revised metrics logger in listing 2 is called the same way as previous examples, that part hasn’t changed. Where the change happens is how telemetry is moved into the shipping stage. When metrics.timer or metrics.counter is called, this function generates a structured hash. This hash is then injected into redis using the RPUSH command, which adds the hash to the bottom of a list-structure in redis and makes the list work like a queue.

Our bulk writer systems uses the BLPOP command (Blocking List POP) to fetch the first element of the named list as blocking is done to ensure no other system fetches the same values. It then takes this hash and injects it into the storage system for the Shipping Stage, for display by the Presentation Stage. Making sure that telemetry doesn’t linger in the pipeline too long is a concern, and the bulk-writer-counter.py program needs to flush writes early in certain cases. Figure 5 shows the flow.


Figure 5. Execution flow during bulk-writer-counters.py. This makes sure that inserts into Elasticsearch have enough items to be worth it, except if it has been too long since an insert.


One possible version of the bulk writer script is here in listing 3:

3 bulk-writer-counters.py: Watching redis, shipping to Elasticsearch

 
 import json
 import redis
 import time
 from elasticsearch import Elasticsearch
  
 redis_client = redis.Redis( host='log-queue.prod.internal' )
 esclient = Elasticsearch (
   hosts=[{"host": "escluster.prod.internal", "port" : 9200}],
   sniff_on_start=False,
   sniffer_timeout=60
   )
 wait_limit = 5    #A
  
 bulk_size  = 200    #B
  
 last_time     = time.time()    #C
 current_size  = 0    #C
 current_items = []    #C
 do_insert     = False    #C
  
 while True:
      counter_raw = redis_client.blpop('metrics_counters', wait_limit)    #D
   if counter_raw == None:    #E
     do_insert = True    #E
   else:
     counter     = json.loads(counter_raw)    #F
     counter['metric_type'] = 'counter'    #G
     bulk_header = { 'index' : {    #H
       '_index': 'metrics' } }    #H
     bulk_item = [    #H
       json.dump(bulk_header),    #H
       json.dump(counter)    #H
       ]    #H
     current_items.append("\n".join(bulk_item))    #I
     if len(current_items) >= bulk_size:    #J
       do_insert = True    #J
     elif (time.time() - last_time) >= wait_limit:    #J
       do_insert = True    #J
  
   if do_insert:
     esclient.bulk( current_items, index='metrics' )    #K
     last_time = time.time()
     do_insert = False
        current_items = []
  

#A S et the maximum time in seconds we will wait to insert

#B Set the maximum number of items we will insert

#C Set defaults for the loop before we enter it

#D Fetches a value from the named list, waiting up to wait_limit

#E If the timeout expired, force an insert, otherwise, process the value

#F Decodes the JSON into a native hash

#G Updates the hash to add the metric_type field

#H Formats a hash for Elasticsearch bulk-insert

#I Newline delimits the bulk_items array as it is appended

#J Checks if we’re ready to insert yet

#K Performs the bulk-insert to Elasticsearch

The bulk-writer-counters.py script needs small tweaks to turn it into one for the timers, otherwise it’s a drop-in for timer metrics. Written this way, we ensure that up to two hundred metrics are inserted into Elasticsearch each time we insert. We also use timeouts to make sure that our metrics data is recent enough; meaning we can insert fewer than two hundred metrics if we’ve waited too long to insert. For the case where we were individually writing ten thousand metrics per second, we are now writing ten thousand per second in batchesof  five hundred. The end-to-end flow is shown in figure 6.


Figure 6. The end to end flow of counter metrics, starting in the production code, and ending in Elasticsearch. Along the way writes are collated in the bulk-writer-counters.py script and bulk-inserted into Elasticsearch. This greatly reduces the number of write-transactions Elasticsearch has to handle per second.


Figure 6 describes where this script sits in a shipping pipeline for metrics. The flow of telemetry is:

  1. counter(pdf_pages, 19)
  2. The metrics function injects a JSON document to the metrics_counters list in the Redis server, using the RPUSH command.
  3. The bulk-writer-counters.py script issues a BLPOP command for the metrics_counters list, which receives the JSON document once it’s injected.
  4. The bulk-writer-counters.py script decodes the hash to add a field, and waits until enough metrics are ready to go before attempting to write.
  5. The bulk-writer-counters.py script bulk inserts the collated metrics into the metrics index of the Elasticsearch cluster.

Other than performance reasons, there are several reasons why you may prefer to send telemetry to storage in a later part of the shipping pipeline when you could go there directly.

  • Perhaps a different team owns the storage, and won’t let systems write directly to it without lots of negotiation.
  • Perhaps submit-to-queue is part of the standard metrics library used by all code, including systems that don’t have the ability to write directly to storage.
  • Perhaps the storage lacks appropriate access-control-lists, and granting write to emitting systems also grants delete, allowing attackers who gain control of emitting systems to modify storage. Using a queue breaks this chain and preserves the integrity of the storage system.
  • Perhaps handling storage system outages is too complex for emitting systems, and it’s best to use the queue as a buffer.

Generally speaking, as telemetry ecosystems increase in size the need to queue or otherwise batch updates to storage grows. Telemetry generation rate may force certain architectural compromises in order to maintain performance, queues help with this. Queues also allow buffering between emitting and storage, increasing the ability of the telemetry system to survive storage outages; if the emitting system can’t buffer updates internally, it can harm the performance of the production system as emitting functions block until service is restored.

Using streams in a shipping pipeline

Streams are a modification of the queue idea, but the items in the stream aren’t removed once they’re serviced. Multiple consumers of a stream can be servicing different parts of the stream. Exact stream implementations vary, but the concept of multiple consumers following their own place in the list is common to all stream implementations. How long telemetry remains in the stream also depends on the implementation of the stream.

Note: Every streaming service supports multiple defined streams. Redis calls its streaming objects a ‘stream’. Apache Kafka, which originated the concept of streaming, calls theirs ‘topics’.

To demonstrate how streams are used, let’s go back to the global logistics company. Cisco firewalls, for example, emit telemetry into a Syslog system. Because they have a global footprint, and are operating datacenters, they have many systems emitting telemetry. Being a global company, they have many different teams that consume telemetry, some of which is needed by multiple teams. The firewall data is useful for the Security and Compliance teams, as well as their Network Operations team. Both teams need the same telemetry, but have their own infrastructure for handling it. Streams are how we make this happen.

Figure 7 demonstrates the architecture in use.


Figure 7: Shipping pipeline using a stream to split telemetry flow to two consumers. The stream allows two later Shipping Stage systems to get an identical feed of telemetry, permitting forking a stream. Useful for multi-tenant systems.


  1. The Cisco firewall emits a Syslog line, sending it to the Syslog server at syslog.prod.internal.
  2. The Syslog server sends that line to a stream service, in a topic named syslog_stream. This new line is the head of the stream.
  3. The Security and Compliance team’s shipper listens to the stream and pulls lines, and stores them in a Security Incident Event Management system.
  4. The Network Operations team’s shipper does this as well, and stores them in Elasticsearch.

We’ve already shown the commands needed to send telemetry to a Syslog server, now let’s take a look at how to configure the Syslog server to send telemetry to the stream. This requires two bits of code; one for the Syslog server, and a shipping script running on the Syslog server. First, we need to configure the Syslog server to send telemetry to a named pipe, which is a one line addition to the rsyslog config-file.

 
 # Config file for rsyslog
  
 [...]
 local07.*   | /dev/syslog_stream
 [...]
  

This tells rsyslog to send telemetry coming on the local07 Syslog facility to a named pipe at /dev/syslog_stream. This named pipe is created by a service-script written by their Operations team that runs as part of bootup of the Syslog server. This is short; it creates the named pipe (FIFO) and then sits in a while loop waiting for lines.

4 stream_shipper.py: On-boot service to make a FIFO and ship to redis

 
 import os
 import redis
  
 os.mkfifo('/dev/syslog_stream')    #A
  
 redis_client = redis.Redis( host='log-stream.prod.internal')
  
 rsyslog_stream = open('/dev/syslog_stream', "r")    #B
 while True:
   for line in rsyslog_stream:    #C
     redis_client.xadd(
       'syslog_stream',    #D
       '*',    #D
       line )    #D

#A Creates a fifo device, a named pipe, at /dev/syslog_stream

#B Opens the named pipe.

#C Wait for each line to come in on the pipe and act on it.

#D For each line, add it to the ‘syslog_stream’ stream on redis, through the XADD command.

For each line which is sent to the pipe by the Rsyslog server, it submits that line directly to a redis-based stream. Figure 8 shows this flow.


Figure 8. The flow of execution for stream_shipper.py. This script first creates an operating system structure called a FIFO at /dev/syslog_stream. It then opens the newly created stream and starts listening to lines. For each received line of text, it inserts the text into a Redis stream hosted at log-stream.prod.internal on the syslog_stream topic. Use of a stream allows more than one downstream system to get a full copy of events.


Using the Redis stream feature, listing 4 uses a stream named syslog_stream, telemetry is appended to the stream by this shipper script. We’re using an OS level queue called a FIFO, or named pipe, to pass data between rsyslog and the shipper script. The shipper doesn’t care about the consumers, all it cares about is appending data to the stream.

For this global logistics company, there are two teams who consume the firewall feed; the Security and Compliance team and a Network Operations team. To consume items from the stream requires a few steps with Redis. Here’s the beginning of a script used by the Network Operations team to consume telemetry from the stream. It creates a connection to the stream, then sits in a while loop waiting for telemetry:

5 consume_syslog.py: Consumes the redis stream from listing 4

 
 import redis
  
 redis_client = redis.Redis( host='log-stream.prod.internal' )
 redis_client.xgroup_create('syslog_stream', 'noc_team', '$')    #A
  
 while True:
   line = redis_client.xreadgroup(    #B
     'noc_team',
     'noc_ingest',    #C
     'syslog_stream',    #D
     '>')    #E
   do_something(line)
  

#A Creates the consumer-group.

#B Reads from the consumer-group.

#C Name of the individual consumer.

#D Name of the stream to consume.

#E Indicates ‘give me events that no one else has seen before’.

The important line of code is the one that reads from the stream. This has a lot going on, figure 9 breaks out what each part does.


Figure 9. Diagram of the key command in consume_syslog.py, the Redis command that pulls information from the stream topic. Five configuration items are needed to set up a subscriber for the syslog_stream topic. Once done, this loop gets only new items that no other member of the consumer group (named noc_team) have seen.


The script used by the Security and Compliance team looks similar, but use sec_team instead of noc_team for the name of their consumer group. This way both teams get a full stream of telemetry emitted by the Syslog server and the shipper script, but if the NOC team’s script happened to be much faster than the Security and Compliance team’s shipper it doesn’t cause the sec_team shipper to miss telemetry.

The Syslog server configured here’s configured to send Syslog local07 facility telemetry to the stream. The same Syslog server can handle multiple facilities, and it could send local06 facility telemetry to a different stream, and local05 facility telemetry to a queuing system. At the same time, a fleet of database servers could emit telemetry from their local Syslog servers into a database_syslog stream. Shipping to Software-as-a-Service systems

This far, we’ve talked about organizations keeping their entire telemetry systems internally, but not all organizations do this. Small startups, for instance, have too much going on to waste time building a telemetry system, and paying someone else to do it makes all kinds of sense. For certain telemetry styles like Observability, paying someone else is by far the best option until your production systems are truly large.

Let’s take another look at the one-hundred–person startup. They’ve had a metrics service, but are looking to get a deeper look into how their system evolves; it’s no longer only three containers in the Heroku system! They’ve decided to get into Observability, and have picked Honeycomb.io as their SaaS provider. To make use of it, they need to add the Honeycomb SDK to their codebase, and start instrumenting their functions.

They’ve taken a microservices approach to their system, with lots of small docker images doing work. As we’ve seen this far, they’ve a function called pdf_pages which we’ve seen metrics from. This is a small function and it does two things:

  • Count the number of pages in a supplied PDF file.
  • Calls out to another function called pdf_to_png, one for each page, to request PNG images be created from each page in the PDF.

Figure 10 shows the flow of telemetry, from when the SDK is initialized through doing the work of pdf_pages, ending in telemetry being sent to Honeycomb.io.


Figure 10. Flow of execution during a single instance of pdf_pages. Dotted boxes are function work, solid boxes are Observability actions leading up to a POST to the Honeycomb Events API. Note that context related telemetry is gathered at multiple places during this flow. The .send() event is non-blocking, sending to a not-pictured queuing system, to reduce the impact of telemetry operations on the production code.


Listing 5 is our pdf_pages function, highlighting telemetry operations.

5: pdf_pages.py, instrumented with observability

 
 import libhoney    #A
 import [lots of other things]
  
 libhoney.init(    #B
   writekey=os.getenv('HC_WRITEKEY'),    #B
   dataset='example.profile.pages',    #B
   sample_rate=0.25)    #B
  
 def get_file_details(options):
   [...]
  
 def get_page_details(file_details):
   [...]
  
 def enqueue_pdf_create(page_details):
   [...]
  
 def wait_png_pages(png_pages):
   [...]
  
 # Main event hook.
 def do_work(options):
   hc_event     = libhoney.new_event()    #C
   file_details = get_file_details(options)    #D
   hc_event.add_field('file_size', file_details['file_size'])    #E
   hc_event.add_field('file_extension', file_details['extension'])    #E
  
   page_details = get_page_details(file_details)    #F
   hc_event.add_field('page_count', page_details['count'])    #G
  
   png_pages = enqueue_png_create(page_details)    #H
   wait_png_pages(png_pages)    #H
  
   hc_event.send()    #I
  

#A Loads honeycomb SDK

#B Initializes the honeycomb connection

#C Creates new event and begins tracking

#D Fetch file-specific details

#E Add file-specific context details to the event

#F Fetch pdf–specific details

#G Add pdf-specific details to the event

#H Callout and wait for pdf_to_png processes to return

#I Finalize the event and send to Honeycomb

The pdf_pages function here leaves out all of the code doing the work of pdf_pages, and instead shows how the Honeycomb SDK is used to track context-related details. Note that we capture a page_count metric as part of this work, which is associated with this event in the Honeycomb dashboard. We also see two more pieces of explicit context also being gathered:

  • file_size: capturing the size of the source file.
  • file_extension: capturing the extension of the file, which isn’t always .pdf!

You may be wondering why capture the extension when presumably we already know this file is a PDF. The answer is because there have been bugs in the past where certain extensions fool our pdf page-counting code, and the software engineers want to track file-extension in case another bug like that happens again.

if you look at the final HTTP POST in figure 10 you should notice an extra bit of telemetry we didn’t specify:

 
 dur_ms: 6155
  

This value is captured by the Honeycomb SDK directly, and represents the time between the libhoney new_event() call and the hc_event.send() call. Because these calls bracket all of the functions doing the work of pdf_pages, dur_ms captures how long this do_work() function took to perform. In the case of figure 10, the function took 6.155 seconds to count pages in the supplied PDF and wait for individual PNG images of each page to be created.

One final thing to note is the parameter samplerate=0.25 we gave when we initialized the libhoney library. This value ultimately ended up as a header to the HTTP POST going to the honeycomb API, and indicates what percentage of events of this type to keep; twenty-five percent in this case. Because all Telemetry SaaS vendors charge on volume in some way, using a sample-rate is among the best tools at your disposal for keeping costs down for SaaS telemetry systems.

Twenty-five percent is a large sample, and Software Engineering teams should be able to learn how the pdf_pages function operates in the context of the overall system without the costs associated with keeping every event. Once Software Engineering has a good understanding of how this function works, the sample-rate can be reduced even further, bringing costs down as well.

That’s all for this article.

If you want to learn more about the book, you can check it out on our browser-based liveBook platform here.