Refactor statuspoller processing

This commit introduces processing of the JSON returned by /v1/status.
Perfdata that is represented as a string will be ignored/dropped.
Features that are not enabled are not be forwarded to ES.
This commit is contained in:
Blerim Sheqa 2017-01-31 14:37:08 +01:00
parent d80cbaf23f
commit aaaf13c894
2 changed files with 72 additions and 40 deletions

View File

@ -36,8 +36,8 @@ func NewEventstream(bt *Icingabeat, cfg config.Config) *Eventstream {
return eventstream
}
// BuildEvent ...
func BuildEvent(e []byte) common.MapStr {
// BuildEventstreamEvent ...
func BuildEventstreamEvent(e []byte) common.MapStr {
var event common.MapStr
var icingaEvent map[string]interface{}
@ -124,7 +124,6 @@ func (es *Eventstream) Run() error {
URL.RawQuery = parameters.Encode()
for {
ticker := time.NewTicker(es.config.Eventstream.RetryInterval)
response, responseErr := requestURL(es.icingabeat, "POST", URL)
@ -148,7 +147,7 @@ func (es *Eventstream) Run() error {
logp.Err("Error reading line %#v", err)
}
es.icingabeat.client.PublishEvent(BuildEvent(line))
es.icingabeat.client.PublishEvent(BuildEventstreamEvent(line))
logp.Info("Event sent")
}

View File

@ -2,7 +2,6 @@ package beater
import (
"encoding/json"
"io"
"io/ioutil"
"net/url"
"strconv"
@ -33,6 +32,71 @@ func NewStatuspoller(bt *Icingabeat, cfg config.Config) *Statuspoller {
return statuspoller
}
// BuildStatusEvents ...
func BuildStatusEvents(body []byte) []common.MapStr {
var statusEvents []common.MapStr
var icingaStatus map[string]interface{}
if err := json.Unmarshal(body, &icingaStatus); err != nil {
logp.Warn("Unmarshal problem %v", err)
}
for _, result := range icingaStatus {
for _, status := range result.([]interface{}) {
event := common.MapStr{}
for key, value := range status.(map[string]interface{}) {
switch key {
case "status":
for _, statusvalue := range value.(map[string]interface{}) {
switch statusvalue.(type) {
case map[string]interface{}:
if len(statusvalue.(map[string]interface{})) > 0 {
event.Put(key, value)
}
default:
event.Put(key, value)
}
}
case "perfdata":
for _, perfdata := range value.([]interface{}) {
switch perfdata.(type) {
case string:
logp.Debug("Perfdata is a string, skipping. (%v)", perfdata.(string))
case interface{}:
key = "perfdata." + perfdata.(map[string]interface{})["label"].(string)
value = perfdata
event.Put(key, value)
}
}
case "name":
key = "type"
value = "icingabeat.status." + strings.ToLower(value.(string))
event.Put(key, value)
default:
event.Put(key, value)
}
}
if statusAvailable, _ := event.HasKey("status"); statusAvailable == true {
event.Put("@timestamp", common.Time(time.Now()))
statusEvents = append(statusEvents, event)
}
}
}
return statusEvents
}
// Run evenstream receiver
func (sp *Statuspoller) Run() error {
host := sp.config.Host + ":" + strconv.Itoa(sp.config.Port)
@ -48,7 +112,6 @@ func (sp *Statuspoller) Run() error {
for {
ticker := time.NewTicker(sp.config.Statuspoller.Interval)
response, responseErr := requestURL(sp.icingabeat, "GET", URL)
var event common.MapStr
if responseErr == nil {
body, err := ioutil.ReadAll(response.Body)
@ -56,41 +119,12 @@ func (sp *Statuspoller) Run() error {
logp.Warn("Response body invalid: %v", err)
}
if err := json.Unmarshal(body, &event); err != nil {
logp.Info("Unmarshal problem %v", err)
processedStatusEvents := BuildStatusEvents(body)
sp.icingabeat.client.PublishEvents(processedStatusEvents)
logp.Info("Events sent: %v", len(processedStatusEvents))
if err == io.ErrUnexpectedEOF || err == io.EOF {
break
}
continue
}
for _, result := range event {
switch statustype := result.(type) {
case []interface{}:
for _, status := range statustype {
statusevent := common.MapStr{
"@timestamp": common.Time(time.Now()),
}
for key, value := range status.(map[string]interface{}) {
if key != "perfdata" {
if key == "name" {
documentType := strings.ToLower(value.(string))
statusevent.Put("type", "icingabeat.status."+documentType)
} else {
statusevent.Put(key, value)
}
}
}
sp.icingabeat.client.PublishEvent(statusevent)
logp.Info("Event sent")
}
}
}
} else {
logp.Info("Error connecting to API: %v", responseErr)
logp.Err("Error connecting to API: %v", responseErr)
}
select {
@ -99,7 +133,6 @@ func (sp *Statuspoller) Run() error {
case <-ticker.C:
}
}
return nil
}
// Stop statuspoller