/ clojure

Writing a Riemann metric to a file

At work, I recently setup Riemann for monitoring. Riemann is a monitoring tool that works on streams of events. It includes many powerful tools to work on streams of events. As an example, it has a ddt function that will differentiate the eventʼs metrics over time. This allows you get to a rate of change for a counter. While Riemann includes many powerful tools, your Riemann config file is a Clojure program so youʼre able to extend Riemann by simply modifying you config file.

I had such an occasion to do such a thing this week. We send all of our error level log messages from Logstash to Riemann in order to alert us when we need to check the logs. Doing this is a fairly simple process, we use slack and there is a built in function to send alerts to Slack. While we could send the whole log message to Slack, this isnʼt ideal for us. Our log messages can be quite long, many thousands of characters and sending that to Slack makes for a fairly bad experience. What we decided to do instead was write the full metric to a file and link to that file in the Slack message. Unfortunately there isnʼt really a built in way to do this in Riemann. You could write the messages to Riemannʼs log file but that isnʼt what we are looking for here as that results in a single large log file rather than individual log files.

What I decided to do was create a function that would write out the message to a file with name set to the messages sha-256 hash. Generating the has was the most complicated part of this. The complication arose from my lack of knowledge on the various libraries that can generate a hash. The way that I figured this out was by Googling variations on Clojure/Java sha-256 hashes and then trying them at the Clojure REPL on a checkout of the Riemann source. Unfortunately neither of the Clojure hashing libraries are included in Riemann but, I was able to find a java package that Riemann includes that is able to generate hashes, Apache Commons. I likely would have known that if I had more experience with the Java ecosystem but I donʼt. So here is what I came up with.

(ns riemann.config
  (:require [clojure.java.io :as io]))
  (:import (org.apache.commons.codec.binary.Base64)
           (java.security.MessageDigest)))

; A couple of helper functions to shuffle data between strings and byte arrays
(defn base64-encode [^String v] (Base64/encodeBase64 (.getBytes v)))
(defn base64-decode [^bytes b] (Base64/decodeBase64 b))

; The hashing function
(defn digest [^bytes b]
  (.digest (doto (MessageDigest/getInstance "SHA-256")
             (.reset)
             (.update b))))

(defn as-hex [v]
  (-> (Integer/toString (+ 0x100 (bit-and 0xff v)) 16) (.substring 1)))

; Gets the sha-256 of a passed in string
(defn get-sha256 [v]
  (apply str (map as-hex (-> (base64-encode v) (base64-decode) (digest)))))

; Returns a function that writes an event to the specified directory as <sha-256>.txt and
; modifies the event to include {:url <url>/<sha-256>.txt}
(defn write [directory url]
  (fn [& children]
    (fn stream [event]
      (let [write-event (fn [individual-event]
                          (let [contents (reduce (fn [r x] (str r "\n" (x 0) ": " (x 1)))
                                                 (conj (seq individual-event) "Message Contents"))]
                            (let [sha (get-sha256 contents)]
                              (with-open [wrtr (io/writer (str directory "/" sha ".txt"))]
                                (.write wrtr contents)
                                (conj individual-event {:url (str url "/" sha ".txt")})))))]
        (if (vector? event)
          (call-rescue (into [] (map write-event event)) children)
          (call-rescue (write-event event) children))))))

Then all you need to do is define the function that you will use on your steams. Something like (def write-log (write "/var/www/alerts" "https://alerts.example.com")) would work where /var/www/alerts is the content directory for alerts.example.com. To include the the link in your Slack alert, youʼll need to provide a custom formatter that includes a link. Here is what we use:

; Truncates the String (s) at the specified number of characters (n)
(defn trunc [s n] (subs s 0 (min (count s) n)))

; The formatter that slack will use when there isn't a link
(defn default-formatter [e] {:text (str "*Host*: " (:host e) "\n*Service*: " (:service e) "\n*Metric*: " (:metric e) "\n*Description*: " (trunc (:description e) 100))})
; The formatter that slack will use when there is a link
(defn url-formatter [e] {:text (str "*Host*: " (:host e) "\n*Service*: " (:service e ) "\n*Metric*: " (:metric e) "\n*Link*: " (:url e) "\n*Description*: " (trunc (:description e) 100))})

; Choses which formatter slack will use
(defn slack-formatter [e]
  (if (nil? (:url e))
    (default-formatter e)
    (url-formatter e)))

I know thats a lot to piece together, so here is a minimal Riemann config that should work, to show you how to use everything.

(ns riemann.config
  (:require [clojure.java.io :as io]))
  (:import (org.apache.commons.codec.binary.Base64)
           (java.security.MessageDigest)))

; A couple of helper functions to shuffle data between strings and byte arrays
(defn base64-encode [^String v] (Base64/encodeBase64 (.getBytes v)))
(defn base64-decode [^bytes b] (Base64/decodeBase64 b))

; The hashing function
(defn digest [^bytes b]
  (.digest (doto (MessageDigest/getInstance "SHA-256")
             (.reset)
             (.update b))))

(defn as-hex [v]
  (-> (Integer/toString (+ 0x100 (bit-and 0xff v)) 16) (.substring 1)))

; Gets the sha-256 of a passed in string
(defn get-sha256 [v]
  (apply str (map as-hex (-> (base64-encode v) (base64-decode) (digest)))))

; Returns a function that writes an event to the specified directory as <sha-256>.txt and modifies the event to include {:url <url>/<sha-256>.txt}
(defn write [directory url]
  (fn [& children]
    (fn stream [event]
      (let [write-event (fn [individual-event]
                          (let [contents (reduce (fn [r x] (str r "\n" (x 0) ": " (x 1)))
                                                 (conj (seq individual-event) "Message Contents"))]
                            (let [sha (get-sha256 contents)]
                              (with-open [wrtr (io/writer (str directory "/" sha ".txt"))]
                                (.write wrtr contents)
                                (conj individual-event {:url (str url "/" sha ".txt")})))))]
        (if (vector? event)
          (call-rescue (into [] (map write-event event)) children)
          (call-rescue (write-event event) children))))))

;The function that you will call to write the event
(def write-alert (write "/var/www/alerts" "http://alerts.example.com"))

; Truncates the String (s) at the specified number of characters (n)
(defn trunc [s n] (subs s 0 (min (count s) n)))

; The formatter that slack will use when there isn't a link
(defn default-formatter [e] {:text (str "*Host*: " (:host e) "\n*Service*: " (:service e) "\n*Metric*: " (:metric e) "\n*Description*: " (trunc (:description e) 100))})
; The formatter that slack will use when there is a link
(defn url-formatter [e] {:text (str "*Host*: " (:host e) "\n*Service*: " (:service e ) "\n*Metric*: " (:metric e) "\n*Link*: " (:url e) "\n*Description*: " (trunc (:description e) 100))})

; Choses which formatter slack will use
(defn slack-formatter [e]
  (if (nil? (:url e))
    (default-formatter e)
    (url-formatter e)))

(def notify-slack (slack {:account "{% raw %}{{Your Account}}{% endraw %}"
                          :token "{% raw %}{{Your Token}}{% endraw %}"}
                         {:username "Riemann"
                          :channel "#Alerts"
                          :icon ":rotating_light:"
                          :formatter slack-formatter}))

; Expire old events from the index every 5 seconds.
(periodically-expire 5)

(let [index (index)]
  ; Inbound events will be passed to these streams:
  (streams
    index

  (default {:severity 4}
    (where (and (service #".* logs$")(= (:environment event) "production"))
      (by [:service]
        (where (< (:severity event) 4)
          (throttle 2 3600
            (write-log notify-slack))))))))

What that config will do is send alerts to the Alerts channel of your Slack when any events are placed into Riemann that end with logs. They are limited to no more than 2 messages per hour per service.