|
|
|
|
@ -12,6 +12,7 @@ import ( |
|
|
|
|
|
|
|
|
|
"github.com/cortexproject/cortex/pkg/ingester/client" |
|
|
|
|
"github.com/cortexproject/cortex/pkg/ingester/index" |
|
|
|
|
cutil "github.com/cortexproject/cortex/pkg/util" |
|
|
|
|
|
|
|
|
|
"github.com/grafana/loki/pkg/helpers" |
|
|
|
|
"github.com/grafana/loki/pkg/iter" |
|
|
|
|
@ -149,18 +150,27 @@ func (i *instance) lookupStreams(req *logproto.QueryRequest, matchers []*labels. |
|
|
|
|
i.streamsMtx.RLock() |
|
|
|
|
defer i.streamsMtx.RUnlock() |
|
|
|
|
|
|
|
|
|
var err error |
|
|
|
|
filters, matchers := cutil.SplitFiltersAndMatchers(matchers) |
|
|
|
|
ids := i.index.Lookup(matchers) |
|
|
|
|
iterators := make([]iter.EntryIterator, len(ids)) |
|
|
|
|
for j := range ids { |
|
|
|
|
stream, ok := i.streams[ids[j]] |
|
|
|
|
iterators := make([]iter.EntryIterator, 0, len(ids)) |
|
|
|
|
|
|
|
|
|
outer: |
|
|
|
|
for _, streamID := range ids { |
|
|
|
|
stream, ok := i.streams[streamID] |
|
|
|
|
if !ok { |
|
|
|
|
return nil, ErrStreamMissing |
|
|
|
|
} |
|
|
|
|
iterators[j], err = stream.Iterator(req.Start, req.End, req.Direction) |
|
|
|
|
lbs := client.FromLabelAdaptersToLabels(stream.labels) |
|
|
|
|
for _, filter := range filters { |
|
|
|
|
if !filter.Matches(lbs.Get(filter.Name)) { |
|
|
|
|
continue outer |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
iter, err := stream.Iterator(req.Start, req.End, req.Direction) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
iterators = append(iterators, iter) |
|
|
|
|
} |
|
|
|
|
return iterators, nil |
|
|
|
|
} |
|
|
|
|
|