|
|
|
|
@ -4,6 +4,7 @@ import ( |
|
|
|
|
"context" |
|
|
|
|
"fmt" |
|
|
|
|
"math/rand" |
|
|
|
|
"sort" |
|
|
|
|
"sync" |
|
|
|
|
"testing" |
|
|
|
|
"time" |
|
|
|
|
@ -142,6 +143,102 @@ func TestSyncPeriod(t *testing.T) { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func Test_SeriesQuery(t *testing.T) { |
|
|
|
|
limits, err := validation.NewOverrides(validation.Limits{MaxLocalStreamsPerUser: 1000}, nil) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1) |
|
|
|
|
|
|
|
|
|
// just some random values
|
|
|
|
|
syncPeriod := 1 * time.Minute |
|
|
|
|
minUtil := 0.20 |
|
|
|
|
|
|
|
|
|
instance := newInstance(&Config{}, "test", defaultFactory, limiter, syncPeriod, minUtil) |
|
|
|
|
|
|
|
|
|
currentTime := time.Now() |
|
|
|
|
|
|
|
|
|
testStreams := []logproto.Stream{ |
|
|
|
|
{Labels: "{app=\"test\",job=\"varlogs\"}", Entries: entries(5, currentTime)}, |
|
|
|
|
{Labels: "{app=\"test2\",job=\"varlogs\"}", Entries: entries(5, currentTime.Add(6*time.Nanosecond))}, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
for _, testStream := range testStreams { |
|
|
|
|
stream, err := instance.getOrCreateStream(testStream) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
chunk := defaultFactory() |
|
|
|
|
for _, entry := range testStream.Entries { |
|
|
|
|
err = chunk.Append(&entry) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
} |
|
|
|
|
stream.chunks = append(stream.chunks, chunkDesc{chunk: chunk}) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
tests := []struct { |
|
|
|
|
name string |
|
|
|
|
req *logproto.SeriesRequest |
|
|
|
|
expectedResponse []logproto.SeriesIdentifier |
|
|
|
|
}{ |
|
|
|
|
{ |
|
|
|
|
"non overlapping request", |
|
|
|
|
&logproto.SeriesRequest{ |
|
|
|
|
Start: currentTime.Add(11 * time.Nanosecond), |
|
|
|
|
End: currentTime.Add(12 * time.Nanosecond), |
|
|
|
|
Groups: []string{`{job="varlogs"}`}, |
|
|
|
|
}, |
|
|
|
|
[]logproto.SeriesIdentifier{}, |
|
|
|
|
}, |
|
|
|
|
{ |
|
|
|
|
"overlapping request", |
|
|
|
|
&logproto.SeriesRequest{ |
|
|
|
|
Start: currentTime.Add(1 * time.Nanosecond), |
|
|
|
|
End: currentTime.Add(7 * time.Nanosecond), |
|
|
|
|
Groups: []string{`{job="varlogs"}`}, |
|
|
|
|
}, |
|
|
|
|
[]logproto.SeriesIdentifier{ |
|
|
|
|
{Labels: map[string]string{"app": "test", "job": "varlogs"}}, |
|
|
|
|
{Labels: map[string]string{"app": "test2", "job": "varlogs"}}, |
|
|
|
|
}, |
|
|
|
|
}, |
|
|
|
|
{ |
|
|
|
|
"request end time overlaps stream start time", |
|
|
|
|
&logproto.SeriesRequest{ |
|
|
|
|
Start: currentTime.Add(1 * time.Nanosecond), |
|
|
|
|
End: currentTime.Add(6 * time.Nanosecond), |
|
|
|
|
Groups: []string{`{job="varlogs"}`}, |
|
|
|
|
}, |
|
|
|
|
[]logproto.SeriesIdentifier{ |
|
|
|
|
{Labels: map[string]string{"app": "test", "job": "varlogs"}}, |
|
|
|
|
}, |
|
|
|
|
}, |
|
|
|
|
{ |
|
|
|
|
"request start time overlaps stream end time", |
|
|
|
|
&logproto.SeriesRequest{ |
|
|
|
|
Start: currentTime.Add(10 * time.Nanosecond), |
|
|
|
|
End: currentTime.Add(11 * time.Nanosecond), |
|
|
|
|
Groups: []string{`{job="varlogs"}`}, |
|
|
|
|
}, |
|
|
|
|
[]logproto.SeriesIdentifier{ |
|
|
|
|
{Labels: map[string]string{"app": "test2", "job": "varlogs"}}, |
|
|
|
|
}, |
|
|
|
|
}, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
for _, tc := range tests { |
|
|
|
|
t.Run(tc.name, func(t *testing.T) { |
|
|
|
|
resp, err := instance.Series(context.Background(), tc.req) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
|
|
|
|
|
sort.Slice(resp.Series, func(i, j int) bool { |
|
|
|
|
return resp.Series[i].String() < resp.Series[j].String() |
|
|
|
|
}) |
|
|
|
|
sort.Slice(tc.expectedResponse, func(i, j int) bool { |
|
|
|
|
return tc.expectedResponse[i].String() < tc.expectedResponse[j].String() |
|
|
|
|
}) |
|
|
|
|
require.Equal(t, tc.expectedResponse, resp.Series) |
|
|
|
|
}) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func entries(n int, t time.Time) []logproto.Entry { |
|
|
|
|
var result []logproto.Entry |
|
|
|
|
for i := 0; i < n; i++ { |
|
|
|
|
|