Add filter configuration and improve URL handling
This commit is contained in:
parent
ae66968b98
commit
207fd524c4
|
@ -42,3 +42,15 @@ icingabeat:
|
||||||
types:
|
types:
|
||||||
- CheckResult
|
- CheckResult
|
||||||
- StateChange
|
- StateChange
|
||||||
|
|
||||||
|
# Event streams can be filtered by attributes using the prefix 'event.'
|
||||||
|
#
|
||||||
|
# Example for the CheckResult type with the exit_code set to 2:
|
||||||
|
# filter: "event.check_result.exit_status==2"
|
||||||
|
#
|
||||||
|
# Example for the CheckResult type with the service matching the string
|
||||||
|
# pattern "mysql*":
|
||||||
|
# filter: 'match("mysql*", event.service)'
|
||||||
|
#
|
||||||
|
# Comment out this line if you don't want to set a filter at all
|
||||||
|
filter: ""
|
||||||
|
|
|
@ -4,7 +4,8 @@ import (
|
||||||
"bufio"
|
"bufio"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"io"
|
"io"
|
||||||
"strings"
|
"net/url"
|
||||||
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -36,15 +37,34 @@ func NewEventstream(bt *Icingabeat, cfg config.Config) *Eventstream {
|
||||||
|
|
||||||
// Run evenstream receiver
|
// Run evenstream receiver
|
||||||
func (es *Eventstream) Run() error {
|
func (es *Eventstream) Run() error {
|
||||||
types := strings.Join(es.config.Eventstream.Types, "&types=")
|
queue := "icingabeat"
|
||||||
logp.Info(types)
|
host := es.config.Host + ":" + strconv.Itoa(es.config.Port)
|
||||||
|
var URL *url.URL
|
||||||
|
|
||||||
|
URL, err := url.Parse("https://" + host)
|
||||||
|
if err != nil {
|
||||||
|
logp.Info("Invalid request URL")
|
||||||
|
}
|
||||||
|
|
||||||
|
URL.Path += "/v1/events/"
|
||||||
|
|
||||||
|
parameters := url.Values{}
|
||||||
|
parameters.Add("queue", queue)
|
||||||
|
|
||||||
|
if es.config.Eventstream.Filter != "" {
|
||||||
|
parameters.Add("filter", es.config.Eventstream.Filter)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, eventType := range es.config.Eventstream.Types {
|
||||||
|
parameters.Add("types", eventType)
|
||||||
|
}
|
||||||
|
|
||||||
|
URL.RawQuery = parameters.Encode()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
|
|
||||||
ticker := time.NewTicker(es.config.RetryInterval)
|
ticker := time.NewTicker(es.config.RetryInterval)
|
||||||
response, responseErr := requestURL(
|
response, responseErr := requestURL(es.icingabeat, "POST", URL)
|
||||||
es.icingabeat,
|
|
||||||
"POST",
|
|
||||||
"/v1/events?queue=icingabeat&types="+types)
|
|
||||||
|
|
||||||
if responseErr == nil {
|
if responseErr == nil {
|
||||||
reader := bufio.NewReader(response.Body)
|
reader := bufio.NewReader(response.Body)
|
||||||
|
|
|
@ -3,13 +3,13 @@ package beater
|
||||||
import (
|
import (
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"net/url"
|
||||||
|
|
||||||
"github.com/elastic/beats/libbeat/logp"
|
"github.com/elastic/beats/libbeat/logp"
|
||||||
)
|
)
|
||||||
|
|
||||||
func requestURL(bt *Icingabeat, method, path string) (*http.Response, error) {
|
func requestURL(bt *Icingabeat, method string, URL *url.URL) (*http.Response, error) {
|
||||||
transport := &http.Transport{
|
transport := &http.Transport{
|
||||||
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
|
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
|
||||||
}
|
}
|
||||||
|
@ -18,12 +18,9 @@ func requestURL(bt *Icingabeat, method, path string) (*http.Response, error) {
|
||||||
Transport: transport,
|
Transport: transport,
|
||||||
}
|
}
|
||||||
|
|
||||||
url := fmt.Sprintf(
|
logp.Info("Requested URL: %v", URL.String())
|
||||||
"https://%s:%v%s",
|
|
||||||
bt.config.Host,
|
request, err := http.NewRequest(method, URL.String(), nil)
|
||||||
bt.config.Port,
|
|
||||||
path)
|
|
||||||
request, err := http.NewRequest(method, url, nil)
|
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logp.Info("Request: %v", err)
|
logp.Info("Request: %v", err)
|
||||||
|
|
|
@ -40,7 +40,6 @@ func (bt *Icingabeat) Run(b *beat.Beat) error {
|
||||||
bt.client = b.Publisher.Connect()
|
bt.client = b.Publisher.Connect()
|
||||||
|
|
||||||
eventstream = NewEventstream(bt, bt.config)
|
eventstream = NewEventstream(bt, bt.config)
|
||||||
logp.Info("hostname: %v", bt.config.Host)
|
|
||||||
go eventstream.Run()
|
go eventstream.Run()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
|
|
|
@ -17,7 +17,8 @@ type Config struct {
|
||||||
|
|
||||||
// EventstreamConfig optoins
|
// EventstreamConfig optoins
|
||||||
type EventstreamConfig struct {
|
type EventstreamConfig struct {
|
||||||
Types []string `config:"types"`
|
Types []string `config:"types"`
|
||||||
|
Filter string `config:"filter"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// DefaultConfig values
|
// DefaultConfig values
|
||||||
|
|
|
@ -43,6 +43,18 @@ icingabeat:
|
||||||
- CheckResult
|
- CheckResult
|
||||||
- StateChange
|
- StateChange
|
||||||
|
|
||||||
|
# Event streams can be filtered by attributes using the prefix event.
|
||||||
|
#
|
||||||
|
# Example for the CheckResult type with the exit_code set to 2:
|
||||||
|
# filter: "event.check_result.exit_status==2"
|
||||||
|
#
|
||||||
|
# Example for the CheckResult type with the service matching the string
|
||||||
|
# pattern "mysql*":
|
||||||
|
# filter: 'match("mysql*", event.service)'
|
||||||
|
#
|
||||||
|
# Comment out this line if you don't want to set a filter at all
|
||||||
|
filter: ""
|
||||||
|
|
||||||
#================================ General ======================================
|
#================================ General ======================================
|
||||||
|
|
||||||
# The name of the shipper that publishes the network data. It can be used to group
|
# The name of the shipper that publishes the network data. It can be used to group
|
||||||
|
|
|
@ -43,6 +43,18 @@ icingabeat:
|
||||||
- CheckResult
|
- CheckResult
|
||||||
- StateChange
|
- StateChange
|
||||||
|
|
||||||
|
# Event streams can be filtered by attributes using the prefix event.
|
||||||
|
#
|
||||||
|
# Example for the CheckResult type with the exit_code set to 2:
|
||||||
|
# filter: "event.check_result.exit_status==2"
|
||||||
|
#
|
||||||
|
# Example for the CheckResult type with the service matching the string
|
||||||
|
# pattern "mysql*":
|
||||||
|
# filter: 'match("mysql*", event.service)'
|
||||||
|
#
|
||||||
|
# Comment out this line if you don't want to set a filter at all
|
||||||
|
filter: ""
|
||||||
|
|
||||||
#================================ General =====================================
|
#================================ General =====================================
|
||||||
|
|
||||||
# The name of the shipper that publishes the network data. It can be used to group
|
# The name of the shipper that publishes the network data. It can be used to group
|
||||||
|
|
Loading…
Reference in New Issue