First attempt to retreive, parse and push events

This commit is contained in:
Blerim Sheqa 2016-12-08 17:19:25 +01:00
parent 4d119a92a6
commit 09dfabda0b
7 changed files with 145 additions and 42 deletions

1
.gitignore vendored
View File

@ -5,3 +5,4 @@
/icingabeat /icingabeat
/icingabeat.test /icingabeat.test
*.pyc *.pyc
logs/

View File

@ -5,3 +5,11 @@
icingabeat: icingabeat:
# Defines how often an event is sent to the output # Defines how often an event is sent to the output
period: 1s period: 1s
# Icinga 2 API endpoint
host: "localhost"
# Port of Icinga 2 API
port: 5665
# User for Icinga 2 API
user: "icinga"
# Password for the Icinga 2 API user
password: "icinga"

View File

@ -1,7 +1,14 @@
package beater package beater
import ( import (
"bufio"
"crypto/tls"
"encoding/json"
"fmt" "fmt"
"io"
"log"
"net/http"
"sync"
"time" "time"
"github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/beat"
@ -12,13 +19,56 @@ import (
"github.com/icinga/icingabeat/config" "github.com/icinga/icingabeat/config"
) )
// Icingabeat type
type Icingabeat struct { type Icingabeat struct {
done chan struct{}
config config.Config config config.Config
client publisher.Client client publisher.Client
closer io.Closer
mutex sync.Mutex
} }
// Creates beater func requestURL(icingabeat *Icingabeat, method, path string) *http.Response {
transport := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
client := &http.Client{
Transport: transport,
}
url := fmt.Sprintf("https://%s:%v%s", icingabeat.config.Host, icingabeat.config.Port, path)
request, err := http.NewRequest(method, url, nil)
if err != nil {
log.Fatalln(err)
}
request.Header.Add("Accept", "application/json")
request.SetBasicAuth(icingabeat.config.User, icingabeat.config.Password)
response, err := client.Do(request)
if err != nil {
log.Fatalln(err)
}
return response
}
func apiStatus(icingabeat *Icingabeat) bool {
response := requestURL(icingabeat, "GET", "/v1/status")
if response.StatusCode == 200 {
return true
}
log.Println("Request:", response.Request.URL)
log.Fatalln("Error:", response.Status)
return false
}
// New beater
func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) { func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {
config := config.DefaultConfig config := config.DefaultConfig
if err := cfg.Unpack(&config); err != nil { if err := cfg.Unpack(&config); err != nil {
@ -26,35 +76,71 @@ func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {
} }
bt := &Icingabeat{ bt := &Icingabeat{
done: make(chan struct{}),
config: config, config: config,
} }
return bt, nil return bt, nil
} }
// Run Icingabeat
func (bt *Icingabeat) Run(b *beat.Beat) error { func (bt *Icingabeat) Run(b *beat.Beat) error {
logp.Info("icingabeat is running! Hit CTRL-C to stop it.") logp.Info("icingabeat is running! Hit CTRL-C to stop it.")
bt.client = b.Publisher.Connect() bt.client = b.Publisher.Connect()
ticker := time.NewTicker(bt.config.Period)
if apiStatus(bt) {
logp.Info("API is here!!!")
} else {
logp.Info("API not available")
}
response := requestURL(bt, "POST", "/v1/events?queue=icingabeat&types=CheckResult")
reader := bufio.NewReader(response.Body)
bt.mutex.Lock()
bt.closer = response.Body
bt.mutex.Unlock()
for { for {
select { line, err := reader.ReadBytes('\n')
case <-bt.done: if err != nil {
return nil bt.mutex.Lock()
case <-ticker.C: tst := bt.closer == nil
bt.mutex.Unlock()
if tst {
break
}
logp.Err("Error reading line %#v", err)
} }
event := common.MapStr{ var event common.MapStr
"@timestamp": common.Time(time.Now()),
"type": b.Name, if err := json.Unmarshal(line, &event); err != nil {
"event": "icingabeat to come here", logp.Info("Unmarshal problem %v", err)
bt.mutex.Lock()
tst := bt.closer == nil
bt.mutex.Unlock()
if tst {
break
}
continue
} }
event["@timestamp"] = common.Time(time.Now())
event["type"] = b.Name
bt.client.PublishEvent(event) bt.client.PublishEvent(event)
logp.Info("Event sent") logp.Info("Event sent")
} }
return nil
} }
// Stop Icingabeat
func (bt *Icingabeat) Stop() { func (bt *Icingabeat) Stop() {
bt.client.Close() bt.client.Close()
close(bt.done) bt.mutex.Lock()
if bt.closer != nil {
bt.closer.Close()
bt.closer = nil
}
bt.mutex.Unlock()
} }

View File

@ -5,10 +5,18 @@ package config
import "time" import "time"
// Config options
type Config struct { type Config struct {
Period time.Duration `config:"period"` Period time.Duration `config:"period"`
Host string `config:"host"`
Port int `config:"port"`
User string `config:"user"`
Password string `config:"password"`
} }
// DefaultConfig values
var DefaultConfig = Config{ var DefaultConfig = Config{
Period: 1 * time.Second, Period: 1 * time.Second,
Host: "localhost",
Port: 5665,
} }

View File

@ -5,6 +5,14 @@
icingabeat: icingabeat:
# Defines how often an event is sent to the output # Defines how often an event is sent to the output
period: 1s period: 1s
# Icinga 2 API endpoint
host: "localhost"
# Port of Icinga 2 API
port: 5665
# User for Icinga 2 API
user: "icinga"
# Password for the Icinga 2 API user
password: "icinga"
#================================ General ====================================== #================================ General ======================================

View File

@ -5,6 +5,14 @@
icingabeat: icingabeat:
# Defines how often an event is sent to the output # Defines how often an event is sent to the output
period: 1s period: 1s
# Icinga 2 API endpoint
host: "demo.icinga.com"
# Port of Icinga 2 API
port: 5665
# User for Icinga 2 API
user: "root"
# Password for the Icinga 2 API user
password: "icinga"
#================================ General ===================================== #================================ General =====================================
@ -29,7 +37,7 @@ icingabeat:
#-------------------------- Elasticsearch output ------------------------------ #-------------------------- Elasticsearch output ------------------------------
output.elasticsearch: output.elasticsearch:
# Array of hosts to connect to. # Array of hosts to connect to.
hosts: ["localhost:9200"] hosts: ["demo.icinga.com:9200"]
# Optional protocol and basic auth credentials. # Optional protocol and basic auth credentials.
#protocol: "https" #protocol: "https"

View File

@ -1,27 +1,11 @@
2016-12-02T11:47:19+01:00 INFO Setup Beat: icingabeat; Version: 6.0.0-alpha1 2016-12-08T17:15:46+01:00 INFO Home path: [/Users/bsheqa/go/src/github.com/icinga/icingabeat] Config path: [/Users/bsheqa/go/src/github.com/icinga/icingabeat] Data path: [/Users/bsheqa/go/src/github.com/icinga/icingabeat/data] Logs path: [/Users/bsheqa/go/src/github.com/icinga/icingabeat/logs]
2016-12-02T11:47:19+01:00 INFO Metrics logging every 30s 2016-12-08T17:15:46+01:00 INFO Setup Beat: icingabeat; Version: 6.0.0-alpha1
2016-12-02T11:47:19+01:00 INFO Loading template enabled. Reading template file: /Users/bsheqa/go/src/github.com/icinga/icingabeat/icingabeat.template.json 2016-12-08T17:15:46+01:00 INFO Loading template enabled. Reading template file: /Users/bsheqa/go/src/github.com/icinga/icingabeat/icingabeat.template.json
2016-12-02T11:47:19+01:00 INFO Loading template enabled for Elasticsearch 2.x. Reading template file: /Users/bsheqa/go/src/github.com/icinga/icingabeat/icingabeat.template-es2x.json 2016-12-08T17:15:46+01:00 INFO Loading template enabled for Elasticsearch 2.x. Reading template file: /Users/bsheqa/go/src/github.com/icinga/icingabeat/icingabeat.template-es2x.json
2016-12-02T11:47:19+01:00 INFO Elasticsearch url: http://localhost:9200 2016-12-08T17:15:46+01:00 INFO Elasticsearch url: http://localhost:9200
2016-12-02T11:47:19+01:00 INFO Activated elasticsearch as output plugin. 2016-12-08T17:15:46+01:00 INFO Activated elasticsearch as output plugin.
2016-12-02T11:47:19+01:00 INFO Publisher name: Blerims-MacBook-Pro.local 2016-12-08T17:15:46+01:00 INFO Publisher name: blerims-mbp.int.netways.de
2016-12-02T11:47:19+01:00 INFO Flush Interval set to: 1s 2016-12-08T17:15:46+01:00 INFO Flush Interval set to: 1s
2016-12-02T11:47:19+01:00 INFO Max Bulk Size set to: 50 2016-12-08T17:15:46+01:00 INFO Max Bulk Size set to: 50
2016-12-02T11:47:19+01:00 INFO icingabeat start running. 2016-12-08T17:15:46+01:00 INFO icingabeat start running.
2016-12-02T11:47:19+01:00 INFO icingabeat is running! Hit CTRL-C to stop it. 2016-12-08T17:15:46+01:00 INFO icingabeat is running! Hit CTRL-C to stop it.
2016-12-02T11:47:20+01:00 INFO Event sent
2016-12-02T11:47:21+01:00 INFO Event sent
2016-12-02T11:47:21+01:00 ERR Connecting error publishing events (retrying): Get http://localhost:9200: dial tcp 127.0.0.1:9200: getsockopt: connection refused
2016-12-02T11:47:22+01:00 INFO Event sent
2016-12-02T11:47:22+01:00 ERR Connecting error publishing events (retrying): Get http://localhost:9200: dial tcp 127.0.0.1:9200: getsockopt: connection refused
2016-12-02T11:47:23+01:00 INFO Event sent
2016-12-02T11:47:24+01:00 INFO Event sent
2016-12-02T11:47:24+01:00 ERR Connecting error publishing events (retrying): Get http://localhost:9200: dial tcp 127.0.0.1:9200: getsockopt: connection refused
2016-12-02T11:47:25+01:00 INFO Event sent
2016-12-02T11:47:26+01:00 INFO Event sent
2016-12-02T11:47:27+01:00 INFO Event sent
2016-12-02T11:47:28+01:00 INFO Event sent
2016-12-02T11:47:28+01:00 ERR Connecting error publishing events (retrying): Get http://localhost:9200: dial tcp 127.0.0.1:9200: getsockopt: connection refused
2016-12-02T11:47:29+01:00 INFO Total non-zero values: libbeat.publisher.messages_in_worker_queues=9 libbeat.publisher.published_events=9
2016-12-02T11:47:29+01:00 INFO Uptime: 9.720728469s
2016-12-02T11:47:29+01:00 INFO icingabeat stopped.