mirror of
https://github.com/Icinga/icingabeat.git
synced 2025-09-02 15:48:09 +02:00
95 lines
1.9 KiB
Go
95 lines
1.9 KiB
Go
package outputs
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"math/rand"
|
|
|
|
"github.com/elastic/beats/libbeat/publisher"
|
|
"github.com/elastic/beats/libbeat/testing"
|
|
)
|
|
|
|
type failoverClient struct {
|
|
clients []NetworkClient
|
|
active int
|
|
}
|
|
|
|
var (
|
|
// ErrNoConnectionConfigured indicates no configured connections for publishing.
|
|
ErrNoConnectionConfigured = errors.New("No connection configured")
|
|
|
|
errNoActiveConnection = errors.New("No active connection")
|
|
)
|
|
|
|
// NewFailoverClient combines a set of NetworkClients into one NetworkClient instances,
|
|
// with at most one active client. If the active client fails, another client
|
|
// will be used.
|
|
func NewFailoverClient(clients []NetworkClient) NetworkClient {
|
|
if len(clients) == 1 {
|
|
return clients[0]
|
|
}
|
|
|
|
return &failoverClient{
|
|
clients: clients,
|
|
active: -1,
|
|
}
|
|
}
|
|
|
|
func (f *failoverClient) Connect() error {
|
|
var (
|
|
next int
|
|
active = f.active
|
|
l = len(f.clients)
|
|
)
|
|
|
|
switch {
|
|
case l == 0:
|
|
return ErrNoConnectionConfigured
|
|
case l == 1:
|
|
next = 0
|
|
case l == 2 && 0 <= active && active <= 1:
|
|
next = 1 - active
|
|
default:
|
|
for {
|
|
// Connect to random server to potentially spread the
|
|
// load when large number of beats with same set of sinks
|
|
// are started up at about the same time.
|
|
next = rand.Int() % l
|
|
if next != active {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
client := f.clients[next]
|
|
f.active = next
|
|
return client.Connect()
|
|
}
|
|
|
|
func (f *failoverClient) Close() error {
|
|
if f.active < 0 {
|
|
return errNoActiveConnection
|
|
}
|
|
return f.clients[f.active].Close()
|
|
}
|
|
|
|
func (f *failoverClient) Publish(batch publisher.Batch) error {
|
|
if f.active < 0 {
|
|
batch.Retry()
|
|
return errNoActiveConnection
|
|
}
|
|
return f.clients[f.active].Publish(batch)
|
|
}
|
|
|
|
func (f *failoverClient) Test(d testing.Driver) {
|
|
for i, client := range f.clients {
|
|
c, ok := client.(testing.Testable)
|
|
d.Run(fmt.Sprintf("Client %d", i), func(d testing.Driver) {
|
|
if !ok {
|
|
d.Fatal("output", errors.New("client doesn't support testing"))
|
|
}
|
|
c.Test(d)
|
|
})
|
|
}
|
|
}
|