mirror of
https://github.com/Icinga/icingabeat.git
synced 2025-08-15 14:58:08 +02:00
484 lines
14 KiB
Go
484 lines
14 KiB
Go
package fileset
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
|
|
"github.com/pkg/errors"
|
|
|
|
"github.com/elastic/beats/libbeat/common"
|
|
"github.com/elastic/beats/libbeat/logp"
|
|
mlimporter "github.com/elastic/beats/libbeat/ml-importer"
|
|
"github.com/elastic/beats/libbeat/paths"
|
|
)
|
|
|
|
type ModuleRegistry struct {
|
|
registry map[string]map[string]*Fileset // module -> fileset -> Fileset
|
|
}
|
|
|
|
// newModuleRegistry reads and loads the configured module into the registry.
|
|
func newModuleRegistry(modulesPath string,
|
|
moduleConfigs []*ModuleConfig,
|
|
overrides *ModuleOverrides,
|
|
beatVersion string) (*ModuleRegistry, error) {
|
|
|
|
var reg ModuleRegistry
|
|
reg.registry = map[string]map[string]*Fileset{}
|
|
|
|
for _, mcfg := range moduleConfigs {
|
|
if mcfg.Enabled != nil && (*mcfg.Enabled) == false {
|
|
continue
|
|
}
|
|
|
|
reg.registry[mcfg.Module] = map[string]*Fileset{}
|
|
moduleFilesets, err := getModuleFilesets(modulesPath, mcfg.Module)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Error getting filesets for module %s: %v", mcfg.Module, err)
|
|
}
|
|
|
|
for _, filesetName := range moduleFilesets {
|
|
fcfg, exists := mcfg.Filesets[filesetName]
|
|
if !exists {
|
|
fcfg = &FilesetConfig{}
|
|
}
|
|
|
|
fcfg, err = applyOverrides(fcfg, mcfg.Module, filesetName, overrides)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Error applying overrides on fileset %s/%s: %v", mcfg.Module, filesetName, err)
|
|
}
|
|
|
|
if fcfg.Enabled != nil && (*fcfg.Enabled) == false {
|
|
continue
|
|
}
|
|
|
|
fileset, err := New(modulesPath, filesetName, mcfg, fcfg)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
err = fileset.Read(beatVersion)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Error reading fileset %s/%s: %v", mcfg.Module, filesetName, err)
|
|
}
|
|
reg.registry[mcfg.Module][filesetName] = fileset
|
|
}
|
|
|
|
// check that no extra filesets are configured
|
|
for filesetName, fcfg := range mcfg.Filesets {
|
|
if fcfg.Enabled != nil && (*fcfg.Enabled) == false {
|
|
continue
|
|
}
|
|
found := false
|
|
for _, name := range moduleFilesets {
|
|
if filesetName == name {
|
|
found = true
|
|
}
|
|
}
|
|
if !found {
|
|
return nil, fmt.Errorf("Fileset %s/%s is configured but doesn't exist", mcfg.Module, filesetName)
|
|
}
|
|
}
|
|
}
|
|
|
|
return ®, nil
|
|
}
|
|
|
|
// NewModuleRegistry reads and loads the configured module into the registry.
|
|
func NewModuleRegistry(moduleConfigs []*common.Config, beatVersion string, init bool) (*ModuleRegistry, error) {
|
|
modulesPath := paths.Resolve(paths.Home, "module")
|
|
|
|
stat, err := os.Stat(modulesPath)
|
|
if err != nil || !stat.IsDir() {
|
|
logp.Err("Not loading modules. Module directory not found: %s", modulesPath)
|
|
return &ModuleRegistry{}, nil // empty registry, no error
|
|
}
|
|
|
|
var modulesCLIList []string
|
|
var modulesOverrides *ModuleOverrides
|
|
if init {
|
|
modulesCLIList, modulesOverrides, err = getModulesCLIConfig()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
mcfgs := []*ModuleConfig{}
|
|
for _, moduleConfig := range moduleConfigs {
|
|
mcfg, err := mcfgFromConfig(moduleConfig)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Error unpacking module config: %v", err)
|
|
}
|
|
mcfgs = append(mcfgs, mcfg)
|
|
}
|
|
|
|
mcfgs, err = appendWithoutDuplicates(mcfgs, modulesCLIList)
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return newModuleRegistry(modulesPath, mcfgs, modulesOverrides, beatVersion)
|
|
}
|
|
|
|
func mcfgFromConfig(cfg *common.Config) (*ModuleConfig, error) {
|
|
var mcfg ModuleConfig
|
|
|
|
err := cfg.Unpack(&mcfg)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var dict map[string]interface{}
|
|
|
|
err = cfg.Unpack(&dict)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Error unpacking module %s in a dict: %v", mcfg.Module, err)
|
|
}
|
|
|
|
mcfg.Filesets = map[string]*FilesetConfig{}
|
|
for name, filesetConfig := range dict {
|
|
if name == "module" || name == "enabled" {
|
|
continue
|
|
}
|
|
|
|
var fcfg FilesetConfig
|
|
tmpCfg, err := common.NewConfigFrom(filesetConfig)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Error creating config from fileset %s/%s: %v", mcfg.Module, name, err)
|
|
}
|
|
err = tmpCfg.Unpack(&fcfg)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Error unpacking fileset %s/%s: %v", mcfg.Module, name, err)
|
|
}
|
|
mcfg.Filesets[name] = &fcfg
|
|
|
|
}
|
|
|
|
return &mcfg, nil
|
|
}
|
|
|
|
func getModuleFilesets(modulePath, module string) ([]string, error) {
|
|
fileInfos, err := ioutil.ReadDir(filepath.Join(modulePath, module))
|
|
if err != nil {
|
|
return []string{}, err
|
|
}
|
|
|
|
filesets := []string{}
|
|
for _, fi := range fileInfos {
|
|
if fi.IsDir() {
|
|
// check also that the `manifest.yml` file exists
|
|
_, err = os.Stat(filepath.Join(modulePath, module, fi.Name(), "manifest.yml"))
|
|
if err == nil {
|
|
filesets = append(filesets, fi.Name())
|
|
}
|
|
}
|
|
}
|
|
|
|
return filesets, nil
|
|
}
|
|
|
|
func applyOverrides(fcfg *FilesetConfig,
|
|
module, fileset string,
|
|
overrides *ModuleOverrides) (*FilesetConfig, error) {
|
|
|
|
if overrides == nil {
|
|
return fcfg, nil
|
|
}
|
|
|
|
overridesConfigs := overrides.Get(module, fileset)
|
|
if len(overridesConfigs) == 0 {
|
|
return fcfg, nil
|
|
}
|
|
|
|
config, err := common.NewConfigFrom(fcfg)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Error creating vars config object: %v", err)
|
|
}
|
|
|
|
toMerge := []*common.Config{config}
|
|
toMerge = append(toMerge, overridesConfigs...)
|
|
|
|
resultConfig, err := common.MergeConfigs(toMerge...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Error merging configs: %v", err)
|
|
}
|
|
|
|
var res FilesetConfig
|
|
err = resultConfig.Unpack(&res)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Error unpacking configs: %v", err)
|
|
}
|
|
|
|
return &res, nil
|
|
}
|
|
|
|
// appendWithoutDuplicates appends basic module configuration for each module in the
|
|
// modules list, unless the same module is not already loaded.
|
|
func appendWithoutDuplicates(moduleConfigs []*ModuleConfig, modules []string) ([]*ModuleConfig, error) {
|
|
if len(modules) == 0 {
|
|
return moduleConfigs, nil
|
|
}
|
|
|
|
// built a dictionary with the configured modules
|
|
modulesMap := map[string]bool{}
|
|
for _, mcfg := range moduleConfigs {
|
|
if mcfg.Enabled != nil && (*mcfg.Enabled) == false {
|
|
continue
|
|
}
|
|
modulesMap[mcfg.Module] = true
|
|
}
|
|
|
|
// add the non duplicates to the list
|
|
for _, module := range modules {
|
|
if _, exists := modulesMap[module]; !exists {
|
|
moduleConfigs = append(moduleConfigs, &ModuleConfig{Module: module})
|
|
}
|
|
}
|
|
return moduleConfigs, nil
|
|
}
|
|
|
|
func (reg *ModuleRegistry) GetProspectorConfigs() ([]*common.Config, error) {
|
|
result := []*common.Config{}
|
|
for module, filesets := range reg.registry {
|
|
for name, fileset := range filesets {
|
|
fcfg, err := fileset.getProspectorConfig()
|
|
if err != nil {
|
|
return result, fmt.Errorf("Error getting config for fielset %s/%s: %v",
|
|
module, name, err)
|
|
}
|
|
result = append(result, fcfg)
|
|
}
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
// PipelineLoader factory builds and returns a PipelineLoader
|
|
type PipelineLoaderFactory func() (PipelineLoader, error)
|
|
|
|
// PipelineLoader is a subset of the Elasticsearch client API capable of loading
|
|
// the pipelines.
|
|
type PipelineLoader interface {
|
|
LoadJSON(path string, json map[string]interface{}) ([]byte, error)
|
|
Request(method, path string, pipeline string, params map[string]string, body interface{}) (int, []byte, error)
|
|
GetVersion() string
|
|
}
|
|
|
|
// LoadPipelines loads the pipelines for each configured fileset.
|
|
func (reg *ModuleRegistry) LoadPipelines(esClient PipelineLoader) error {
|
|
for module, filesets := range reg.registry {
|
|
for name, fileset := range filesets {
|
|
// check that all the required Ingest Node plugins are available
|
|
requiredProcessors := fileset.GetRequiredProcessors()
|
|
logp.Debug("modules", "Required processors: %s", requiredProcessors)
|
|
if len(requiredProcessors) > 0 {
|
|
err := checkAvailableProcessors(esClient, requiredProcessors)
|
|
if err != nil {
|
|
return fmt.Errorf("Error loading pipeline for fileset %s/%s: %v", module, name, err)
|
|
}
|
|
}
|
|
|
|
pipelineID, content, err := fileset.GetPipeline(esClient.GetVersion())
|
|
if err != nil {
|
|
return fmt.Errorf("Error getting pipeline for fileset %s/%s: %v", module, name, err)
|
|
}
|
|
err = loadPipeline(esClient, pipelineID, content)
|
|
if err != nil {
|
|
return fmt.Errorf("Error loading pipeline for fileset %s/%s: %v", module, name, err)
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// InfoString returns the enabled modules and filesets in a single string, ready to
|
|
// be shown to the user
|
|
func (reg *ModuleRegistry) InfoString() string {
|
|
var result string
|
|
for module, filesets := range reg.registry {
|
|
var filesetNames string
|
|
for name, _ := range filesets {
|
|
if filesetNames != "" {
|
|
filesetNames += ", "
|
|
}
|
|
filesetNames += name
|
|
}
|
|
if result != "" {
|
|
result += ", "
|
|
}
|
|
result += fmt.Sprintf("%s (%s)", module, filesetNames)
|
|
}
|
|
return result
|
|
}
|
|
|
|
// checkAvailableProcessors calls the /_nodes/ingest API and verifies that all processors listed
|
|
// in the requiredProcessors list are available in Elasticsearch. Returns nil if all required
|
|
// processors are available.
|
|
func checkAvailableProcessors(esClient PipelineLoader, requiredProcessors []ProcessorRequirement) error {
|
|
var response struct {
|
|
Nodes map[string]struct {
|
|
Ingest struct {
|
|
Processors []struct {
|
|
Type string `json:"type"`
|
|
} `json:"processors"`
|
|
} `json:"ingest"`
|
|
} `json:"nodes"`
|
|
}
|
|
status, body, err := esClient.Request("GET", "/_nodes/ingest", "", nil, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("Error querying _nodes/ingest: %v", err)
|
|
}
|
|
if status > 299 {
|
|
return fmt.Errorf("Error querying _nodes/ingest. Status: %d. Response body: %s", status, body)
|
|
}
|
|
err = json.Unmarshal(body, &response)
|
|
if err != nil {
|
|
return fmt.Errorf("Error unmarshaling json when querying _nodes/ingest. Body: %s", body)
|
|
}
|
|
|
|
missing := []ProcessorRequirement{}
|
|
for _, requiredProcessor := range requiredProcessors {
|
|
for _, node := range response.Nodes {
|
|
available := false
|
|
for _, availableProcessor := range node.Ingest.Processors {
|
|
if requiredProcessor.Name == availableProcessor.Type {
|
|
available = true
|
|
break
|
|
}
|
|
}
|
|
if !available {
|
|
missing = append(missing, requiredProcessor)
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
if len(missing) > 0 {
|
|
missingPlugins := []string{}
|
|
for _, proc := range missing {
|
|
missingPlugins = append(missingPlugins, proc.Plugin)
|
|
}
|
|
errorMsg := fmt.Sprintf("This module requires the following Elasticsearch plugins: %s. "+
|
|
"You can install them by running the following commands on all the Elasticsearch nodes:",
|
|
strings.Join(missingPlugins, ", "))
|
|
for _, plugin := range missingPlugins {
|
|
errorMsg += fmt.Sprintf("\n sudo bin/elasticsearch-plugin install %s", plugin)
|
|
}
|
|
return errors.New(errorMsg)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func loadPipeline(esClient PipelineLoader, pipelineID string, content map[string]interface{}) error {
|
|
path := "/_ingest/pipeline/" + pipelineID
|
|
status, _, _ := esClient.Request("GET", path, "", nil, nil)
|
|
if status == 200 {
|
|
logp.Debug("modules", "Pipeline %s already loaded", pipelineID)
|
|
return nil
|
|
}
|
|
body, err := esClient.LoadJSON(path, content)
|
|
if err != nil {
|
|
return interpretError(err, body)
|
|
}
|
|
logp.Info("Elasticsearch pipeline with ID '%s' loaded", pipelineID)
|
|
return nil
|
|
}
|
|
|
|
func interpretError(initialErr error, body []byte) error {
|
|
var response struct {
|
|
Error struct {
|
|
RootCause []struct {
|
|
Type string `json:"type"`
|
|
Reason string `json:"reason"`
|
|
Header struct {
|
|
ProcessorType string `json:"processor_type"`
|
|
} `json:"header"`
|
|
Index string `json:"index"`
|
|
} `json:"root_cause"`
|
|
} `json:"error"`
|
|
}
|
|
err := json.Unmarshal(body, &response)
|
|
if err != nil {
|
|
// this might be ES < 2.0. Do a best effort to check for ES 1.x
|
|
var response1x struct {
|
|
Error string `json:"error"`
|
|
}
|
|
err1x := json.Unmarshal(body, &response1x)
|
|
if err1x == nil && response1x.Error != "" {
|
|
return fmt.Errorf("The Filebeat modules require Elasticsearch >= 5.0. "+
|
|
"This is the response I got from Elasticsearch: %s", body)
|
|
}
|
|
|
|
return fmt.Errorf("couldn't load pipeline: %v. Additionally, error decoding response body: %s",
|
|
initialErr, body)
|
|
}
|
|
|
|
// missing plugins?
|
|
if len(response.Error.RootCause) > 0 &&
|
|
response.Error.RootCause[0].Type == "parse_exception" &&
|
|
strings.HasPrefix(response.Error.RootCause[0].Reason, "No processor type exists with name") &&
|
|
response.Error.RootCause[0].Header.ProcessorType != "" {
|
|
|
|
plugins := map[string]string{
|
|
"geoip": "ingest-geoip",
|
|
"user_agent": "ingest-user-agent",
|
|
}
|
|
plugin, ok := plugins[response.Error.RootCause[0].Header.ProcessorType]
|
|
if !ok {
|
|
return fmt.Errorf("This module requires an Elasticsearch plugin that provides the %s processor. "+
|
|
"Please visit the Elasticsearch documentation for instructions on how to install this plugin. "+
|
|
"Response body: %s", response.Error.RootCause[0].Header.ProcessorType, body)
|
|
}
|
|
|
|
return fmt.Errorf("This module requires the %s plugin to be installed in Elasticsearch. "+
|
|
"You can install it using the following command in the Elasticsearch home directory:\n"+
|
|
" sudo bin/elasticsearch-plugin install %s", plugin, plugin)
|
|
}
|
|
|
|
// older ES version?
|
|
if len(response.Error.RootCause) > 0 &&
|
|
response.Error.RootCause[0].Type == "invalid_index_name_exception" &&
|
|
response.Error.RootCause[0].Index == "_ingest" {
|
|
|
|
return fmt.Errorf("The Ingest Node functionality seems to be missing from Elasticsearch. "+
|
|
"The Filebeat modules require Elasticsearch >= 5.0. "+
|
|
"This is the response I got from Elasticsearch: %s", body)
|
|
}
|
|
|
|
return fmt.Errorf("couldn't load pipeline: %v. Response body: %s", initialErr, body)
|
|
}
|
|
|
|
// LoadML loads the machine-learning configurations into Elasticsearch, if Xpack is available
|
|
func (reg *ModuleRegistry) LoadML(esClient PipelineLoader) error {
|
|
haveXpack, err := mlimporter.HaveXpackML(esClient)
|
|
if err != nil {
|
|
return errors.Errorf("Error checking if xpack is available: %v", err)
|
|
}
|
|
if !haveXpack {
|
|
logp.Warn("Xpack Machine Learning is not enabled")
|
|
return nil
|
|
}
|
|
|
|
for module, filesets := range reg.registry {
|
|
for name, fileset := range filesets {
|
|
for _, mlConfig := range fileset.GetMLConfigs() {
|
|
err = mlimporter.ImportMachineLearningJob(esClient, &mlConfig)
|
|
if err != nil {
|
|
return errors.Errorf("Error loading ML config from %s/%s: %v", module, name, err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (reg *ModuleRegistry) Empty() bool {
|
|
count := 0
|
|
for _, filesets := range reg.registry {
|
|
count += len(filesets)
|
|
}
|
|
return count == 0
|
|
}
|