mirror of
				https://github.com/go-gitea/gitea.git
				synced 2025-11-03 21:16:26 +01:00 
			
		
		
		
	* go function contexting is not what you expect * Apply suggestions from code review Co-Authored-By: Lauris BH <lauris@nix.lv> Co-authored-by: Lauris BH <lauris@nix.lv>
		
			
				
	
	
		
			370 lines
		
	
	
		
			9.2 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			370 lines
		
	
	
		
			9.2 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
// Copyright 2019 The Gitea Authors. All rights reserved.
 | 
						|
// Use of this source code is governed by a MIT-style
 | 
						|
// license that can be found in the LICENSE file.
 | 
						|
 | 
						|
package queue
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"encoding/json"
 | 
						|
	"fmt"
 | 
						|
	"reflect"
 | 
						|
	"sort"
 | 
						|
	"sync"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"code.gitea.io/gitea/modules/log"
 | 
						|
)
 | 
						|
 | 
						|
var manager *Manager
 | 
						|
 | 
						|
// Manager is a queue manager
 | 
						|
type Manager struct {
 | 
						|
	mutex sync.Mutex
 | 
						|
 | 
						|
	counter int64
 | 
						|
	Queues  map[int64]*ManagedQueue
 | 
						|
}
 | 
						|
 | 
						|
// ManagedQueue represents a working queue with a Pool of workers.
 | 
						|
//
 | 
						|
// Although a ManagedQueue should really represent a Queue this does not
 | 
						|
// necessarily have to be the case. This could be used to describe any queue.WorkerPool.
 | 
						|
type ManagedQueue struct {
 | 
						|
	mutex         sync.Mutex
 | 
						|
	QID           int64
 | 
						|
	Type          Type
 | 
						|
	Name          string
 | 
						|
	Configuration interface{}
 | 
						|
	ExemplarType  string
 | 
						|
	Managed       interface{}
 | 
						|
	counter       int64
 | 
						|
	PoolWorkers   map[int64]*PoolWorkers
 | 
						|
}
 | 
						|
 | 
						|
// Flushable represents a pool or queue that is flushable
 | 
						|
type Flushable interface {
 | 
						|
	// Flush will add a flush worker to the pool - the worker should be autoregistered with the manager
 | 
						|
	Flush(time.Duration) error
 | 
						|
	// FlushWithContext is very similar to Flush
 | 
						|
	// NB: The worker will not be registered with the manager.
 | 
						|
	FlushWithContext(ctx context.Context) error
 | 
						|
	// IsEmpty will return if the managed pool is empty and has no work
 | 
						|
	IsEmpty() bool
 | 
						|
}
 | 
						|
 | 
						|
// ManagedPool is a simple interface to get certain details from a worker pool
 | 
						|
type ManagedPool interface {
 | 
						|
	// AddWorkers adds a number of worker as group to the pool with the provided timeout. A CancelFunc is provided to cancel the group
 | 
						|
	AddWorkers(number int, timeout time.Duration) context.CancelFunc
 | 
						|
	// NumberOfWorkers returns the total number of workers in the pool
 | 
						|
	NumberOfWorkers() int
 | 
						|
	// MaxNumberOfWorkers returns the maximum number of workers the pool can dynamically grow to
 | 
						|
	MaxNumberOfWorkers() int
 | 
						|
	// SetMaxNumberOfWorkers sets the maximum number of workers the pool can dynamically grow to
 | 
						|
	SetMaxNumberOfWorkers(int)
 | 
						|
	// BoostTimeout returns the current timeout for worker groups created during a boost
 | 
						|
	BoostTimeout() time.Duration
 | 
						|
	// BlockTimeout returns the timeout the internal channel can block for before a boost would occur
 | 
						|
	BlockTimeout() time.Duration
 | 
						|
	// BoostWorkers sets the number of workers to be created during a boost
 | 
						|
	BoostWorkers() int
 | 
						|
	// SetPoolSettings sets the user updatable settings for the pool
 | 
						|
	SetPoolSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration)
 | 
						|
}
 | 
						|
 | 
						|
// ManagedQueueList implements the sort.Interface
 | 
						|
type ManagedQueueList []*ManagedQueue
 | 
						|
 | 
						|
// PoolWorkers represents a group of workers working on a queue
 | 
						|
type PoolWorkers struct {
 | 
						|
	PID        int64
 | 
						|
	Workers    int
 | 
						|
	Start      time.Time
 | 
						|
	Timeout    time.Time
 | 
						|
	HasTimeout bool
 | 
						|
	Cancel     context.CancelFunc
 | 
						|
	IsFlusher  bool
 | 
						|
}
 | 
						|
 | 
						|
// PoolWorkersList implements the sort.Interface for PoolWorkers
 | 
						|
type PoolWorkersList []*PoolWorkers
 | 
						|
 | 
						|
func init() {
 | 
						|
	_ = GetManager()
 | 
						|
}
 | 
						|
 | 
						|
// GetManager returns a Manager and initializes one as singleton if there's none yet
 | 
						|
func GetManager() *Manager {
 | 
						|
	if manager == nil {
 | 
						|
		manager = &Manager{
 | 
						|
			Queues: make(map[int64]*ManagedQueue),
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return manager
 | 
						|
}
 | 
						|
 | 
						|
// Add adds a queue to this manager
 | 
						|
func (m *Manager) Add(managed interface{},
 | 
						|
	t Type,
 | 
						|
	configuration,
 | 
						|
	exemplar interface{}) int64 {
 | 
						|
 | 
						|
	cfg, _ := json.Marshal(configuration)
 | 
						|
	mq := &ManagedQueue{
 | 
						|
		Type:          t,
 | 
						|
		Configuration: string(cfg),
 | 
						|
		ExemplarType:  reflect.TypeOf(exemplar).String(),
 | 
						|
		PoolWorkers:   make(map[int64]*PoolWorkers),
 | 
						|
		Managed:       managed,
 | 
						|
	}
 | 
						|
	m.mutex.Lock()
 | 
						|
	m.counter++
 | 
						|
	mq.QID = m.counter
 | 
						|
	mq.Name = fmt.Sprintf("queue-%d", mq.QID)
 | 
						|
	if named, ok := managed.(Named); ok {
 | 
						|
		name := named.Name()
 | 
						|
		if len(name) > 0 {
 | 
						|
			mq.Name = name
 | 
						|
		}
 | 
						|
	}
 | 
						|
	m.Queues[mq.QID] = mq
 | 
						|
	m.mutex.Unlock()
 | 
						|
	log.Trace("Queue Manager registered: %s (QID: %d)", mq.Name, mq.QID)
 | 
						|
	return mq.QID
 | 
						|
}
 | 
						|
 | 
						|
// Remove a queue from the Manager
 | 
						|
func (m *Manager) Remove(qid int64) {
 | 
						|
	m.mutex.Lock()
 | 
						|
	delete(m.Queues, qid)
 | 
						|
	m.mutex.Unlock()
 | 
						|
	log.Trace("Queue Manager removed: QID: %d", qid)
 | 
						|
 | 
						|
}
 | 
						|
 | 
						|
// GetManagedQueue by qid
 | 
						|
func (m *Manager) GetManagedQueue(qid int64) *ManagedQueue {
 | 
						|
	m.mutex.Lock()
 | 
						|
	defer m.mutex.Unlock()
 | 
						|
	return m.Queues[qid]
 | 
						|
}
 | 
						|
 | 
						|
// FlushAll flushes all the flushable queues attached to this manager
 | 
						|
func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error {
 | 
						|
	var ctx context.Context
 | 
						|
	var cancel context.CancelFunc
 | 
						|
	start := time.Now()
 | 
						|
	end := start
 | 
						|
	hasTimeout := false
 | 
						|
	if timeout > 0 {
 | 
						|
		ctx, cancel = context.WithTimeout(baseCtx, timeout)
 | 
						|
		end = start.Add(timeout)
 | 
						|
		hasTimeout = true
 | 
						|
	} else {
 | 
						|
		ctx, cancel = context.WithCancel(baseCtx)
 | 
						|
	}
 | 
						|
	defer cancel()
 | 
						|
 | 
						|
	for {
 | 
						|
		select {
 | 
						|
		case <-ctx.Done():
 | 
						|
			return ctx.Err()
 | 
						|
		default:
 | 
						|
		}
 | 
						|
		mqs := m.ManagedQueues()
 | 
						|
		wg := sync.WaitGroup{}
 | 
						|
		wg.Add(len(mqs))
 | 
						|
		allEmpty := true
 | 
						|
		for _, mq := range mqs {
 | 
						|
			if mq.IsEmpty() {
 | 
						|
				wg.Done()
 | 
						|
				continue
 | 
						|
			}
 | 
						|
			allEmpty = false
 | 
						|
			if flushable, ok := mq.Managed.(Flushable); ok {
 | 
						|
				go func(q *ManagedQueue) {
 | 
						|
					localCtx, localCancel := context.WithCancel(ctx)
 | 
						|
					pid := q.RegisterWorkers(1, start, hasTimeout, end, localCancel, true)
 | 
						|
					err := flushable.FlushWithContext(localCtx)
 | 
						|
					if err != nil && err != ctx.Err() {
 | 
						|
						cancel()
 | 
						|
					}
 | 
						|
					q.CancelWorkers(pid)
 | 
						|
					localCancel()
 | 
						|
					wg.Done()
 | 
						|
				}(mq)
 | 
						|
			} else {
 | 
						|
				wg.Done()
 | 
						|
			}
 | 
						|
 | 
						|
		}
 | 
						|
		if allEmpty {
 | 
						|
			break
 | 
						|
		}
 | 
						|
		wg.Wait()
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
 | 
						|
}
 | 
						|
 | 
						|
// ManagedQueues returns the managed queues
 | 
						|
func (m *Manager) ManagedQueues() []*ManagedQueue {
 | 
						|
	m.mutex.Lock()
 | 
						|
	mqs := make([]*ManagedQueue, 0, len(m.Queues))
 | 
						|
	for _, mq := range m.Queues {
 | 
						|
		mqs = append(mqs, mq)
 | 
						|
	}
 | 
						|
	m.mutex.Unlock()
 | 
						|
	sort.Sort(ManagedQueueList(mqs))
 | 
						|
	return mqs
 | 
						|
}
 | 
						|
 | 
						|
// Workers returns the poolworkers
 | 
						|
func (q *ManagedQueue) Workers() []*PoolWorkers {
 | 
						|
	q.mutex.Lock()
 | 
						|
	workers := make([]*PoolWorkers, 0, len(q.PoolWorkers))
 | 
						|
	for _, worker := range q.PoolWorkers {
 | 
						|
		workers = append(workers, worker)
 | 
						|
	}
 | 
						|
	q.mutex.Unlock()
 | 
						|
	sort.Sort(PoolWorkersList(workers))
 | 
						|
	return workers
 | 
						|
}
 | 
						|
 | 
						|
// RegisterWorkers registers workers to this queue
 | 
						|
func (q *ManagedQueue) RegisterWorkers(number int, start time.Time, hasTimeout bool, timeout time.Time, cancel context.CancelFunc, isFlusher bool) int64 {
 | 
						|
	q.mutex.Lock()
 | 
						|
	defer q.mutex.Unlock()
 | 
						|
	q.counter++
 | 
						|
	q.PoolWorkers[q.counter] = &PoolWorkers{
 | 
						|
		PID:        q.counter,
 | 
						|
		Workers:    number,
 | 
						|
		Start:      start,
 | 
						|
		Timeout:    timeout,
 | 
						|
		HasTimeout: hasTimeout,
 | 
						|
		Cancel:     cancel,
 | 
						|
		IsFlusher:  isFlusher,
 | 
						|
	}
 | 
						|
	return q.counter
 | 
						|
}
 | 
						|
 | 
						|
// CancelWorkers cancels pooled workers with pid
 | 
						|
func (q *ManagedQueue) CancelWorkers(pid int64) {
 | 
						|
	q.mutex.Lock()
 | 
						|
	pw, ok := q.PoolWorkers[pid]
 | 
						|
	q.mutex.Unlock()
 | 
						|
	if !ok {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	pw.Cancel()
 | 
						|
}
 | 
						|
 | 
						|
// RemoveWorkers deletes pooled workers with pid
 | 
						|
func (q *ManagedQueue) RemoveWorkers(pid int64) {
 | 
						|
	q.mutex.Lock()
 | 
						|
	pw, ok := q.PoolWorkers[pid]
 | 
						|
	delete(q.PoolWorkers, pid)
 | 
						|
	q.mutex.Unlock()
 | 
						|
	if ok && pw.Cancel != nil {
 | 
						|
		pw.Cancel()
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// AddWorkers adds workers to the queue if it has registered an add worker function
 | 
						|
func (q *ManagedQueue) AddWorkers(number int, timeout time.Duration) context.CancelFunc {
 | 
						|
	if pool, ok := q.Managed.(ManagedPool); ok {
 | 
						|
		// the cancel will be added to the pool workers description above
 | 
						|
		return pool.AddWorkers(number, timeout)
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// Flush flushes the queue with a timeout
 | 
						|
func (q *ManagedQueue) Flush(timeout time.Duration) error {
 | 
						|
	if flushable, ok := q.Managed.(Flushable); ok {
 | 
						|
		// the cancel will be added to the pool workers description above
 | 
						|
		return flushable.Flush(timeout)
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// IsEmpty returns if the queue is empty
 | 
						|
func (q *ManagedQueue) IsEmpty() bool {
 | 
						|
	if flushable, ok := q.Managed.(Flushable); ok {
 | 
						|
		return flushable.IsEmpty()
 | 
						|
	}
 | 
						|
	return true
 | 
						|
}
 | 
						|
 | 
						|
// NumberOfWorkers returns the number of workers in the queue
 | 
						|
func (q *ManagedQueue) NumberOfWorkers() int {
 | 
						|
	if pool, ok := q.Managed.(ManagedPool); ok {
 | 
						|
		return pool.NumberOfWorkers()
 | 
						|
	}
 | 
						|
	return -1
 | 
						|
}
 | 
						|
 | 
						|
// MaxNumberOfWorkers returns the maximum number of workers for the pool
 | 
						|
func (q *ManagedQueue) MaxNumberOfWorkers() int {
 | 
						|
	if pool, ok := q.Managed.(ManagedPool); ok {
 | 
						|
		return pool.MaxNumberOfWorkers()
 | 
						|
	}
 | 
						|
	return 0
 | 
						|
}
 | 
						|
 | 
						|
// BoostWorkers returns the number of workers for a boost
 | 
						|
func (q *ManagedQueue) BoostWorkers() int {
 | 
						|
	if pool, ok := q.Managed.(ManagedPool); ok {
 | 
						|
		return pool.BoostWorkers()
 | 
						|
	}
 | 
						|
	return -1
 | 
						|
}
 | 
						|
 | 
						|
// BoostTimeout returns the timeout of the next boost
 | 
						|
func (q *ManagedQueue) BoostTimeout() time.Duration {
 | 
						|
	if pool, ok := q.Managed.(ManagedPool); ok {
 | 
						|
		return pool.BoostTimeout()
 | 
						|
	}
 | 
						|
	return 0
 | 
						|
}
 | 
						|
 | 
						|
// BlockTimeout returns the timeout til the next boost
 | 
						|
func (q *ManagedQueue) BlockTimeout() time.Duration {
 | 
						|
	if pool, ok := q.Managed.(ManagedPool); ok {
 | 
						|
		return pool.BlockTimeout()
 | 
						|
	}
 | 
						|
	return 0
 | 
						|
}
 | 
						|
 | 
						|
// SetPoolSettings sets the setable boost values
 | 
						|
func (q *ManagedQueue) SetPoolSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration) {
 | 
						|
	if pool, ok := q.Managed.(ManagedPool); ok {
 | 
						|
		pool.SetPoolSettings(maxNumberOfWorkers, boostWorkers, timeout)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (l ManagedQueueList) Len() int {
 | 
						|
	return len(l)
 | 
						|
}
 | 
						|
 | 
						|
func (l ManagedQueueList) Less(i, j int) bool {
 | 
						|
	return l[i].Name < l[j].Name
 | 
						|
}
 | 
						|
 | 
						|
func (l ManagedQueueList) Swap(i, j int) {
 | 
						|
	l[i], l[j] = l[j], l[i]
 | 
						|
}
 | 
						|
 | 
						|
func (l PoolWorkersList) Len() int {
 | 
						|
	return len(l)
 | 
						|
}
 | 
						|
 | 
						|
func (l PoolWorkersList) Less(i, j int) bool {
 | 
						|
	return l[i].Start.Before(l[j].Start)
 | 
						|
}
 | 
						|
 | 
						|
func (l PoolWorkersList) Swap(i, j int) {
 | 
						|
	l[i], l[j] = l[j], l[i]
 | 
						|
}
 |