From aaaf13c8940e86d688293ab8d7a3106cfd70f993 Mon Sep 17 00:00:00 2001 From: Blerim Sheqa Date: Tue, 31 Jan 2017 14:37:08 +0100 Subject: [PATCH] Refactor statuspoller processing This commit introduces processing of the JSON returned by /v1/status. Perfdata that is represented as a string will be ignored/dropped. Features that are not enabled are not be forwarded to ES. --- beater/eventstream.go | 7 ++- beater/statuspoller.go | 105 +++++++++++++++++++++++++++-------------- 2 files changed, 72 insertions(+), 40 deletions(-) diff --git a/beater/eventstream.go b/beater/eventstream.go index 21bde10f..2d3110e3 100644 --- a/beater/eventstream.go +++ b/beater/eventstream.go @@ -36,8 +36,8 @@ func NewEventstream(bt *Icingabeat, cfg config.Config) *Eventstream { return eventstream } -// BuildEvent ... -func BuildEvent(e []byte) common.MapStr { +// BuildEventstreamEvent ... +func BuildEventstreamEvent(e []byte) common.MapStr { var event common.MapStr var icingaEvent map[string]interface{} @@ -124,7 +124,6 @@ func (es *Eventstream) Run() error { URL.RawQuery = parameters.Encode() for { - ticker := time.NewTicker(es.config.Eventstream.RetryInterval) response, responseErr := requestURL(es.icingabeat, "POST", URL) @@ -148,7 +147,7 @@ func (es *Eventstream) Run() error { logp.Err("Error reading line %#v", err) } - es.icingabeat.client.PublishEvent(BuildEvent(line)) + es.icingabeat.client.PublishEvent(BuildEventstreamEvent(line)) logp.Info("Event sent") } diff --git a/beater/statuspoller.go b/beater/statuspoller.go index ac52fccd..0d90b970 100644 --- a/beater/statuspoller.go +++ b/beater/statuspoller.go @@ -2,7 +2,6 @@ package beater import ( "encoding/json" - "io" "io/ioutil" "net/url" "strconv" @@ -33,6 +32,71 @@ func NewStatuspoller(bt *Icingabeat, cfg config.Config) *Statuspoller { return statuspoller } +// BuildStatusEvents ... +func BuildStatusEvents(body []byte) []common.MapStr { + var statusEvents []common.MapStr + var icingaStatus map[string]interface{} + + if err := json.Unmarshal(body, &icingaStatus); err != nil { + logp.Warn("Unmarshal problem %v", err) + } + + for _, result := range icingaStatus { + for _, status := range result.([]interface{}) { + + event := common.MapStr{} + for key, value := range status.(map[string]interface{}) { + + switch key { + case "status": + for _, statusvalue := range value.(map[string]interface{}) { + switch statusvalue.(type) { + case map[string]interface{}: + if len(statusvalue.(map[string]interface{})) > 0 { + + event.Put(key, value) + } + + default: + event.Put(key, value) + } + + } + + case "perfdata": + for _, perfdata := range value.([]interface{}) { + switch perfdata.(type) { + case string: + logp.Debug("Perfdata is a string, skipping. (%v)", perfdata.(string)) + + case interface{}: + key = "perfdata." + perfdata.(map[string]interface{})["label"].(string) + value = perfdata + event.Put(key, value) + + } + } + + case "name": + key = "type" + value = "icingabeat.status." + strings.ToLower(value.(string)) + event.Put(key, value) + + default: + event.Put(key, value) + } + } + + if statusAvailable, _ := event.HasKey("status"); statusAvailable == true { + event.Put("@timestamp", common.Time(time.Now())) + statusEvents = append(statusEvents, event) + } + } + } + + return statusEvents +} + // Run evenstream receiver func (sp *Statuspoller) Run() error { host := sp.config.Host + ":" + strconv.Itoa(sp.config.Port) @@ -48,7 +112,6 @@ func (sp *Statuspoller) Run() error { for { ticker := time.NewTicker(sp.config.Statuspoller.Interval) response, responseErr := requestURL(sp.icingabeat, "GET", URL) - var event common.MapStr if responseErr == nil { body, err := ioutil.ReadAll(response.Body) @@ -56,41 +119,12 @@ func (sp *Statuspoller) Run() error { logp.Warn("Response body invalid: %v", err) } - if err := json.Unmarshal(body, &event); err != nil { - logp.Info("Unmarshal problem %v", err) + processedStatusEvents := BuildStatusEvents(body) + sp.icingabeat.client.PublishEvents(processedStatusEvents) + logp.Info("Events sent: %v", len(processedStatusEvents)) - if err == io.ErrUnexpectedEOF || err == io.EOF { - break - } - continue - } - - for _, result := range event { - switch statustype := result.(type) { - case []interface{}: - - for _, status := range statustype { - statusevent := common.MapStr{ - "@timestamp": common.Time(time.Now()), - } - for key, value := range status.(map[string]interface{}) { - if key != "perfdata" { - if key == "name" { - documentType := strings.ToLower(value.(string)) - statusevent.Put("type", "icingabeat.status."+documentType) - } else { - statusevent.Put(key, value) - } - } - } - - sp.icingabeat.client.PublishEvent(statusevent) - logp.Info("Event sent") - } - } - } } else { - logp.Info("Error connecting to API: %v", responseErr) + logp.Err("Error connecting to API: %v", responseErr) } select { @@ -99,7 +133,6 @@ func (sp *Statuspoller) Run() error { case <-ticker.C: } } - return nil } // Stop statuspoller