|
|
|
|
@ -249,33 +249,64 @@ func (i *instance) Series(_ context.Context, req *logproto.SeriesRequest) (*logp |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
dedupedSeries := make(map[uint64]logproto.SeriesIdentifier) |
|
|
|
|
for _, matchers := range groups { |
|
|
|
|
err = i.forMatchingStreams(matchers, func(stream *stream) error { |
|
|
|
|
// exit early when this stream was added by an earlier group
|
|
|
|
|
key := stream.labels.Hash() |
|
|
|
|
if _, found := dedupedSeries[key]; found { |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
var series []logproto.SeriesIdentifier |
|
|
|
|
|
|
|
|
|
dedupedSeries[key] = logproto.SeriesIdentifier{ |
|
|
|
|
// If no matchers were supplied we include all streams.
|
|
|
|
|
if len(groups) == 0 { |
|
|
|
|
series = make([]logproto.SeriesIdentifier, 0, len(i.streams)) |
|
|
|
|
err = i.forAllStreams(func(stream *stream) error { |
|
|
|
|
series = append(series, logproto.SeriesIdentifier{ |
|
|
|
|
Labels: stream.labels.Map(), |
|
|
|
|
} |
|
|
|
|
}) |
|
|
|
|
return nil |
|
|
|
|
}) |
|
|
|
|
|
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
series := make([]logproto.SeriesIdentifier, 0, len(dedupedSeries)) |
|
|
|
|
for _, v := range dedupedSeries { |
|
|
|
|
series = append(series, v) |
|
|
|
|
} else { |
|
|
|
|
dedupedSeries := make(map[uint64]logproto.SeriesIdentifier) |
|
|
|
|
for _, matchers := range groups { |
|
|
|
|
err = i.forMatchingStreams(matchers, func(stream *stream) error { |
|
|
|
|
// exit early when this stream was added by an earlier group
|
|
|
|
|
key := stream.labels.Hash() |
|
|
|
|
if _, found := dedupedSeries[key]; found { |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
dedupedSeries[key] = logproto.SeriesIdentifier{ |
|
|
|
|
Labels: stream.labels.Map(), |
|
|
|
|
} |
|
|
|
|
return nil |
|
|
|
|
}) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
series = make([]logproto.SeriesIdentifier, 0, len(dedupedSeries)) |
|
|
|
|
for _, v := range dedupedSeries { |
|
|
|
|
series = append(series, v) |
|
|
|
|
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return &logproto.SeriesResponse{Series: series}, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// forAllStreams will execute a function for all streams in the instance.
|
|
|
|
|
// It uses a function in order to enable generic stream access without accidentally leaking streams under the mutex.
|
|
|
|
|
func (i *instance) forAllStreams(fn func(*stream) error) error { |
|
|
|
|
i.streamsMtx.RLock() |
|
|
|
|
defer i.streamsMtx.RUnlock() |
|
|
|
|
|
|
|
|
|
for _, stream := range i.streams { |
|
|
|
|
err := fn(stream) |
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// forMatchingStreams will execute a function for each stream that satisfies a set of requirements (time range, matchers, etc).
|
|
|
|
|
// It uses a function in order to enable generic stream access without accidentally leaking streams under the mutex.
|
|
|
|
|
func (i *instance) forMatchingStreams( |
|
|
|
|
|