Run eventstream in seperate goroutine

This commit is contained in:
Blerim Sheqa 2016-12-22 13:40:14 +01:00
parent ee3ced346c
commit 6a5d635ea2
4 changed files with 152 additions and 17693 deletions

109
beater/eventstream.go Normal file
View File

@ -0,0 +1,109 @@
package beater
import (
"bufio"
"encoding/json"
"io"
"sync"
"time"
"github.com/icinga/icingabeat/config"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
)
// Eventstream type
type Eventstream struct {
icingabeat *Icingabeat
config config.Config
done chan struct{}
closer io.Closer
mutex sync.Mutex
}
// NewEventstream ...
func NewEventstream(bt *Icingabeat, cfg config.Config) *Eventstream {
eventstream := &Eventstream{
icingabeat: bt,
done: make(chan struct{}),
config: cfg,
}
return eventstream
}
// Run evenstream receiver
func (es *Eventstream) Run() error {
for {
ticker := time.NewTicker(2 * time.Second)
response, responseErr := requestURL(es.icingabeat, "POST", "/v1/events?queue=icingabeat&types=CheckResult")
if responseErr == nil {
reader := bufio.NewReader(response.Body)
es.mutex.Lock()
es.closer = response.Body
es.mutex.Unlock()
for {
line, err := reader.ReadBytes('\n')
if err != nil {
es.mutex.Lock()
tst := es.closer == nil
es.mutex.Unlock()
if tst {
break
}
logp.Err("Error reading line %#v", err)
}
var event common.MapStr
if err := json.Unmarshal(line, &event); err != nil {
logp.Info("Unmarshal problem %v", err)
es.mutex.Lock()
tst := es.closer == nil
es.mutex.Unlock()
if tst {
break
}
continue
}
event["@timestamp"] = common.Time(time.Now())
event["type"] = "icingabeat"
es.icingabeat.client.PublishEvent(event)
logp.Info("Event sent")
}
select {
case <-es.done:
return nil
default:
}
} else {
logp.Info("Error connecting to API:", responseErr)
}
select {
case <-es.done:
return nil
case <-ticker.C:
}
}
}
// Stop eventstream receiver
func (es *Eventstream) Stop() {
es.mutex.Lock()
if es.closer != nil {
es.closer.Close()
es.closer = nil
}
es.mutex.Unlock()
close(es.done)
}

37
beater/http.go Normal file
View File

@ -0,0 +1,37 @@
package beater
import (
"crypto/tls"
"fmt"
"net/http"
"github.com/elastic/beats/libbeat/logp"
)
func requestURL(bt *Icingabeat, method, path string) (*http.Response, error) {
transport := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
client := &http.Client{
Transport: transport,
}
url := fmt.Sprintf("https://%s:%v%s", bt.config.Host, bt.config.Port, path)
request, err := http.NewRequest(method, url, nil)
if err != nil {
logp.Info("Request:", err)
}
request.Header.Add("Accept", "application/json")
request.SetBasicAuth(bt.config.User, bt.config.Password)
response, err := client.Do(request)
if err != nil {
return nil, err
}
return response, err
}

View File

@ -1,14 +1,7 @@
package beater
import (
"bufio"
"crypto/tls"
"encoding/json"
"fmt"
"io"
"net/http"
"sync"
"time"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
@ -23,37 +16,6 @@ type Icingabeat struct {
done chan struct{}
config config.Config
client publisher.Client
closer io.Closer
mutex sync.Mutex
}
func requestURL(bt *Icingabeat, method, path string) (*http.Response, error) {
transport := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
client := &http.Client{
Transport: transport,
}
url := fmt.Sprintf("https://%s:%v%s", bt.config.Host, bt.config.Port, path)
request, err := http.NewRequest(method, url, nil)
if err != nil {
logp.Info("Request:", err)
}
request.Header.Add("Accept", "application/json")
request.SetBasicAuth(bt.config.User, bt.config.Password)
response, err := client.Do(request)
if err != nil {
return nil, err
}
return response, err
}
// New beater
@ -72,78 +34,24 @@ func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {
// Run Icingabeat
func (bt *Icingabeat) Run(b *beat.Beat) error {
var eventstream *Eventstream
logp.Info("icingabeat is running! Hit CTRL-C to stop it.")
for {
ticker := time.NewTicker(2 * time.Second)
response, responseErr := requestURL(bt, "POST", "/v1/events?queue=icingabeat&types=CheckResult")
if responseErr == nil {
bt.client = b.Publisher.Connect()
reader := bufio.NewReader(response.Body)
bt.mutex.Lock()
bt.closer = response.Body
bt.mutex.Unlock()
eventstream = NewEventstream(bt, bt.config)
go eventstream.Run()
for {
line, err := reader.ReadBytes('\n')
if err != nil {
bt.mutex.Lock()
tst := bt.closer == nil
bt.mutex.Unlock()
if tst {
break
}
logp.Err("Error reading line %#v", err)
}
var event common.MapStr
if err := json.Unmarshal(line, &event); err != nil {
logp.Info("Unmarshal problem %v", err)
bt.mutex.Lock()
tst := bt.closer == nil
bt.mutex.Unlock()
if tst {
break
}
continue
}
event["@timestamp"] = common.Time(time.Now())
event["type"] = b.Name
bt.client.PublishEvent(event)
logp.Info("Event sent")
}
select {
case <-bt.done:
return nil
default:
}
} else {
logp.Info("Error connecting to API:", responseErr)
}
select {
case <-bt.done:
return nil
case <-ticker.C:
}
}
}
// Stop Icingabeat
func (bt *Icingabeat) Stop() {
bt.mutex.Lock()
if bt.closer != nil {
bt.closer.Close()
bt.closer = nil
}
bt.mutex.Unlock()
bt.client.Close()
close(bt.done)
}

File diff suppressed because it is too large Load Diff