mirror of
				https://github.com/go-gitea/gitea.git
				synced 2025-10-25 17:44:32 +02:00 
			
		
		
		
	
		
			
				
	
	
		
			144 lines
		
	
	
		
			3.4 KiB
		
	
	
	
		
			Go
		
	
	
	
		
			Vendored
		
	
	
	
			
		
		
	
	
			144 lines
		
	
	
		
			3.4 KiB
		
	
	
	
		
			Go
		
	
	
	
		
			Vendored
		
	
	
	
| package couchbase
 | |
| 
 | |
| import (
 | |
| 	"github.com/couchbase/gomemcached/client"
 | |
| 	"github.com/couchbase/goutils/logging"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| )
 | |
| 
 | |
| const initialRetryInterval = 1 * time.Second
 | |
| const maximumRetryInterval = 30 * time.Second
 | |
| 
 | |
| // A TapFeed streams mutation events from a bucket.
 | |
| //
 | |
| // Events from the bucket can be read from the channel 'C'.  Remember
 | |
| // to call Close() on it when you're done, unless its channel has
 | |
| // closed itself already.
 | |
| type TapFeed struct {
 | |
| 	C <-chan memcached.TapEvent
 | |
| 
 | |
| 	bucket    *Bucket
 | |
| 	args      *memcached.TapArguments
 | |
| 	nodeFeeds []*memcached.TapFeed    // The TAP feeds of the individual nodes
 | |
| 	output    chan memcached.TapEvent // Same as C but writeably-typed
 | |
| 	wg        sync.WaitGroup
 | |
| 	quit      chan bool
 | |
| }
 | |
| 
 | |
| // StartTapFeed creates and starts a new Tap feed
 | |
| func (b *Bucket) StartTapFeed(args *memcached.TapArguments) (*TapFeed, error) {
 | |
| 	if args == nil {
 | |
| 		defaultArgs := memcached.DefaultTapArguments()
 | |
| 		args = &defaultArgs
 | |
| 	}
 | |
| 
 | |
| 	feed := &TapFeed{
 | |
| 		bucket: b,
 | |
| 		args:   args,
 | |
| 		output: make(chan memcached.TapEvent, 10),
 | |
| 		quit:   make(chan bool),
 | |
| 	}
 | |
| 
 | |
| 	go feed.run()
 | |
| 
 | |
| 	feed.C = feed.output
 | |
| 	return feed, nil
 | |
| }
 | |
| 
 | |
| // Goroutine that runs the feed
 | |
| func (feed *TapFeed) run() {
 | |
| 	retryInterval := initialRetryInterval
 | |
| 	bucketOK := true
 | |
| 	for {
 | |
| 		// Connect to the TAP feed of each server node:
 | |
| 		if bucketOK {
 | |
| 			killSwitch, err := feed.connectToNodes()
 | |
| 			if err == nil {
 | |
| 				// Run until one of the sub-feeds fails:
 | |
| 				select {
 | |
| 				case <-killSwitch:
 | |
| 				case <-feed.quit:
 | |
| 					return
 | |
| 				}
 | |
| 				feed.closeNodeFeeds()
 | |
| 				retryInterval = initialRetryInterval
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		// On error, try to refresh the bucket in case the list of nodes changed:
 | |
| 		logging.Infof("go-couchbase: TAP connection lost; reconnecting to bucket %q in %v",
 | |
| 			feed.bucket.Name, retryInterval)
 | |
| 		err := feed.bucket.Refresh()
 | |
| 		bucketOK = err == nil
 | |
| 
 | |
| 		select {
 | |
| 		case <-time.After(retryInterval):
 | |
| 		case <-feed.quit:
 | |
| 			return
 | |
| 		}
 | |
| 		if retryInterval *= 2; retryInterval > maximumRetryInterval {
 | |
| 			retryInterval = maximumRetryInterval
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (feed *TapFeed) connectToNodes() (killSwitch chan bool, err error) {
 | |
| 	killSwitch = make(chan bool)
 | |
| 	for _, serverConn := range feed.bucket.getConnPools(false /* not already locked */) {
 | |
| 		var singleFeed *memcached.TapFeed
 | |
| 		singleFeed, err = serverConn.StartTapFeed(feed.args)
 | |
| 		if err != nil {
 | |
| 			logging.Errorf("go-couchbase: Error connecting to tap feed of %s: %v", serverConn.host, err)
 | |
| 			feed.closeNodeFeeds()
 | |
| 			return
 | |
| 		}
 | |
| 		feed.nodeFeeds = append(feed.nodeFeeds, singleFeed)
 | |
| 		go feed.forwardTapEvents(singleFeed, killSwitch, serverConn.host)
 | |
| 		feed.wg.Add(1)
 | |
| 	}
 | |
| 	return
 | |
| }
 | |
| 
 | |
| // Goroutine that forwards Tap events from a single node's feed to the aggregate feed.
 | |
| func (feed *TapFeed) forwardTapEvents(singleFeed *memcached.TapFeed, killSwitch chan bool, host string) {
 | |
| 	defer feed.wg.Done()
 | |
| 	for {
 | |
| 		select {
 | |
| 		case event, ok := <-singleFeed.C:
 | |
| 			if !ok {
 | |
| 				if singleFeed.Error != nil {
 | |
| 					logging.Errorf("go-couchbase: Tap feed from %s failed: %v", host, singleFeed.Error)
 | |
| 				}
 | |
| 				killSwitch <- true
 | |
| 				return
 | |
| 			}
 | |
| 			feed.output <- event
 | |
| 		case <-feed.quit:
 | |
| 			return
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (feed *TapFeed) closeNodeFeeds() {
 | |
| 	for _, f := range feed.nodeFeeds {
 | |
| 		f.Close()
 | |
| 	}
 | |
| 	feed.nodeFeeds = nil
 | |
| }
 | |
| 
 | |
| // Close a Tap feed.
 | |
| func (feed *TapFeed) Close() error {
 | |
| 	select {
 | |
| 	case <-feed.quit:
 | |
| 		return nil
 | |
| 	default:
 | |
| 	}
 | |
| 
 | |
| 	feed.closeNodeFeeds()
 | |
| 	close(feed.quit)
 | |
| 	feed.wg.Wait()
 | |
| 	close(feed.output)
 | |
| 	return nil
 | |
| }
 |