dean_00

By Alexander Dean

In this article, excerpted from the book Unified Log Processing, we generate a simple stream of events related to systems monitoring.

Let’s imagine that our company has a server that keeps running out of space – it hosts some particularly chatty application that keeps generating lots of log files. Our systems administrator wants to receive a warning whenever the server’s disk reaches 80% full, so that he can go in and manually archive and remove the excess log files.

Of course, there is a rich, mature ecosystem of systems monitoring tools that could meet this requirement. Simplifying somewhat*, these tools typically use one of two monitoring architectures:

  1. Push-based monitoring, where an agent running on each monitored system periodically sends data (often called “metrics”) into a centralized system. Push-based monitoring systems include Ganglia, Graphite, collectd and StatsD
  2. Pull-based monitoring, where the centralized system periodically “scrapes” metrics from each monitored system. Pull-based monitoring systems include: JMX, librit and WMI

The push and pull approaches are both visualized in Figure 5.8. Sometimes a systems monitoring tool will provide both approaches – for example, Zabbix and Prometheus are predominantly pull-based systems with some push support.

* http://www.boxever.com/push-vs-pull-for-monitoring


dean_01

Figure 1. In push-based systems monitoring, some kind of agent pushes metrics at regular intervals into a centralized system. By contrast in pull-based architectures, the centralized system regularly scrapes metrics from some form of endpoints available on the servers


In the push-based model, we have agents generating events and submitting them to a centralized system; the centralized system then analyzes the event stream obtained from all agents. Does this sound familiar? We can transplant this approach directly into our unified log, as per Figure 2.


dean_02

Figure 2. We can implement push-based systems monitoring on top of our unified log. The agents running on our servers will emit events that are written to our unified log, ready for further processing


It looks, then, like it should be straightforward to meet our systems administrator’s monitoring requirements using a unified log such as Apache Kafka or Amazon Kinesis. Of course, we will be using Kinesis for this article – so before we get started, we’ll take a brief look at the terminology differences between Kafka and Kinesis.


Terminology differences from Kafka

Amazon Kinesis has extremely similar semantics to Apache Kafka – however the two platforms diverge a little in the descriptive language that they use. The key differences are set out in Figure 3 – essentially, Kinesis uses “streams” whereas Kafka uses “topics.” Kinesis streams consist of one or more “shards.” where Kafka topics contain “partitions.” Personally I prefer the Kinesis terms to Kafka’s: they are a little less ambiguous and have less “message queue” baggage.


dean_03

Figure 3. The equivalent of a Kafka topic in Kinesis would be a stream. A stream consists of one or more shards, where Kafka refers to partitions


Differences of language aside, the fact that Kinesis offers the same key building blocks as Kafka is encouraging: it suggests that almost everything we could do in Kafka, we can do in Kinesis. To be sure, we will come across some differences of approach and capability through this chapter – I will make sure to highlight these as they come up. For now, let’s get started with Kinesis.


Setting up our stream

First, we need a Kinesis stream to send our systems monitoring events to. Most commands in the AWS CLI follow this format:

$ aws [service] [command] options...

In our case, all of our commands will start with aws kinesis. You can find a full reference of all available AWS CLI commands for Kinesis here:

http://docs.aws.amazon.com/cli/latest/reference/kinesis/index.html

We can create our new stream using the AWS CLI like so:

$ aws kinesis create-stream --stream-name events \
   --shard-count 2 --profile=ulp

Hit enter, and then switch back to the AWS web interface, and click on Amazon Kinesis. If you are quick enough, you should see the new stream listed with its status set to CREATING, and with a count of 0 shards, as in Figure 4. After the stream is created, Kinesis will report the stream status as ACTIVE and display the correct number of shards. We can only write events to and read events from ACTIVE streams.


dean_04

Figure 4. Our first Amazon Kinesis stream is currently being created. After a few more seconds, we will see a status of ACTIVE and the correct shard count.


We created our stream with two shards – meaning that events which are sent to the event stream will be written to one or either of the two shards; any stream processing apps which we write will have to make sure to read events from all shards. At the time of writing, Amazon enforces a few limits around shards:*

  • You are allowed 10 shards per AWS region by default (although can have that limit raised as much as needed)
  • Each shard supports reading up to 2 MB of record data per minute, and up to 5 “read transactions” per second
  • Each shard supports writing up to 1 MB of record data per minute, and up to 1,000 records per second

Don’t worry – we won’t be hitting any of these limits; in fact we could have happily made do with just one shard in our stream.

So at this point, we have our Kinesis stream ready and waiting to receive some events. In the next section, let’s model those events.

* http://docs.aws.amazon.com/kinesis/latest/dev/service-sizes-and-limits.html


Modeling our events

Remember that our systems administrator wants to receive a warning whenever the troublesome server’s disk reaches 80% full. To support this monitoring, the agent running on the server will need to regularly read file-system metrics and send those metrics into our unified log for further analysis. We can model these metrics readings as events using this grammatical structure:

  • Our agent is the subject of the event
  • Read (i.e., “took a reading”) is the verb of the event
  • File-system metrics are the direct object of the event
  • The reading takes place on our server, a prepositional object
  • The reading takes place at a specific time, another prepositional object

Putting these together, we can sketch out the event model that we’ll need to assemble as in Figure 5.


dean_05

Figure 5. Our systems monitoring events involve an agent reading file-system metrics on a given server at a specific point in time


Now we know what our events should look like, let’s write our agent.


Writing our agent

We are going to write our agent in Python, making use of the excellent boto library, which is the official Python SDK for AWS. In fact all of the official language-specific SDKs for AWS support writing events to Kinesis, which seems fair: for Kinesis to be a truly unified log, we need to be able to send events to it from all of our various client applications, whatever language they are written in.

http://aws.amazon.com/sdk-for-python/

Let’s get started. We are going to build up our systems monitoring agent piece by piece, using the Python interactive interpreter. Start it up by logging into your Vagrant guest and typing:

Python 2.7.6 (default, Mar 22 2014, 22:59:56)
 [GCC 4.8.2] on linux2
 Type "help", "copyright", "credits" or "license" for more information.
 >>>

First let’s define and test a function that gives us the file-system metrics that we need. Paste the following into your Python interpreter, being careful to keep the whitespace intact:

 import os def get_filesystem_metrics(path):
   stats = os.statvfs(path)
   block_size = stats.f_frsize
   return (block_size * stats.f_blocks, # Filesystem size in bytes
     block_size * stats.f_bfree,        # Free bytes
     block_size * stats.f_bavail)       # Free bytes excl. reserved space
  
 s, f, a = get_filesystem_metrics("/")
 print "size: {}, free: {}, available: {}".format(s, f, a)

You should see something like the following output – of course the exact numbers will depend on your own computer:

size: 499046809600, free: 104127823872, available: 103865679872

Good – so we now know how to retrieve the information we need about the file-system. Next, let’s create all the metadata that we need for our event. Paste in the following:

import datetime, socket, uuid
 def get_agent_version():
    return "0.1.0"
  def get_hostname():
    return socket.gethostname()
  def get_event_time():
   return datetime.datetime.now().isoformat()
  def get_event_id():
   return str(uuid.uuid4())
  
 print "agent: {}, hostname: {}, time: {}, id: {}".format(
   get_agent_version(), get_hostname(), get_event_time(), get_event_id())

You should now see something a little like:

agent: 0.1.0, hostname: Alexanders-MacBook-Pro.local, time: 2015-03-
 05T23:35:11.807298, id: 42432ebe-40a5-4407-a066-a1361fc31319

Note that we are uniquely identifying each event by attaching a freshly minted version 4 UUID as its event ID.* We will explore event IDs in much more detail in Chapter XX.

Let’s put this all together with a function that creates our event as a Python dictionary. Type in the following at the interpreter:

def create_event():
   size, free, avail = get_filesystem_metrics("/")
   event_id = get_event_id()
   return (event_id, {
     "id": event_id,
     "subject": {
       "agent": {
         "version": get_agent_version()
       }
     },
     "verb": "read",
     "direct_object": {
       "filesystem_metrics": {
         "size": size,
         "free": free,
         "available": avail
       }
     },
     "at": get_event_time(),
     "on": {
       "server": {
         "hostname": get_hostname()
       }
      }
    }) 
 print create_event()

It’s a little verbose, but the intent should be clear from the Python interpreter’s output, which should be something like this:

* http://en.wikipedia.org/wiki/Universally_unique_identifier

('60f4ead5-8a1f-41f5-8e6a-805bbdd1d3f2', {'on': {'server': {'hostname': 'ulp'}}, 'direct_object': {'filesystem_metrics': {'available': 37267378176, 'free':
 39044952064, 'size': 42241163264}}, 'verb': 'read', 'at': '2015-03-
 05T23:35:34.878537', 'id': '60f4ead5-8a1f-41f5-8e6a-805bbdd1d3f2',
 'subject': {'agent': {'version': '0.1.0'}}})

We have now constructed our first well-structured systems monitoring event! How do we send it to our Kinesis stream? It should be as simple as this:

def write_event(conn, stream_name):
   event_id, event_payload = create_event()
   event_json = json.dumps(event_payload)
   conn.put_record(stream_name, event_json, event_id)
  
 conn = kinesis.connect_to_region(region_name="us-east-1", profile_name="ulp") write_event(conn, "events")

The key method to understand here is conn.put_record, which takes three required arguments:

  1. The name of the stream to write to
  2. The data (sometimes called body or payload) of the event. We are sending this data as a Python string containing our JSON
  3. The partition key for the event. This will determine which shard the event is written to Now we just need to connect to Kinesis and try writing an event – this is as simple as:
from boto import kinesis
  
 conn = kinesis.connect_to_region(region_name="us-east-1",
        profile_name="ulp")
  
 write_event(conn, "events")

Part of the reason that this code is so simple is that the AWS CLI tool that you configured earlier uses boto, the AWS SDK for Python, under the hood. This means that boto can access the AWS credentials that you setup earlier in the AWS CLI without any trouble.

Hit enter on the above code and you should be greeted with… silence! Although in this case no news is good news, it would still be nice to get some visual feedback. This can be arranged: next put our event sending into an infinite loop in the Python interpreter like so:

while True:   write_event(conn, "events")

Leave this running for a couple of minutes, and then head back into the Kinesis section of the AWS web interface, and click on your events stream to bring up the Stream Details view. At the bottom of this view, in the Monitoring tab, you should be able to see the beginnings of lines on some of the charts, as in Figure 6.


dean_06

Figure 6. The Monitoring tab in the Stream Details view lets you review the current and historical performance of a given Kinesis stream.


Unfortunately, this is the only visual confirmation we can get that we are successfully writing to our Kinesis stream – at least until we write some kind of stream consumer. We will do that soon, but first let’s wrap up our systems monitoring agent. We won’t need the Python interpreter any more, so you can kill the infinite loop with Control-C, and then exit the interpreter with Control-D.

Let’s consolidate all of our work at the interpreter into a single file to run our agent’s core monitoring loop. Create a file called agent.py and populate it with the contents of Listing 2.


Listing 5.1. agent.py

#!/usr/bin/env python
  
 import os, datetime, socket, json, uuid, time
 from boto import kinesis
 def get_filesystem_metrics(path):
   stats = os.statvfs(path)
   block_size = stats.f_frsize
   return (block_size * stats.f_blocks, # Filesystem size in bytes
     block_size * stats.f_bfree,        # Free bytes
     block_size * stats.f_bavail)       # Free bytes excluding reserved space
  def get_agent_version():
   return "0.1.0"
  def get_hostname():
    return socket.gethostname()
  def get_event_time():
    return datetime.datetime.now().isoformat()
  def get_event_id():
   return str(uuid.uuid4())  def create_event():
    size, free, avail = get_filesystem_metrics("/")
   event_id = get_event_id()
    return (event_id, {
     "id": event_id,
     "subject": {
       "agent": {
         "version": get_agent_version()
       }
     },
     "verb": "read",
     "direct_object": {
       "filesystem_metrics": {
         "size": size,
         "free": free,
         "available": avail
       }
     },
     "at": get_event_time(),
     "on": {
       "server": {
         "hostname": get_hostname()
       }
      }
    }) 
 def write_event(conn, stream_name):
   event_id, event_payload = create_event()
   event_json = json.dumps(event_payload)
   conn.put_record(stream_name, event_json, event_id)
   return event_id
  
 if __name__ == '__main__':                                        # a
    conn = kinesis.connect_to_region(region_name="eu-west-1",
     profile_name="ulp")
   while True:                                                     # b
     event_id = write_event(conn, "events")
     print "Wrote event: {}".format(event_id)
     time.sleep(10)                                                # c

#a The entrypoint for our app

#b Loop forever

#c Emit one event every 10 seconds


Make the agent.py file executable and run it:

chmod +x agent.py
 ./agent.py
 Wrote event 481d142d-60f1-4d68-9bd6-d69eec5ff6c0
 Wrote event 3486558d-163d-4e42-8d6f-c0fb91a9e7ec
 Wrote event c3cd28b8-9ddc-4505-a1ce-193514c28b57
 Wrote event f055a8bb-290c-4258-90f0-9ad3a817b26b ...

Our agent is running! Check out Figure 7 for a visualization of what we have created.


dean_07

Figure 7. Our systems monitoring agent is now emitting an event containing file-system statistics every 10 seconds.