logging: removed some noise in logs from live-tailing (#1100)

* Removed some noise in logs by detecting cases where connection is erroring due to stopped or closed tail request

* fixed log formats
pull/1114/head
Sandeep Sukhani 6 years ago committed by Cyril Tovena
parent 05f8e564db
commit 569748ff18
  1. 9
      pkg/ingester/tailer.go
  2. 22
      pkg/querier/http.go
  3. 12
      pkg/querier/tail.go
  4. 20
      pkg/util/errors.go

@ -2,15 +2,15 @@ package ingester
import (
"encoding/binary"
"fmt"
"hash/fnv"
"sync"
"time"
"github.com/cortexproject/cortex/pkg/util"
cortex_util "github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log/level"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/util"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
)
@ -91,7 +91,10 @@ func (t *tailer) loop() {
tailResponse := logproto.TailResponse{Stream: stream, DroppedStreams: t.popDroppedStreams()}
err = t.conn.Send(&tailResponse)
if err != nil {
level.Error(util.Logger).Log("Error writing to tail client", fmt.Sprintf("%v", err))
// Don't log any error due to tail client closing the connection
if !util.IsConnCanceled(err) {
level.Error(cortex_util.Logger).Log("msg", "Error writing to tail client", "err", err)
}
t.close()
return
}

@ -362,19 +362,19 @@ func (q *Querier) TailHandler(w http.ResponseWriter, r *http.Request) {
if tailRequestPtr.DelayFor > maxDelayForInTailing {
server.WriteError(w, fmt.Errorf("delay_for can't be greater than %d", maxDelayForInTailing))
level.Error(util.Logger).Log("Error in upgrading websocket", fmt.Sprintf("%v", err))
level.Error(util.Logger).Log("msg", "Error in upgrading websocket", "err", err)
return
}
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
level.Error(util.Logger).Log("Error in upgrading websocket", fmt.Sprintf("%v", err))
level.Error(util.Logger).Log("msg", "Error in upgrading websocket", "err", err)
return
}
defer func() {
if err := conn.Close(); err != nil {
level.Error(util.Logger).Log("Error closing websocket", fmt.Sprintf("%v", err))
level.Error(util.Logger).Log("msg", "Error closing websocket", "err", err)
}
}()
@ -385,13 +385,13 @@ func (q *Querier) TailHandler(w http.ResponseWriter, r *http.Request) {
tailer, err := q.Tail(r.Context(), &tailRequest)
if err != nil {
if err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error())); err != nil {
level.Error(util.Logger).Log("Error connecting to ingesters for tailing", fmt.Sprintf("%v", err))
level.Error(util.Logger).Log("msg", "Error connecting to ingesters for tailing", "err", err)
}
return
}
defer func() {
if err := tailer.close(); err != nil {
level.Error(util.Logger).Log("Error closing Tailer", fmt.Sprintf("%v", err))
level.Error(util.Logger).Log("msg", "Error closing Tailer", "err", err)
}
}()
@ -412,25 +412,25 @@ func (q *Querier) TailHandler(w http.ResponseWriter, r *http.Request) {
err = marshal_legacy.WriteTailResponseJSON(*response, conn)
}
if err != nil {
level.Error(util.Logger).Log("Error writing to websocket", fmt.Sprintf("%v", err))
level.Error(util.Logger).Log("msg", "Error writing to websocket", "err", err)
if err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error())); err != nil {
level.Error(util.Logger).Log("Error writing close message to websocket", fmt.Sprintf("%v", err))
level.Error(util.Logger).Log("msg", "Error writing close message to websocket", "err", err)
}
return
}
case err := <-closeErrChan:
level.Error(util.Logger).Log("Error from iterator", fmt.Sprintf("%v", err))
level.Error(util.Logger).Log("msg", "Error from iterator", "err", err)
if err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error())); err != nil {
level.Error(util.Logger).Log("Error writing close message to websocket", fmt.Sprintf("%v", err))
level.Error(util.Logger).Log("msg", "Error writing close message to websocket", "err", err)
}
return
case <-ticker.C:
// This is to periodically check whether connection is active, useful to clean up dead connections when there are no entries to send
if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil {
level.Error(util.Logger).Log("Error writing ping message to websocket", fmt.Sprintf("%v", err))
level.Error(util.Logger).Log("msg", "Error writing ping message to websocket", "err", err)
if err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error())); err != nil {
level.Error(util.Logger).Log("Error writing close message to websocket", fmt.Sprintf("%v", err))
level.Error(util.Logger).Log("msg", "Error writing close message to websocket", "err", err)
}
return
}

@ -1,7 +1,6 @@
package querier
import (
"fmt"
"sync"
"time"
@ -126,12 +125,12 @@ func (t *Tailer) loop() {
if numClients == 0 {
// All the connections to ingesters are dropped, try reconnecting or return error
if err := t.checkIngesterConnections(); err != nil {
level.Error(util.Logger).Log("Error reconnecting to ingesters", fmt.Sprintf("%v", err))
level.Error(util.Logger).Log("msg", "Error reconnecting to ingesters", "err", err)
} else {
continue
}
if err := t.close(); err != nil {
level.Error(util.Logger).Log("Error closing Tailer", fmt.Sprintf("%v", err))
level.Error(util.Logger).Log("msg", "Error closing Tailer", "err", err)
}
t.closeErrChan <- errors.New("all ingesters closed the connection")
return
@ -199,13 +198,16 @@ func (t *Tailer) readTailClient(addr string, querierTailClient logproto.Querier_
for {
if t.stopped {
if err := querierTailClient.CloseSend(); err != nil {
level.Error(util.Logger).Log("Error closing grpc tail client", fmt.Sprintf("%v", err))
level.Error(util.Logger).Log("msg", "Error closing grpc tail client", "err", err)
}
break
}
resp, err = querierTailClient.Recv()
if err != nil {
level.Error(util.Logger).Log("Error receiving response from grpc tail client", fmt.Sprintf("%v", err))
// We don't want to log error when its due to stopping the tail request
if !t.stopped {
level.Error(util.Logger).Log("msg", "Error receiving response from grpc tail client", "err", err)
}
break
}
t.pushTailResponseFromIngester(resp)

@ -3,6 +3,9 @@ package util
import (
"bytes"
"fmt"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// The MultiError type implements the error interface, and contains the
@ -46,3 +49,20 @@ func (es MultiError) Err() error {
}
return es
}
// IsConnCanceled returns true, if error is from a closed gRPC connection.
// copied from https://github.com/etcd-io/etcd/blob/7f47de84146bdc9225d2080ec8678ca8189a2d2b/clientv3/client.go#L646
func IsConnCanceled(err error) bool {
if err == nil {
return false
}
// >= gRPC v1.23.x
s, ok := status.FromError(err)
if ok {
// connection is canceled or server has already closed the connection
return s.Code() == codes.Canceled || s.Message() == "transport is closing"
}
return false
}

Loading…
Cancel
Save