icingabeat/beater/statuspoller.go

153 lines
3.6 KiB
Go

package beater
import (
"encoding/json"
"io/ioutil"
"net/url"
"strconv"
"strings"
"time"
"github.com/icinga/icingabeat/config"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
)
// Statuspoller type
type Statuspoller struct {
icingabeat *Icingabeat
config config.Config
done chan struct{}
}
// NewStatuspoller ...
func NewStatuspoller(bt *Icingabeat, cfg config.Config) *Statuspoller {
statuspoller := &Statuspoller{
icingabeat: bt,
done: make(chan struct{}),
config: cfg,
}
return statuspoller
}
// BuildStatusEvents ...
func BuildStatusEvents(body []byte) []beat.Event {
var statusEvents []beat.Event
var icingaStatus map[string]interface{}
if err := json.Unmarshal(body, &icingaStatus); err != nil {
logp.Warn("Unmarshal problem %v", err)
}
for _, result := range icingaStatus {
for _, status := range result.([]interface{}) {
var event beat.Event
event.Fields = common.MapStr{}
event.Timestamp = time.Now()
for key, value := range status.(map[string]interface{}) {
switch key {
case "status":
for _, statusvalue := range value.(map[string]interface{}) {
switch statusvalue.(type) {
case map[string]interface{}:
if len(statusvalue.(map[string]interface{})) > 0 {
for key, value := range value.(map[string]interface{}) {
if key == "api" {
// "zones" can include a massive amount of data, depending
// on the number of connected agents and satellites
// since enough data is included in other keys, we're
// removing "zones" explicitly
delete(value.(map[string]interface{}), "zones")
}
}
event.Fields.Put(target_key+key, value)
}
default:
event.Fields.Put(target_key+key, value)
}
}
case "perfdata":
for _, perfdata := range value.([]interface{}) {
switch perfdata.(type) {
case string:
logp.Debug("Perfdata is a string, skipping. (%v)", perfdata.(string))
case interface{}:
key = "perfdata." + perfdata.(map[string]interface{})["label"].(string)
value = perfdata
event.Fields.Put(target_key+key, value)
}
}
case "name":
value = "icingabeat.status." + strings.ToLower(value.(string))
event.Fields.Put("type", value)
default:
event.Fields.Put(target_key+key, value)
}
}
if statusAvailable, _ := event.Fields.HasKey(target_key + "status"); statusAvailable == true {
statusEvents = append(statusEvents, event)
}
}
}
return statusEvents
}
// Run evenstream receiver
func (sp *Statuspoller) Run() error {
host := sp.config.Host + ":" + strconv.Itoa(sp.config.Port)
var URL *url.URL
URL, err := url.Parse("https://" + host)
if err != nil {
logp.Info("Invalid request URL")
}
URL.Path += "/v1/status"
for {
ticker := time.NewTicker(sp.config.Statuspoller.Interval)
response, responseErr := requestURL(sp.icingabeat, "GET", URL)
if responseErr == nil {
body, err := ioutil.ReadAll(response.Body)
if err != nil {
logp.Warn("Response body invalid: %v", err)
}
processedStatusEvents := BuildStatusEvents(body)
sp.icingabeat.client.PublishAll(processedStatusEvents)
logp.Debug("icingabeat.statuspoller", "Events sent: %v", len(processedStatusEvents))
} else {
logp.Err("Error connecting to API: %v", responseErr)
}
select {
case <-sp.done:
defer response.Body.Close()
return nil
case <-ticker.C:
}
}
}
// Stop statuspoller
func (sp *Statuspoller) Stop() {
close(sp.done)
}