141 lines
3.6 KiB
Go
141 lines
3.6 KiB
Go
package crawler
|
|
|
|
import (
|
|
"fmt"
|
|
"sync"
|
|
|
|
"github.com/elastic/beats/filebeat/channel"
|
|
"github.com/elastic/beats/filebeat/fileset"
|
|
"github.com/elastic/beats/filebeat/input/file"
|
|
"github.com/elastic/beats/filebeat/prospector"
|
|
"github.com/elastic/beats/filebeat/registrar"
|
|
"github.com/elastic/beats/libbeat/cfgfile"
|
|
"github.com/elastic/beats/libbeat/common"
|
|
"github.com/elastic/beats/libbeat/logp"
|
|
|
|
_ "github.com/elastic/beats/filebeat/include"
|
|
)
|
|
|
|
type Crawler struct {
|
|
prospectors map[uint64]*prospector.Prospector
|
|
prospectorConfigs []*common.Config
|
|
out channel.Factory
|
|
wg sync.WaitGroup
|
|
ProspectorsFactory cfgfile.RunnerFactory
|
|
ModulesFactory cfgfile.RunnerFactory
|
|
modulesReloader *cfgfile.Reloader
|
|
prospectorsReloader *cfgfile.Reloader
|
|
once bool
|
|
beatVersion string
|
|
beatDone chan struct{}
|
|
}
|
|
|
|
func New(out channel.Factory, prospectorConfigs []*common.Config, beatVersion string, beatDone chan struct{}, once bool) (*Crawler, error) {
|
|
return &Crawler{
|
|
out: out,
|
|
prospectors: map[uint64]*prospector.Prospector{},
|
|
prospectorConfigs: prospectorConfigs,
|
|
once: once,
|
|
beatVersion: beatVersion,
|
|
beatDone: beatDone,
|
|
}, nil
|
|
}
|
|
|
|
// Start starts the crawler with all prospectors
|
|
func (c *Crawler) Start(r *registrar.Registrar, configProspectors *common.Config,
|
|
configModules *common.Config, pipelineLoaderFactory fileset.PipelineLoaderFactory) error {
|
|
|
|
logp.Info("Loading Prospectors: %v", len(c.prospectorConfigs))
|
|
|
|
// Prospect the globs/paths given on the command line and launch harvesters
|
|
for _, prospectorConfig := range c.prospectorConfigs {
|
|
err := c.startProspector(prospectorConfig, r.GetStates())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
c.ProspectorsFactory = prospector.NewRunnerFactory(c.out, r, c.beatDone)
|
|
if configProspectors.Enabled() {
|
|
c.prospectorsReloader = cfgfile.NewReloader(configProspectors)
|
|
if err := c.prospectorsReloader.Check(c.ProspectorsFactory); err != nil {
|
|
return err
|
|
}
|
|
|
|
go func() {
|
|
c.prospectorsReloader.Run(c.ProspectorsFactory)
|
|
}()
|
|
}
|
|
|
|
c.ModulesFactory = fileset.NewFactory(c.out, r, c.beatVersion, pipelineLoaderFactory, c.beatDone)
|
|
if configModules.Enabled() {
|
|
c.modulesReloader = cfgfile.NewReloader(configModules)
|
|
if err := c.modulesReloader.Check(c.ModulesFactory); err != nil {
|
|
return err
|
|
}
|
|
|
|
go func() {
|
|
c.modulesReloader.Run(c.ModulesFactory)
|
|
}()
|
|
}
|
|
|
|
logp.Info("Loading and starting Prospectors completed. Enabled prospectors: %v", len(c.prospectors))
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *Crawler) startProspector(config *common.Config, states []file.State) error {
|
|
if !config.Enabled() {
|
|
return nil
|
|
}
|
|
p, err := prospector.New(config, c.out, c.beatDone, states, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("Error in initing prospector: %s", err)
|
|
}
|
|
p.Once = c.once
|
|
|
|
if _, ok := c.prospectors[p.ID]; ok {
|
|
return fmt.Errorf("Prospector with same ID already exists: %d", p.ID)
|
|
}
|
|
|
|
c.prospectors[p.ID] = p
|
|
|
|
p.Start()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *Crawler) Stop() {
|
|
logp.Info("Stopping Crawler")
|
|
|
|
asyncWaitStop := func(stop func()) {
|
|
c.wg.Add(1)
|
|
go func() {
|
|
defer c.wg.Done()
|
|
stop()
|
|
}()
|
|
}
|
|
|
|
logp.Info("Stopping %v prospectors", len(c.prospectors))
|
|
for _, p := range c.prospectors {
|
|
// Stop prospectors in parallel
|
|
asyncWaitStop(p.Stop)
|
|
}
|
|
|
|
if c.prospectorsReloader != nil {
|
|
asyncWaitStop(c.prospectorsReloader.Stop)
|
|
}
|
|
|
|
if c.modulesReloader != nil {
|
|
asyncWaitStop(c.modulesReloader.Stop)
|
|
}
|
|
|
|
c.WaitForCompletion()
|
|
|
|
logp.Info("Crawler stopped")
|
|
}
|
|
|
|
func (c *Crawler) WaitForCompletion() {
|
|
c.wg.Wait()
|
|
}
|