mirror of
				https://github.com/go-gitea/gitea.git
				synced 2025-10-24 17:14:30 +02:00 
			
		
		
		
	* Issue search support elasticsearch * Fix lint * Add indexer name on app.ini * add a warnning on SearchIssuesByKeyword * improve code
		
			
				
	
	
		
			2100 lines
		
	
	
		
			63 KiB
		
	
	
	
		
			Go
		
	
	
	
		
			Vendored
		
	
	
	
			
		
		
	
	
			2100 lines
		
	
	
		
			63 KiB
		
	
	
	
		
			Go
		
	
	
	
		
			Vendored
		
	
	
	
| // Copyright 2012-present Oliver Eilhard. All rights reserved.
 | |
| // Use of this source code is governed by a MIT-license.
 | |
| // See http://olivere.mit-license.org/license.txt for details.
 | |
| 
 | |
| package elastic
 | |
| 
 | |
| import (
 | |
| 	"bytes"
 | |
| 	"context"
 | |
| 	"encoding/json"
 | |
| 	"fmt"
 | |
| 	"log"
 | |
| 	"net/http"
 | |
| 	"net/http/httputil"
 | |
| 	"net/url"
 | |
| 	"os"
 | |
| 	"regexp"
 | |
| 	"strings"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/pkg/errors"
 | |
| 
 | |
| 	"github.com/olivere/elastic/v7/config"
 | |
| )
 | |
| 
 | |
| const (
 | |
| 	// Version is the current version of Elastic.
 | |
| 	Version = "7.0.9"
 | |
| 
 | |
| 	// DefaultURL is the default endpoint of Elasticsearch on the local machine.
 | |
| 	// It is used e.g. when initializing a new Client without a specific URL.
 | |
| 	DefaultURL = "http://127.0.0.1:9200"
 | |
| 
 | |
| 	// DefaultScheme is the default protocol scheme to use when sniffing
 | |
| 	// the Elasticsearch cluster.
 | |
| 	DefaultScheme = "http"
 | |
| 
 | |
| 	// DefaultHealthcheckEnabled specifies if healthchecks are enabled by default.
 | |
| 	DefaultHealthcheckEnabled = true
 | |
| 
 | |
| 	// DefaultHealthcheckTimeoutStartup is the time the healthcheck waits
 | |
| 	// for a response from Elasticsearch on startup, i.e. when creating a
 | |
| 	// client. After the client is started, a shorter timeout is commonly used
 | |
| 	// (its default is specified in DefaultHealthcheckTimeout).
 | |
| 	DefaultHealthcheckTimeoutStartup = 5 * time.Second
 | |
| 
 | |
| 	// DefaultHealthcheckTimeout specifies the time a running client waits for
 | |
| 	// a response from Elasticsearch. Notice that the healthcheck timeout
 | |
| 	// when a client is created is larger by default (see DefaultHealthcheckTimeoutStartup).
 | |
| 	DefaultHealthcheckTimeout = 1 * time.Second
 | |
| 
 | |
| 	// DefaultHealthcheckInterval is the default interval between
 | |
| 	// two health checks of the nodes in the cluster.
 | |
| 	DefaultHealthcheckInterval = 60 * time.Second
 | |
| 
 | |
| 	// DefaultSnifferEnabled specifies if the sniffer is enabled by default.
 | |
| 	DefaultSnifferEnabled = true
 | |
| 
 | |
| 	// DefaultSnifferInterval is the interval between two sniffing procedures,
 | |
| 	// i.e. the lookup of all nodes in the cluster and their addition/removal
 | |
| 	// from the list of actual connections.
 | |
| 	DefaultSnifferInterval = 15 * time.Minute
 | |
| 
 | |
| 	// DefaultSnifferTimeoutStartup is the default timeout for the sniffing
 | |
| 	// process that is initiated while creating a new client. For subsequent
 | |
| 	// sniffing processes, DefaultSnifferTimeout is used (by default).
 | |
| 	DefaultSnifferTimeoutStartup = 5 * time.Second
 | |
| 
 | |
| 	// DefaultSnifferTimeout is the default timeout after which the
 | |
| 	// sniffing process times out. Notice that for the initial sniffing
 | |
| 	// process, DefaultSnifferTimeoutStartup is used.
 | |
| 	DefaultSnifferTimeout = 2 * time.Second
 | |
| 
 | |
| 	// DefaultSendGetBodyAs is the HTTP method to use when elastic is sending
 | |
| 	// a GET request with a body.
 | |
| 	DefaultSendGetBodyAs = "GET"
 | |
| 
 | |
| 	// DefaultGzipEnabled specifies if gzip compression is enabled by default.
 | |
| 	DefaultGzipEnabled = false
 | |
| 
 | |
| 	// off is used to disable timeouts.
 | |
| 	off = -1 * time.Second
 | |
| )
 | |
| 
 | |
| var (
 | |
| 	// ErrNoClient is raised when no Elasticsearch node is available.
 | |
| 	ErrNoClient = errors.New("no Elasticsearch node available")
 | |
| 
 | |
| 	// ErrRetry is raised when a request cannot be executed after the configured
 | |
| 	// number of retries.
 | |
| 	ErrRetry = errors.New("cannot connect after several retries")
 | |
| 
 | |
| 	// ErrTimeout is raised when a request timed out, e.g. when WaitForStatus
 | |
| 	// didn't return in time.
 | |
| 	ErrTimeout = errors.New("timeout")
 | |
| 
 | |
| 	// noRetries is a retrier that does not retry.
 | |
| 	noRetries = NewStopRetrier()
 | |
| 
 | |
| 	// noDeprecationLog is a no-op for logging deprecations.
 | |
| 	noDeprecationLog = func(*http.Request, *http.Response) {}
 | |
| )
 | |
| 
 | |
| // Doer is an interface to perform HTTP requests.
 | |
| // It can be used for mocking.
 | |
| type Doer interface {
 | |
| 	Do(*http.Request) (*http.Response, error)
 | |
| }
 | |
| 
 | |
| // ClientOptionFunc is a function that configures a Client.
 | |
| // It is used in NewClient.
 | |
| type ClientOptionFunc func(*Client) error
 | |
| 
 | |
| // Client is an Elasticsearch client. Create one by calling NewClient.
 | |
| type Client struct {
 | |
| 	c Doer // e.g. a net/*http.Client to use for requests
 | |
| 
 | |
| 	connsMu sync.RWMutex // connsMu guards the next block
 | |
| 	conns   []*conn      // all connections
 | |
| 	cindex  int          // index into conns
 | |
| 
 | |
| 	mu                        sync.RWMutex // guards the next block
 | |
| 	urls                      []string     // set of URLs passed initially to the client
 | |
| 	running                   bool         // true if the client's background processes are running
 | |
| 	errorlog                  Logger       // error log for critical messages
 | |
| 	infolog                   Logger       // information log for e.g. response times
 | |
| 	tracelog                  Logger       // trace log for debugging
 | |
| 	deprecationlog            func(*http.Request, *http.Response)
 | |
| 	scheme                    string          // http or https
 | |
| 	healthcheckEnabled        bool            // healthchecks enabled or disabled
 | |
| 	healthcheckTimeoutStartup time.Duration   // time the healthcheck waits for a response from Elasticsearch on startup
 | |
| 	healthcheckTimeout        time.Duration   // time the healthcheck waits for a response from Elasticsearch
 | |
| 	healthcheckInterval       time.Duration   // interval between healthchecks
 | |
| 	healthcheckStop           chan bool       // notify healthchecker to stop, and notify back
 | |
| 	snifferEnabled            bool            // sniffer enabled or disabled
 | |
| 	snifferTimeoutStartup     time.Duration   // time the sniffer waits for a response from nodes info API on startup
 | |
| 	snifferTimeout            time.Duration   // time the sniffer waits for a response from nodes info API
 | |
| 	snifferInterval           time.Duration   // interval between sniffing
 | |
| 	snifferCallback           SnifferCallback // callback to modify the sniffing decision
 | |
| 	snifferStop               chan bool       // notify sniffer to stop, and notify back
 | |
| 	decoder                   Decoder         // used to decode data sent from Elasticsearch
 | |
| 	basicAuth                 bool            // indicates whether to send HTTP Basic Auth credentials
 | |
| 	basicAuthUsername         string          // username for HTTP Basic Auth
 | |
| 	basicAuthPassword         string          // password for HTTP Basic Auth
 | |
| 	sendGetBodyAs             string          // override for when sending a GET with a body
 | |
| 	gzipEnabled               bool            // gzip compression enabled or disabled (default)
 | |
| 	requiredPlugins           []string        // list of required plugins
 | |
| 	retrier                   Retrier         // strategy for retries
 | |
| 	headers                   http.Header     // a list of default headers to add to each request
 | |
| }
 | |
| 
 | |
| // NewClient creates a new client to work with Elasticsearch.
 | |
| //
 | |
| // NewClient, by default, is meant to be long-lived and shared across
 | |
| // your application. If you need a short-lived client, e.g. for request-scope,
 | |
| // consider using NewSimpleClient instead.
 | |
| //
 | |
| // The caller can configure the new client by passing configuration options
 | |
| // to the func.
 | |
| //
 | |
| // Example:
 | |
| //
 | |
| //   client, err := elastic.NewClient(
 | |
| //     elastic.SetURL("http://127.0.0.1:9200", "http://127.0.0.1:9201"),
 | |
| //     elastic.SetBasicAuth("user", "secret"))
 | |
| //
 | |
| // If no URL is configured, Elastic uses DefaultURL by default.
 | |
| //
 | |
| // If the sniffer is enabled (the default), the new client then sniffes
 | |
| // the cluster via the Nodes Info API
 | |
| // (see https://www.elastic.co/guide/en/elasticsearch/reference/7.0/cluster-nodes-info.html#cluster-nodes-info).
 | |
| // It uses the URLs specified by the caller. The caller is responsible
 | |
| // to only pass a list of URLs of nodes that belong to the same cluster.
 | |
| // This sniffing process is run on startup and periodically.
 | |
| // Use SnifferInterval to set the interval between two sniffs (default is
 | |
| // 15 minutes). In other words: By default, the client will find new nodes
 | |
| // in the cluster and remove those that are no longer available every
 | |
| // 15 minutes. Disable the sniffer by passing SetSniff(false) to NewClient.
 | |
| //
 | |
| // The list of nodes found in the sniffing process will be used to make
 | |
| // connections to the REST API of Elasticsearch. These nodes are also
 | |
| // periodically checked in a shorter time frame. This process is called
 | |
| // a health check. By default, a health check is done every 60 seconds.
 | |
| // You can set a shorter or longer interval by SetHealthcheckInterval.
 | |
| // Disabling health checks is not recommended, but can be done by
 | |
| // SetHealthcheck(false).
 | |
| //
 | |
| // Connections are automatically marked as dead or healthy while
 | |
| // making requests to Elasticsearch. When a request fails, Elastic will
 | |
| // call into the Retry strategy which can be specified with SetRetry.
 | |
| // The Retry strategy is also responsible for handling backoff i.e. the time
 | |
| // to wait before starting the next request. There are various standard
 | |
| // backoff implementations, e.g. ExponentialBackoff or SimpleBackoff.
 | |
| // Retries are disabled by default.
 | |
| //
 | |
| // If no HttpClient is configured, then http.DefaultClient is used.
 | |
| // You can use your own http.Client with some http.Transport for
 | |
| // advanced scenarios.
 | |
| //
 | |
| // An error is also returned when some configuration option is invalid or
 | |
| // the new client cannot sniff the cluster (if enabled).
 | |
| func NewClient(options ...ClientOptionFunc) (*Client, error) {
 | |
| 	return DialContext(context.Background(), options...)
 | |
| }
 | |
| 
 | |
| // NewClientFromConfig initializes a client from a configuration.
 | |
| func NewClientFromConfig(cfg *config.Config) (*Client, error) {
 | |
| 	options, err := configToOptions(cfg)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	return DialContext(context.Background(), options...)
 | |
| }
 | |
| 
 | |
| // NewSimpleClient creates a new short-lived Client that can be used in
 | |
| // use cases where you need e.g. one client per request.
 | |
| //
 | |
| // While NewClient by default sets up e.g. periodic health checks
 | |
| // and sniffing for new nodes in separate goroutines, NewSimpleClient does
 | |
| // not and is meant as a simple replacement where you don't need all the
 | |
| // heavy lifting of NewClient.
 | |
| //
 | |
| // NewSimpleClient does the following by default: First, all health checks
 | |
| // are disabled, including timeouts and periodic checks. Second, sniffing
 | |
| // is disabled, including timeouts and periodic checks. The number of retries
 | |
| // is set to 1. NewSimpleClient also does not start any goroutines.
 | |
| //
 | |
| // Notice that you can still override settings by passing additional options,
 | |
| // just like with NewClient.
 | |
| func NewSimpleClient(options ...ClientOptionFunc) (*Client, error) {
 | |
| 	c := &Client{
 | |
| 		c:                         http.DefaultClient,
 | |
| 		conns:                     make([]*conn, 0),
 | |
| 		cindex:                    -1,
 | |
| 		scheme:                    DefaultScheme,
 | |
| 		decoder:                   &DefaultDecoder{},
 | |
| 		healthcheckEnabled:        false,
 | |
| 		healthcheckTimeoutStartup: off,
 | |
| 		healthcheckTimeout:        off,
 | |
| 		healthcheckInterval:       off,
 | |
| 		healthcheckStop:           make(chan bool),
 | |
| 		snifferEnabled:            false,
 | |
| 		snifferTimeoutStartup:     off,
 | |
| 		snifferTimeout:            off,
 | |
| 		snifferInterval:           off,
 | |
| 		snifferCallback:           nopSnifferCallback,
 | |
| 		snifferStop:               make(chan bool),
 | |
| 		sendGetBodyAs:             DefaultSendGetBodyAs,
 | |
| 		gzipEnabled:               DefaultGzipEnabled,
 | |
| 		retrier:                   noRetries, // no retries by default
 | |
| 		deprecationlog:            noDeprecationLog,
 | |
| 	}
 | |
| 
 | |
| 	// Run the options on it
 | |
| 	for _, option := range options {
 | |
| 		if err := option(c); err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Use a default URL and normalize them
 | |
| 	if len(c.urls) == 0 {
 | |
| 		c.urls = []string{DefaultURL}
 | |
| 	}
 | |
| 	c.urls = canonicalize(c.urls...)
 | |
| 
 | |
| 	// If the URLs have auth info, use them here as an alternative to SetBasicAuth
 | |
| 	if !c.basicAuth {
 | |
| 		for _, urlStr := range c.urls {
 | |
| 			u, err := url.Parse(urlStr)
 | |
| 			if err == nil && u.User != nil {
 | |
| 				c.basicAuth = true
 | |
| 				c.basicAuthUsername = u.User.Username()
 | |
| 				c.basicAuthPassword, _ = u.User.Password()
 | |
| 				break
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	for _, url := range c.urls {
 | |
| 		c.conns = append(c.conns, newConn(url, url))
 | |
| 	}
 | |
| 
 | |
| 	// Ensure that we have at least one connection available
 | |
| 	if err := c.mustActiveConn(); err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	// Check the required plugins
 | |
| 	for _, plugin := range c.requiredPlugins {
 | |
| 		found, err := c.HasPlugin(plugin)
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 		if !found {
 | |
| 			return nil, fmt.Errorf("elastic: plugin %s not found", plugin)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	c.mu.Lock()
 | |
| 	c.running = true
 | |
| 	c.mu.Unlock()
 | |
| 
 | |
| 	return c, nil
 | |
| }
 | |
| 
 | |
| // Dial will call DialContext with a background context.
 | |
| func Dial(options ...ClientOptionFunc) (*Client, error) {
 | |
| 	return DialContext(context.Background(), options...)
 | |
| }
 | |
| 
 | |
| // DialContext will connect to Elasticsearch, just like NewClient does.
 | |
| //
 | |
| // The context is honoured in terms of e.g. cancellation.
 | |
| func DialContext(ctx context.Context, options ...ClientOptionFunc) (*Client, error) {
 | |
| 	// Set up the client
 | |
| 	c := &Client{
 | |
| 		c:                         http.DefaultClient,
 | |
| 		conns:                     make([]*conn, 0),
 | |
| 		cindex:                    -1,
 | |
| 		scheme:                    DefaultScheme,
 | |
| 		decoder:                   &DefaultDecoder{},
 | |
| 		healthcheckEnabled:        DefaultHealthcheckEnabled,
 | |
| 		healthcheckTimeoutStartup: DefaultHealthcheckTimeoutStartup,
 | |
| 		healthcheckTimeout:        DefaultHealthcheckTimeout,
 | |
| 		healthcheckInterval:       DefaultHealthcheckInterval,
 | |
| 		healthcheckStop:           make(chan bool),
 | |
| 		snifferEnabled:            DefaultSnifferEnabled,
 | |
| 		snifferTimeoutStartup:     DefaultSnifferTimeoutStartup,
 | |
| 		snifferTimeout:            DefaultSnifferTimeout,
 | |
| 		snifferInterval:           DefaultSnifferInterval,
 | |
| 		snifferCallback:           nopSnifferCallback,
 | |
| 		snifferStop:               make(chan bool),
 | |
| 		sendGetBodyAs:             DefaultSendGetBodyAs,
 | |
| 		gzipEnabled:               DefaultGzipEnabled,
 | |
| 		retrier:                   noRetries, // no retries by default
 | |
| 		deprecationlog:            noDeprecationLog,
 | |
| 	}
 | |
| 
 | |
| 	// Run the options on it
 | |
| 	for _, option := range options {
 | |
| 		if err := option(c); err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Use a default URL and normalize them
 | |
| 	if len(c.urls) == 0 {
 | |
| 		c.urls = []string{DefaultURL}
 | |
| 	}
 | |
| 	c.urls = canonicalize(c.urls...)
 | |
| 
 | |
| 	// If the URLs have auth info, use them here as an alternative to SetBasicAuth
 | |
| 	if !c.basicAuth {
 | |
| 		for _, urlStr := range c.urls {
 | |
| 			u, err := url.Parse(urlStr)
 | |
| 			if err == nil && u.User != nil {
 | |
| 				c.basicAuth = true
 | |
| 				c.basicAuthUsername = u.User.Username()
 | |
| 				c.basicAuthPassword, _ = u.User.Password()
 | |
| 				break
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Check if we can make a request to any of the specified URLs
 | |
| 	if c.healthcheckEnabled {
 | |
| 		if err := c.startupHealthcheck(ctx, c.healthcheckTimeoutStartup); err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if c.snifferEnabled {
 | |
| 		// Sniff the cluster initially
 | |
| 		if err := c.sniff(ctx, c.snifferTimeoutStartup); err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 	} else {
 | |
| 		// Do not sniff the cluster initially. Use the provided URLs instead.
 | |
| 		for _, url := range c.urls {
 | |
| 			c.conns = append(c.conns, newConn(url, url))
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if c.healthcheckEnabled {
 | |
| 		// Perform an initial health check
 | |
| 		c.healthcheck(ctx, c.healthcheckTimeoutStartup, true)
 | |
| 	}
 | |
| 	// Ensure that we have at least one connection available
 | |
| 	if err := c.mustActiveConn(); err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	// Check the required plugins
 | |
| 	for _, plugin := range c.requiredPlugins {
 | |
| 		found, err := c.HasPlugin(plugin)
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 		if !found {
 | |
| 			return nil, fmt.Errorf("elastic: plugin %s not found", plugin)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if c.snifferEnabled {
 | |
| 		go c.sniffer() // periodically update cluster information
 | |
| 	}
 | |
| 	if c.healthcheckEnabled {
 | |
| 		go c.healthchecker() // start goroutine periodically ping all nodes of the cluster
 | |
| 	}
 | |
| 
 | |
| 	c.mu.Lock()
 | |
| 	c.running = true
 | |
| 	c.mu.Unlock()
 | |
| 
 | |
| 	return c, nil
 | |
| }
 | |
| 
 | |
| // DialWithConfig will use the configuration settings parsed from config package
 | |
| // to connect to Elasticsearch.
 | |
| //
 | |
| // The context is honoured in terms of e.g. cancellation.
 | |
| func DialWithConfig(ctx context.Context, cfg *config.Config) (*Client, error) {
 | |
| 	options, err := configToOptions(cfg)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	return DialContext(ctx, options...)
 | |
| }
 | |
| 
 | |
| func configToOptions(cfg *config.Config) ([]ClientOptionFunc, error) {
 | |
| 	var options []ClientOptionFunc
 | |
| 	if cfg != nil {
 | |
| 		if cfg.URL != "" {
 | |
| 			options = append(options, SetURL(cfg.URL))
 | |
| 		}
 | |
| 		if cfg.Errorlog != "" {
 | |
| 			f, err := os.OpenFile(cfg.Errorlog, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
 | |
| 			if err != nil {
 | |
| 				return nil, errors.Wrap(err, "unable to initialize error log")
 | |
| 			}
 | |
| 			l := log.New(f, "", 0)
 | |
| 			options = append(options, SetErrorLog(l))
 | |
| 		}
 | |
| 		if cfg.Tracelog != "" {
 | |
| 			f, err := os.OpenFile(cfg.Tracelog, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
 | |
| 			if err != nil {
 | |
| 				return nil, errors.Wrap(err, "unable to initialize trace log")
 | |
| 			}
 | |
| 			l := log.New(f, "", 0)
 | |
| 			options = append(options, SetTraceLog(l))
 | |
| 		}
 | |
| 		if cfg.Infolog != "" {
 | |
| 			f, err := os.OpenFile(cfg.Infolog, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
 | |
| 			if err != nil {
 | |
| 				return nil, errors.Wrap(err, "unable to initialize info log")
 | |
| 			}
 | |
| 			l := log.New(f, "", 0)
 | |
| 			options = append(options, SetInfoLog(l))
 | |
| 		}
 | |
| 		if cfg.Username != "" || cfg.Password != "" {
 | |
| 			options = append(options, SetBasicAuth(cfg.Username, cfg.Password))
 | |
| 		}
 | |
| 		if cfg.Sniff != nil {
 | |
| 			options = append(options, SetSniff(*cfg.Sniff))
 | |
| 		}
 | |
| 		/*
 | |
| 			if cfg.Healthcheck != nil {
 | |
| 				options = append(options, SetHealthcheck(*cfg.Healthcheck))
 | |
| 			}
 | |
| 		*/
 | |
| 	}
 | |
| 	return options, nil
 | |
| }
 | |
| 
 | |
| // SetHttpClient can be used to specify the http.Client to use when making
 | |
| // HTTP requests to Elasticsearch.
 | |
| func SetHttpClient(httpClient Doer) ClientOptionFunc {
 | |
| 	return func(c *Client) error {
 | |
| 		if httpClient != nil {
 | |
| 			c.c = httpClient
 | |
| 		} else {
 | |
| 			c.c = http.DefaultClient
 | |
| 		}
 | |
| 		return nil
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // SetBasicAuth can be used to specify the HTTP Basic Auth credentials to
 | |
| // use when making HTTP requests to Elasticsearch.
 | |
| func SetBasicAuth(username, password string) ClientOptionFunc {
 | |
| 	return func(c *Client) error {
 | |
| 		c.basicAuthUsername = username
 | |
| 		c.basicAuthPassword = password
 | |
| 		c.basicAuth = c.basicAuthUsername != "" || c.basicAuthPassword != ""
 | |
| 		return nil
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // SetURL defines the URL endpoints of the Elasticsearch nodes. Notice that
 | |
| // when sniffing is enabled, these URLs are used to initially sniff the
 | |
| // cluster on startup.
 | |
| func SetURL(urls ...string) ClientOptionFunc {
 | |
| 	return func(c *Client) error {
 | |
| 		switch len(urls) {
 | |
| 		case 0:
 | |
| 			c.urls = []string{DefaultURL}
 | |
| 		default:
 | |
| 			c.urls = urls
 | |
| 		}
 | |
| 		return nil
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // SetScheme sets the HTTP scheme to look for when sniffing (http or https).
 | |
| // This is http by default.
 | |
| func SetScheme(scheme string) ClientOptionFunc {
 | |
| 	return func(c *Client) error {
 | |
| 		c.scheme = scheme
 | |
| 		return nil
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // SetSniff enables or disables the sniffer (enabled by default).
 | |
| func SetSniff(enabled bool) ClientOptionFunc {
 | |
| 	return func(c *Client) error {
 | |
| 		c.snifferEnabled = enabled
 | |
| 		return nil
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // SetSnifferTimeoutStartup sets the timeout for the sniffer that is used
 | |
| // when creating a new client. The default is 5 seconds. Notice that the
 | |
| // timeout being used for subsequent sniffing processes is set with
 | |
| // SetSnifferTimeout.
 | |
| func SetSnifferTimeoutStartup(timeout time.Duration) ClientOptionFunc {
 | |
| 	return func(c *Client) error {
 | |
| 		c.snifferTimeoutStartup = timeout
 | |
| 		return nil
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // SetSnifferTimeout sets the timeout for the sniffer that finds the
 | |
| // nodes in a cluster. The default is 2 seconds. Notice that the timeout
 | |
| // used when creating a new client on startup is usually greater and can
 | |
| // be set with SetSnifferTimeoutStartup.
 | |
| func SetSnifferTimeout(timeout time.Duration) ClientOptionFunc {
 | |
| 	return func(c *Client) error {
 | |
| 		c.snifferTimeout = timeout
 | |
| 		return nil
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // SetSnifferInterval sets the interval between two sniffing processes.
 | |
| // The default interval is 15 minutes.
 | |
| func SetSnifferInterval(interval time.Duration) ClientOptionFunc {
 | |
| 	return func(c *Client) error {
 | |
| 		c.snifferInterval = interval
 | |
| 		return nil
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // SnifferCallback defines the protocol for sniffing decisions.
 | |
| type SnifferCallback func(*NodesInfoNode) bool
 | |
| 
 | |
| // nopSnifferCallback is the default sniffer callback: It accepts
 | |
| // all nodes the sniffer finds.
 | |
| var nopSnifferCallback = func(*NodesInfoNode) bool { return true }
 | |
| 
 | |
| // SetSnifferCallback allows the caller to modify sniffer decisions.
 | |
| // When setting the callback, the given SnifferCallback is called for
 | |
| // each (healthy) node found during the sniffing process.
 | |
| // If the callback returns false, the node is ignored: No requests
 | |
| // are routed to it.
 | |
| func SetSnifferCallback(f SnifferCallback) ClientOptionFunc {
 | |
| 	return func(c *Client) error {
 | |
| 		if f != nil {
 | |
| 			c.snifferCallback = f
 | |
| 		}
 | |
| 		return nil
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // SetHealthcheck enables or disables healthchecks (enabled by default).
 | |
| func SetHealthcheck(enabled bool) ClientOptionFunc {
 | |
| 	return func(c *Client) error {
 | |
| 		c.healthcheckEnabled = enabled
 | |
| 		return nil
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // SetHealthcheckTimeoutStartup sets the timeout for the initial health check.
 | |
| // The default timeout is 5 seconds (see DefaultHealthcheckTimeoutStartup).
 | |
| // Notice that timeouts for subsequent health checks can be modified with
 | |
| // SetHealthcheckTimeout.
 | |
| func SetHealthcheckTimeoutStartup(timeout time.Duration) ClientOptionFunc {
 | |
| 	return func(c *Client) error {
 | |
| 		c.healthcheckTimeoutStartup = timeout
 | |
| 		return nil
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // SetHealthcheckTimeout sets the timeout for periodic health checks.
 | |
| // The default timeout is 1 second (see DefaultHealthcheckTimeout).
 | |
| // Notice that a different (usually larger) timeout is used for the initial
 | |
| // healthcheck, which is initiated while creating a new client.
 | |
| // The startup timeout can be modified with SetHealthcheckTimeoutStartup.
 | |
| func SetHealthcheckTimeout(timeout time.Duration) ClientOptionFunc {
 | |
| 	return func(c *Client) error {
 | |
| 		c.healthcheckTimeout = timeout
 | |
| 		return nil
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // SetHealthcheckInterval sets the interval between two health checks.
 | |
| // The default interval is 60 seconds.
 | |
| func SetHealthcheckInterval(interval time.Duration) ClientOptionFunc {
 | |
| 	return func(c *Client) error {
 | |
| 		c.healthcheckInterval = interval
 | |
| 		return nil
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // SetMaxRetries sets the maximum number of retries before giving up when
 | |
| // performing a HTTP request to Elasticsearch.
 | |
| //
 | |
| // Deprecated: Replace with a Retry implementation.
 | |
| func SetMaxRetries(maxRetries int) ClientOptionFunc {
 | |
| 	return func(c *Client) error {
 | |
| 		if maxRetries < 0 {
 | |
| 			return errors.New("MaxRetries must be greater than or equal to 0")
 | |
| 		} else if maxRetries == 0 {
 | |
| 			c.retrier = noRetries
 | |
| 		} else {
 | |
| 			// Create a Retrier that will wait for 100ms (+/- jitter) between requests.
 | |
| 			// This resembles the old behavior with maxRetries.
 | |
| 			ticks := make([]int, maxRetries)
 | |
| 			for i := 0; i < len(ticks); i++ {
 | |
| 				ticks[i] = 100
 | |
| 			}
 | |
| 			backoff := NewSimpleBackoff(ticks...)
 | |
| 			c.retrier = NewBackoffRetrier(backoff)
 | |
| 		}
 | |
| 		return nil
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // SetGzip enables or disables gzip compression (disabled by default).
 | |
| func SetGzip(enabled bool) ClientOptionFunc {
 | |
| 	return func(c *Client) error {
 | |
| 		c.gzipEnabled = enabled
 | |
| 		return nil
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // SetDecoder sets the Decoder to use when decoding data from Elasticsearch.
 | |
| // DefaultDecoder is used by default.
 | |
| func SetDecoder(decoder Decoder) ClientOptionFunc {
 | |
| 	return func(c *Client) error {
 | |
| 		if decoder != nil {
 | |
| 			c.decoder = decoder
 | |
| 		} else {
 | |
| 			c.decoder = &DefaultDecoder{}
 | |
| 		}
 | |
| 		return nil
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // SetRequiredPlugins can be used to indicate that some plugins are required
 | |
| // before a Client will be created.
 | |
| func SetRequiredPlugins(plugins ...string) ClientOptionFunc {
 | |
| 	return func(c *Client) error {
 | |
| 		if c.requiredPlugins == nil {
 | |
| 			c.requiredPlugins = make([]string, 0)
 | |
| 		}
 | |
| 		c.requiredPlugins = append(c.requiredPlugins, plugins...)
 | |
| 		return nil
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // SetErrorLog sets the logger for critical messages like nodes joining
 | |
| // or leaving the cluster or failing requests. It is nil by default.
 | |
| func SetErrorLog(logger Logger) ClientOptionFunc {
 | |
| 	return func(c *Client) error {
 | |
| 		c.errorlog = logger
 | |
| 		return nil
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // SetInfoLog sets the logger for informational messages, e.g. requests
 | |
| // and their response times. It is nil by default.
 | |
| func SetInfoLog(logger Logger) ClientOptionFunc {
 | |
| 	return func(c *Client) error {
 | |
| 		c.infolog = logger
 | |
| 		return nil
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // SetTraceLog specifies the log.Logger to use for output of HTTP requests
 | |
| // and responses which is helpful during debugging. It is nil by default.
 | |
| func SetTraceLog(logger Logger) ClientOptionFunc {
 | |
| 	return func(c *Client) error {
 | |
| 		c.tracelog = logger
 | |
| 		return nil
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // SetSendGetBodyAs specifies the HTTP method to use when sending a GET request
 | |
| // with a body. It is GET by default.
 | |
| func SetSendGetBodyAs(httpMethod string) ClientOptionFunc {
 | |
| 	return func(c *Client) error {
 | |
| 		c.sendGetBodyAs = httpMethod
 | |
| 		return nil
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // SetRetrier specifies the retry strategy that handles errors during
 | |
| // HTTP request/response with Elasticsearch.
 | |
| func SetRetrier(retrier Retrier) ClientOptionFunc {
 | |
| 	return func(c *Client) error {
 | |
| 		if retrier == nil {
 | |
| 			retrier = noRetries // no retries by default
 | |
| 		}
 | |
| 		c.retrier = retrier
 | |
| 		return nil
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // SetHeaders adds a list of default HTTP headers that will be added to
 | |
| // each requests executed by PerformRequest.
 | |
| func SetHeaders(headers http.Header) ClientOptionFunc {
 | |
| 	return func(c *Client) error {
 | |
| 		c.headers = headers
 | |
| 		return nil
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // String returns a string representation of the client status.
 | |
| func (c *Client) String() string {
 | |
| 	c.connsMu.Lock()
 | |
| 	conns := c.conns
 | |
| 	c.connsMu.Unlock()
 | |
| 
 | |
| 	var buf bytes.Buffer
 | |
| 	for i, conn := range conns {
 | |
| 		if i > 0 {
 | |
| 			buf.WriteString(", ")
 | |
| 		}
 | |
| 		buf.WriteString(conn.String())
 | |
| 	}
 | |
| 	return buf.String()
 | |
| }
 | |
| 
 | |
| // IsRunning returns true if the background processes of the client are
 | |
| // running, false otherwise.
 | |
| func (c *Client) IsRunning() bool {
 | |
| 	c.mu.RLock()
 | |
| 	defer c.mu.RUnlock()
 | |
| 	return c.running
 | |
| }
 | |
| 
 | |
| // Start starts the background processes like sniffing the cluster and
 | |
| // periodic health checks. You don't need to run Start when creating a
 | |
| // client with NewClient; the background processes are run by default.
 | |
| //
 | |
| // If the background processes are already running, this is a no-op.
 | |
| func (c *Client) Start() {
 | |
| 	c.mu.RLock()
 | |
| 	if c.running {
 | |
| 		c.mu.RUnlock()
 | |
| 		return
 | |
| 	}
 | |
| 	c.mu.RUnlock()
 | |
| 
 | |
| 	if c.snifferEnabled {
 | |
| 		go c.sniffer()
 | |
| 	}
 | |
| 	if c.healthcheckEnabled {
 | |
| 		go c.healthchecker()
 | |
| 	}
 | |
| 
 | |
| 	c.mu.Lock()
 | |
| 	c.running = true
 | |
| 	c.mu.Unlock()
 | |
| 
 | |
| 	c.infof("elastic: client started")
 | |
| }
 | |
| 
 | |
| // Stop stops the background processes that the client is running,
 | |
| // i.e. sniffing the cluster periodically and running health checks
 | |
| // on the nodes.
 | |
| //
 | |
| // If the background processes are not running, this is a no-op.
 | |
| func (c *Client) Stop() {
 | |
| 	c.mu.RLock()
 | |
| 	if !c.running {
 | |
| 		c.mu.RUnlock()
 | |
| 		return
 | |
| 	}
 | |
| 	c.mu.RUnlock()
 | |
| 
 | |
| 	if c.healthcheckEnabled {
 | |
| 		c.healthcheckStop <- true
 | |
| 		<-c.healthcheckStop
 | |
| 	}
 | |
| 
 | |
| 	if c.snifferEnabled {
 | |
| 		c.snifferStop <- true
 | |
| 		<-c.snifferStop
 | |
| 	}
 | |
| 
 | |
| 	c.mu.Lock()
 | |
| 	c.running = false
 | |
| 	c.mu.Unlock()
 | |
| 
 | |
| 	c.infof("elastic: client stopped")
 | |
| }
 | |
| 
 | |
| var logDeprecation = func(*http.Request, *http.Response) {}
 | |
| 
 | |
| // errorf logs to the error log.
 | |
| func (c *Client) errorf(format string, args ...interface{}) {
 | |
| 	if c.errorlog != nil {
 | |
| 		c.errorlog.Printf(format, args...)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // infof logs informational messages.
 | |
| func (c *Client) infof(format string, args ...interface{}) {
 | |
| 	if c.infolog != nil {
 | |
| 		c.infolog.Printf(format, args...)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // tracef logs to the trace log.
 | |
| func (c *Client) tracef(format string, args ...interface{}) {
 | |
| 	if c.tracelog != nil {
 | |
| 		c.tracelog.Printf(format, args...)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // dumpRequest dumps the given HTTP request to the trace log.
 | |
| func (c *Client) dumpRequest(r *http.Request) {
 | |
| 	if c.tracelog != nil {
 | |
| 		out, err := httputil.DumpRequestOut(r, true)
 | |
| 		if err == nil {
 | |
| 			c.tracef("%s\n", string(out))
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // dumpResponse dumps the given HTTP response to the trace log.
 | |
| func (c *Client) dumpResponse(resp *http.Response) {
 | |
| 	if c.tracelog != nil {
 | |
| 		out, err := httputil.DumpResponse(resp, true)
 | |
| 		if err == nil {
 | |
| 			c.tracef("%s\n", string(out))
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // sniffer periodically runs sniff.
 | |
| func (c *Client) sniffer() {
 | |
| 	c.mu.RLock()
 | |
| 	timeout := c.snifferTimeout
 | |
| 	interval := c.snifferInterval
 | |
| 	c.mu.RUnlock()
 | |
| 
 | |
| 	ticker := time.NewTicker(interval)
 | |
| 	defer ticker.Stop()
 | |
| 
 | |
| 	for {
 | |
| 		select {
 | |
| 		case <-c.snifferStop:
 | |
| 			// we are asked to stop, so we signal back that we're stopping now
 | |
| 			c.snifferStop <- true
 | |
| 			return
 | |
| 		case <-ticker.C:
 | |
| 			c.sniff(context.Background(), timeout)
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // sniff uses the Node Info API to return the list of nodes in the cluster.
 | |
| // It uses the list of URLs passed on startup plus the list of URLs found
 | |
| // by the preceding sniffing process (if sniffing is enabled).
 | |
| //
 | |
| // If sniffing is disabled, this is a no-op.
 | |
| func (c *Client) sniff(parentCtx context.Context, timeout time.Duration) error {
 | |
| 	c.mu.RLock()
 | |
| 	if !c.snifferEnabled {
 | |
| 		c.mu.RUnlock()
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	// Use all available URLs provided to sniff the cluster.
 | |
| 	var urls []string
 | |
| 	urlsMap := make(map[string]bool)
 | |
| 
 | |
| 	// Add all URLs provided on startup
 | |
| 	for _, url := range c.urls {
 | |
| 		urlsMap[url] = true
 | |
| 		urls = append(urls, url)
 | |
| 	}
 | |
| 	c.mu.RUnlock()
 | |
| 
 | |
| 	// Add all URLs found by sniffing
 | |
| 	c.connsMu.RLock()
 | |
| 	for _, conn := range c.conns {
 | |
| 		if !conn.IsDead() {
 | |
| 			url := conn.URL()
 | |
| 			if _, found := urlsMap[url]; !found {
 | |
| 				urls = append(urls, url)
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 	c.connsMu.RUnlock()
 | |
| 
 | |
| 	if len(urls) == 0 {
 | |
| 		return errors.Wrap(ErrNoClient, "no URLs found")
 | |
| 	}
 | |
| 
 | |
| 	// Start sniffing on all found URLs
 | |
| 	ch := make(chan []*conn, len(urls))
 | |
| 
 | |
| 	ctx, cancel := context.WithTimeout(parentCtx, timeout)
 | |
| 	defer cancel()
 | |
| 
 | |
| 	for _, url := range urls {
 | |
| 		go func(url string) { ch <- c.sniffNode(ctx, url) }(url)
 | |
| 	}
 | |
| 
 | |
| 	// Wait for the results to come back, or the process times out.
 | |
| 	for {
 | |
| 		select {
 | |
| 		case conns := <-ch:
 | |
| 			if len(conns) > 0 {
 | |
| 				c.updateConns(conns)
 | |
| 				return nil
 | |
| 			}
 | |
| 		case <-ctx.Done():
 | |
| 			if err := ctx.Err(); err != nil {
 | |
| 				switch {
 | |
| 				case IsContextErr(err):
 | |
| 					return err
 | |
| 				}
 | |
| 				return errors.Wrapf(ErrNoClient, "sniff timeout: %v", err)
 | |
| 			}
 | |
| 			// We get here if no cluster responds in time
 | |
| 			return errors.Wrap(ErrNoClient, "sniff timeout")
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // sniffNode sniffs a single node. This method is run as a goroutine
 | |
| // in sniff. If successful, it returns the list of node URLs extracted
 | |
| // from the result of calling Nodes Info API. Otherwise, an empty array
 | |
| // is returned.
 | |
| func (c *Client) sniffNode(ctx context.Context, url string) []*conn {
 | |
| 	var nodes []*conn
 | |
| 
 | |
| 	// Call the Nodes Info API at /_nodes/http
 | |
| 	req, err := NewRequest("GET", url+"/_nodes/http")
 | |
| 	if err != nil {
 | |
| 		return nodes
 | |
| 	}
 | |
| 
 | |
| 	c.mu.RLock()
 | |
| 	if c.basicAuth {
 | |
| 		req.SetBasicAuth(c.basicAuthUsername, c.basicAuthPassword)
 | |
| 	}
 | |
| 	c.mu.RUnlock()
 | |
| 
 | |
| 	res, err := c.c.Do((*http.Request)(req).WithContext(ctx))
 | |
| 	if err != nil {
 | |
| 		return nodes
 | |
| 	}
 | |
| 	defer res.Body.Close()
 | |
| 
 | |
| 	var info NodesInfoResponse
 | |
| 	if err := json.NewDecoder(res.Body).Decode(&info); err == nil {
 | |
| 		if len(info.Nodes) > 0 {
 | |
| 			for nodeID, node := range info.Nodes {
 | |
| 				if c.snifferCallback(node) {
 | |
| 					if node.HTTP != nil && len(node.HTTP.PublishAddress) > 0 {
 | |
| 						url := c.extractHostname(c.scheme, node.HTTP.PublishAddress)
 | |
| 						if url != "" {
 | |
| 							nodes = append(nodes, newConn(nodeID, url))
 | |
| 						}
 | |
| 					}
 | |
| 				}
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 	return nodes
 | |
| }
 | |
| 
 | |
| // reSniffHostAndPort is used to extract hostname and port from a result
 | |
| // from a Nodes Info API (example: "inet[/127.0.0.1:9200]").
 | |
| var reSniffHostAndPort = regexp.MustCompile(`\/([^:]*):([0-9]+)\]`)
 | |
| 
 | |
| func (c *Client) extractHostname(scheme, address string) string {
 | |
| 	if strings.HasPrefix(address, "inet") {
 | |
| 		m := reSniffHostAndPort.FindStringSubmatch(address)
 | |
| 		if len(m) == 3 {
 | |
| 			return fmt.Sprintf("%s://%s:%s", scheme, m[1], m[2])
 | |
| 		}
 | |
| 	}
 | |
| 	s := address
 | |
| 	if idx := strings.Index(s, "/"); idx >= 0 {
 | |
| 		s = s[idx+1:]
 | |
| 	}
 | |
| 	if !strings.Contains(s, ":") {
 | |
| 		return ""
 | |
| 	}
 | |
| 	return fmt.Sprintf("%s://%s", scheme, s)
 | |
| }
 | |
| 
 | |
| // updateConns updates the clients' connections with new information
 | |
| // gather by a sniff operation.
 | |
| func (c *Client) updateConns(conns []*conn) {
 | |
| 	c.connsMu.Lock()
 | |
| 
 | |
| 	// Build up new connections:
 | |
| 	// If we find an existing connection, use that (including no. of failures etc.).
 | |
| 	// If we find a new connection, add it.
 | |
| 	var newConns []*conn
 | |
| 	for _, conn := range conns {
 | |
| 		var found bool
 | |
| 		for _, oldConn := range c.conns {
 | |
| 			// Notice that e.g. in a Kubernetes cluster the NodeID might be
 | |
| 			// stable while the URL has changed.
 | |
| 			if oldConn.NodeID() == conn.NodeID() && oldConn.URL() == conn.URL() {
 | |
| 				// Take over the old connection
 | |
| 				newConns = append(newConns, oldConn)
 | |
| 				found = true
 | |
| 				break
 | |
| 			}
 | |
| 		}
 | |
| 		if !found {
 | |
| 			// New connection didn't exist, so add it to our list of new conns.
 | |
| 			c.infof("elastic: %s joined the cluster", conn.URL())
 | |
| 			newConns = append(newConns, conn)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	c.conns = newConns
 | |
| 	c.cindex = -1
 | |
| 	c.connsMu.Unlock()
 | |
| }
 | |
| 
 | |
| // healthchecker periodically runs healthcheck.
 | |
| func (c *Client) healthchecker() {
 | |
| 	c.mu.RLock()
 | |
| 	timeout := c.healthcheckTimeout
 | |
| 	interval := c.healthcheckInterval
 | |
| 	c.mu.RUnlock()
 | |
| 
 | |
| 	ticker := time.NewTicker(interval)
 | |
| 	defer ticker.Stop()
 | |
| 
 | |
| 	for {
 | |
| 		select {
 | |
| 		case <-c.healthcheckStop:
 | |
| 			// we are asked to stop, so we signal back that we're stopping now
 | |
| 			c.healthcheckStop <- true
 | |
| 			return
 | |
| 		case <-ticker.C:
 | |
| 			c.healthcheck(context.Background(), timeout, false)
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // healthcheck does a health check on all nodes in the cluster. Depending on
 | |
| // the node state, it marks connections as dead, sets them alive etc.
 | |
| // If healthchecks are disabled and force is false, this is a no-op.
 | |
| // The timeout specifies how long to wait for a response from Elasticsearch.
 | |
| func (c *Client) healthcheck(parentCtx context.Context, timeout time.Duration, force bool) {
 | |
| 	c.mu.RLock()
 | |
| 	if !c.healthcheckEnabled && !force {
 | |
| 		c.mu.RUnlock()
 | |
| 		return
 | |
| 	}
 | |
| 	basicAuth := c.basicAuth
 | |
| 	basicAuthUsername := c.basicAuthUsername
 | |
| 	basicAuthPassword := c.basicAuthPassword
 | |
| 	c.mu.RUnlock()
 | |
| 
 | |
| 	c.connsMu.RLock()
 | |
| 	conns := c.conns
 | |
| 	c.connsMu.RUnlock()
 | |
| 
 | |
| 	for _, conn := range conns {
 | |
| 		// Run the HEAD request against ES with a timeout
 | |
| 		ctx, cancel := context.WithTimeout(parentCtx, timeout)
 | |
| 		defer cancel()
 | |
| 
 | |
| 		// Goroutine executes the HTTP request, returns an error and sets status
 | |
| 		var status int
 | |
| 		errc := make(chan error, 1)
 | |
| 		go func(url string) {
 | |
| 			req, err := NewRequest("HEAD", url)
 | |
| 			if err != nil {
 | |
| 				errc <- err
 | |
| 				return
 | |
| 			}
 | |
| 			if basicAuth {
 | |
| 				req.SetBasicAuth(basicAuthUsername, basicAuthPassword)
 | |
| 			}
 | |
| 			res, err := c.c.Do((*http.Request)(req).WithContext(ctx))
 | |
| 			if res != nil {
 | |
| 				status = res.StatusCode
 | |
| 				if res.Body != nil {
 | |
| 					res.Body.Close()
 | |
| 				}
 | |
| 			}
 | |
| 			errc <- err
 | |
| 		}(conn.URL())
 | |
| 
 | |
| 		// Wait for the Goroutine (or its timeout)
 | |
| 		select {
 | |
| 		case <-ctx.Done(): // timeout
 | |
| 			c.errorf("elastic: %s is dead", conn.URL())
 | |
| 			conn.MarkAsDead()
 | |
| 		case err := <-errc:
 | |
| 			if err != nil {
 | |
| 				c.errorf("elastic: %s is dead", conn.URL())
 | |
| 				conn.MarkAsDead()
 | |
| 				break
 | |
| 			}
 | |
| 			if status >= 200 && status < 300 {
 | |
| 				conn.MarkAsAlive()
 | |
| 			} else {
 | |
| 				conn.MarkAsDead()
 | |
| 				c.errorf("elastic: %s is dead [status=%d]", conn.URL(), status)
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // startupHealthcheck is used at startup to check if the server is available
 | |
| // at all.
 | |
| func (c *Client) startupHealthcheck(parentCtx context.Context, timeout time.Duration) error {
 | |
| 	c.mu.Lock()
 | |
| 	urls := c.urls
 | |
| 	basicAuth := c.basicAuth
 | |
| 	basicAuthUsername := c.basicAuthUsername
 | |
| 	basicAuthPassword := c.basicAuthPassword
 | |
| 	c.mu.Unlock()
 | |
| 
 | |
| 	// If we don't get a connection after "timeout", we bail.
 | |
| 	var lastErr error
 | |
| 	start := time.Now()
 | |
| 	done := false
 | |
| 	for !done {
 | |
| 		for _, url := range urls {
 | |
| 			req, err := http.NewRequest("HEAD", url, nil)
 | |
| 			if err != nil {
 | |
| 				return err
 | |
| 			}
 | |
| 			if basicAuth {
 | |
| 				req.SetBasicAuth(basicAuthUsername, basicAuthPassword)
 | |
| 			}
 | |
| 			ctx, cancel := context.WithTimeout(parentCtx, timeout)
 | |
| 			defer cancel()
 | |
| 			req = req.WithContext(ctx)
 | |
| 			res, err := c.c.Do(req)
 | |
| 			if err == nil && res != nil && res.StatusCode >= 200 && res.StatusCode < 300 {
 | |
| 				return nil
 | |
| 			} else if err != nil {
 | |
| 				lastErr = err
 | |
| 			}
 | |
| 		}
 | |
| 		select {
 | |
| 		case <-parentCtx.Done():
 | |
| 			lastErr = parentCtx.Err()
 | |
| 			done = true
 | |
| 		case <-time.After(1 * time.Second):
 | |
| 			if time.Since(start) > timeout {
 | |
| 				done = true
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 	if lastErr != nil {
 | |
| 		if IsContextErr(lastErr) {
 | |
| 			return lastErr
 | |
| 		}
 | |
| 		return errors.Wrapf(ErrNoClient, "health check timeout: %v", lastErr)
 | |
| 	}
 | |
| 	return errors.Wrap(ErrNoClient, "health check timeout")
 | |
| }
 | |
| 
 | |
| // next returns the next available connection, or ErrNoClient.
 | |
| func (c *Client) next() (*conn, error) {
 | |
| 	// We do round-robin here.
 | |
| 	// TODO(oe) This should be a pluggable strategy, like the Selector in the official clients.
 | |
| 	c.connsMu.Lock()
 | |
| 	defer c.connsMu.Unlock()
 | |
| 
 | |
| 	i := 0
 | |
| 	numConns := len(c.conns)
 | |
| 	for {
 | |
| 		i++
 | |
| 		if i > numConns {
 | |
| 			break // we visited all conns: they all seem to be dead
 | |
| 		}
 | |
| 		c.cindex++
 | |
| 		if c.cindex >= numConns {
 | |
| 			c.cindex = 0
 | |
| 		}
 | |
| 		conn := c.conns[c.cindex]
 | |
| 		if !conn.IsDead() {
 | |
| 			return conn, nil
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// We have a deadlock here: All nodes are marked as dead.
 | |
| 	// If sniffing is disabled, connections will never be marked alive again.
 | |
| 	// So we are marking them as alive--if sniffing is disabled.
 | |
| 	// They'll then be picked up in the next call to PerformRequest.
 | |
| 	if !c.snifferEnabled {
 | |
| 		c.errorf("elastic: all %d nodes marked as dead; resurrecting them to prevent deadlock", len(c.conns))
 | |
| 		for _, conn := range c.conns {
 | |
| 			conn.MarkAsAlive()
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// We tried hard, but there is no node available
 | |
| 	return nil, errors.Wrap(ErrNoClient, "no available connection")
 | |
| }
 | |
| 
 | |
| // mustActiveConn returns nil if there is an active connection,
 | |
| // otherwise ErrNoClient is returned.
 | |
| func (c *Client) mustActiveConn() error {
 | |
| 	c.connsMu.Lock()
 | |
| 	defer c.connsMu.Unlock()
 | |
| 
 | |
| 	for _, c := range c.conns {
 | |
| 		if !c.IsDead() {
 | |
| 			return nil
 | |
| 		}
 | |
| 	}
 | |
| 	return errors.Wrap(ErrNoClient, "no active connection found")
 | |
| }
 | |
| 
 | |
| // -- PerformRequest --
 | |
| 
 | |
| // PerformRequestOptions must be passed into PerformRequest.
 | |
| type PerformRequestOptions struct {
 | |
| 	Method          string
 | |
| 	Path            string
 | |
| 	Params          url.Values
 | |
| 	Body            interface{}
 | |
| 	ContentType     string
 | |
| 	IgnoreErrors    []int
 | |
| 	Retrier         Retrier
 | |
| 	Headers         http.Header
 | |
| 	MaxResponseSize int64
 | |
| }
 | |
| 
 | |
| // PerformRequest does a HTTP request to Elasticsearch.
 | |
| // It returns a response (which might be nil) and an error on failure.
 | |
| //
 | |
| // Optionally, a list of HTTP error codes to ignore can be passed.
 | |
| // This is necessary for services that expect e.g. HTTP status 404 as a
 | |
| // valid outcome (Exists, IndicesExists, IndicesTypeExists).
 | |
| func (c *Client) PerformRequest(ctx context.Context, opt PerformRequestOptions) (*Response, error) {
 | |
| 	start := time.Now().UTC()
 | |
| 
 | |
| 	c.mu.RLock()
 | |
| 	timeout := c.healthcheckTimeout
 | |
| 	basicAuth := c.basicAuth
 | |
| 	basicAuthUsername := c.basicAuthUsername
 | |
| 	basicAuthPassword := c.basicAuthPassword
 | |
| 	sendGetBodyAs := c.sendGetBodyAs
 | |
| 	gzipEnabled := c.gzipEnabled
 | |
| 	retrier := c.retrier
 | |
| 	if opt.Retrier != nil {
 | |
| 		retrier = opt.Retrier
 | |
| 	}
 | |
| 	defaultHeaders := c.headers
 | |
| 	c.mu.RUnlock()
 | |
| 
 | |
| 	var err error
 | |
| 	var conn *conn
 | |
| 	var req *Request
 | |
| 	var resp *Response
 | |
| 	var retried bool
 | |
| 	var n int
 | |
| 
 | |
| 	// Change method if sendGetBodyAs is specified.
 | |
| 	if opt.Method == "GET" && opt.Body != nil && sendGetBodyAs != "GET" {
 | |
| 		opt.Method = sendGetBodyAs
 | |
| 	}
 | |
| 
 | |
| 	for {
 | |
| 		pathWithParams := opt.Path
 | |
| 		if len(opt.Params) > 0 {
 | |
| 			pathWithParams += "?" + opt.Params.Encode()
 | |
| 		}
 | |
| 
 | |
| 		// Get a connection
 | |
| 		conn, err = c.next()
 | |
| 		if errors.Cause(err) == ErrNoClient {
 | |
| 			n++
 | |
| 			if !retried {
 | |
| 				// Force a healtcheck as all connections seem to be dead.
 | |
| 				c.healthcheck(ctx, timeout, false)
 | |
| 			}
 | |
| 			wait, ok, rerr := retrier.Retry(ctx, n, nil, nil, err)
 | |
| 			if rerr != nil {
 | |
| 				return nil, rerr
 | |
| 			}
 | |
| 			if !ok {
 | |
| 				return nil, err
 | |
| 			}
 | |
| 			retried = true
 | |
| 			time.Sleep(wait)
 | |
| 			continue // try again
 | |
| 		}
 | |
| 		if err != nil {
 | |
| 			c.errorf("elastic: cannot get connection from pool")
 | |
| 			return nil, err
 | |
| 		}
 | |
| 
 | |
| 		req, err = NewRequest(opt.Method, conn.URL()+pathWithParams)
 | |
| 		if err != nil {
 | |
| 			c.errorf("elastic: cannot create request for %s %s: %v", strings.ToUpper(opt.Method), conn.URL()+pathWithParams, err)
 | |
| 			return nil, err
 | |
| 		}
 | |
| 		if basicAuth {
 | |
| 			req.SetBasicAuth(basicAuthUsername, basicAuthPassword)
 | |
| 		}
 | |
| 		if opt.ContentType != "" {
 | |
| 			req.Header.Set("Content-Type", opt.ContentType)
 | |
| 		}
 | |
| 		if len(opt.Headers) > 0 {
 | |
| 			for key, value := range opt.Headers {
 | |
| 				for _, v := range value {
 | |
| 					req.Header.Add(key, v)
 | |
| 				}
 | |
| 			}
 | |
| 		}
 | |
| 		if len(defaultHeaders) > 0 {
 | |
| 			for key, value := range defaultHeaders {
 | |
| 				for _, v := range value {
 | |
| 					req.Header.Add(key, v)
 | |
| 				}
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		// Set body
 | |
| 		if opt.Body != nil {
 | |
| 			err = req.SetBody(opt.Body, gzipEnabled)
 | |
| 			if err != nil {
 | |
| 				c.errorf("elastic: couldn't set body %+v for request: %v", opt.Body, err)
 | |
| 				return nil, err
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		// Tracing
 | |
| 		c.dumpRequest((*http.Request)(req))
 | |
| 
 | |
| 		// Get response
 | |
| 		res, err := c.c.Do((*http.Request)(req).WithContext(ctx))
 | |
| 		if IsContextErr(err) {
 | |
| 			// Proceed, but don't mark the node as dead
 | |
| 			return nil, err
 | |
| 		}
 | |
| 		if err != nil {
 | |
| 			n++
 | |
| 			wait, ok, rerr := retrier.Retry(ctx, n, (*http.Request)(req), res, err)
 | |
| 			if rerr != nil {
 | |
| 				c.errorf("elastic: %s is dead", conn.URL())
 | |
| 				conn.MarkAsDead()
 | |
| 				return nil, rerr
 | |
| 			}
 | |
| 			if !ok {
 | |
| 				c.errorf("elastic: %s is dead", conn.URL())
 | |
| 				conn.MarkAsDead()
 | |
| 				return nil, err
 | |
| 			}
 | |
| 			retried = true
 | |
| 			time.Sleep(wait)
 | |
| 			continue // try again
 | |
| 		}
 | |
| 		defer res.Body.Close()
 | |
| 
 | |
| 		// Tracing
 | |
| 		c.dumpResponse(res)
 | |
| 
 | |
| 		// Log deprecation warnings as errors
 | |
| 		if len(res.Header["Warning"]) > 0 {
 | |
| 			c.deprecationlog((*http.Request)(req), res)
 | |
| 			for _, warning := range res.Header["Warning"] {
 | |
| 				c.errorf("Deprecation warning: %s", warning)
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		// Check for errors
 | |
| 		if err := checkResponse((*http.Request)(req), res, opt.IgnoreErrors...); err != nil {
 | |
| 			// No retry if request succeeded
 | |
| 			// We still try to return a response.
 | |
| 			resp, _ = c.newResponse(res, opt.MaxResponseSize)
 | |
| 			return resp, err
 | |
| 		}
 | |
| 
 | |
| 		// We successfully made a request with this connection
 | |
| 		conn.MarkAsHealthy()
 | |
| 
 | |
| 		resp, err = c.newResponse(res, opt.MaxResponseSize)
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 
 | |
| 		break
 | |
| 	}
 | |
| 
 | |
| 	duration := time.Now().UTC().Sub(start)
 | |
| 	c.infof("%s %s [status:%d, request:%.3fs]",
 | |
| 		strings.ToUpper(opt.Method),
 | |
| 		req.URL,
 | |
| 		resp.StatusCode,
 | |
| 		float64(int64(duration/time.Millisecond))/1000)
 | |
| 
 | |
| 	return resp, nil
 | |
| }
 | |
| 
 | |
| // -- Document APIs --
 | |
| 
 | |
| // Index a document.
 | |
| func (c *Client) Index() *IndexService {
 | |
| 	return NewIndexService(c)
 | |
| }
 | |
| 
 | |
| // Get a document.
 | |
| func (c *Client) Get() *GetService {
 | |
| 	return NewGetService(c)
 | |
| }
 | |
| 
 | |
| // MultiGet retrieves multiple documents in one roundtrip.
 | |
| func (c *Client) MultiGet() *MgetService {
 | |
| 	return NewMgetService(c)
 | |
| }
 | |
| 
 | |
| // Mget retrieves multiple documents in one roundtrip.
 | |
| func (c *Client) Mget() *MgetService {
 | |
| 	return NewMgetService(c)
 | |
| }
 | |
| 
 | |
| // Delete a document.
 | |
| func (c *Client) Delete() *DeleteService {
 | |
| 	return NewDeleteService(c)
 | |
| }
 | |
| 
 | |
| // DeleteByQuery deletes documents as found by a query.
 | |
| func (c *Client) DeleteByQuery(indices ...string) *DeleteByQueryService {
 | |
| 	return NewDeleteByQueryService(c).Index(indices...)
 | |
| }
 | |
| 
 | |
| // Update a document.
 | |
| func (c *Client) Update() *UpdateService {
 | |
| 	return NewUpdateService(c)
 | |
| }
 | |
| 
 | |
| // UpdateByQuery performs an update on a set of documents.
 | |
| func (c *Client) UpdateByQuery(indices ...string) *UpdateByQueryService {
 | |
| 	return NewUpdateByQueryService(c).Index(indices...)
 | |
| }
 | |
| 
 | |
| // Bulk is the entry point to mass insert/update/delete documents.
 | |
| func (c *Client) Bulk() *BulkService {
 | |
| 	return NewBulkService(c)
 | |
| }
 | |
| 
 | |
| // BulkProcessor allows setting up a concurrent processor of bulk requests.
 | |
| func (c *Client) BulkProcessor() *BulkProcessorService {
 | |
| 	return NewBulkProcessorService(c)
 | |
| }
 | |
| 
 | |
| // Reindex copies data from a source index into a destination index.
 | |
| //
 | |
| // See https://www.elastic.co/guide/en/elasticsearch/reference/7.0/docs-reindex.html
 | |
| // for details on the Reindex API.
 | |
| func (c *Client) Reindex() *ReindexService {
 | |
| 	return NewReindexService(c)
 | |
| }
 | |
| 
 | |
| // TermVectors returns information and statistics on terms in the fields
 | |
| // of a particular document.
 | |
| func (c *Client) TermVectors(index string) *TermvectorsService {
 | |
| 	builder := NewTermvectorsService(c)
 | |
| 	builder = builder.Index(index)
 | |
| 	return builder
 | |
| }
 | |
| 
 | |
| // MultiTermVectors returns information and statistics on terms in the fields
 | |
| // of multiple documents.
 | |
| func (c *Client) MultiTermVectors() *MultiTermvectorService {
 | |
| 	return NewMultiTermvectorService(c)
 | |
| }
 | |
| 
 | |
| // -- Search APIs --
 | |
| 
 | |
| // Search is the entry point for searches.
 | |
| func (c *Client) Search(indices ...string) *SearchService {
 | |
| 	return NewSearchService(c).Index(indices...)
 | |
| }
 | |
| 
 | |
| // MultiSearch is the entry point for multi searches.
 | |
| func (c *Client) MultiSearch() *MultiSearchService {
 | |
| 	return NewMultiSearchService(c)
 | |
| }
 | |
| 
 | |
| // Count documents.
 | |
| func (c *Client) Count(indices ...string) *CountService {
 | |
| 	return NewCountService(c).Index(indices...)
 | |
| }
 | |
| 
 | |
| // Explain computes a score explanation for a query and a specific document.
 | |
| func (c *Client) Explain(index, typ, id string) *ExplainService {
 | |
| 	return NewExplainService(c).Index(index).Type(typ).Id(id)
 | |
| }
 | |
| 
 | |
| // TODO Search Template
 | |
| // TODO Search Exists API
 | |
| 
 | |
| // Validate allows a user to validate a potentially expensive query without executing it.
 | |
| func (c *Client) Validate(indices ...string) *ValidateService {
 | |
| 	return NewValidateService(c).Index(indices...)
 | |
| }
 | |
| 
 | |
| // SearchShards returns statistical information about nodes and shards.
 | |
| func (c *Client) SearchShards(indices ...string) *SearchShardsService {
 | |
| 	return NewSearchShardsService(c).Index(indices...)
 | |
| }
 | |
| 
 | |
| // FieldCaps returns statistical information about fields in indices.
 | |
| func (c *Client) FieldCaps(indices ...string) *FieldCapsService {
 | |
| 	return NewFieldCapsService(c).Index(indices...)
 | |
| }
 | |
| 
 | |
| // Exists checks if a document exists.
 | |
| func (c *Client) Exists() *ExistsService {
 | |
| 	return NewExistsService(c)
 | |
| }
 | |
| 
 | |
| // Scroll through documents. Use this to efficiently scroll through results
 | |
| // while returning the results to a client.
 | |
| func (c *Client) Scroll(indices ...string) *ScrollService {
 | |
| 	return NewScrollService(c).Index(indices...)
 | |
| }
 | |
| 
 | |
| // ClearScroll can be used to clear search contexts manually.
 | |
| func (c *Client) ClearScroll(scrollIds ...string) *ClearScrollService {
 | |
| 	return NewClearScrollService(c).ScrollId(scrollIds...)
 | |
| }
 | |
| 
 | |
| // -- Indices APIs --
 | |
| 
 | |
| // CreateIndex returns a service to create a new index.
 | |
| func (c *Client) CreateIndex(name string) *IndicesCreateService {
 | |
| 	return NewIndicesCreateService(c).Index(name)
 | |
| }
 | |
| 
 | |
| // DeleteIndex returns a service to delete an index.
 | |
| func (c *Client) DeleteIndex(indices ...string) *IndicesDeleteService {
 | |
| 	return NewIndicesDeleteService(c).Index(indices)
 | |
| }
 | |
| 
 | |
| // IndexExists allows to check if an index exists.
 | |
| func (c *Client) IndexExists(indices ...string) *IndicesExistsService {
 | |
| 	return NewIndicesExistsService(c).Index(indices)
 | |
| }
 | |
| 
 | |
| // ShrinkIndex returns a service to shrink one index into another.
 | |
| func (c *Client) ShrinkIndex(source, target string) *IndicesShrinkService {
 | |
| 	return NewIndicesShrinkService(c).Source(source).Target(target)
 | |
| }
 | |
| 
 | |
| // RolloverIndex rolls an alias over to a new index when the existing index
 | |
| // is considered to be too large or too old.
 | |
| func (c *Client) RolloverIndex(alias string) *IndicesRolloverService {
 | |
| 	return NewIndicesRolloverService(c).Alias(alias)
 | |
| }
 | |
| 
 | |
| // IndexStats provides statistics on different operations happining
 | |
| // in one or more indices.
 | |
| func (c *Client) IndexStats(indices ...string) *IndicesStatsService {
 | |
| 	return NewIndicesStatsService(c).Index(indices...)
 | |
| }
 | |
| 
 | |
| // OpenIndex opens an index.
 | |
| func (c *Client) OpenIndex(name string) *IndicesOpenService {
 | |
| 	return NewIndicesOpenService(c).Index(name)
 | |
| }
 | |
| 
 | |
| // CloseIndex closes an index.
 | |
| func (c *Client) CloseIndex(name string) *IndicesCloseService {
 | |
| 	return NewIndicesCloseService(c).Index(name)
 | |
| }
 | |
| 
 | |
| // FreezeIndex freezes an index.
 | |
| func (c *Client) FreezeIndex(name string) *IndicesFreezeService {
 | |
| 	return NewIndicesFreezeService(c).Index(name)
 | |
| }
 | |
| 
 | |
| // UnfreezeIndex unfreezes an index.
 | |
| func (c *Client) UnfreezeIndex(name string) *IndicesUnfreezeService {
 | |
| 	return NewIndicesUnfreezeService(c).Index(name)
 | |
| }
 | |
| 
 | |
| // IndexGet retrieves information about one or more indices.
 | |
| // IndexGet is only available for Elasticsearch 1.4 or later.
 | |
| func (c *Client) IndexGet(indices ...string) *IndicesGetService {
 | |
| 	return NewIndicesGetService(c).Index(indices...)
 | |
| }
 | |
| 
 | |
| // IndexGetSettings retrieves settings of all, one or more indices.
 | |
| func (c *Client) IndexGetSettings(indices ...string) *IndicesGetSettingsService {
 | |
| 	return NewIndicesGetSettingsService(c).Index(indices...)
 | |
| }
 | |
| 
 | |
| // IndexPutSettings sets settings for all, one or more indices.
 | |
| func (c *Client) IndexPutSettings(indices ...string) *IndicesPutSettingsService {
 | |
| 	return NewIndicesPutSettingsService(c).Index(indices...)
 | |
| }
 | |
| 
 | |
| // IndexSegments retrieves low level segment information for all, one or more indices.
 | |
| func (c *Client) IndexSegments(indices ...string) *IndicesSegmentsService {
 | |
| 	return NewIndicesSegmentsService(c).Index(indices...)
 | |
| }
 | |
| 
 | |
| // IndexAnalyze performs the analysis process on a text and returns the
 | |
| // token breakdown of the text.
 | |
| func (c *Client) IndexAnalyze() *IndicesAnalyzeService {
 | |
| 	return NewIndicesAnalyzeService(c)
 | |
| }
 | |
| 
 | |
| // Forcemerge optimizes one or more indices.
 | |
| // It replaces the deprecated Optimize API.
 | |
| func (c *Client) Forcemerge(indices ...string) *IndicesForcemergeService {
 | |
| 	return NewIndicesForcemergeService(c).Index(indices...)
 | |
| }
 | |
| 
 | |
| // Refresh asks Elasticsearch to refresh one or more indices.
 | |
| func (c *Client) Refresh(indices ...string) *RefreshService {
 | |
| 	return NewRefreshService(c).Index(indices...)
 | |
| }
 | |
| 
 | |
| // Flush asks Elasticsearch to free memory from the index and
 | |
| // flush data to disk.
 | |
| func (c *Client) Flush(indices ...string) *IndicesFlushService {
 | |
| 	return NewIndicesFlushService(c).Index(indices...)
 | |
| }
 | |
| 
 | |
| // SyncedFlush performs a synced flush.
 | |
| //
 | |
| // See https://www.elastic.co/guide/en/elasticsearch/reference/7.0/indices-synced-flush.html
 | |
| // for more details on synched flushes and how they differ from a normal
 | |
| // Flush.
 | |
| func (c *Client) SyncedFlush(indices ...string) *IndicesSyncedFlushService {
 | |
| 	return NewIndicesSyncedFlushService(c).Index(indices...)
 | |
| }
 | |
| 
 | |
| // Alias enables the caller to add and/or remove aliases.
 | |
| func (c *Client) Alias() *AliasService {
 | |
| 	return NewAliasService(c)
 | |
| }
 | |
| 
 | |
| // Aliases returns aliases by index name(s).
 | |
| func (c *Client) Aliases() *AliasesService {
 | |
| 	return NewAliasesService(c)
 | |
| }
 | |
| 
 | |
| // IndexGetTemplate gets an index template.
 | |
| // Use XXXTemplate funcs to manage search templates.
 | |
| func (c *Client) IndexGetTemplate(names ...string) *IndicesGetTemplateService {
 | |
| 	return NewIndicesGetTemplateService(c).Name(names...)
 | |
| }
 | |
| 
 | |
| // IndexTemplateExists gets check if an index template exists.
 | |
| // Use XXXTemplate funcs to manage search templates.
 | |
| func (c *Client) IndexTemplateExists(name string) *IndicesExistsTemplateService {
 | |
| 	return NewIndicesExistsTemplateService(c).Name(name)
 | |
| }
 | |
| 
 | |
| // IndexPutTemplate creates or updates an index template.
 | |
| // Use XXXTemplate funcs to manage search templates.
 | |
| func (c *Client) IndexPutTemplate(name string) *IndicesPutTemplateService {
 | |
| 	return NewIndicesPutTemplateService(c).Name(name)
 | |
| }
 | |
| 
 | |
| // IndexDeleteTemplate deletes an index template.
 | |
| // Use XXXTemplate funcs to manage search templates.
 | |
| func (c *Client) IndexDeleteTemplate(name string) *IndicesDeleteTemplateService {
 | |
| 	return NewIndicesDeleteTemplateService(c).Name(name)
 | |
| }
 | |
| 
 | |
| // GetMapping gets a mapping.
 | |
| func (c *Client) GetMapping() *IndicesGetMappingService {
 | |
| 	return NewIndicesGetMappingService(c)
 | |
| }
 | |
| 
 | |
| // PutMapping registers a mapping.
 | |
| func (c *Client) PutMapping() *IndicesPutMappingService {
 | |
| 	return NewIndicesPutMappingService(c)
 | |
| }
 | |
| 
 | |
| // GetFieldMapping gets mapping for fields.
 | |
| func (c *Client) GetFieldMapping() *IndicesGetFieldMappingService {
 | |
| 	return NewIndicesGetFieldMappingService(c)
 | |
| }
 | |
| 
 | |
| // -- cat APIs --
 | |
| 
 | |
| // TODO cat fielddata
 | |
| // TODO cat master
 | |
| // TODO cat nodes
 | |
| // TODO cat pending tasks
 | |
| // TODO cat plugins
 | |
| // TODO cat recovery
 | |
| // TODO cat thread pool
 | |
| // TODO cat shards
 | |
| // TODO cat segments
 | |
| 
 | |
| // CatAliases returns information about aliases.
 | |
| func (c *Client) CatAliases() *CatAliasesService {
 | |
| 	return NewCatAliasesService(c)
 | |
| }
 | |
| 
 | |
| // CatAllocation returns information about the allocation across nodes.
 | |
| func (c *Client) CatAllocation() *CatAllocationService {
 | |
| 	return NewCatAllocationService(c)
 | |
| }
 | |
| 
 | |
| // CatCount returns document counts for indices.
 | |
| func (c *Client) CatCount() *CatCountService {
 | |
| 	return NewCatCountService(c)
 | |
| }
 | |
| 
 | |
| // CatHealth returns information about cluster health.
 | |
| func (c *Client) CatHealth() *CatHealthService {
 | |
| 	return NewCatHealthService(c)
 | |
| }
 | |
| 
 | |
| // CatIndices returns information about indices.
 | |
| func (c *Client) CatIndices() *CatIndicesService {
 | |
| 	return NewCatIndicesService(c)
 | |
| }
 | |
| 
 | |
| // -- Ingest APIs --
 | |
| 
 | |
| // IngestPutPipeline adds pipelines and updates existing pipelines in
 | |
| // the cluster.
 | |
| func (c *Client) IngestPutPipeline(id string) *IngestPutPipelineService {
 | |
| 	return NewIngestPutPipelineService(c).Id(id)
 | |
| }
 | |
| 
 | |
| // IngestGetPipeline returns pipelines based on ID.
 | |
| func (c *Client) IngestGetPipeline(ids ...string) *IngestGetPipelineService {
 | |
| 	return NewIngestGetPipelineService(c).Id(ids...)
 | |
| }
 | |
| 
 | |
| // IngestDeletePipeline deletes a pipeline by ID.
 | |
| func (c *Client) IngestDeletePipeline(id string) *IngestDeletePipelineService {
 | |
| 	return NewIngestDeletePipelineService(c).Id(id)
 | |
| }
 | |
| 
 | |
| // IngestSimulatePipeline executes a specific pipeline against the set of
 | |
| // documents provided in the body of the request.
 | |
| func (c *Client) IngestSimulatePipeline() *IngestSimulatePipelineService {
 | |
| 	return NewIngestSimulatePipelineService(c)
 | |
| }
 | |
| 
 | |
| // -- Cluster APIs --
 | |
| 
 | |
| // ClusterHealth retrieves the health of the cluster.
 | |
| func (c *Client) ClusterHealth() *ClusterHealthService {
 | |
| 	return NewClusterHealthService(c)
 | |
| }
 | |
| 
 | |
| // ClusterReroute allows for manual changes to the allocation of
 | |
| // individual shards in the cluster.
 | |
| func (c *Client) ClusterReroute() *ClusterRerouteService {
 | |
| 	return NewClusterRerouteService(c)
 | |
| }
 | |
| 
 | |
| // ClusterState retrieves the state of the cluster.
 | |
| func (c *Client) ClusterState() *ClusterStateService {
 | |
| 	return NewClusterStateService(c)
 | |
| }
 | |
| 
 | |
| // ClusterStats retrieves cluster statistics.
 | |
| func (c *Client) ClusterStats() *ClusterStatsService {
 | |
| 	return NewClusterStatsService(c)
 | |
| }
 | |
| 
 | |
| // NodesInfo retrieves one or more or all of the cluster nodes information.
 | |
| func (c *Client) NodesInfo() *NodesInfoService {
 | |
| 	return NewNodesInfoService(c)
 | |
| }
 | |
| 
 | |
| // NodesStats retrieves one or more or all of the cluster nodes statistics.
 | |
| func (c *Client) NodesStats() *NodesStatsService {
 | |
| 	return NewNodesStatsService(c)
 | |
| }
 | |
| 
 | |
| // TasksCancel cancels tasks running on the specified nodes.
 | |
| func (c *Client) TasksCancel() *TasksCancelService {
 | |
| 	return NewTasksCancelService(c)
 | |
| }
 | |
| 
 | |
| // TasksList retrieves the list of tasks running on the specified nodes.
 | |
| func (c *Client) TasksList() *TasksListService {
 | |
| 	return NewTasksListService(c)
 | |
| }
 | |
| 
 | |
| // TasksGetTask retrieves a task running on the cluster.
 | |
| func (c *Client) TasksGetTask() *TasksGetTaskService {
 | |
| 	return NewTasksGetTaskService(c)
 | |
| }
 | |
| 
 | |
| // TODO Pending cluster tasks
 | |
| // TODO Cluster Reroute
 | |
| // TODO Cluster Update Settings
 | |
| // TODO Nodes Stats
 | |
| // TODO Nodes hot_threads
 | |
| 
 | |
| // -- Snapshot and Restore --
 | |
| 
 | |
| // TODO Snapshot Delete
 | |
| // TODO Snapshot Get
 | |
| // TODO Snapshot Restore
 | |
| // TODO Snapshot Status
 | |
| 
 | |
| // SnapshotCreate creates a snapshot.
 | |
| func (c *Client) SnapshotCreate(repository string, snapshot string) *SnapshotCreateService {
 | |
| 	return NewSnapshotCreateService(c).Repository(repository).Snapshot(snapshot)
 | |
| }
 | |
| 
 | |
| // SnapshotCreateRepository creates or updates a snapshot repository.
 | |
| func (c *Client) SnapshotCreateRepository(repository string) *SnapshotCreateRepositoryService {
 | |
| 	return NewSnapshotCreateRepositoryService(c).Repository(repository)
 | |
| }
 | |
| 
 | |
| // SnapshotDelete deletes a snapshot in a snapshot repository.
 | |
| func (c *Client) SnapshotDelete(repository string, snapshot string) *SnapshotDeleteService {
 | |
| 	return NewSnapshotDeleteService(c).Repository(repository).Snapshot(snapshot)
 | |
| }
 | |
| 
 | |
| // SnapshotDeleteRepository deletes a snapshot repository.
 | |
| func (c *Client) SnapshotDeleteRepository(repositories ...string) *SnapshotDeleteRepositoryService {
 | |
| 	return NewSnapshotDeleteRepositoryService(c).Repository(repositories...)
 | |
| }
 | |
| 
 | |
| // SnapshotGetRepository gets a snapshot repository.
 | |
| func (c *Client) SnapshotGetRepository(repositories ...string) *SnapshotGetRepositoryService {
 | |
| 	return NewSnapshotGetRepositoryService(c).Repository(repositories...)
 | |
| }
 | |
| 
 | |
| // SnapshotGet lists snapshot for a repository.
 | |
| func (c *Client) SnapshotGet(repository string) *SnapshotGetService {
 | |
| 	return NewSnapshotGetService(c).Repository(repository)
 | |
| }
 | |
| 
 | |
| // SnapshotVerifyRepository verifies a snapshot repository.
 | |
| func (c *Client) SnapshotVerifyRepository(repository string) *SnapshotVerifyRepositoryService {
 | |
| 	return NewSnapshotVerifyRepositoryService(c).Repository(repository)
 | |
| }
 | |
| 
 | |
| // SnapshotRestore restores the specified indices from a given snapshot
 | |
| func (c *Client) SnapshotRestore(repository string, snapshot string) *SnapshotRestoreService {
 | |
| 	return NewSnapshotRestoreService(c).Repository(repository).Snapshot(snapshot)
 | |
| }
 | |
| 
 | |
| // -- Scripting APIs --
 | |
| 
 | |
| // GetScript reads a stored script in Elasticsearch.
 | |
| // Use PutScript for storing a script.
 | |
| func (c *Client) GetScript() *GetScriptService {
 | |
| 	return NewGetScriptService(c)
 | |
| }
 | |
| 
 | |
| // PutScript allows saving a stored script in Elasticsearch.
 | |
| func (c *Client) PutScript() *PutScriptService {
 | |
| 	return NewPutScriptService(c)
 | |
| }
 | |
| 
 | |
| // DeleteScript allows removing a stored script from Elasticsearch.
 | |
| func (c *Client) DeleteScript() *DeleteScriptService {
 | |
| 	return NewDeleteScriptService(c)
 | |
| }
 | |
| 
 | |
| // -- X-Pack General --
 | |
| 
 | |
| // XPackInfo gets information on the xpack plugins enabled on the cluster
 | |
| 
 | |
| func (c *Client) XPackInfo() *XPackInfoService {
 | |
| 	return NewXPackInfoService(c)
 | |
| }
 | |
| 
 | |
| // -- X-Pack Index Lifecycle Management --
 | |
| 
 | |
| // XPackIlmPutLifecycle adds or modifies an ilm policy.
 | |
| func (c *Client) XPackIlmPutLifecycle() *XPackIlmPutLifecycleService {
 | |
| 	return NewXPackIlmPutLifecycleService(c)
 | |
| }
 | |
| 
 | |
| // XPackIlmGettLifecycle gets an ilm policy.
 | |
| func (c *Client) XPackIlmGetLifecycle() *XPackIlmGetLifecycleService {
 | |
| 	return NewXPackIlmGetLifecycleService(c)
 | |
| }
 | |
| 
 | |
| // XPackIlmDeleteLifecycle deletes an ilm policy.
 | |
| func (c *Client) XPackIlmDeleteLifecycle() *XPackIlmDeleteLifecycleService {
 | |
| 	return NewXPackIlmDeleteLifecycleService(c)
 | |
| }
 | |
| 
 | |
| // -- X-Pack Security --
 | |
| 
 | |
| // XPackSecurityGetRoleMapping gets a role mapping.
 | |
| func (c *Client) XPackSecurityGetRoleMapping(roleMappingName string) *XPackSecurityGetRoleMappingService {
 | |
| 	return NewXPackSecurityGetRoleMappingService(c).Name(roleMappingName)
 | |
| }
 | |
| 
 | |
| // XPackSecurityPutRoleMapping adds a role mapping.
 | |
| func (c *Client) XPackSecurityPutRoleMapping(roleMappingName string) *XPackSecurityPutRoleMappingService {
 | |
| 	return NewXPackSecurityPutRoleMappingService(c).Name(roleMappingName)
 | |
| }
 | |
| 
 | |
| // XPackSecurityDeleteRoleMapping deletes a role mapping.
 | |
| func (c *Client) XPackSecurityDeleteRoleMapping(roleMappingName string) *XPackSecurityDeleteRoleMappingService {
 | |
| 	return NewXPackSecurityDeleteRoleMappingService(c).Name(roleMappingName)
 | |
| }
 | |
| 
 | |
| // XPackSecurityGetRole gets a role.
 | |
| func (c *Client) XPackSecurityGetRole(roleName string) *XPackSecurityGetRoleService {
 | |
| 	return NewXPackSecurityGetRoleService(c).Name(roleName)
 | |
| }
 | |
| 
 | |
| // XPackSecurityPutRole adds a role.
 | |
| func (c *Client) XPackSecurityPutRole(roleName string) *XPackSecurityPutRoleService {
 | |
| 	return NewXPackSecurityPutRoleService(c).Name(roleName)
 | |
| }
 | |
| 
 | |
| // XPackSecurityDeleteRole deletes a role.
 | |
| func (c *Client) XPackSecurityDeleteRole(roleName string) *XPackSecurityDeleteRoleService {
 | |
| 	return NewXPackSecurityDeleteRoleService(c).Name(roleName)
 | |
| }
 | |
| 
 | |
| // TODO: Clear role cache API
 | |
| // https://www.elastic.co/guide/en/elasticsearch/reference/7.0/security-api-clear-role-cache.html
 | |
| 
 | |
| // XPackSecurityChangePassword changes the password of users in the native realm.
 | |
| func (c *Client) XPackSecurityChangePassword(username string) *XPackSecurityChangePasswordService {
 | |
| 	return NewXPackSecurityChangePasswordService(c).Username(username)
 | |
| }
 | |
| 
 | |
| // XPackSecurityGetUser gets details about one or more users.
 | |
| func (c *Client) XPackSecurityGetUser(usernames ...string) *XPackSecurityGetUserService {
 | |
| 	return NewXPackSecurityGetUserService(c).Usernames(usernames...)
 | |
| }
 | |
| 
 | |
| // XPackSecurityPutUser adds or updates a user.
 | |
| func (c *Client) XPackSecurityPutUser(username string) *XPackSecurityPutUserService {
 | |
| 	return NewXPackSecurityPutUserService(c).Username(username)
 | |
| }
 | |
| 
 | |
| // XPackSecurityEnableUser enables a user.
 | |
| func (c *Client) XPackSecurityEnableUser(username string) *XPackSecurityEnableUserService {
 | |
| 	return NewXPackSecurityEnableUserService(c).Username(username)
 | |
| }
 | |
| 
 | |
| // XPackSecurityDisableUser disables a user.
 | |
| func (c *Client) XPackSecurityDisableUser(username string) *XPackSecurityDisableUserService {
 | |
| 	return NewXPackSecurityDisableUserService(c).Username(username)
 | |
| }
 | |
| 
 | |
| // XPackSecurityDeleteUser deletes a user.
 | |
| func (c *Client) XPackSecurityDeleteUser(username string) *XPackSecurityDeleteUserService {
 | |
| 	return NewXPackSecurityDeleteUserService(c).Username(username)
 | |
| }
 | |
| 
 | |
| // -- X-Pack Watcher --
 | |
| 
 | |
| // XPackWatchPut adds a watch.
 | |
| func (c *Client) XPackWatchPut(watchId string) *XPackWatcherPutWatchService {
 | |
| 	return NewXPackWatcherPutWatchService(c).Id(watchId)
 | |
| }
 | |
| 
 | |
| // XPackWatchGet gets a watch.
 | |
| func (c *Client) XPackWatchGet(watchId string) *XPackWatcherGetWatchService {
 | |
| 	return NewXPackWatcherGetWatchService(c).Id(watchId)
 | |
| }
 | |
| 
 | |
| // XPackWatchDelete deletes a watch.
 | |
| func (c *Client) XPackWatchDelete(watchId string) *XPackWatcherDeleteWatchService {
 | |
| 	return NewXPackWatcherDeleteWatchService(c).Id(watchId)
 | |
| }
 | |
| 
 | |
| // XPackWatchExecute executes a watch.
 | |
| func (c *Client) XPackWatchExecute() *XPackWatcherExecuteWatchService {
 | |
| 	return NewXPackWatcherExecuteWatchService(c)
 | |
| }
 | |
| 
 | |
| // XPackWatchAck acknowledging a watch.
 | |
| func (c *Client) XPackWatchAck(watchId string) *XPackWatcherAckWatchService {
 | |
| 	return NewXPackWatcherAckWatchService(c).WatchId(watchId)
 | |
| }
 | |
| 
 | |
| // XPackWatchActivate activates a watch.
 | |
| func (c *Client) XPackWatchActivate(watchId string) *XPackWatcherActivateWatchService {
 | |
| 	return NewXPackWatcherActivateWatchService(c).WatchId(watchId)
 | |
| }
 | |
| 
 | |
| // XPackWatchDeactivate deactivates a watch.
 | |
| func (c *Client) XPackWatchDeactivate(watchId string) *XPackWatcherDeactivateWatchService {
 | |
| 	return NewXPackWatcherDeactivateWatchService(c).WatchId(watchId)
 | |
| }
 | |
| 
 | |
| // XPackWatchStats returns the current Watcher metrics.
 | |
| func (c *Client) XPackWatchStats() *XPackWatcherStatsService {
 | |
| 	return NewXPackWatcherStatsService(c)
 | |
| }
 | |
| 
 | |
| // XPackWatchStart starts a watch.
 | |
| func (c *Client) XPackWatchStart() *XPackWatcherStartService {
 | |
| 	return NewXPackWatcherStartService(c)
 | |
| }
 | |
| 
 | |
| // XPackWatchStop stops a watch.
 | |
| func (c *Client) XPackWatchStop() *XPackWatcherStopService {
 | |
| 	return NewXPackWatcherStopService(c)
 | |
| }
 | |
| 
 | |
| // -- Helpers and shortcuts --
 | |
| 
 | |
| // ElasticsearchVersion returns the version number of Elasticsearch
 | |
| // running on the given URL.
 | |
| func (c *Client) ElasticsearchVersion(url string) (string, error) {
 | |
| 	res, _, err := c.Ping(url).Do(context.Background())
 | |
| 	if err != nil {
 | |
| 		return "", err
 | |
| 	}
 | |
| 	return res.Version.Number, nil
 | |
| }
 | |
| 
 | |
| // IndexNames returns the names of all indices in the cluster.
 | |
| func (c *Client) IndexNames() ([]string, error) {
 | |
| 	res, err := c.IndexGetSettings().Index("_all").Do(context.Background())
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	var names []string
 | |
| 	for name := range res {
 | |
| 		names = append(names, name)
 | |
| 	}
 | |
| 	return names, nil
 | |
| }
 | |
| 
 | |
| // Ping checks if a given node in a cluster exists and (optionally)
 | |
| // returns some basic information about the Elasticsearch server,
 | |
| // e.g. the Elasticsearch version number.
 | |
| //
 | |
| // Notice that you need to specify a URL here explicitly.
 | |
| func (c *Client) Ping(url string) *PingService {
 | |
| 	return NewPingService(c).URL(url)
 | |
| }
 | |
| 
 | |
| // WaitForStatus waits for the cluster to have the given status.
 | |
| // This is a shortcut method for the ClusterHealth service.
 | |
| //
 | |
| // WaitForStatus waits for the specified timeout, e.g. "10s".
 | |
| // If the cluster will have the given state within the timeout, nil is returned.
 | |
| // If the request timed out, ErrTimeout is returned.
 | |
| func (c *Client) WaitForStatus(status string, timeout string) error {
 | |
| 	health, err := c.ClusterHealth().WaitForStatus(status).Timeout(timeout).Do(context.Background())
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	if health.TimedOut {
 | |
| 		return ErrTimeout
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // WaitForGreenStatus waits for the cluster to have the "green" status.
 | |
| // See WaitForStatus for more details.
 | |
| func (c *Client) WaitForGreenStatus(timeout string) error {
 | |
| 	return c.WaitForStatus("green", timeout)
 | |
| }
 | |
| 
 | |
| // WaitForYellowStatus waits for the cluster to have the "yellow" status.
 | |
| // See WaitForStatus for more details.
 | |
| func (c *Client) WaitForYellowStatus(timeout string) error {
 | |
| 	return c.WaitForStatus("yellow", timeout)
 | |
| }
 |