Ensure trace propagation in our logs. (#1977)

* Ensure trace propagation in our logs.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Add missing import.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fixes a test.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fix function usage.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
pull/2000/head
Cyril Tovena 5 years ago committed by GitHub
parent c7a1a1fb2e
commit 577d8eb168
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 9
      pkg/helpers/logerror.go
  2. 2
      pkg/ingester/ingester.go
  3. 4
      pkg/ingester/stream.go
  4. 2
      pkg/ingester/tailer.go
  5. 26
      pkg/ingester/transfer.go
  6. 4
      pkg/logql/engine.go
  7. 8
      pkg/logql/metrics.go
  8. 13
      pkg/logql/stats/grpc.go
  9. 25
      pkg/querier/http.go
  10. 4
      pkg/querier/querier_mock_test.go
  11. 3
      pkg/querier/queryrange/stats.go
  12. 6
      pkg/querier/tail.go
  13. 2
      pkg/storage/iterator.go

@ -1,6 +1,8 @@
package helpers
import (
"context"
"github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log/level"
)
@ -11,3 +13,10 @@ func LogError(message string, f func() error) {
level.Error(util.Logger).Log("message", message, "error", err)
}
}
// LogError logs any error returned by f; useful when defering Close etc.
func LogErrorWithContext(ctx context.Context, message string, f func() error) {
if err := f(); err != nil {
level.Error(util.WithContext(ctx, util.Logger)).Log("message", message, "error", err)
}
}

@ -298,7 +298,7 @@ func (i *Ingester) Query(req *logproto.QueryRequest, queryServer logproto.Querie
heapItr := iter.NewHeapIterator(ctx, itrs, req.Direction)
defer helpers.LogError("closing iterator", heapItr.Close)
defer helpers.LogErrorWithContext(ctx, "closing iterator", heapItr.Close)
return sendBatches(queryServer.Context(), heapItr, queryServer, req.Limit)
}

@ -109,7 +109,7 @@ func (s *stream) consumeChunk(_ context.Context, chunk *logproto.Chunk) error {
return nil
}
func (s *stream) Push(_ context.Context, entries []logproto.Entry, synchronizePeriod time.Duration, minUtilization float64) error {
func (s *stream) Push(ctx context.Context, entries []logproto.Entry, synchronizePeriod time.Duration, minUtilization float64) error {
var lastChunkTimestamp time.Time
if len(s.chunks) == 0 {
s.chunks = append(s.chunks, chunkDesc{
@ -145,7 +145,7 @@ func (s *stream) Push(_ context.Context, entries []logproto.Entry, synchronizePe
if err != nil {
// This should be an unlikely situation, returning an error up the stack doesn't help much here
// so instead log this to help debug the issue if it ever arises.
level.Error(util.Logger).Log("msg", "failed to Close chunk", "err", err)
level.Error(util.WithContext(ctx, util.Logger)).Log("msg", "failed to Close chunk", "err", err)
}
chunk.closed = true

@ -94,7 +94,7 @@ func (t *tailer) loop() {
if err != nil {
// Don't log any error due to tail client closing the connection
if !util.IsConnCanceled(err) {
level.Error(cortex_util.Logger).Log("msg", "Error writing to tail client", "err", err)
level.Error(cortex_util.WithContext(t.conn.Context(), cortex_util.Logger)).Log("msg", "Error writing to tail client", "err", err)
}
t.close()
return

@ -36,6 +36,7 @@ var (
// TransferChunks receives all chunks from another ingester. The Ingester
// must be in PENDING state or else the call will fail.
func (i *Ingester) TransferChunks(stream logproto.Ingester_TransferChunksServer) error {
logger := util.WithContext(stream.Context(), util.Logger)
// Prevent a shutdown from happening until we've completely finished a handoff
// from a leaving ingester.
i.shutdownMtx.Lock()
@ -54,12 +55,12 @@ func (i *Ingester) TransferChunks(stream logproto.Ingester_TransferChunksServer)
return
}
level.Error(util.Logger).Log("msg", "TransferChunks failed, not in ACTIVE state.", "state", state)
level.Error(logger).Log("msg", "TransferChunks failed, not in ACTIVE state.", "state", state)
// Enter PENDING state (only valid from JOINING)
if i.lifecycler.GetState() == ring.JOINING {
if err := i.lifecycler.ChangeState(stream.Context(), ring.PENDING); err != nil {
level.Error(util.Logger).Log("msg", "error rolling back failed TransferChunks", "err", err)
level.Error(logger).Log("msg", "error rolling back failed TransferChunks", "err", err)
os.Exit(1)
}
}
@ -81,7 +82,7 @@ func (i *Ingester) TransferChunks(stream logproto.Ingester_TransferChunksServer)
// this loop.
if fromIngesterID == "" {
fromIngesterID = chunkSet.FromIngesterId
level.Info(util.Logger).Log("msg", "processing TransferChunks request", "from_ingester", fromIngesterID)
level.Info(logger).Log("msg", "processing TransferChunks request", "from_ingester", fromIngesterID)
// Before transfer, make sure 'from' ingester is in correct state to call ClaimTokensFor later
err := i.checkFromIngesterIsInLeavingState(stream.Context(), fromIngesterID)
@ -109,10 +110,10 @@ func (i *Ingester) TransferChunks(stream logproto.Ingester_TransferChunksServer)
}
if seriesReceived == 0 {
level.Error(util.Logger).Log("msg", "received TransferChunks request with no series", "from_ingester", fromIngesterID)
level.Error(logger).Log("msg", "received TransferChunks request with no series", "from_ingester", fromIngesterID)
return fmt.Errorf("no series")
} else if fromIngesterID == "" {
level.Error(util.Logger).Log("msg", "received TransferChunks request with no ID from ingester")
level.Error(logger).Log("msg", "received TransferChunks request with no ID from ingester")
return fmt.Errorf("no ingester id")
}
@ -127,10 +128,10 @@ func (i *Ingester) TransferChunks(stream logproto.Ingester_TransferChunksServer)
// Close the stream last, as this is what tells the "from" ingester that
// it's OK to shut down.
if err := stream.SendAndClose(&logproto.TransferChunksResponse{}); err != nil {
level.Error(util.Logger).Log("msg", "Error closing TransferChunks stream", "from_ingester", fromIngesterID, "err", err)
level.Error(logger).Log("msg", "Error closing TransferChunks stream", "from_ingester", fromIngesterID, "err", err)
return err
}
level.Info(util.Logger).Log("msg", "Successfully transferred chunks", "from_ingester", fromIngesterID, "series_received", seriesReceived)
level.Info(logger).Log("msg", "Successfully transferred chunks", "from_ingester", fromIngesterID, "series_received", seriesReceived)
return nil
}
@ -188,7 +189,7 @@ func (i *Ingester) TransferOut(ctx context.Context) error {
return nil
}
level.Error(util.Logger).Log("msg", "transfer failed", "err", err)
level.Error(util.WithContext(ctx, util.Logger)).Log("msg", "transfer failed", "err", err)
backoff.Wait()
}
@ -196,18 +197,19 @@ func (i *Ingester) TransferOut(ctx context.Context) error {
}
func (i *Ingester) transferOut(ctx context.Context) error {
logger := util.WithContext(ctx, util.Logger)
targetIngester, err := i.findTransferTarget(ctx)
if err != nil {
return fmt.Errorf("cannot find ingester to transfer chunks to: %v", err)
}
level.Info(util.Logger).Log("msg", "sending chunks", "to_ingester", targetIngester.Addr)
level.Info(logger).Log("msg", "sending chunks", "to_ingester", targetIngester.Addr)
c, err := i.cfg.ingesterClientFactory(i.clientConfig, targetIngester.Addr)
if err != nil {
return err
}
if c, ok := c.(io.Closer); ok {
defer helpers.LogError("closing client", c.Close)
defer helpers.LogErrorWithContext(ctx, "closing client", c.Close)
}
ic := c.(logproto.IngesterClient)
@ -244,7 +246,7 @@ func (i *Ingester) transferOut(ctx context.Context) error {
FromIngesterId: i.lifecycler.ID,
})
if err != nil {
level.Error(util.Logger).Log("msg", "failed sending stream's chunks to ingester", "to_ingester", targetIngester.Addr, "err", err)
level.Error(logger).Log("msg", "failed sending stream's chunks to ingester", "to_ingester", targetIngester.Addr, "err", err)
return err
}
@ -262,7 +264,7 @@ func (i *Ingester) transferOut(ctx context.Context) error {
}
i.flushQueuesDone.Wait()
level.Info(util.Logger).Log("msg", "successfully sent chunks", "to_ingester", targetIngester.Addr)
level.Info(logger).Log("msg", "successfully sent chunks", "to_ingester", targetIngester.Addr)
return nil
}

@ -201,7 +201,7 @@ func (ng *engine) exec(ctx context.Context, q *query) (promql.Value, error) {
if err != nil {
return nil, err
}
defer helpers.LogError("closing iterator", iter.Close)
defer helpers.LogErrorWithContext(ctx, "closing iterator", iter.Close)
streams, err := readStreams(iter, q.limit, q.direction, q.interval)
return streams, err
}
@ -219,7 +219,7 @@ func (ng *engine) evalSample(ctx context.Context, expr SampleExpr, q *query) (pr
if err != nil {
return nil, err
}
defer helpers.LogError("closing SampleExpr", stepEvaluator.Close)
defer helpers.LogErrorWithContext(ctx, "closing SampleExpr", stepEvaluator.Close)
seriesIndex := map[uint64]*promql.Series{}

@ -63,9 +63,10 @@ var (
)
func RecordMetrics(ctx context.Context, p Params, status string, stats stats.Result) {
logger := util.WithContext(ctx, util.Logger)
queryType, err := QueryType(p.Query())
if err != nil {
level.Warn(util.Logger).Log("msg", "error parsing query type", "err", err)
level.Warn(logger).Log("msg", "error parsing query type", "err", err)
}
rt := string(GetRangeType(p))
@ -77,10 +78,7 @@ func RecordMetrics(ctx context.Context, p Params, status string, stats stats.Res
}
// we also log queries, useful for troubleshooting slow queries.
level.Info(
// ensure we have traceID & orgId
util.WithContext(ctx, util.Logger),
).Log(
level.Info(logger).Log(
"latency", latencyType, // this can be used to filter log lines.
"query", p.Query(),
"query_type", queryType,

@ -47,7 +47,7 @@ func CollectTrailer(ctx context.Context) grpc.CallOption {
func SendAsTrailer(ctx context.Context, stream grpc.ServerStream) {
trailer, err := encodeTrailer(ctx)
if err != nil {
level.Warn(util.Logger).Log("msg", "failed to encode trailer", "err", err)
level.Warn(util.WithContext(ctx, util.Logger)).Log("msg", "failed to encode trailer", "err", err)
return
}
stream.SetTrailer(trailer)
@ -92,7 +92,7 @@ func decodeTrailers(ctx context.Context) Result {
}
res.Ingester.TotalReached = int32(len(collector.trailers))
for _, meta := range collector.trailers {
ing := decodeTrailer(meta)
ing := decodeTrailer(ctx, meta)
res.Ingester.TotalChunksMatched += ing.Ingester.TotalChunksMatched
res.Ingester.TotalBatches += ing.Ingester.TotalBatches
res.Ingester.TotalLinesSent += ing.Ingester.TotalLinesSent
@ -109,26 +109,27 @@ func decodeTrailers(ctx context.Context) Result {
return res
}
func decodeTrailer(meta *metadata.MD) Result {
func decodeTrailer(ctx context.Context, meta *metadata.MD) Result {
logger := util.WithContext(ctx, util.Logger)
var ingData IngesterData
values := meta.Get(ingesterDataKey)
if len(values) == 1 {
if err := jsoniter.UnmarshalFromString(values[0], &ingData); err != nil {
level.Warn(util.Logger).Log("msg", "could not unmarshal ingester data", "err", err)
level.Warn(logger).Log("msg", "could not unmarshal ingester data", "err", err)
}
}
var chunkData ChunkData
values = meta.Get(chunkDataKey)
if len(values) == 1 {
if err := jsoniter.UnmarshalFromString(values[0], &chunkData); err != nil {
level.Warn(util.Logger).Log("msg", "could not unmarshal chunk data", "err", err)
level.Warn(logger).Log("msg", "could not unmarshal chunk data", "err", err)
}
}
var storeData StoreData
values = meta.Get(storeDataKey)
if len(values) == 1 {
if err := jsoniter.UnmarshalFromString(values[0], &storeData); err != nil {
level.Warn(util.Logger).Log("msg", "could not unmarshal chunk data", "err", err)
level.Warn(logger).Log("msg", "could not unmarshal chunk data", "err", err)
}
}
return Result{

@ -170,6 +170,7 @@ func (q *Querier) TailHandler(w http.ResponseWriter, r *http.Request) {
upgrader := websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool { return true },
}
logger := util.WithContext(r.Context(), util.Logger)
req, err := loghttp.ParseTailQuery(r)
if err != nil {
@ -185,26 +186,26 @@ func (q *Querier) TailHandler(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
level.Error(util.Logger).Log("msg", "Error in upgrading websocket", "err", err)
level.Error(logger).Log("msg", "Error in upgrading websocket", "err", err)
return
}
defer func() {
if err := conn.Close(); err != nil {
level.Error(util.Logger).Log("msg", "Error closing websocket", "err", err)
level.Error(logger).Log("msg", "Error closing websocket", "err", err)
}
}()
tailer, err := q.Tail(r.Context(), req)
if err != nil {
if err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error())); err != nil {
level.Error(util.Logger).Log("msg", "Error connecting to ingesters for tailing", "err", err)
level.Error(logger).Log("msg", "Error connecting to ingesters for tailing", "err", err)
}
return
}
defer func() {
if err := tailer.close(); err != nil {
level.Error(util.Logger).Log("msg", "Error closing Tailer", "err", err)
level.Error(logger).Log("msg", "Error closing Tailer", "err", err)
}
}()
@ -224,12 +225,12 @@ func (q *Querier) TailHandler(w http.ResponseWriter, r *http.Request) {
if closeErr.Code == websocket.CloseNormalClosure {
break
}
level.Error(util.Logger).Log("msg", "Error from client", "err", err)
level.Error(logger).Log("msg", "Error from client", "err", err)
break
} else if tailer.stopped {
return
} else {
level.Error(util.Logger).Log("msg", "Unexpected error from client", "err", err)
level.Error(logger).Log("msg", "Unexpected error from client", "err", err)
break
}
}
@ -247,25 +248,25 @@ func (q *Querier) TailHandler(w http.ResponseWriter, r *http.Request) {
err = marshal_legacy.WriteTailResponseJSON(*response, conn)
}
if err != nil {
level.Error(util.Logger).Log("msg", "Error writing to websocket", "err", err)
level.Error(logger).Log("msg", "Error writing to websocket", "err", err)
if err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error())); err != nil {
level.Error(util.Logger).Log("msg", "Error writing close message to websocket", "err", err)
level.Error(logger).Log("msg", "Error writing close message to websocket", "err", err)
}
return
}
case err := <-closeErrChan:
level.Error(util.Logger).Log("msg", "Error from iterator", "err", err)
level.Error(logger).Log("msg", "Error from iterator", "err", err)
if err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error())); err != nil {
level.Error(util.Logger).Log("msg", "Error writing close message to websocket", "err", err)
level.Error(logger).Log("msg", "Error writing close message to websocket", "err", err)
}
return
case <-ticker.C:
// This is to periodically check whether connection is active, useful to clean up dead connections when there are no entries to send
if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil {
level.Error(util.Logger).Log("msg", "Error writing ping message to websocket", "err", err)
level.Error(logger).Log("msg", "Error writing ping message to websocket", "err", err)
if err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error())); err != nil {
level.Error(util.Logger).Log("msg", "Error writing close message to websocket", "err", err)
level.Error(logger).Log("msg", "Error writing close message to websocket", "err", err)
}
return
}

@ -171,6 +171,10 @@ func (c *tailClientMock) CloseSend() error {
return nil
}
func (c *tailClientMock) Context() context.Context {
return context.Background()
}
func (c *tailClientMock) SendMsg(m interface{}) error {
return nil
}

@ -10,7 +10,6 @@ import (
"time"
"github.com/cortexproject/cortex/pkg/querier/queryrange"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/spanlogger"
"github.com/go-kit/kit/log/level"
"github.com/weaveworks/common/middleware"
@ -93,7 +92,7 @@ func StatsCollectorMiddleware() queryrange.Middleware {
case *LokiPromResponse:
statistics = &r.Statistics
default:
level.Warn(util.Logger).Log("msg", fmt.Sprintf("cannot compute stats, unexpected type: %T", resp))
level.Warn(logger).Log("msg", fmt.Sprintf("cannot compute stats, unexpected type: %T", resp))
}
}
if statistics != nil {

@ -197,10 +197,12 @@ func (t *Tailer) readTailClient(addr string, querierTailClient logproto.Querier_
var resp *logproto.TailResponse
var err error
defer t.dropTailClient(addr)
logger := util.WithContext(querierTailClient.Context(), util.Logger)
for {
if t.stopped {
if err := querierTailClient.CloseSend(); err != nil {
level.Error(util.Logger).Log("msg", "Error closing grpc tail client", "err", err)
level.Error(logger).Log("msg", "Error closing grpc tail client", "err", err)
}
break
}
@ -208,7 +210,7 @@ func (t *Tailer) readTailClient(addr string, querierTailClient logproto.Querier_
if err != nil {
// We don't want to log error when its due to stopping the tail request
if !t.stopped {
level.Error(util.Logger).Log("msg", "Error receiving response from grpc tail client", "err", err)
level.Error(logger).Log("msg", "Error receiving response from grpc tail client", "err", err)
}
break
}

@ -127,7 +127,7 @@ func newBatchChunkIterator(ctx context.Context, chunks []*chunkenc.LazyChunk, ba
}
err = next.Close()
if err != nil {
level.Error(util.Logger).Log("msg", "Failed to close the pre-fetched iterator when pre-fetching was canceled", "err", err)
level.Error(util.WithContext(ctx, util.Logger)).Log("msg", "Failed to close the pre-fetched iterator when pre-fetching was canceled", "err", err)
}
return
case res.next <- &struct {

Loading…
Cancel
Save