Record waiting time for queriers to receive requests. (#10253)

**What this PR does / why we need it**:
We want to investigate whether work stealing among queriers would be
useful. So we are recording the wait time of queriers for new requests
in their queue.

**Checklist**
- [ ] Reviewed the
[`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md)
guide (**required**)
- [ ] Documentation added
- [ ] Tests updated
- [ ] `CHANGELOG.md` updated
- [ ] If the change is worth mentioning in the release notes, add
`add-to-release-notes` label
- [ ] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/setup/upgrade/_index.md`
- [ ] For Helm chart changes bump the Helm chart version in
`production/helm/loki/Chart.yaml` and update
`production/helm/loki/CHANGELOG.md` and
`production/helm/loki/README.md`. [Example
PR](d10549e3ec)
pull/10300/head
Karsten Jeschkies 3 years ago committed by GitHub
parent a30629e36e
commit b1a948e28b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 13
      pkg/scheduler/queue/metrics.go
  2. 6
      pkg/scheduler/queue/queue.go
  3. 2
      pkg/scheduler/queue/tenant_queues.go

@ -6,9 +6,10 @@ import (
)
type Metrics struct {
queueLength *prometheus.GaugeVec // Per tenant
discardedRequests *prometheus.CounterVec // Per tenant
enqueueCount *prometheus.CounterVec // Per tenant and level
queueLength *prometheus.GaugeVec // Per tenant
discardedRequests *prometheus.CounterVec // Per tenant
enqueueCount *prometheus.CounterVec // Per tenant and level
querierWaitTime *prometheus.HistogramVec // Per querier wait time
}
func NewMetrics(subsystem string, registerer prometheus.Registerer) *Metrics {
@ -31,6 +32,12 @@ func NewMetrics(subsystem string, registerer prometheus.Registerer) *Metrics {
Name: "enqueue_count",
Help: "Total number of enqueued (sub-)queries.",
}, []string{"user", "level"}),
querierWaitTime: promauto.With(registerer).NewHistogramVec(prometheus.HistogramOpts{
Namespace: "loki",
Subsystem: subsystem,
Name: "querier_wait_seconds",
Help: "Time spend waiting for new requests.",
}, []string{"querier"}),
}
}

@ -136,9 +136,11 @@ func (q *RequestQueue) Dequeue(ctx context.Context, last QueueIndex, querierID s
FindQueue:
// We need to wait if there are no tenants, or no pending requests for given querier.
for (q.queues.hasTenantQueues() || querierWait) && ctx.Err() == nil && !q.stopped {
for (q.queues.hasNoTenantQueues() || querierWait) && ctx.Err() == nil && !q.stopped {
querierWait = false
start := time.Now()
q.cond.Wait(ctx)
q.metrics.querierWaitTime.WithLabelValues(querierID).Observe(time.Since(start).Seconds())
}
if q.stopped {
@ -196,7 +198,7 @@ func (q *RequestQueue) stopping(_ error) error {
q.mtx.Lock()
defer q.mtx.Unlock()
for !q.queues.hasTenantQueues() && q.connectedQuerierWorkers.Load() > 0 {
for !q.queues.hasNoTenantQueues() && q.connectedQuerierWorkers.Load() > 0 {
q.cond.Wait(context.Background())
}

@ -109,7 +109,7 @@ func newTenantQueues(maxUserQueueSize int, forgetDelay time.Duration) *tenantQue
}
}
func (q *tenantQueues) hasTenantQueues() bool {
func (q *tenantQueues) hasNoTenantQueues() bool {
return q.mapping.Len() == 0
}

Loading…
Cancel
Save