mirror of https://github.com/grafana/loki
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1302 lines
36 KiB
1302 lines
36 KiB
package queryrange
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"reflect"
|
|
"regexp"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/grafana/dskit/user"
|
|
"github.com/prometheus/common/model"
|
|
"github.com/prometheus/prometheus/model/labels"
|
|
"github.com/prometheus/prometheus/promql"
|
|
"github.com/stretchr/testify/require"
|
|
"go.uber.org/atomic"
|
|
"gopkg.in/yaml.v2"
|
|
|
|
"github.com/grafana/loki/v3/pkg/logproto"
|
|
"github.com/grafana/loki/v3/pkg/logql/syntax"
|
|
"github.com/grafana/loki/v3/pkg/logqlmodel"
|
|
"github.com/grafana/loki/v3/pkg/logqlmodel/metadata"
|
|
"github.com/grafana/loki/v3/pkg/querier/plan"
|
|
"github.com/grafana/loki/v3/pkg/querier/queryrange/queryrangebase"
|
|
"github.com/grafana/loki/v3/pkg/storage/config"
|
|
"github.com/grafana/loki/v3/pkg/storage/types"
|
|
"github.com/grafana/loki/v3/pkg/util"
|
|
"github.com/grafana/loki/v3/pkg/util/constants"
|
|
"github.com/grafana/loki/v3/pkg/util/httpreq"
|
|
util_log "github.com/grafana/loki/v3/pkg/util/log"
|
|
"github.com/grafana/loki/v3/pkg/util/querylimits"
|
|
)
|
|
|
|
func TestLimits(t *testing.T) {
|
|
l := fakeLimits{
|
|
splitDuration: map[string]time.Duration{"a": time.Minute},
|
|
}
|
|
|
|
wrapped := WithSplitByLimits(l, time.Hour)
|
|
|
|
// Test default
|
|
require.Equal(t, wrapped.QuerySplitDuration("b"), time.Hour)
|
|
// Ensure we override the underlying implementation
|
|
require.Equal(t, wrapped.QuerySplitDuration("a"), time.Hour)
|
|
|
|
r := &LokiRequest{
|
|
Query: "qry",
|
|
StartTs: time.Now(),
|
|
Step: int64(time.Minute / time.Millisecond),
|
|
}
|
|
|
|
require.Equal(
|
|
t,
|
|
fmt.Sprintf("%s:%s:%d:%d:%d", "a", r.GetQuery(), r.GetStep(), r.GetStart().UnixMilli()/int64(time.Hour/time.Millisecond), int64(time.Hour)),
|
|
cacheKeyLimits{wrapped, nil, nil}.GenerateCacheKey(context.Background(), "a", r),
|
|
)
|
|
}
|
|
|
|
func TestMetricQueryCacheKey(t *testing.T) {
|
|
const (
|
|
defaultTenant = "a"
|
|
alternateTenant = "b"
|
|
query = `sum(rate({foo="bar"}[1]))`
|
|
defaultSplit = time.Hour
|
|
ingesterSplit = 90 * time.Minute
|
|
ingesterQueryWindow = defaultSplit * 3
|
|
)
|
|
|
|
step := (15 * time.Second).Milliseconds()
|
|
|
|
l := fakeLimits{
|
|
splitDuration: map[string]time.Duration{defaultTenant: defaultSplit, alternateTenant: defaultSplit},
|
|
ingesterSplitDuration: map[string]time.Duration{defaultTenant: ingesterSplit},
|
|
}
|
|
|
|
cases := []struct {
|
|
name, tenantID string
|
|
start, end time.Time
|
|
expectedSplit time.Duration
|
|
iqo util.IngesterQueryOptions
|
|
}{
|
|
{
|
|
name: "outside ingester query window",
|
|
tenantID: defaultTenant,
|
|
start: time.Now().Add(-6 * time.Hour),
|
|
end: time.Now().Add(-5 * time.Hour),
|
|
expectedSplit: defaultSplit,
|
|
iqo: ingesterQueryOpts{
|
|
queryIngestersWithin: ingesterQueryWindow,
|
|
queryStoreOnly: false,
|
|
},
|
|
},
|
|
{
|
|
name: "within ingester query window",
|
|
tenantID: defaultTenant,
|
|
start: time.Now().Add(-6 * time.Hour),
|
|
end: time.Now().Add(-ingesterQueryWindow / 2),
|
|
expectedSplit: ingesterSplit,
|
|
iqo: ingesterQueryOpts{
|
|
queryIngestersWithin: ingesterQueryWindow,
|
|
queryStoreOnly: false,
|
|
},
|
|
},
|
|
{
|
|
name: "within ingester query window, but query store only",
|
|
tenantID: defaultTenant,
|
|
start: time.Now().Add(-6 * time.Hour),
|
|
end: time.Now().Add(-ingesterQueryWindow / 2),
|
|
expectedSplit: defaultSplit,
|
|
iqo: ingesterQueryOpts{
|
|
queryIngestersWithin: ingesterQueryWindow,
|
|
queryStoreOnly: true,
|
|
},
|
|
},
|
|
{
|
|
name: "within ingester query window, but no ingester split duration configured",
|
|
tenantID: alternateTenant,
|
|
start: time.Now().Add(-6 * time.Hour),
|
|
end: time.Now().Add(-ingesterQueryWindow / 2),
|
|
expectedSplit: defaultSplit,
|
|
iqo: ingesterQueryOpts{
|
|
queryIngestersWithin: ingesterQueryWindow,
|
|
queryStoreOnly: false,
|
|
},
|
|
},
|
|
}
|
|
for _, tc := range cases {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
keyGen := cacheKeyLimits{l, nil, tc.iqo}
|
|
|
|
r := &LokiRequest{
|
|
Query: query,
|
|
StartTs: tc.start,
|
|
EndTs: tc.end,
|
|
Step: step,
|
|
}
|
|
|
|
// we use regex here because cache key always refers to the current time to get the ingester query window,
|
|
// and therefore we can't know the current interval apriori without duplicating the logic
|
|
pattern := regexp.MustCompile(fmt.Sprintf(`%s:%s:%d:(\d+):%d`, tc.tenantID, regexp.QuoteMeta(query), step, tc.expectedSplit))
|
|
require.Regexp(t, pattern, keyGen.GenerateCacheKey(context.Background(), tc.tenantID, r))
|
|
})
|
|
}
|
|
}
|
|
|
|
func Test_seriesLimiter(t *testing.T) {
|
|
cfg := testConfig
|
|
cfg.CacheResults = false
|
|
cfg.CacheIndexStatsResults = false
|
|
// split in 7 with 2 in // max.
|
|
l := WithSplitByLimits(fakeLimits{maxSeries: 1, maxQueryParallelism: 2}, time.Hour)
|
|
tpw, stopper, err := NewMiddleware(cfg, testEngineOpts, RouterConfig{}, nil, util_log.Logger, l, config.SchemaConfig{
|
|
Configs: testSchemas,
|
|
}, nil, false, nil, constants.Loki)
|
|
if stopper != nil {
|
|
defer stopper.Stop()
|
|
}
|
|
require.NoError(t, err)
|
|
|
|
lreq := &LokiRequest{
|
|
Query: `rate({app="foo"} |= "foo"[1m])`,
|
|
Limit: 1000,
|
|
Step: 30000, // 30sec
|
|
StartTs: testTime.Add(-6 * time.Hour),
|
|
EndTs: testTime,
|
|
Direction: logproto.FORWARD,
|
|
Path: "/query_range",
|
|
Plan: &plan.QueryPlan{
|
|
AST: syntax.MustParseExpr(`rate({app="foo"} |= "foo"[1m])`),
|
|
},
|
|
}
|
|
|
|
ctx := user.InjectOrgID(context.Background(), "1")
|
|
|
|
count, h := promqlResult(matrix)
|
|
_, err = tpw.Wrap(h).Do(ctx, lreq)
|
|
require.NoError(t, err)
|
|
require.Equal(t, 7, *count)
|
|
|
|
// 2 series should not be allowed.
|
|
c := new(int)
|
|
m := &sync.Mutex{}
|
|
h = queryrangebase.HandlerFunc(func(_ context.Context, req queryrangebase.Request) (queryrangebase.Response, error) {
|
|
m.Lock()
|
|
defer m.Unlock()
|
|
defer func() {
|
|
*c++
|
|
}()
|
|
// first time returns a single series
|
|
if *c == 0 {
|
|
params, err := ParamsFromRequest(req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return ResultToResponse(logqlmodel.Result{Data: matrix}, params)
|
|
}
|
|
// second time returns a different series.
|
|
m := promql.Matrix{
|
|
{
|
|
Floats: []promql.FPoint{
|
|
{
|
|
T: toMs(testTime.Add(-4 * time.Hour)),
|
|
F: 0.013333333333333334,
|
|
},
|
|
},
|
|
Metric: labels.FromStrings(
|
|
"filename", `/var/hostlog/apport.log`,
|
|
"job", "anotherjob",
|
|
),
|
|
},
|
|
}
|
|
params, err := ParamsFromRequest(req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return ResultToResponse(logqlmodel.Result{Data: m}, params)
|
|
})
|
|
|
|
_, err = tpw.Wrap(h).Do(ctx, lreq)
|
|
require.Error(t, err)
|
|
require.LessOrEqual(t, *c, 4)
|
|
}
|
|
|
|
func Test_seriesLimiterDrilldown(t *testing.T) {
|
|
// Test the drilldown scenario directly using the series limiter middleware
|
|
middleware := newSeriesLimiter(1) // limit to 1 series
|
|
|
|
// Create context with drilldown request headers
|
|
tenantCtx := user.InjectOrgID(context.Background(), "1")
|
|
ctx := httpreq.InjectQueryTags(tenantCtx, "Source="+constants.LogsDrilldownAppName)
|
|
|
|
// Create metadata context to capture warnings
|
|
md, ctx := metadata.NewContext(ctx)
|
|
|
|
callCount := 0
|
|
// Create a handler that returns 1 series on first call, then 1 more series on second call
|
|
h := queryrangebase.HandlerFunc(func(_ context.Context, _ queryrangebase.Request) (queryrangebase.Response, error) {
|
|
var result []queryrangebase.SampleStream
|
|
|
|
if callCount == 0 {
|
|
// First call: return 1 series
|
|
result = []queryrangebase.SampleStream{
|
|
{
|
|
Labels: []logproto.LabelAdapter{
|
|
{Name: "filename", Value: "/var/hostlog/app.log"},
|
|
{Name: "job", Value: "firstjob"},
|
|
},
|
|
Samples: []logproto.LegacySample{{Value: 0.013333333333333334}},
|
|
},
|
|
}
|
|
} else {
|
|
// Second call: return 1 different series (this should trigger the drilldown logic)
|
|
result = []queryrangebase.SampleStream{
|
|
{
|
|
Labels: []logproto.LabelAdapter{
|
|
{Name: "filename", Value: "/var/hostlog/apport.log"},
|
|
{Name: "job", Value: "anotherjob"},
|
|
},
|
|
Samples: []logproto.LegacySample{{Value: 0.026666666666666668}},
|
|
},
|
|
}
|
|
}
|
|
callCount++
|
|
|
|
return &LokiPromResponse{
|
|
Response: &queryrangebase.PrometheusResponse{
|
|
Data: queryrangebase.PrometheusData{
|
|
ResultType: "matrix",
|
|
Result: result,
|
|
},
|
|
},
|
|
}, nil
|
|
})
|
|
|
|
wrappedHandler := middleware.Wrap(h)
|
|
|
|
// First call - should succeed and add 1 series to the limiter
|
|
resp1, err := wrappedHandler.Do(ctx, &LokiRequest{})
|
|
require.NoError(t, err)
|
|
require.NotNil(t, resp1)
|
|
|
|
// Should have no warnings yet
|
|
require.Len(t, md.Warnings(), 0)
|
|
|
|
// Second call - should trigger drilldown logic and return with warning
|
|
resp2, err := wrappedHandler.Do(ctx, &LokiRequest{})
|
|
require.NoError(t, err)
|
|
require.NotNil(t, resp2)
|
|
|
|
// Verify that a warning was added about partial results
|
|
warnings := md.Warnings()
|
|
require.Len(t, warnings, 1)
|
|
require.Contains(t, warnings[0], "maximum number of series (1) reached for a single query; returning partial results")
|
|
|
|
// The second response should be the original response since drilldown returns early
|
|
lokiResp, ok := resp2.(*LokiPromResponse)
|
|
require.True(t, ok)
|
|
require.Len(t, lokiResp.Response.Data.Result, 1)
|
|
|
|
// A request without the drilldown header should produce an error
|
|
_, err = wrappedHandler.Do(tenantCtx, &LokiRequest{})
|
|
require.Error(t, err)
|
|
}
|
|
|
|
func TestSeriesLimiter_PerVariantLimits(t *testing.T) {
|
|
for _, test := range []struct {
|
|
name string
|
|
req *LokiRequest
|
|
resp *LokiPromResponse
|
|
expectedResponse *LokiPromResponse
|
|
expectedWarnings []string
|
|
}{
|
|
{
|
|
name: "single variant under limit should not exceed limit",
|
|
req: &LokiRequest{
|
|
Query: "sum by (job) (count_over_time({job=~\"app.*\"}[1m]))",
|
|
},
|
|
resp: createPromResponse([][]seriesLabels{
|
|
{
|
|
{Name: "job", Value: "app1"},
|
|
{Name: constants.VariantLabel, Value: "0"},
|
|
},
|
|
{
|
|
{Name: "job", Value: "app2"},
|
|
{Name: constants.VariantLabel, Value: "0"},
|
|
},
|
|
{
|
|
{Name: "job", Value: "app3"},
|
|
{Name: constants.VariantLabel, Value: "0"},
|
|
},
|
|
}),
|
|
expectedResponse: createPromResponse([][]seriesLabels{
|
|
{
|
|
{Name: "job", Value: "app1"},
|
|
{Name: constants.VariantLabel, Value: "0"},
|
|
},
|
|
{
|
|
{Name: "job", Value: "app2"},
|
|
{Name: constants.VariantLabel, Value: "0"},
|
|
},
|
|
{
|
|
{Name: "job", Value: "app3"},
|
|
{Name: constants.VariantLabel, Value: "0"},
|
|
},
|
|
}),
|
|
},
|
|
{
|
|
name: "multiple variants under limit should not exceed limit",
|
|
req: &LokiRequest{
|
|
Query: "sum by (job) (count_over_time({job=~\"app.*\"}[1m]))",
|
|
},
|
|
resp: createPromResponse([][]seriesLabels{
|
|
{
|
|
{Name: "job", Value: "app1"},
|
|
{Name: constants.VariantLabel, Value: "0"},
|
|
},
|
|
{
|
|
{Name: "job", Value: "app2"},
|
|
{Name: constants.VariantLabel, Value: "0"},
|
|
},
|
|
{
|
|
{Name: "job", Value: "app3"},
|
|
{Name: constants.VariantLabel, Value: "0"},
|
|
},
|
|
{
|
|
{Name: "job", Value: "app1"},
|
|
{Name: constants.VariantLabel, Value: "1"},
|
|
},
|
|
{
|
|
{Name: "job", Value: "app2"},
|
|
{Name: constants.VariantLabel, Value: "1"},
|
|
},
|
|
{
|
|
{Name: "job", Value: "app3"},
|
|
{Name: constants.VariantLabel, Value: "1"},
|
|
},
|
|
}),
|
|
expectedResponse: createPromResponse([][]seriesLabels{
|
|
{
|
|
{Name: "job", Value: "app1"},
|
|
{Name: constants.VariantLabel, Value: "0"},
|
|
},
|
|
{
|
|
{Name: "job", Value: "app2"},
|
|
{Name: constants.VariantLabel, Value: "0"},
|
|
},
|
|
{
|
|
{Name: "job", Value: "app3"},
|
|
{Name: constants.VariantLabel, Value: "0"},
|
|
},
|
|
{
|
|
{Name: "job", Value: "app1"},
|
|
{Name: constants.VariantLabel, Value: "1"},
|
|
},
|
|
{
|
|
{Name: "job", Value: "app2"},
|
|
{Name: constants.VariantLabel, Value: "1"},
|
|
},
|
|
{
|
|
{Name: "job", Value: "app3"},
|
|
{Name: constants.VariantLabel, Value: "1"},
|
|
},
|
|
}),
|
|
},
|
|
{
|
|
name: "single variant over the limit should exceed limit",
|
|
req: &LokiRequest{
|
|
Query: "sum by (job) (count_over_time({job=~\"app.*\"}[1m]))",
|
|
},
|
|
resp: createPromResponse([][]seriesLabels{
|
|
{
|
|
{Name: "job", Value: "app1"},
|
|
{Name: constants.VariantLabel, Value: "1"},
|
|
},
|
|
{
|
|
{Name: "job", Value: "app2"},
|
|
{Name: constants.VariantLabel, Value: "1"},
|
|
},
|
|
{
|
|
{Name: "job", Value: "app3"},
|
|
{Name: constants.VariantLabel, Value: "1"},
|
|
},
|
|
{
|
|
{Name: "job", Value: "app4"},
|
|
{Name: constants.VariantLabel, Value: "1"},
|
|
},
|
|
}),
|
|
expectedResponse: createPromResponse([][]seriesLabels{
|
|
{
|
|
{Name: "job", Value: "app1"},
|
|
{Name: constants.VariantLabel, Value: "1"},
|
|
},
|
|
{
|
|
{Name: "job", Value: "app2"},
|
|
{Name: constants.VariantLabel, Value: "1"},
|
|
},
|
|
{
|
|
{Name: "job", Value: "app3"},
|
|
{Name: constants.VariantLabel, Value: "1"},
|
|
},
|
|
}),
|
|
expectedWarnings: []string{"maximum of series (3) reached for variant (1)"},
|
|
},
|
|
{
|
|
name: "multiple variants over the limit should exceed limit",
|
|
req: &LokiRequest{
|
|
Query: "sum by (job) (count_over_time({job=~\"app.*\"}[1m]))",
|
|
},
|
|
resp: createPromResponse([][]seriesLabels{
|
|
{
|
|
{Name: "app", Value: "foo"},
|
|
{Name: constants.VariantLabel, Value: "0"},
|
|
},
|
|
{
|
|
{Name: "app", Value: "bar"},
|
|
{Name: constants.VariantLabel, Value: "0"},
|
|
},
|
|
{
|
|
{Name: "job", Value: "app1"},
|
|
{Name: constants.VariantLabel, Value: "1"},
|
|
},
|
|
{
|
|
{Name: "job", Value: "app2"},
|
|
{Name: constants.VariantLabel, Value: "1"},
|
|
},
|
|
{
|
|
{Name: "job", Value: "app3"},
|
|
{Name: constants.VariantLabel, Value: "1"},
|
|
},
|
|
{
|
|
{Name: "job", Value: "app4"},
|
|
{Name: constants.VariantLabel, Value: "1"},
|
|
},
|
|
{
|
|
{Name: "job", Value: "app5"},
|
|
{Name: constants.VariantLabel, Value: "1"},
|
|
},
|
|
}),
|
|
expectedResponse: createPromResponse([][]seriesLabels{
|
|
{
|
|
{Name: "app", Value: "foo"},
|
|
{Name: constants.VariantLabel, Value: "0"},
|
|
},
|
|
{
|
|
{Name: "app", Value: "bar"},
|
|
{Name: constants.VariantLabel, Value: "0"},
|
|
},
|
|
{
|
|
{Name: "job", Value: "app1"},
|
|
{Name: constants.VariantLabel, Value: "1"},
|
|
},
|
|
{
|
|
{Name: "job", Value: "app2"},
|
|
{Name: constants.VariantLabel, Value: "1"},
|
|
},
|
|
{
|
|
{Name: "job", Value: "app3"},
|
|
{Name: constants.VariantLabel, Value: "1"},
|
|
},
|
|
}),
|
|
expectedWarnings: []string{"maximum of series (3) reached for variant (1)"},
|
|
},
|
|
} {
|
|
|
|
t.Run(test.name, func(t *testing.T) {
|
|
middleware := newSeriesLimiter(3)
|
|
mock := variantMockHandler{
|
|
response: test.resp,
|
|
}
|
|
handler := middleware.Wrap(mock)
|
|
|
|
metadata, ctx := metadata.NewContext(context.Background())
|
|
|
|
resp, err := handler.Do(ctx, &LokiRequest{})
|
|
require.NoError(t, err)
|
|
require.EqualValues(t, test.expectedResponse.Response.Data, resp.(*LokiPromResponse).Response.Data)
|
|
|
|
if test.expectedWarnings != nil {
|
|
require.Equal(t, test.expectedWarnings, metadata.Warnings())
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
type seriesLabels = logproto.LabelAdapter
|
|
|
|
// variantMockHandler is a mock implementation of queryrangebase.Handler
|
|
type variantMockHandler struct {
|
|
response *LokiPromResponse
|
|
}
|
|
|
|
func (m variantMockHandler) Do(_ context.Context, _ queryrangebase.Request) (queryrangebase.Response, error) {
|
|
// For testing, we'll return the predefined responses
|
|
// This is just a mock - in a real case, the handler would return
|
|
// different responses based on the request
|
|
if m.response != nil {
|
|
return m.response, nil
|
|
}
|
|
// Return empty response by default
|
|
return createPromResponse(nil), nil
|
|
}
|
|
|
|
func createPromResponse(series [][]seriesLabels) *LokiPromResponse {
|
|
result := make([]queryrangebase.SampleStream, len(series))
|
|
for i, labels := range series {
|
|
result[i] = queryrangebase.SampleStream{
|
|
Labels: labels,
|
|
Samples: []logproto.LegacySample{{Value: 1.0}},
|
|
}
|
|
}
|
|
|
|
return &LokiPromResponse{
|
|
Response: &queryrangebase.PrometheusResponse{
|
|
Data: queryrangebase.PrometheusData{
|
|
ResultType: "matrix",
|
|
Result: result,
|
|
},
|
|
},
|
|
}
|
|
}
|
|
|
|
func Test_MaxQueryParallelism(t *testing.T) {
|
|
maxQueryParallelism := 2
|
|
|
|
var count atomic.Int32
|
|
var maxVal atomic.Int32
|
|
h := queryrangebase.HandlerFunc(func(_ context.Context, _ queryrangebase.Request) (queryrangebase.Response, error) {
|
|
cur := count.Inc()
|
|
if cur > maxVal.Load() {
|
|
maxVal.Store(cur)
|
|
}
|
|
defer count.Dec()
|
|
// simulate some work
|
|
time.Sleep(20 * time.Millisecond)
|
|
return queryrangebase.NewEmptyPrometheusResponse(model.ValMatrix), nil
|
|
})
|
|
ctx := user.InjectOrgID(context.Background(), "foo")
|
|
|
|
_, _ = NewLimitedRoundTripper(h, fakeLimits{maxQueryParallelism: maxQueryParallelism},
|
|
testSchemas,
|
|
queryrangebase.MiddlewareFunc(func(next queryrangebase.Handler) queryrangebase.Handler {
|
|
return queryrangebase.HandlerFunc(func(c context.Context, _ queryrangebase.Request) (queryrangebase.Response, error) {
|
|
var wg sync.WaitGroup
|
|
for i := 0; i < 10; i++ {
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
_, _ = next.Do(c, &LokiRequest{})
|
|
}()
|
|
}
|
|
wg.Wait()
|
|
return nil, nil
|
|
})
|
|
}),
|
|
).Do(ctx, &LokiRequest{})
|
|
maxFound := int(maxVal.Load())
|
|
require.LessOrEqual(t, maxFound, maxQueryParallelism, "max query parallelism: ", maxFound, " went over the configured one:", maxQueryParallelism)
|
|
}
|
|
|
|
func Test_MaxQueryParallelismLateScheduling(t *testing.T) {
|
|
maxQueryParallelism := 2
|
|
|
|
h := queryrangebase.HandlerFunc(func(_ context.Context, _ queryrangebase.Request) (queryrangebase.Response, error) {
|
|
// simulate some work
|
|
time.Sleep(20 * time.Millisecond)
|
|
return queryrangebase.NewEmptyPrometheusResponse(model.ValMatrix), nil
|
|
})
|
|
ctx := user.InjectOrgID(context.Background(), "foo")
|
|
|
|
_, err := NewLimitedRoundTripper(h, fakeLimits{maxQueryParallelism: maxQueryParallelism},
|
|
testSchemas,
|
|
queryrangebase.MiddlewareFunc(func(next queryrangebase.Handler) queryrangebase.Handler {
|
|
return queryrangebase.HandlerFunc(func(c context.Context, r queryrangebase.Request) (queryrangebase.Response, error) {
|
|
for i := 0; i < 10; i++ {
|
|
go func() {
|
|
_, _ = next.Do(c, r)
|
|
}()
|
|
}
|
|
return nil, nil
|
|
})
|
|
}),
|
|
).Do(ctx, &LokiRequest{})
|
|
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
func Test_MaxQueryParallelismDisable(t *testing.T) {
|
|
maxQueryParallelism := 0
|
|
|
|
h := queryrangebase.HandlerFunc(func(_ context.Context, _ queryrangebase.Request) (queryrangebase.Response, error) {
|
|
// simulate some work
|
|
time.Sleep(20 * time.Millisecond)
|
|
return queryrangebase.NewEmptyPrometheusResponse(model.ValMatrix), nil
|
|
})
|
|
ctx := user.InjectOrgID(context.Background(), "foo")
|
|
|
|
_, err := NewLimitedRoundTripper(h, fakeLimits{maxQueryParallelism: maxQueryParallelism},
|
|
testSchemas,
|
|
queryrangebase.MiddlewareFunc(func(next queryrangebase.Handler) queryrangebase.Handler {
|
|
return queryrangebase.HandlerFunc(func(c context.Context, _ queryrangebase.Request) (queryrangebase.Response, error) {
|
|
for i := 0; i < 10; i++ {
|
|
go func() {
|
|
_, _ = next.Do(c, &LokiRequest{})
|
|
}()
|
|
}
|
|
return nil, nil
|
|
})
|
|
}),
|
|
).Do(ctx, &LokiRequest{})
|
|
require.Error(t, err)
|
|
}
|
|
|
|
func Test_MaxQueryLookBack(t *testing.T) {
|
|
tpw, stopper, err := NewMiddleware(testConfig, testEngineOpts, RouterConfig{}, nil, util_log.Logger, fakeLimits{
|
|
maxQueryLookback: 1 * time.Hour,
|
|
maxQueryParallelism: 1,
|
|
}, config.SchemaConfig{
|
|
Configs: testSchemas,
|
|
}, nil, false, nil, constants.Loki)
|
|
if stopper != nil {
|
|
defer stopper.Stop()
|
|
}
|
|
require.NoError(t, err)
|
|
|
|
lreq := &LokiRequest{
|
|
Query: `{app="foo"} |= "foo"`,
|
|
Limit: 10000,
|
|
StartTs: testTime.Add(-6 * time.Hour),
|
|
EndTs: testTime,
|
|
Direction: logproto.FORWARD,
|
|
Path: "/loki/api/v1/query_range",
|
|
Plan: &plan.QueryPlan{
|
|
AST: syntax.MustParseExpr(`{app="foo"} |= "foo"`),
|
|
},
|
|
}
|
|
|
|
ctx := user.InjectOrgID(context.Background(), "1")
|
|
|
|
called := false
|
|
h := queryrangebase.HandlerFunc(func(context.Context, queryrangebase.Request) (queryrangebase.Response, error) {
|
|
called = true
|
|
return nil, nil
|
|
})
|
|
|
|
resp, err := tpw.Wrap(h).Do(ctx, lreq)
|
|
require.NoError(t, err)
|
|
require.False(t, called)
|
|
require.Equal(t, resp.(*LokiResponse).Status, "success")
|
|
}
|
|
|
|
func Test_MaxQueryLookBack_Types(t *testing.T) {
|
|
m := NewLimitsMiddleware(fakeLimits{
|
|
maxQueryLookback: 1 * time.Hour,
|
|
maxQueryParallelism: 1,
|
|
})
|
|
|
|
now := time.Now()
|
|
type tcase struct {
|
|
request queryrangebase.Request
|
|
expectedResponse queryrangebase.Response
|
|
}
|
|
cases := []tcase{
|
|
{
|
|
request: &logproto.IndexStatsRequest{
|
|
From: model.Time(now.UnixMilli()),
|
|
Through: model.Time(now.Add(-90 * time.Minute).UnixMilli()),
|
|
},
|
|
expectedResponse: &IndexStatsResponse{},
|
|
},
|
|
{
|
|
request: &logproto.VolumeRequest{
|
|
From: model.Time(now.UnixMilli()),
|
|
Through: model.Time(now.Add(-90 * time.Minute).UnixMilli()),
|
|
},
|
|
expectedResponse: &VolumeResponse{},
|
|
},
|
|
}
|
|
|
|
ctx := user.InjectOrgID(context.Background(), "1")
|
|
|
|
h := queryrangebase.HandlerFunc(func(context.Context, queryrangebase.Request) (queryrangebase.Response, error) {
|
|
return nil, nil
|
|
})
|
|
|
|
for _, tcase := range cases {
|
|
resp, err := m.Wrap(h).Do(ctx, tcase.request)
|
|
require.NoError(t, err)
|
|
|
|
require.Equal(t, reflect.TypeOf(tcase.expectedResponse), reflect.TypeOf(resp))
|
|
}
|
|
}
|
|
|
|
func Test_GenerateCacheKey_NoDivideZero(t *testing.T) {
|
|
l := cacheKeyLimits{WithSplitByLimits(nil, 0), nil, nil}
|
|
start := time.Now()
|
|
r := &LokiRequest{
|
|
Query: "qry",
|
|
StartTs: start,
|
|
Step: int64(time.Minute / time.Millisecond),
|
|
}
|
|
|
|
require.Equal(
|
|
t,
|
|
fmt.Sprintf("foo:qry:%d:0:0", r.GetStep()),
|
|
l.GenerateCacheKey(context.Background(), "foo", r),
|
|
)
|
|
}
|
|
|
|
func Test_WeightedParallelism(t *testing.T) {
|
|
limits := &fakeLimits{
|
|
tsdbMaxQueryParallelism: 2048,
|
|
maxQueryParallelism: 10,
|
|
}
|
|
|
|
for _, cfgs := range []struct {
|
|
desc string
|
|
periods string
|
|
}{
|
|
{
|
|
desc: "end configs",
|
|
periods: `
|
|
- from: "2022-01-01"
|
|
store: boltdb-shipper
|
|
object_store: gcs
|
|
schema: v12
|
|
- from: "2022-01-02"
|
|
store: tsdb
|
|
object_store: gcs
|
|
schema: v12
|
|
`,
|
|
},
|
|
{
|
|
// Add another test that wraps the tested period configs with other unused configs
|
|
// to ensure we bounds-test properly
|
|
desc: "middle configs",
|
|
periods: `
|
|
- from: "2021-01-01"
|
|
store: boltdb-shipper
|
|
object_store: gcs
|
|
schema: v12
|
|
- from: "2022-01-01"
|
|
store: boltdb-shipper
|
|
object_store: gcs
|
|
schema: v12
|
|
- from: "2022-01-02"
|
|
store: tsdb
|
|
object_store: gcs
|
|
schema: v12
|
|
- from: "2023-01-02"
|
|
store: tsdb
|
|
object_store: gcs
|
|
schema: v12
|
|
`,
|
|
},
|
|
} {
|
|
var confs []config.PeriodConfig
|
|
require.Nil(t, yaml.Unmarshal([]byte(cfgs.periods), &confs))
|
|
parsed, err := time.Parse("2006-01-02", "2022-01-02")
|
|
borderTime := model.TimeFromUnix(parsed.Unix())
|
|
require.Nil(t, err)
|
|
|
|
for _, tc := range []struct {
|
|
desc string
|
|
start, end model.Time
|
|
exp int
|
|
}{
|
|
{
|
|
desc: "50% each",
|
|
start: borderTime.Add(-time.Hour),
|
|
end: borderTime.Add(time.Hour),
|
|
exp: 1029,
|
|
},
|
|
{
|
|
desc: "75/25 split",
|
|
start: borderTime.Add(-3 * time.Hour),
|
|
end: borderTime.Add(time.Hour),
|
|
exp: 519,
|
|
},
|
|
{
|
|
desc: "start==end",
|
|
start: borderTime.Add(time.Hour),
|
|
end: borderTime.Add(time.Hour),
|
|
exp: 2048,
|
|
},
|
|
{
|
|
desc: "huge range to make sure we don't int overflow in the calculations.",
|
|
start: borderTime,
|
|
end: borderTime.Add(24 * time.Hour * 365 * 20),
|
|
exp: 2048,
|
|
},
|
|
} {
|
|
t.Run(cfgs.desc+tc.desc, func(t *testing.T) {
|
|
require.Equal(t, tc.exp, WeightedParallelism(context.Background(), confs, "fake", limits, tc.start, tc.end))
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
func Test_WeightedParallelism_DivideByZeroError(t *testing.T) {
|
|
t.Run("query end before start", func(t *testing.T) {
|
|
parsed, err := time.Parse("2006-01-02", "2022-01-02")
|
|
require.NoError(t, err)
|
|
borderTime := model.TimeFromUnix(parsed.Unix())
|
|
|
|
confs := []config.PeriodConfig{
|
|
{
|
|
From: config.DayTime{
|
|
Time: borderTime.Add(-1 * time.Hour),
|
|
},
|
|
IndexType: types.TSDBType,
|
|
},
|
|
}
|
|
|
|
result := WeightedParallelism(context.Background(), confs, "fake", &fakeLimits{tsdbMaxQueryParallelism: 50}, borderTime, borderTime.Add(-1*time.Hour))
|
|
require.Equal(t, 1, result)
|
|
})
|
|
|
|
t.Run("negative start and end time", func(t *testing.T) {
|
|
parsed, err := time.Parse("2006-01-02", "2022-01-02")
|
|
require.NoError(t, err)
|
|
borderTime := model.TimeFromUnix(parsed.Unix())
|
|
|
|
confs := []config.PeriodConfig{
|
|
{
|
|
From: config.DayTime{
|
|
Time: borderTime.Add(-1 * time.Hour),
|
|
},
|
|
IndexType: types.TSDBType,
|
|
},
|
|
}
|
|
|
|
result := WeightedParallelism(context.Background(), confs, "fake", &fakeLimits{maxQueryParallelism: 50}, -100, -50)
|
|
require.Equal(t, 1, result)
|
|
})
|
|
|
|
t.Run("query start and end time before config start", func(t *testing.T) {
|
|
parsed, err := time.Parse("2006-01-02", "2022-01-02")
|
|
require.NoError(t, err)
|
|
borderTime := model.TimeFromUnix(parsed.Unix())
|
|
|
|
confs := []config.PeriodConfig{
|
|
{
|
|
From: config.DayTime{
|
|
Time: borderTime.Add(-1 * time.Hour),
|
|
},
|
|
IndexType: types.TSDBType,
|
|
},
|
|
}
|
|
|
|
result := WeightedParallelism(context.Background(), confs, "fake", &fakeLimits{maxQueryParallelism: 50}, confs[0].From.Add(-24*time.Hour), confs[0].From.Add(-12*time.Hour))
|
|
require.Equal(t, 1, result)
|
|
})
|
|
}
|
|
|
|
func Test_MaxQuerySize(t *testing.T) {
|
|
const statsBytes = 1000
|
|
|
|
schemas := []config.PeriodConfig{
|
|
{
|
|
// BoltDB -> Time -4 days
|
|
From: config.DayTime{Time: model.TimeFromUnix(testTime.Add(-96 * time.Hour).Unix())},
|
|
IndexType: types.BoltDBShipperType,
|
|
},
|
|
{
|
|
// TSDB -> Time -2 days
|
|
From: config.DayTime{Time: model.TimeFromUnix(testTime.Add(-48 * time.Hour).Unix())},
|
|
IndexType: types.TSDBType,
|
|
},
|
|
}
|
|
|
|
for _, tc := range []struct {
|
|
desc string
|
|
schema string
|
|
query string
|
|
queryRange time.Duration
|
|
queryStart time.Time
|
|
queryEnd time.Time
|
|
limits Limits
|
|
|
|
shouldErr bool
|
|
expectedQueryStatsHits int
|
|
expectedQuerierStatsHits int
|
|
}{
|
|
{
|
|
desc: "No TSDB",
|
|
schema: types.BoltDBShipperType,
|
|
query: `{app="foo"} |= "foo"`,
|
|
queryRange: 1 * time.Hour,
|
|
|
|
queryStart: testTime.Add(-96 * time.Hour),
|
|
queryEnd: testTime.Add(-90 * time.Hour),
|
|
limits: fakeLimits{
|
|
maxQueryBytesRead: 1,
|
|
maxQuerierBytesRead: 1,
|
|
},
|
|
|
|
shouldErr: false,
|
|
expectedQueryStatsHits: 0,
|
|
expectedQuerierStatsHits: 0,
|
|
},
|
|
{
|
|
desc: "Unlimited",
|
|
query: `{app="foo"} |= "foo"`,
|
|
queryStart: testTime.Add(-48 * time.Hour),
|
|
queryEnd: testTime,
|
|
limits: fakeLimits{
|
|
maxQueryBytesRead: 0,
|
|
maxQuerierBytesRead: 0,
|
|
},
|
|
|
|
shouldErr: false,
|
|
expectedQueryStatsHits: 0,
|
|
expectedQuerierStatsHits: 0,
|
|
},
|
|
{
|
|
desc: "1 hour range",
|
|
query: `{app="foo"} |= "foo"`,
|
|
queryStart: testTime.Add(-1 * time.Hour),
|
|
queryEnd: testTime,
|
|
limits: fakeLimits{
|
|
maxQueryBytesRead: statsBytes,
|
|
maxQuerierBytesRead: statsBytes,
|
|
},
|
|
|
|
shouldErr: false,
|
|
// [testTime-1h, testTime)
|
|
expectedQueryStatsHits: 1,
|
|
expectedQuerierStatsHits: 1,
|
|
},
|
|
{
|
|
desc: "Query size too big",
|
|
query: `{app="foo"} |= "foo"`,
|
|
queryStart: testTime.Add(-1 * time.Hour),
|
|
queryEnd: testTime,
|
|
limits: fakeLimits{
|
|
maxQueryBytesRead: statsBytes - 1,
|
|
maxQuerierBytesRead: statsBytes,
|
|
},
|
|
|
|
shouldErr: true,
|
|
expectedQueryStatsHits: 1,
|
|
expectedQuerierStatsHits: 0,
|
|
},
|
|
{
|
|
desc: "Querier size too big",
|
|
query: `{app="foo"} |= "foo"`,
|
|
queryStart: testTime.Add(-1 * time.Hour),
|
|
queryEnd: testTime,
|
|
limits: fakeLimits{
|
|
maxQueryBytesRead: statsBytes,
|
|
maxQuerierBytesRead: statsBytes - 1,
|
|
},
|
|
|
|
shouldErr: true,
|
|
expectedQueryStatsHits: 1,
|
|
expectedQuerierStatsHits: 1,
|
|
},
|
|
{
|
|
desc: "Multi-matchers with offset",
|
|
query: `sum_over_time ({app="foo"} |= "foo" | unwrap foo [5m] ) / sum_over_time ({app="bar"} |= "bar" | unwrap bar [5m] offset 1h)`,
|
|
queryStart: testTime.Add(-1 * time.Hour),
|
|
queryEnd: testTime,
|
|
limits: fakeLimits{
|
|
maxQueryBytesRead: statsBytes,
|
|
maxQuerierBytesRead: statsBytes,
|
|
},
|
|
|
|
shouldErr: false,
|
|
// *2 since we have two matcher groups
|
|
expectedQueryStatsHits: 1 * 2,
|
|
expectedQuerierStatsHits: 1 * 2,
|
|
},
|
|
} {
|
|
t.Run(tc.desc, func(t *testing.T) {
|
|
queryStatsHits, queryStatsHandler := indexStatsResult(logproto.IndexStatsResponse{Bytes: uint64(statsBytes / max(tc.expectedQueryStatsHits, 1))})
|
|
|
|
querierStatsHits, querierStatsHandler := indexStatsResult(logproto.IndexStatsResponse{Bytes: uint64(statsBytes / max(tc.expectedQuerierStatsHits, 1))})
|
|
|
|
_, promHandler := promqlResult(matrix)
|
|
|
|
lokiReq := &LokiRequest{
|
|
Query: tc.query,
|
|
Limit: 1000,
|
|
StartTs: tc.queryStart,
|
|
EndTs: tc.queryEnd,
|
|
Direction: logproto.FORWARD,
|
|
Path: "/query_range",
|
|
Plan: &plan.QueryPlan{
|
|
AST: syntax.MustParseExpr(tc.query),
|
|
},
|
|
}
|
|
|
|
ctx := user.InjectOrgID(context.Background(), "foo")
|
|
|
|
middlewares := []queryrangebase.Middleware{
|
|
NewQuerySizeLimiterMiddleware(schemas, testEngineOpts, util_log.Logger, tc.limits, queryStatsHandler),
|
|
NewQuerierSizeLimiterMiddleware(schemas, testEngineOpts, util_log.Logger, tc.limits, querierStatsHandler),
|
|
}
|
|
|
|
_, err := queryrangebase.MergeMiddlewares(middlewares...).Wrap(promHandler).Do(ctx, lokiReq)
|
|
|
|
if tc.shouldErr {
|
|
require.Error(t, err)
|
|
} else {
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
require.Equal(t, tc.expectedQueryStatsHits, *queryStatsHits)
|
|
require.Equal(t, tc.expectedQuerierStatsHits, *querierStatsHits)
|
|
})
|
|
}
|
|
}
|
|
|
|
func Test_MaxQuerySize_WithQueryLimitsContext(t *testing.T) {
|
|
// a sentinal query value to control when our mock stats handler returns context stats
|
|
ctxSentinal := `{context="true"}`
|
|
schemas := []config.PeriodConfig{
|
|
{
|
|
From: config.DayTime{Time: model.TimeFromUnix(testTime.Add(-48 * time.Hour).Unix())},
|
|
IndexType: types.TSDBType,
|
|
},
|
|
}
|
|
|
|
for _, tc := range []struct {
|
|
desc string
|
|
query string
|
|
queryStart time.Time
|
|
queryEnd time.Time
|
|
queryBytes uint64
|
|
contextStart time.Time
|
|
contextEnd time.Time
|
|
contextBytes uint64
|
|
limit int
|
|
shouldErr bool
|
|
expectedStatsHits int
|
|
}{
|
|
{
|
|
desc: "No context, query under limit",
|
|
query: `{app="foo"} |= "foo"`,
|
|
queryStart: testTime.Add(-1 * time.Hour),
|
|
queryEnd: testTime,
|
|
queryBytes: 500,
|
|
limit: 1000,
|
|
shouldErr: false,
|
|
expectedStatsHits: 1,
|
|
},
|
|
{
|
|
desc: "Context range larger, both under limit",
|
|
query: `{app="foo"} |= "foo"`,
|
|
queryStart: testTime.Add(-1 * time.Hour),
|
|
queryEnd: testTime,
|
|
queryBytes: 200,
|
|
contextStart: testTime.Add(-24 * time.Hour),
|
|
contextEnd: testTime,
|
|
contextBytes: 800,
|
|
limit: 1000,
|
|
shouldErr: false,
|
|
expectedStatsHits: 2,
|
|
},
|
|
{
|
|
desc: "Context range larger, context exceeds limit",
|
|
query: `{app="foo"} |= "foo"`,
|
|
queryStart: testTime.Add(-1 * time.Hour),
|
|
queryEnd: testTime,
|
|
queryBytes: 200,
|
|
contextStart: testTime.Add(-24 * time.Hour),
|
|
contextEnd: testTime,
|
|
contextBytes: 1200,
|
|
limit: 1000,
|
|
shouldErr: true,
|
|
expectedStatsHits: 2,
|
|
},
|
|
{
|
|
desc: "Query range larger, query exceeds limit",
|
|
query: `{app="foo"} |= "foo"`,
|
|
queryStart: testTime.Add(-24 * time.Hour),
|
|
queryEnd: testTime,
|
|
queryBytes: 1200,
|
|
contextStart: testTime.Add(-1 * time.Hour),
|
|
contextEnd: testTime,
|
|
contextBytes: 200,
|
|
limit: 1000,
|
|
shouldErr: true,
|
|
expectedStatsHits: 2,
|
|
},
|
|
} {
|
|
t.Run(tc.desc, func(t *testing.T) {
|
|
statsHits := atomic.NewInt32(0)
|
|
|
|
statsHandler := queryrangebase.HandlerFunc(func(_ context.Context, req queryrangebase.Request) (queryrangebase.Response, error) {
|
|
statsHits.Inc()
|
|
|
|
bytes := tc.queryBytes
|
|
if req.GetQuery() == ctxSentinal {
|
|
bytes = tc.contextBytes
|
|
}
|
|
|
|
return &IndexStatsResponse{
|
|
Response: &logproto.IndexStatsResponse{
|
|
Bytes: bytes,
|
|
},
|
|
}, nil
|
|
})
|
|
|
|
promHandler := queryrangebase.HandlerFunc(func(_ context.Context, _ queryrangebase.Request) (queryrangebase.Response, error) {
|
|
return &LokiPromResponse{
|
|
Response: &queryrangebase.PrometheusResponse{
|
|
Status: "success",
|
|
},
|
|
}, nil
|
|
})
|
|
|
|
lokiReq := &LokiRequest{
|
|
Query: tc.query,
|
|
StartTs: tc.queryStart,
|
|
EndTs: tc.queryEnd,
|
|
Direction: logproto.FORWARD,
|
|
Path: "/query_range",
|
|
Plan: &plan.QueryPlan{
|
|
AST: syntax.MustParseExpr(tc.query),
|
|
},
|
|
}
|
|
|
|
ctx := user.InjectOrgID(context.Background(), "foo")
|
|
|
|
if !tc.contextStart.IsZero() && !tc.contextEnd.IsZero() {
|
|
ctx = querylimits.InjectQueryLimitsContextIntoContext(ctx, querylimits.Context{
|
|
Expr: ctxSentinal, // a hack to make mocking the stats handler easier, irl this should be the same query as in the request
|
|
From: tc.contextStart,
|
|
To: tc.contextEnd,
|
|
})
|
|
}
|
|
|
|
middlewares := []queryrangebase.Middleware{
|
|
NewQuerySizeLimiterMiddleware(schemas, testEngineOpts, util_log.Logger, fakeLimits{
|
|
maxQueryBytesRead: tc.limit,
|
|
}, statsHandler),
|
|
}
|
|
|
|
_, err := queryrangebase.MergeMiddlewares(middlewares...).Wrap(promHandler).Do(ctx, lokiReq)
|
|
|
|
if tc.shouldErr {
|
|
require.Error(t, err)
|
|
} else {
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
require.Equal(t, tc.expectedStatsHits, int(statsHits.Load()))
|
|
})
|
|
}
|
|
}
|
|
|
|
func Test_MaxQuerySize_MaxLookBackPeriod(t *testing.T) {
|
|
engineOpts := testEngineOpts
|
|
engineOpts.MaxLookBackPeriod = 1 * time.Hour
|
|
|
|
lim := fakeLimits{
|
|
maxQueryBytesRead: 1 << 10,
|
|
maxQuerierBytesRead: 1 << 10,
|
|
}
|
|
|
|
statsHandler := queryrangebase.HandlerFunc(func(_ context.Context, req queryrangebase.Request) (queryrangebase.Response, error) {
|
|
// This is the actual check that we're testing.
|
|
require.Equal(t, testTime.Add(-engineOpts.MaxLookBackPeriod).UnixMilli(), req.GetStart().UnixMilli())
|
|
|
|
return &IndexStatsResponse{
|
|
Response: &logproto.IndexStatsResponse{
|
|
Bytes: 1 << 10,
|
|
},
|
|
}, nil
|
|
})
|
|
|
|
for _, tc := range []struct {
|
|
desc string
|
|
middleware queryrangebase.Middleware
|
|
}{
|
|
{
|
|
desc: "QuerySizeLimiter",
|
|
middleware: NewQuerySizeLimiterMiddleware(testSchemasTSDB, engineOpts, util_log.Logger, lim, statsHandler),
|
|
},
|
|
{
|
|
desc: "QuerierSizeLimiter",
|
|
middleware: NewQuerierSizeLimiterMiddleware(testSchemasTSDB, engineOpts, util_log.Logger, lim, statsHandler),
|
|
},
|
|
} {
|
|
t.Run(tc.desc, func(t *testing.T) {
|
|
lokiReq := &LokiInstantRequest{
|
|
Query: `{cluster="dev-us-central-0"}`,
|
|
Limit: 1000,
|
|
TimeTs: testTime,
|
|
Direction: logproto.FORWARD,
|
|
Path: "/loki/api/v1/query",
|
|
}
|
|
|
|
handler := tc.middleware.Wrap(
|
|
queryrangebase.HandlerFunc(func(_ context.Context, _ queryrangebase.Request) (queryrangebase.Response, error) {
|
|
return &LokiResponse{}, nil
|
|
}),
|
|
)
|
|
|
|
ctx := user.InjectOrgID(context.Background(), "foo")
|
|
_, err := handler.Do(ctx, lokiReq)
|
|
require.NoError(t, err)
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestAcquireWithTiming(t *testing.T) {
|
|
ctx := context.Background()
|
|
sem := NewSemaphoreWithTiming(2)
|
|
|
|
// Channel to collect waiting times
|
|
waitingTimes := make(chan struct {
|
|
GoroutineID int
|
|
WaitingTime time.Duration
|
|
}, 3)
|
|
|
|
tryAcquire := func(n int64, goroutineID int) {
|
|
elapsed, err := sem.Acquire(ctx, n)
|
|
if err != nil {
|
|
t.Errorf("Expected no error, got %v", err)
|
|
}
|
|
waitingTimes <- struct {
|
|
GoroutineID int
|
|
WaitingTime time.Duration
|
|
}{goroutineID, elapsed}
|
|
|
|
defer sem.sem.Release(n)
|
|
|
|
time.Sleep(10 * time.Millisecond)
|
|
}
|
|
|
|
go tryAcquire(1, 1)
|
|
go tryAcquire(1, 2)
|
|
|
|
// Sleep briefly to allow the first two goroutines to start running
|
|
time.Sleep(5 * time.Millisecond)
|
|
|
|
go tryAcquire(1, 3)
|
|
|
|
// Collect and sort waiting times
|
|
var waitingDurations []struct {
|
|
GoroutineID int
|
|
WaitingTime time.Duration
|
|
}
|
|
for i := 0; i < 3; i++ {
|
|
waitingDurations = append(waitingDurations, <-waitingTimes)
|
|
}
|
|
// Find and check the waiting time for the third goroutine
|
|
var waiting3 time.Duration
|
|
for _, waiting := range waitingDurations {
|
|
if waiting.GoroutineID == 3 {
|
|
waiting3 = waiting.WaitingTime
|
|
break
|
|
}
|
|
}
|
|
|
|
// Check that the waiting time for the third request is larger than 0 milliseconds and less than 10 milliseconds
|
|
require.Greater(t, waiting3, 0*time.Nanosecond)
|
|
require.Less(t, waiting3, 10*time.Millisecond)
|
|
}
|
|
|