Catching errors posting to influxdb from riemann

At Signal Vine we recently upgraded from InfluxDB v0.8.8 to v0.9. Unfortunately, we experienced a number of issues related to the migration. One of them was that InfluxDB experienced a problem that caused it to stop accepting data. Since this happened late on a Friday night, no one noticed the lack of data until Monday. This means that we lost all of the data from the Weekend. Luckily for us, our app is not heavily utilized over the weekend but, I decided that we needed to be able to detect any of these sorts of issues in the future.

It turns out that an exception was being omitted each time Riemann failed to send to InfluxDB. I decided to go with a simple (try ... (catch ...)) which is probably not the ideal way to handle this. There is *exception-stream* but I not sure how it is used and I was unable to find an example demonstrating its use. This is what I came up with:

(ns riemann.config
  (:require [riemann.time :as time]))

(def influx (batch 100 1/10
                   (let [throttled-alert (throttle 1 900 tell-brendan)
                         throttled-log (throttle 1 60 (fn log [e] (warn "influxdb-send-exception" (str e))))]
                     (async-queue! :agg {:queue-size 1000
                                         :core-pool-size 1
                                         :max-pool-size 4
                                         :keep-alive-time 60000}
                                   (let [send-influx (influxdb {:host "influxdb.example.com"
                                                                :version :0.9
                                                                :scheme "http"
                                                                :port "8086"
                                                                :db "metrics"
                                                                :username "riemann"
                                                                :password "password"
                                                                :tag-fields #{:host :environment}})]
                                     (fn influx-sending [event]
                                       (try
                                         (send-influx event)
                                         (catch Exception e
                                           (throttled-alert {:host "riemann.example.com"
                                                             :service "influxdb send error"
                                                             :state "fatal"
                                                             :metric 1
                                                             :tags []
                                                             :ttl 60
                                                             :description (str e)
                                                             :time (time/unix-time-real)})
                                           (throttled-log e)))))))))

Obviously, that is a bit more complex then just catching the errors, so Iʼll dig into it. First off, Iʼm batching events together before sending them to InfluxDB. This helps to reduce the cpu load of Riemann. Then I define the two alert function that will be used later on. tell-brendan is a function that sends an email to me. I only want to get one of these every 15 minutes as it is likely that I would see the alert and start working on the problem immediately. However, I do want to see if sending metrics to Riemann is still failing so, I have Riemann log a failure notification every minute. These are both defined here so that the throttle applies to all of the branches later on.

Next up is another performance option. Iʼve set up an async queue so that sending metrics to InfluxDB doesnʼt block Riemannʼs incoming event stream. Iʼve had sending to InfluxDB cause Riemann to back up to the point where events were expiring before Riemann was processing them. Sending them to InfluxDB asynchronously fixes this. It doesnʼt matter how long it takes events to be sent to InfluxDB, all of Riemannʼs processing only depends on Riemann. Since moving to InfluxDB v0.9 and implementing the async queue, the 99.9% stream latency for Riemann has dropped from 50-100ms to 2.5ms.

Next up, I define the Riemann client. There isnʼt much to see here. The only mildly interesting thing is the :tag-fields value. At Signal Vine, all of our events are tagged with an environment. The #{:host :environment} sends both the host value from the event and the environment as InfluxDB tags. This makes it easier to query exactly what you want from Riemann.

Now for the main attraction, the index-sending function. While Riemann tends to hide this in its internal function, Riemannʼs streams are simply made up of functions that take a single argument, the event. Its just that Riemannʼs built in functions return the function that satisfies this and then calls all of the children that you have defined. Since we donʼt have any children, we simply need to construct a function that takes the event. Well in this case it actually takes a vector of events. As previously mentioned, we use a (try ... (catch ... )) to handle any InfluxDB errors. So we simply try to send the event to Riemann. If that throws any exception, we catch it and pass the exception to our notification functions. Iʼve chosen to construct a new event as the passed in events have little to do with the exception.

Iʼm quite fond of this approach but, it does have its limitations. One of the big ones is that this will generate an alert for even momentary issues in InfluxDB. If you happen to restart InfluxDB, you will get an alert. I donʼt really mind this but it is something to keep in mind. It also discards any events which fail to send. In that same scenario, when we restart InfluxDB, we will lose any events that Riemann tries to send to InfluxDB. It would be much better if we would pause the sending process for some period of time and then attempt to resend the events. the main reason that Iʼm not currently doing this is that Iʼm not really sure how to make that happen.

Writing a Riemann metric to a file

At work, I recently setup Riemann for monitoring. Riemann is a monitoring tool that works on streams of events. It includes many powerful tools to work on streams of events. As an example, it has a ddt function that will differentiate the eventʼs metrics over time. This allows you get to a rate of change for a counter. While Riemann includes many powerful tools, your Riemann config file is a Clojure program so youʼre able to extend Riemann by simply modifying your config file.

I had such an occasion to do such a thing this week. We send all of our error level log messages from Logstash to Riemann in order to alert us when we need to check the logs. Doing this is a fairly simple process, we use slack and there is a built-in function to send alerts to Slack. While we could send the whole log message to Slack, this isnʼt ideal for us. Our log messages can be quite long, many thousands of characters and sending that to Slack makes for a fairly bad experience. What we decided to do instead was write the full metric to a file and link to that file in the Slack message. Unfortunately, there isnʼt really a built-in way to do this in Riemann. You could write the messages to Riemannʼs log file but that isnʼt what we are looking for here as that results in a single large log file rather than individual log files.

What I decided to do was create a function that would write out the message to a file with the name set to the messages sha-256 hash. Generating the has was the most complicated part of this. The complication arose from my lack of knowledge on the various libraries that can generate a hash. The way that I figured this out was by Googling variations on Clojure/Java sha-256 hashes and then trying them at the Clojure REPL on a checkout of the Riemann source. Unfortunately, neither of the Clojure hashing libraries are included in Riemann but, I was able to find a java package that Riemann includes that is able to generate hashes, Apache Commons. I likely would have known that if I had more experience with the Java ecosystem but I donʼt. So here is what I came up with.

(ns riemann.config
  (:require [clojure.java.io :as io]))
  (:import (org.apache.commons.codec.binary.Base64)
           (java.security.MessageDigest)))

; A couple of helper functions to shuffle data between strings and byte arrays
(defn base64-encode [^String v] (Base64/encodeBase64 (.getBytes v)))
(defn base64-decode [^bytes b] (Base64/decodeBase64 b))

; The hashing function
(defn digest [^bytes b]
  (.digest (doto (MessageDigest/getInstance "SHA-256")
             (.reset)
             (.update b))))

(defn as-hex [v]
  (-> (Integer/toString (+ 0x100 (bit-and 0xff v)) 16) (.substring 1)))

; Gets the sha-256 of a passed in string
(defn get-sha256 [v]
  (apply str (map as-hex (-> (base64-encode v) (base64-decode) (digest)))))

; Returns a function that writes an event to the specified directory as <sha-256>.txt and
; modifies the event to include {:url <url>/<sha-256>.txt}
(defn write [directory url]
  (fn [& children]
    (fn stream [event]
      (let [write-event (fn [individual-event]
                          (let [contents (reduce (fn [r x] (str r "\n" (x 0) ": " (x 1)))
                                                 (conj (seq individual-event) "Message Contents"))]
                            (let [sha (get-sha256 contents)]
                              (with-open [wrtr (io/writer (str directory "/" sha ".txt"))]
                                (.write wrtr contents)
                                (conj individual-event {:url (str url "/" sha ".txt")})))))]
        (if (vector? event)
          (call-rescue (into [] (map write-event event)) children)
          (call-rescue (write-event event) children))))))

Then all you need to do is define the function that you will use on your steams. Something like (def write-log (write "/var/www/alerts" "https://alerts.example.com")) would work where /var/www/alerts is the content directory for alerts.example.com. To include the link in your Slack alert, youʼll need to provide a custom formatter that includes a link. Here is what we use:

; Truncates the String (s) at the specified number of characters (n)
(defn trunc [s n] (subs s 0 (min (count s) n)))

; The formatter that slack will use when there isn't a link
(defn default-formatter [e] {:text (str "*Host*: " (:host e) "\n*Service*: " (:service e) "\n*Metric*: " (:metric e) "\n*Description*: " (trunc (:description e) 100))})
; The formatter that slack will use when there is a link
(defn url-formatter [e] {:text (str "*Host*: " (:host e) "\n*Service*: " (:service e ) "\n*Metric*: " (:metric e) "\n*Link*: " (:url e) "\n*Description*: " (trunc (:description e) 100))})

; Choses which formatter slack will use
(defn slack-formatter [e]
  (if (nil? (:url e))
    (default-formatter e)
    (url-formatter e)))

I know that’s a lot to piece together, so here is a minimal Riemann config that should work, to show you how to use everything.

(ns riemann.config
  (:require [clojure.java.io :as io]))
  (:import (org.apache.commons.codec.binary.Base64)
           (java.security.MessageDigest)))

; A couple of helper functions to shuffle data between strings and byte arrays
(defn base64-encode [^String v] (Base64/encodeBase64 (.getBytes v)))
(defn base64-decode [^bytes b] (Base64/decodeBase64 b))

; The hashing function
(defn digest [^bytes b]
  (.digest (doto (MessageDigest/getInstance "SHA-256")
             (.reset)
             (.update b))))

(defn as-hex [v]
  (-> (Integer/toString (+ 0x100 (bit-and 0xff v)) 16) (.substring 1)))

; Gets the sha-256 of a passed in string
(defn get-sha256 [v]
  (apply str (map as-hex (-> (base64-encode v) (base64-decode) (digest)))))

; Returns a function that writes an event to the specified directory as <sha-256>.txt and modifies the event to include {:url <url>/<sha-256>.txt}
(defn write [directory url]
  (fn [& children]
    (fn stream [event]
      (let [write-event (fn [individual-event]
                          (let [contents (reduce (fn [r x] (str r "\n" (x 0) ": " (x 1)))
                                                 (conj (seq individual-event) "Message Contents"))]
                            (let [sha (get-sha256 contents)]
                              (with-open [wrtr (io/writer (str directory "/" sha ".txt"))]
                                (.write wrtr contents)
                                (conj individual-event {:url (str url "/" sha ".txt")})))))]
        (if (vector? event)
          (call-rescue (into [] (map write-event event)) children)
          (call-rescue (write-event event) children))))))

;The function that you will call to write the event
(def write-alert (write "/var/www/alerts" "http://alerts.example.com"))

; Truncates the String (s) at the specified number of characters (n)
(defn trunc [s n] (subs s 0 (min (count s) n)))

; The formatter that slack will use when there isn't a link
(defn default-formatter [e] {:text (str "*Host*: " (:host e) "\n*Service*: " (:service e) "\n*Metric*: " (:metric e) "\n*Description*: " (trunc (:description e) 100))})
; The formatter that slack will use when there is a link
(defn url-formatter [e] {:text (str "*Host*: " (:host e) "\n*Service*: " (:service e ) "\n*Metric*: " (:metric e) "\n*Link*: " (:url e) "\n*Description*: " (trunc (:description e) 100))})

; Chooses which formatter slack will use
(defn slack-formatter [e]
  (if (nil? (:url e))
    (default-formatter e)
    (url-formatter e)))

(def notify-slack (slack {:account "{% raw %}{{Your Account}}{% endraw %}"
                          :token "{% raw %}{{Your Token}}{% endraw %}"}
                         {:username "Riemann"
                          :channel "#Alerts"
                          :icon ":rotating_light:"
                          :formatter slack-formatter}))

; Expire old events from the index every 5 seconds.
(periodically-expire 5)

(let [index (index)]
  ; Inbound events will be passed to these streams:
  (streams
    index

  (default {:severity 4}
    (where (and (service #".* logs$")(= (:environment event) "production"))
      (by [:service]
        (where (< (:severity event) 4)
          (throttle 2 3600
            (write-log notify-slack))))))))

What that config will do is send alerts to the Alerts channel of your Slack when any events are placed into Riemann that end with logs. They are limited to no more than 2 messages per hour per service.