A network event stream processing system, in Clojure. Riemann - A network monitoring system

Count riemann events in given time window

In riemann config for specific service I'm trying to assign to all its events metric=1, sum them within 5sec and send the result to influxdb.

I gave up with following:

  (where (service "offers")
    (fixed-time-window 5
      (smap folds/sum (with :metric 1 index))))

it doesn't really work, events stored in influx do not match this rule. any hint?

Source: (StackOverflow)

riemann.io add jar to classpath

I have written custom clojure functions that I want to use in my riemann configuration. I am using leiningen to build jar file (with dependencies) containing my functions. What is the right way to include this jar file in the classpath when starting riemann ?

Source: (StackOverflow)

Riemann Context for Hadoop to send metrics to Riemann using metrics2 interface

Is there a library which can be integrated with the different Hadoop components (Namenode, datanode, jobtracker, tasktracker) as well as the Hadoop 2 components (Resource Manager) to send metrics to Riemann?

Source: (StackOverflow)

How to integrate riemann into the dropwizard to capture metrics?

I have a dropwizard application which emits yammer metrics and can be monitored via a URL like http://localhost:8081/admin/metrics which gives the result in form of jsons.

I want to send these monitor these metrics in riemann and I have no idea on how to start. I went through the riemann-java-client which has a RiemannReporter class for yammer metrics but I do not how to use this in my application.

How to integrate this client into my application or how to capture jsons from the url and send these as events to riemann server?

Source: (StackOverflow)

ERROR: Failed to buld gem native extension?

I am installing riemann monitoring tool on my PC. I run the following command from terminal:


sudo gem install riemann-tools

I get the following error:

ERROR:  Error installing riemann-tools:
        ERROR: Failed to build gem native extension.

            /usr/bin/ruby1.9.1 extconf.rb
    /usr/lib/ruby/1.9.1/rubygems/custom_require.rb:36:in `require': cannot load such file -- mkmf (LoadError)
        from /usr/lib/ruby/1.9.1/rubygems/custom_require.rb:36:in `require'
        from extconf.rb:1:in `<main>'

A similar error comes when installing riemann-dash but riemann-client get installed successfully.

I am using Ububtu 12.10. I read online about such errors and it had something with ruby version, but I am not able to figure out the reason for this.

Any help is appreciated.

Source: (StackOverflow)

How to refactor this clojure / riemann code

I'm discovering/learning Clojure with Riemann and I have written the following code to aggregate my CPU metrics per host :

 (by [:host]
     smap (aggregate-cpu-metrics "user" folds/mean)
     smap (aggregate-cpu-metrics "nice" folds/mean)
     smap (aggregate-cpu-metrics "system" folds/mean)
     smap (aggregate-cpu-metrics "idle" folds/mean)
     smap (aggregate-cpu-metrics "wait" folds/mean)
     smap (aggregate-cpu-metrics "interrupt" folds/mean)
     smap (aggregate-cpu-metrics "softirq" folds/mean)
     smap (aggregate-cpu-metrics "steal" folds/mean)))

(defn aggregate-cpu-metrics
  [name, aggr]
  (where (service (re-pattern (str "cpu-[0-9]+ " name)))
      (coalesce 10
          (smap aggr
              (with :service (str "cpu-average " name) reinject)))))

To explain the code a little bit, I'm receiving events like these :

  • :service "cpu-0 idle" :metric 58.23
  • :service "cpu-1 idle" :metric 98.11
  • :service "cpu-2 idle" :metric 12.23

And my goal is to calculate the average and to reinject this event in riemann :

  • :service "cpu-average" :metric 56.19

It's working, that's not the problem. But as you can see in lines 3 to 10, there is a lot of duplicate code here. I'm looking for a way to refactor this code, but I'm stuck.

I would like to define a vector with my metrics names :

(def cpu-metrics ["user", "nice", "system", "idle", "interrupt", "softirq", "steal"])

...and to use it to call smap(aggregate-cpu-metrics...

But I don't know how to do that. I've tried map or doseq, but without any success.

How would you do it ?

(Update / Solution) :

Here is my refactored version, after reading Arthur's answer.

  (service #"^cpu-[0-9]+ ")
   [:service #(clojure.string/replace % #"^cpu-[0-9]+" "cpu-average")]
   (by [:host :service]
       (fixed-time-window 10 (smap folds/mean reinject))))))

Source: (StackOverflow)

clojure Riemann project collectd

I am trying to do a custom configuration apparently simple using Riemann and Collectd. Basically I'd like to calculate the ratio between two streams. In order to do that I tried something like (as in Rieamann API project suggestion here):

(project [(service "cahe-miss")
      (service "cache-all")]
  (smap folds/quotient
    (with :service "ratio"

Which apparently works, but after a while I noticed some of the results where miss calculated. After log debugging I finished with the following configuration in order to see what's happening and proint the values:

(project [(service "cache-miss")
          (service "cache-all")]
  (fn [[miss all]]
    (if (or (nil? miss) (nil? all)) 
      (do nil) 
      (do (where (= (:time miss) (:time all))
        ;to print time marks
        (println (:time all)) 
        (println (:time miss))
        ; to distinguish easily each event
        (println "NEW LINE") 

My surprise is that each time I get new data from collectd (every 10 seconds) the function I created is executed twice, like reusing previous unused data, and more over it looks like it doesn't care at all about my time equality constraint in the (where (= :time....) clasue. The problem is than I am dividing metrics with different time stamp. Below some ouput of the previous code:

NEW LINE -- First time I get data
NEW LINE -- Second time I get data
NEW LINE -- Third time I get data

Is there anyone that can give a hint on how to get the data formatted as I expected? I assume there is something I am not understading about the "project" function. Or something related to how incoming data is processed in riemann.

Thanks in advance!


I managed to solve my problem but still I don't have a clear idea of how it works, however I managed to do so. Right now I am receiving two different streams from collectd tail plugin (from nginx logs) and I managed to make the quotient between them as it follows:

(where (or (service "nginx/counter-cacheHit") (service "nginx/counter-cacheAll"))
        (smap folds/quotient (with :service "cacheHit" (scale (* 1 100) index)))))

I have tested it widely and up to now it produces the right results. However I still don't understand several things... First, how it is that coalesce only returns data after both events are processed. Collectd sends the events of the both streams every two seconds with the same time mark, usin "project" instead of "coalesce" resulted in two different executions of smap each two seconds (one for each event), however coalesce results only with one execution of smap with the two events with the same time mark, which is exactly what I wanted.

Finally, I don't know which is the criteria to choose which is the numerator and denominator. Is it becaouse of the "or" clauses in "where" clause?

Anyway, with some blackmagic behind it but I managed to solve my problem ;^)

Thank you all!

Source: (StackOverflow)

riemann email setup issue

I am trying to set up riemann (for monitoring) with email alerts. I have used the following section in my riemann.config file but after reloading the config, I get the error copied below. Any thoughts on troubleshooting this will be greatly appreciated.


 (where (and (service #"^riemann netty execution-handler")
             (state "critical"))
        (email "user@somewhere.com")))


java.lang.RuntimeException: Unable to resolve symbol: email in this context, compiling:(/home/user/test/riemann-0.2.4/etc/riemann.config:40:9)

I forgot to add the mailer section in the config - update to follow.

The attempt to send the email is successful - I am getting a different error now which I will post as a separate question (riemann email exception with SMTP).

Source: (StackOverflow)

Number of prime number between 1 and n [closed]

I have been surfing on the internet and found an interesting video in which is mentioned that you can find number of primes between 1 and any number n using Riemann hypothesis and Riemann zeta function. My math knowledge is not this high and I don't understand how, using zeta function, can one find number of primes.

I wanted to write a program that takes one number as input and outputs number of primes to that number, which is calculated using aforemention zeta function, but I have no idea where to start learning. Please know that I'm 17 years old and have always loved math and programming but this is something totaly new to me. Any help is apreciated.

Source: (StackOverflow)

Clojure:riemann.streams$smap$stream IllegalArgumentException: Key must be integer

I have a clojure code(riemann) to send an email if certain condition was met. I am facing some issue while passing the event to riemann server.

Riemann code

(let [email (mailer {"......"})]

    (where (service "system_log")

        (by :RefNo
          (fn [events]
           (let [count-of-failures (count (filter #(= "Failed" (:Status %)) events))]        
              (assoc (first events)
                :status "Failure"
                 :metric  count-of-failures 
                 :total-fail (>= count-of-failures 2))))

          (where (and (= (:status event) "Failure")
                      (:total-fail event))

            (email "XXXXX@gmail.com"))prn)))))

O/P in riemann server

WARN [2015-11-18 05:24:49,596] defaultEventExecutorGroup-2-2 - riemann.streams - riemann.streams$smap$stream__3695@7addde9e threw
java.lang.IllegalArgumentException: Key must be integer
        at clojure.lang.APersistentVector.assoc(APersistentVector.java:335)
        at clojure.lang.APersistentVector.assoc(APersistentVector.java:18)

Update 2:

I simply changed the smap to sreduce. How I should update, since I am newbie to this I am little bit confused about altering the code as per your suggestion

(let [email (mailer {"......"})]

        (where (service "system_log")

            (by :RefNo
              (fn [events]
               (let [count-of-failures (count (filter #(= "Failed" (:Status %)) events))]        
                  (assoc (first events)
                    :status "Failure"
                     :metric  count-of-failures 
                     :total-fail (>= count-of-failures 2))))

              (where (and (= (:status event) "Failure")
                          (:total-fail event))

                (email "XXXXX@gmail.com"))prn)))))

Update 3:

I have updated my code using coalesce and smap has its child. Now its not showing any error but email didn't get triggered. I am getting count-of-failures as 0. I guess count function is not working.

(let [email (mailer {"......"})]

            (where (service "system_log")

                (by :RefNo
                  (fn [events]
                   (let [count-of-failures (count (filter #(= "Failed" (:status %)) events))]        
                      (assoc (first events)
                        :status "Failure"
                         :metric  count-of-failures 
                         :total-fail (>= count-of-failures 2))))

                  (where (and (= (:status event) "Failure")
                              (:total-fail event))

                    (email "XXXXX@gmail.com"))))prn))))

Source: (StackOverflow)

Find a string in Riemann

I want to find the string in sentence using clojure in riemann. I have wrote the code using re-matches but I am facing some error while executing it.

     (fn [events]
         (let [count-of-failures (count (re-matches #("POST*" (:Webservice %)) events))]

This is the error I got

java.lang.ClassCastException: riemann.config$eval96$fn__97$fn__98 cannot be cast to java.util.regex.Pattern
    at clojure.core$re_matcher.invoke(core.clj:4460)
    at clojure.core$re_matches.invoke(core.clj:4497)
    at riemann.config$eval96$fn__97.invoke(riemann_v1.config:31)
    at riemann.streams$smap$stream__3695.invoke(streams.clj:161)
    at riemann.streams$fixed_time_window_fn$stream__3946$fn__3979.invoke(streams.clj:381)
    at riemann.streams$fixed_time_window_fn$stream__3946.invoke(streams.clj:381)
    at riemann.config$eval96$stream__145$fn__150.invoke(riemann_v1.config:25)
    at riemann.config$eval96$stream__145.invoke(riemann_v1.config:25)
    at riemann.core$stream_BANG_$fn__5678.invoke(core.clj:19)
    at riemann.core$stream_BANG_.invoke(core.clj:18)
    at riemann.transport$handle.invoke(transport.clj:159)
    at riemann.transport.tcp$tcp_handler.invoke(tcp.clj:93)
    at riemann.transport.tcp$gen_tcp_handler$fn__5904.invoke(tcp.clj:65)
    at riemann.transport.tcp.proxy$io.netty.channel.ChannelInboundHandlerAdapter$ff19274a.channelRead(Unknown Source)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
    at io.netty.channel.AbstractChannelHandlerContext.access$700(AbstractChannelHandlerContext.java:32)
    at io.netty.channel.AbstractChannelHandlerContext$8.run(AbstractChannelHandlerContext.java:324)
    at io.netty.util.concurrent.DefaultEventExecutor.run(DefaultEventExecutor.java:36)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)

Source: (StackOverflow)

Comparing event and adding new field in Riemann

I am facing one scenario. Let assume I have 3 events A, B and C which I am passing to Riemann.

Event A have following fields :ExamNo 3890 :ExamResult Pass :Rank 8
Event B have following fields :ExamNo 3890 :ExamResult Pass :Rank 5
Event C have following fields :ExamNo 3890 :ExamResult Fail :Rank 0

I need to compare the events based on ExamNo and whenver there is a change in ExamResult I need to add a new fields to event {:Eligible, :Grade}. I wrote the code to compare the events but the new field was not getting added to the event.

(let [examindex (default :ttl 300 (update-index (index)))]

  (where (service "Result")
          (fn [events]
        (changed :ExamResult))
            {    :Eligible :Eligible-Status
                 :Grade     :GradeValue

Since I am newbie to Riemann . I couldn't figure out the issue.

Source: (StackOverflow)

Unable to install logtash contrib plugins?

I want to use logstash contrib plugin riemann in my config file. On running logstash error comes:

 An unexpected error occurred. This is probably a bug.   |
| You can find help with this problem in a few places:    |
|                                                         |
| * chat: #logstash IRC channel on freenode irc.          |
|     IRC via the web: http://goo.gl/TI4Ro                |
| * email: logstash-users@googlegroups.com                |
| * bug system: https://logstash.jira.com/                |
|                                                         |
The error reported is: 
  Couldn't find any output plugin named 'riemann'. Are you sure this is correct? Trying to load the riemann output plugin resulted in this error: no such file to load -- logstash/outputs/riemann

I have a folder inside which both the logstash and its contrib tar are present and extracted. I am using logstash 1.4.1 and logstash-contrib-1.4.1.

I tried the manual installation for contrib too by :

./bin/plugin install contrib

but nothing appears on the console on running the command.

Any help?


On ls the following is my directory structure:


    elasticsearch-1.1.1         kibana-3.1.0.tar.gz  logstash-1.4.1.tar.gz   logstash-contrib-1.4.1.tar.gz
    elasticsearch-1.1.1.tar.gz  logstash-1.4.1       logstash-contrib-1.4.1  riemann-0.2.5.tar.bz2

Thus I have untarred contrib in the same directory as logstash. Any IDEA??

Source: (StackOverflow)

riemann email exception with SMTP

I was able to set up riemann with SMTP. Riemann appears to send the email out when the condition is met but I observe an exception (copied below) - any insight on how to troubleshoot/fix this will be appreciated.


    (def email (mailer {:host "xxx.xxx.xxx.xxx"
                        :port "xxxx"
                        :user "user@somewhere.com"
                        :pass "user12345"
                        :from "user@somewhere.com"}))

 (where (and (service #"^riemann netty execution-handler"))
        (email "user@somewhere.com")))


#riemann.codec.Event{:host "ubuntu-3", :service "riemann netty execution-handler threads active", :state "ok", :description nil, :metric 0, :tags nil, :time 348380111059/250, :ttl 20000}
WARN [2014-02-27 12:00:44,278] Thread-10 - riemann.config - riemann.email$mailer$make_stream__16773$stream__16774@4e9c33e9 threw
java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Number
        at postal.smtp$smtp_send_STAR_.invoke(smtp.clj:33)
        at postal.smtp$smtp_send.doInvoke(smtp.clj:58)
        at clojure.lang.RestFn.invoke(RestFn.java:423)
        at postal.core$send_message.invoke(core.clj:35)
        at riemann.email$email_event.invoke(email.clj:18)
        at riemann.email$mailer$make_stream__16773$stream__16774.invoke(email.clj:69)
        at riemann.config$eval40$stream__41$fn__46.invoke(riemann.config:44)
        at riemann.config$eval40$stream__41.invoke(riemann.config:44)
        at riemann.core$stream_BANG_$fn__10513.invoke(core.clj:19)
        at riemann.core$stream_BANG_.invoke(core.clj:18)
        at riemann.core$instrumentation_service$measure__10522.invoke(core.clj:56)
        at riemann.service.ThreadService$thread_service_runner__8329$fn__8330.invoke(service.clj:64)
        at riemann.service.ThreadService$thread_service_runner__8329.invoke(service.clj:63)
        at clojure.lang.AFn.run(AFn.java:24)
        at java.lang.Thread.run(Thread.java:744)

Source: (StackOverflow)

Custom body message in riemann email

I am trying to create a custom message in the body section of email using riemann. I couldn't append the field dynamically.

Riemann config:

(let [email (mailer 
              {:host "XXXXX" :port XX :user "XXX" :pass "XXX" :auth "true"
               :subject (fn [events] "Team")
               :body (fn [events] 
                       (apply str "Hello Team, now the time is" (:timestamp event) "Thank You!"))
               :from "xxx@gmail.com"})]

My output:

Hello Team, now the time is Thank You!

My expected output:

Hello Team, now the time is 12:13:45 Thank You!.

My timestamp not getting appended in the :body.

Source: (StackOverflow)