Re-stablish tailing websocket connection if it's closed unexpectedly. (#8374)

pull/7624/head
Salva Corts 3 years ago committed by GitHub
parent 2d11ae2071
commit 7ad413e87d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 45
      pkg/logcli/query/tail.go

@ -1,6 +1,7 @@
package query
import (
"context"
"log"
"os"
"os/signal"
@ -10,6 +11,7 @@ import (
"github.com/fatih/color"
"github.com/gorilla/websocket"
"github.com/grafana/dskit/backoff"
"github.com/grafana/loki/pkg/logcli/client"
"github.com/grafana/loki/pkg/logcli/output"
@ -34,8 +36,6 @@ func (q *Query) TailQuery(delayFor time.Duration, c client.Client, out output.Lo
os.Exit(0)
}()
tailResponse := new(loghttp.TailResponse)
if len(q.IgnoreLabelsKey) > 0 && !q.Quiet {
log.Println("Ignoring labels key:", color.RedString(strings.Join(q.IgnoreLabelsKey, ",")))
}
@ -44,9 +44,49 @@ func (q *Query) TailQuery(delayFor time.Duration, c client.Client, out output.Lo
log.Println("Print only labels key:", color.RedString(strings.Join(q.ShowLabelsKey, ",")))
}
tailResponse := new(loghttp.TailResponse)
lastReceivedTimestamp := q.Start
for {
err := unmarshal.ReadTailResponseJSON(tailResponse, conn)
if err != nil {
// Check if the websocket connection closed unexpectedly. If so, retry.
// The connection might close unexpectedly if the querier handling the tail request
// in Loki stops running. The following error would be printed:
// "websocket: close 1006 (abnormal closure): unexpected EOF"
if websocket.IsCloseError(err, websocket.CloseAbnormalClosure) {
log.Printf("Remote websocket connection closed unexpectedly (%+v). Connecting again.", err)
// Close previous connection. If it fails to close the connection it should be fine as it is already broken.
if err = conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")); err != nil {
log.Printf("Error closing websocket: %+v", err)
}
// Try to re-establish the connection up to 5 times.
backoff := backoff.New(context.Background(), backoff.Config{
MinBackoff: 1 * time.Second,
MaxBackoff: 10 * time.Second,
MaxRetries: 5,
})
for backoff.Ongoing() {
conn, err = c.LiveTailQueryConn(q.QueryString, delayFor, q.Limit, lastReceivedTimestamp, q.Quiet)
if err == nil {
break
}
log.Println("Error recreating tailing connection after unexpected close, will retry:", err)
backoff.Wait()
}
if err = backoff.Err(); err != nil {
log.Println("Error recreating tailing connection:", err)
return
}
continue
}
log.Println("Error reading stream:", err)
return
}
@ -75,6 +115,7 @@ func (q *Query) TailQuery(delayFor time.Duration, c client.Client, out output.Lo
for _, entry := range stream.Entries {
out.FormatAndPrintln(entry.Timestamp, labels, 0, entry.Line)
lastReceivedTimestamp = entry.Timestamp
}
}

Loading…
Cancel
Save