mirror of https://github.com/grafana/loki
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
298 lines
9.5 KiB
298 lines
9.5 KiB
![]()
5 years ago
|
package queue
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
![]()
2 years ago
|
"fmt"
|
||
![]()
5 years ago
|
"sync"
|
||
![]()
4 years ago
|
"time"
|
||
![]()
5 years ago
|
|
||
![]()
4 years ago
|
"github.com/grafana/dskit/services"
|
||
![]()
5 years ago
|
"github.com/pkg/errors"
|
||
|
"go.uber.org/atomic"
|
||
![]()
4 years ago
|
)
|
||
|
|
||
|
const (
|
||
|
// How frequently to check for disconnected queriers that should be forgotten.
|
||
|
forgetCheckPeriod = 5 * time.Second
|
||
![]()
5 years ago
|
)
|
||
|
|
||
|
var (
|
||
|
ErrTooManyRequests = errors.New("too many outstanding requests")
|
||
|
ErrStopped = errors.New("queue is stopped")
|
||
|
)
|
||
|
|
||
![]()
2 years ago
|
// QueueIndex is opaque type that allows to resume iteration over tenants between successive calls
|
||
![]()
5 years ago
|
// of RequestQueue.GetNextRequestForQuerier method.
|
||
![]()
2 years ago
|
type QueueIndex int // nolint:revive
|
||
![]()
5 years ago
|
|
||
![]()
2 years ago
|
// StartIndexWithLocalQueue is the index of the queue that starts iteration over local and sub queues.
|
||
|
var StartIndexWithLocalQueue QueueIndex = -2
|
||
|
|
||
|
// StartIndex is the index of the queue that starts iteration over sub queues.
|
||
|
var StartIndex QueueIndex = -1
|
||
|
|
||
![]()
2 years ago
|
// Modify index to start iteration on the same tenant, for which last queue was returned.
|
||
|
func (ui QueueIndex) ReuseLastIndex() QueueIndex {
|
||
![]()
2 years ago
|
if ui < StartIndex {
|
||
![]()
2 years ago
|
return ui
|
||
![]()
5 years ago
|
}
|
||
![]()
2 years ago
|
return ui - 1
|
||
![]()
5 years ago
|
}
|
||
|
|
||
|
// Request stored into the queue.
|
||
![]()
2 years ago
|
type Request any
|
||
|
|
||
|
// RequestChannel is a channel that queues Requests
|
||
|
type RequestChannel chan Request
|
||
![]()
5 years ago
|
|
||
![]()
2 years ago
|
// RequestQueue holds incoming requests in per-tenant queues. It also assigns each tenant specified number of queriers,
|
||
![]()
5 years ago
|
// and when querier asks for next request to handle (using GetNextRequestForQuerier), it returns requests
|
||
|
// in a fair fashion.
|
||
|
type RequestQueue struct {
|
||
![]()
4 years ago
|
services.Service
|
||
|
|
||
![]()
2 years ago
|
connectedConsumers *atomic.Int32
|
||
![]()
5 years ago
|
|
||
|
mtx sync.Mutex
|
||
![]()
4 years ago
|
cond contextCond // Notified when request is enqueued or dequeued, or querier is disconnected.
|
||
![]()
2 years ago
|
queues *tenantQueues
|
||
![]()
5 years ago
|
stopped bool
|
||
|
|
||
![]()
2 years ago
|
metrics *Metrics
|
||
![]()
5 years ago
|
}
|
||
|
|
||
![]()
2 years ago
|
func NewRequestQueue(maxOutstandingPerTenant int, forgetDelay time.Duration, metrics *Metrics) *RequestQueue {
|
||
![]()
5 years ago
|
q := &RequestQueue{
|
||
![]()
2 years ago
|
queues: newTenantQueues(maxOutstandingPerTenant, forgetDelay),
|
||
|
connectedConsumers: atomic.NewInt32(0),
|
||
|
metrics: metrics,
|
||
![]()
5 years ago
|
}
|
||
|
|
||
![]()
4 years ago
|
q.cond = contextCond{Cond: sync.NewCond(&q.mtx)}
|
||
![]()
2 years ago
|
q.Service = services.NewTimerService(forgetCheckPeriod, nil, q.forgetDisconnectedConsumers, q.stopping).WithName("request queue")
|
||
![]()
5 years ago
|
|
||
|
return q
|
||
|
}
|
||
|
|
||
![]()
2 years ago
|
// Enqueue puts the request into the queue. MaxQueries is tenant-specific value that specifies how many queriers can
|
||
|
// this tenant use (zero or negative = all queriers). It is passed to each Enqueue, because it can change
|
||
![]()
5 years ago
|
// between calls.
|
||
|
//
|
||
|
// If request is successfully enqueued, successFn is called with the lock held, before any querier can receive the request.
|
||
![]()
2 years ago
|
func (q *RequestQueue) Enqueue(tenant string, path []string, req Request, maxQueriers int, successFn func()) error {
|
||
![]()
5 years ago
|
q.mtx.Lock()
|
||
|
defer q.mtx.Unlock()
|
||
|
|
||
|
if q.stopped {
|
||
|
return ErrStopped
|
||
|
}
|
||
|
|
||
![]()
2 years ago
|
queue := q.queues.getOrAddQueue(tenant, path, maxQueriers)
|
||
![]()
5 years ago
|
if queue == nil {
|
||
![]()
2 years ago
|
// This can only happen if tenant is "".
|
||
![]()
5 years ago
|
return errors.New("no queue found")
|
||
|
}
|
||
|
|
||
![]()
2 years ago
|
// Optimistically increase queue counter for tenant instead of doing separate
|
||
|
// get and set operations, because _most_ of the time the increased value is
|
||
|
// smaller than the max queue length.
|
||
|
// We need to keep track of queue length separately because the size of the
|
||
|
// buffered channel is the same across all sub-queues which would allow
|
||
|
// enqueuing more items than there are allowed at tenant level.
|
||
|
queueLen := q.queues.perUserQueueLen.Inc(tenant)
|
||
|
if queueLen > q.queues.maxUserQueueSize {
|
||
|
q.metrics.discardedRequests.WithLabelValues(tenant).Inc()
|
||
|
// decrement, because we already optimistically increased the counter
|
||
|
q.queues.perUserQueueLen.Dec(tenant)
|
||
|
return ErrTooManyRequests
|
||
|
}
|
||
|
|
||
![]()
5 years ago
|
select {
|
||
![]()
2 years ago
|
case queue.Chan() <- req:
|
||
![]()
2 years ago
|
q.metrics.queueLength.WithLabelValues(tenant).Inc()
|
||
|
q.metrics.enqueueCount.WithLabelValues(tenant, fmt.Sprint(len(path))).Inc()
|
||
![]()
5 years ago
|
q.cond.Broadcast()
|
||
|
// Call this function while holding a lock. This guarantees that no querier can fetch the request before function returns.
|
||
|
if successFn != nil {
|
||
|
successFn()
|
||
|
}
|
||
|
return nil
|
||
|
default:
|
||
![]()
2 years ago
|
q.metrics.discardedRequests.WithLabelValues(tenant).Inc()
|
||
![]()
2 years ago
|
// decrement, because we already optimistically increased the counter
|
||
|
q.queues.perUserQueueLen.Dec(tenant)
|
||
![]()
5 years ago
|
return ErrTooManyRequests
|
||
|
}
|
||
|
}
|
||
|
|
||
![]()
2 years ago
|
// Dequeue find next tenant queue and takes the next request off of it. Will block if there are no requests.
|
||
|
// By passing tenant index from previous call of this method, querier guarantees that it iterates over all tenants fairly.
|
||
![]()
2 years ago
|
// If consumer finds that request from the tenant is already expired, it can get a request for the same tenant by using UserIndex.ReuseLastUser.
|
||
|
func (q *RequestQueue) Dequeue(ctx context.Context, last QueueIndex, consumerID string) (Request, QueueIndex, error) {
|
||
![]()
5 years ago
|
q.mtx.Lock()
|
||
|
defer q.mtx.Unlock()
|
||
|
|
||
|
querierWait := false
|
||
|
|
||
|
FindQueue:
|
||
![]()
2 years ago
|
// We need to wait if there are no tenants, or no pending requests for given querier.
|
||
![]()
2 years ago
|
for (q.queues.hasNoTenantQueues() || querierWait) && ctx.Err() == nil && !q.stopped {
|
||
![]()
5 years ago
|
querierWait = false
|
||
![]()
2 years ago
|
start := time.Now()
|
||
![]()
4 years ago
|
q.cond.Wait(ctx)
|
||
![]()
2 years ago
|
q.metrics.querierWaitTime.WithLabelValues(consumerID).Observe(time.Since(start).Seconds())
|
||
![]()
5 years ago
|
}
|
||
|
|
||
|
if q.stopped {
|
||
|
return nil, last, ErrStopped
|
||
|
}
|
||
|
|
||
|
if err := ctx.Err(); err != nil {
|
||
|
return nil, last, err
|
||
|
}
|
||
|
|
||
|
for {
|
||
![]()
2 years ago
|
queue, tenant, idx := q.queues.getNextQueueForConsumer(last, consumerID)
|
||
![]()
2 years ago
|
last = idx
|
||
![]()
5 years ago
|
if queue == nil {
|
||
|
break
|
||
|
}
|
||
|
|
||
|
// Pick next request from the queue.
|
||
|
for {
|
||
![]()
2 years ago
|
request := queue.Dequeue()
|
||
|
if queue.Len() == 0 {
|
||
![]()
2 years ago
|
q.queues.deleteQueue(tenant)
|
||
![]()
5 years ago
|
}
|
||
|
|
||
![]()
2 years ago
|
q.queues.perUserQueueLen.Dec(tenant)
|
||
![]()
2 years ago
|
q.metrics.queueLength.WithLabelValues(tenant).Dec()
|
||
![]()
5 years ago
|
|
||
|
// Tell close() we've processed a request.
|
||
|
q.cond.Broadcast()
|
||
|
|
||
|
return request, last, nil
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// There are no unexpired requests, so we can get back
|
||
|
// and wait for more requests.
|
||
|
querierWait = true
|
||
|
goto FindQueue
|
||
|
}
|
||
|
|
||
![]()
2 years ago
|
func (q *RequestQueue) forgetDisconnectedConsumers(_ context.Context) error {
|
||
![]()
4 years ago
|
q.mtx.Lock()
|
||
|
defer q.mtx.Unlock()
|
||
|
|
||
![]()
2 years ago
|
if q.queues.forgetDisconnectedConsumers(time.Now()) > 0 {
|
||
![]()
4 years ago
|
// We need to notify goroutines cause having removed some queriers
|
||
|
// may have caused a resharding.
|
||
|
q.cond.Broadcast()
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (q *RequestQueue) stopping(_ error) error {
|
||
![]()
5 years ago
|
q.mtx.Lock()
|
||
|
defer q.mtx.Unlock()
|
||
|
|
||
![]()
2 years ago
|
for !q.queues.hasNoTenantQueues() && q.connectedConsumers.Load() > 0 {
|
||
![]()
4 years ago
|
q.cond.Wait(context.Background())
|
||
![]()
5 years ago
|
}
|
||
|
|
||
|
// Only stop after dispatching enqueued requests.
|
||
|
q.stopped = true
|
||
|
|
||
|
// If there are still goroutines in GetNextRequestForQuerier method, they get notified.
|
||
|
q.cond.Broadcast()
|
||
![]()
4 years ago
|
|
||
|
return nil
|
||
![]()
5 years ago
|
}
|
||
|
|
||
![]()
2 years ago
|
func (q *RequestQueue) RegisterConsumerConnection(querier string) {
|
||
|
q.connectedConsumers.Inc()
|
||
![]()
5 years ago
|
|
||
|
q.mtx.Lock()
|
||
|
defer q.mtx.Unlock()
|
||
![]()
2 years ago
|
q.queues.addConsumerToConnection(querier)
|
||
![]()
5 years ago
|
}
|
||
|
|
||
![]()
2 years ago
|
func (q *RequestQueue) UnregisterConsumerConnection(querier string) {
|
||
|
q.connectedConsumers.Dec()
|
||
![]()
5 years ago
|
|
||
|
q.mtx.Lock()
|
||
|
defer q.mtx.Unlock()
|
||
![]()
2 years ago
|
q.queues.removeConsumerConnection(querier, time.Now())
|
||
![]()
4 years ago
|
}
|
||
|
|
||
![]()
2 years ago
|
func (q *RequestQueue) NotifyConsumerShutdown(querierID string) {
|
||
![]()
4 years ago
|
q.mtx.Lock()
|
||
|
defer q.mtx.Unlock()
|
||
|
q.queues.notifyQuerierShutdown(querierID)
|
||
![]()
5 years ago
|
}
|
||
|
|
||
![]()
2 years ago
|
func (q *RequestQueue) GetConnectedConsumersMetric() float64 {
|
||
|
return float64(q.connectedConsumers.Load())
|
||
![]()
5 years ago
|
}
|
||
![]()
4 years ago
|
|
||
|
// contextCond is a *sync.Cond with Wait() method overridden to support context-based waiting.
|
||
|
type contextCond struct {
|
||
|
*sync.Cond
|
||
|
|
||
|
// testHookBeforeWaiting is called before calling Cond.Wait() if it's not nil.
|
||
|
// Yes, it's ugly, but the http package settled jurisprudence:
|
||
|
// https://github.com/golang/go/blob/6178d25fc0b28724b1b5aec2b1b74fc06d9294c7/src/net/http/client.go#L596-L601
|
||
|
testHookBeforeWaiting func()
|
||
|
}
|
||
|
|
||
|
// Wait does c.cond.Wait() but will also return if the context provided is done.
|
||
|
// All the documentation of sync.Cond.Wait() applies, but it's especially important to remember that the mutex of
|
||
|
// the cond should be held while Wait() is called (and mutex will be held once it returns)
|
||
|
func (c contextCond) Wait(ctx context.Context) {
|
||
|
// "condWait" goroutine does q.cond.Wait() and signals through condWait channel.
|
||
|
condWait := make(chan struct{})
|
||
|
go func() {
|
||
|
if c.testHookBeforeWaiting != nil {
|
||
|
c.testHookBeforeWaiting()
|
||
|
}
|
||
|
c.Cond.Wait()
|
||
|
close(condWait)
|
||
|
}()
|
||
|
|
||
|
// "waiting" goroutine: signals that the condWait goroutine has started waiting.
|
||
|
// Notice that a closed waiting channel implies that the goroutine above has started waiting
|
||
|
// (because it has unlocked the mutex), but the other way is not true:
|
||
|
// - condWait it may have unlocked and is waiting, but someone else locked the mutex faster than us:
|
||
|
// in this case that caller will eventually unlock, and we'll be able to enter here.
|
||
|
// - condWait called Wait(), unlocked, received a broadcast and locked again faster than we were able to lock here:
|
||
|
// in this case condWait channel will be closed, and this goroutine will be waiting until we unlock.
|
||
|
waiting := make(chan struct{})
|
||
|
go func() {
|
||
|
c.L.Lock()
|
||
|
close(waiting)
|
||
|
c.L.Unlock()
|
||
|
}()
|
||
|
|
||
|
select {
|
||
|
case <-condWait:
|
||
|
// We don't know whether the waiting goroutine is done or not, but we don't care:
|
||
|
// it will be done once nobody is fighting for the mutex anymore.
|
||
|
case <-ctx.Done():
|
||
|
// In order to avoid leaking the condWait goroutine, we can send a broadcast.
|
||
|
// Before sending the broadcast we need to make sure that condWait goroutine is already waiting (or has already waited).
|
||
|
select {
|
||
|
case <-condWait:
|
||
|
// No need to broadcast as q.cond.Wait() has returned already.
|
||
|
return
|
||
|
case <-waiting:
|
||
|
// q.cond.Wait() might be still waiting (or maybe not!), so we'll poke it just in case.
|
||
|
c.Broadcast()
|
||
|
}
|
||
|
|
||
|
// Make sure we are not waiting anymore, we need to do that before returning as the caller will need to unlock the mutex.
|
||
|
<-condWait
|
||
|
}
|
||
|
}
|