diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 5f831f08ae..10cd97c1cf 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -8,9 +8,7 @@ import ( cortex_client "github.com/cortexproject/cortex/pkg/ingester/client" "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/util" - token_util "github.com/grafana/loki/pkg/util" "github.com/prometheus/common/model" - "github.com/weaveworks/common/user" "google.golang.org/grpc/health/grpc_health_v1" "github.com/grafana/loki/pkg/helpers" @@ -287,9 +285,6 @@ func (q *Querier) Tail(ctx context.Context, req *logproto.TailRequest) (*Tailer, time.Duration(req.DelayFor)*time.Second, tailClients, reversedIterator, - func(from, to time.Time, labels string) (iterator iter.EntryIterator, e error) { - return q.queryDroppedStreams(queryCtx, req, from, to, labels) - }, func(connectedIngestersAddr []string) (map[string]logproto.Querier_TailClient, error) { return q.tailDisconnectedIngesters(tailCtx, req, connectedIngestersAddr) }, @@ -297,49 +292,6 @@ func (q *Querier) Tail(ctx context.Context, req *logproto.TailRequest) (*Tailer, ), nil } -// passed to tailer for querying dropped streams -func (q *Querier) queryDroppedStreams(ctx context.Context, req *logproto.TailRequest, start, end time.Time, labels string) (iter.EntryIterator, error) { - userID, err := user.ExtractOrgID(ctx) - if err != nil { - return nil, err - } - - key := token_util.TokenFor(userID, labels) - replicationSet, err := q.ring.Get(key, ring.Read) - if err != nil { - return nil, err - } - - query := logproto.QueryRequest{ - Direction: logproto.FORWARD, - Start: start, - End: end, - Limit: 10000, - Query: req.Query, - Regex: req.Regex, - } - - clients, err := q.forGivenIngesters(replicationSet, func(client logproto.QuerierClient) (interface{}, error) { - return client.Query(ctx, &query) - }) - if err != nil { - return nil, err - } - - ingesterIterators := make([]iter.EntryIterator, len(clients)) - for i := range clients { - ingesterIterators[i] = iter.NewQueryClientIterator(clients[i].response.(logproto.Querier_QueryClient), query.Direction) - } - - chunkStoreIterators, err := q.store.LazyQuery(ctx, &query) - if err != nil { - return nil, err - } - - iterators := append(ingesterIterators, chunkStoreIterators) - return iter.NewHeapIterator(iterators, query.Direction), nil -} - // passed to tailer for (re)connecting to new or disconnected ingesters func (q *Querier) tailDisconnectedIngesters(ctx context.Context, req *logproto.TailRequest, connectedIngestersAddr []string) (map[string]logproto.Querier_TailClient, error) { tailClients := make(map[string]logproto.Querier_TailClient) diff --git a/pkg/querier/tail.go b/pkg/querier/tail.go index 3a102206ed..52076b37cd 100644 --- a/pkg/querier/tail.go +++ b/pkg/querier/tail.go @@ -33,51 +33,15 @@ type TailResponse struct { DroppedEntries []droppedEntry `json:"dropped_entries"` } -/*// dropped streams are collected into a heap to quickly find dropped stream which has oldest timestamp -type droppedStreamsIterator []logproto.DroppedStream - -func (h droppedStreamsIterator) Len() int { return len(h) } -func (h droppedStreamsIterator) Swap(i, j int) { - h[i], h[j] = h[j], h[i] -} -func (h droppedStreamsIterator) Peek() time.Time { - return h[0].From -} -func (h *droppedStreamsIterator) Push(x interface{}) { - *h = append(*h, x.(logproto.DroppedStream)) -} - -func (h *droppedStreamsIterator) Pop() interface{} { - old := *h - n := len(old) - x := old[n-1] - *h = old[0 : n-1] - return x -} - -func (h droppedStreamsIterator) Less(i, j int) bool { - t1, t2 := h[i].From, h[j].From - if !t1.Equal(t2) { - return t1.Before(t2) - } - return h[i].Labels < h[j].Labels -}*/ - // Tailer manages complete lifecycle of a tail request type Tailer struct { - // openStreamIterator is for streams already open which can be complete streams returned by ingester or - // dropped streams queried from ingester and store + // openStreamIterator is for streams already open openStreamIterator iter.HeapIterator - /*droppedStreamsIterator interface { // for holding dropped stream metadata - heap.Interface - Peek() time.Time - }*/ - streamMtx sync.Mutex // for synchronizing access to openStreamIterator and droppedStreamsIterator + streamMtx sync.Mutex // for synchronizing access to openStreamIterator currEntry logproto.Entry currLabels string - queryDroppedStreams func(from, to time.Time, labels string) (iter.EntryIterator, error) tailDisconnectedIngesters func([]string) (map[string]logproto.Querier_TailClient, error) querierTailClients map[string]logproto.Querier_TailClient // addr -> grpc clients for tailing logs from ingesters @@ -164,7 +128,6 @@ func (t *Tailer) loop() { tailResponse.DroppedEntries = t.popDroppedEntries() } - //response := []tailResponse{{Stream: logproto.Stream{Labels: t.currLabels, Entries: responses[t.currLabels]}, DroppedEntries: t.popDroppedEntries()}} select { case t.responseChan <- tailResponse: default: @@ -231,17 +194,10 @@ func (t *Tailer) pushTailResponseFromIngester(resp *logproto.TailResponse) { defer t.streamMtx.Unlock() t.openStreamIterator.Push(iter.NewStreamIterator(resp.Stream)) - /*if resp.DroppedStreams != nil { - for idx := range resp.DroppedStreams { - heap.Push(t.droppedStreamsIterator, *resp.DroppedStreams[idx]) - } - }*/ } -// finds oldest entry by peeking at open stream iterator and dropped stream iterator. -// if open stream iterator has oldest entry then pop it for sending it to tail client -// else pop dropped stream details, to query from ingester and store. -// Response from ingester and store is pushed to open stream for further processing +// finds oldest entry by peeking at open stream iterator. +// Response from ingester is pushed to open stream for further processing func (t *Tailer) next() bool { t.streamMtx.Lock() defer t.streamMtx.Unlock() @@ -249,30 +205,6 @@ func (t *Tailer) next() bool { if t.openStreamIterator.Len() == 0 || !time.Now().After(t.openStreamIterator.Peek().Add(t.delayFor)) || !t.openStreamIterator.Next() { return false } - /*// if we don't have any entries or any of the entries are not older than now()-delay then return false - if !((t.openStreamIterator.Len() != 0 && time.Now().After(t.openStreamIterator.Peek().Add(t.delayFor))) || (t.droppedStreamsIterator.Len() != 0 && time.Now().After(t.droppedStreamsIterator.Peek().Add(t.delayFor)))) { - return false - } - - // If any of the dropped streams are older than open streams, pop dropped stream details for querying them - if t.droppedStreamsIterator.Len() != 0 { - oldestTsFromDroppedStreams := t.droppedStreamsIterator.Peek() - if t.droppedStreamsIterator.Len() != 0 && (t.openStreamIterator.Len() == 0 || t.openStreamIterator.Peek().After(t.droppedStreamsIterator.Peek())) { - for t.droppedStreamsIterator.Len() != 0 && t.droppedStreamsIterator.Peek().Equal(oldestTsFromDroppedStreams) { - droppedStream := heap.Pop(t.droppedStreamsIterator).(logproto.DroppedStream) - iterator, err := t.queryDroppedStreams(droppedStream.From, droppedStream.To.Add(1), droppedStream.Labels) - if err != nil { - level.Error(util.Logger).Log("Error querying dropped streams", fmt.Sprintf("%v", err)) - continue - } - t.openStreamIterator.Push(iterator) - } - } - } - - if !t.openStreamIterator.Next() { - return false - }*/ t.currEntry = t.openStreamIterator.Entry() t.currLabels = t.openStreamIterator.Labels() @@ -328,15 +260,12 @@ func newTailer( delayFor time.Duration, querierTailClients map[string]logproto.Querier_TailClient, historicEntries iter.EntryIterator, - queryDroppedStreams func(from, to time.Time, labels string) (iter.EntryIterator, error), tailDisconnectedIngesters func([]string) (map[string]logproto.Querier_TailClient, error), tailMaxDuration time.Duration, ) *Tailer { t := Tailer{ - openStreamIterator: iter.NewHeapIterator([]iter.EntryIterator{}, logproto.FORWARD), - //droppedStreamsIterator: &droppedStreamsIterator{}, + openStreamIterator: iter.NewHeapIterator([]iter.EntryIterator{historicEntries}, logproto.FORWARD), querierTailClients: querierTailClients, - queryDroppedStreams: queryDroppedStreams, delayFor: delayFor, responseChan: make(chan *TailResponse, bufferSizeForTailResponse), closeErrChan: make(chan error), @@ -344,8 +273,6 @@ func newTailer( tailMaxDuration: tailMaxDuration, } - t.openStreamIterator.Push(historicEntries) - t.readTailClients() go t.loop() return &t