diff --git a/pkg/canary/comparator/comparator.go b/pkg/canary/comparator/comparator.go index 4a668766f2..3d118eb0c7 100644 --- a/pkg/canary/comparator/comparator.go +++ b/pkg/canary/comparator/comparator.go @@ -13,11 +13,13 @@ import ( ) const ( - ErrOutOfOrderEntry = "out of order entry %s was received before entries: %v\n" - ErrEntryNotReceivedWs = "websocket failed to receive entry %v within %f seconds\n" - ErrEntryNotReceived = "failed to receive entry %v within %f seconds\n" - ErrDuplicateEntry = "received a duplicate entry for ts %v\n" - ErrUnexpectedEntry = "received an unexpected entry with ts %v\n" + ErrOutOfOrderEntry = "out of order entry %s was received before entries: %v\n" + ErrEntryNotReceivedWs = "websocket failed to receive entry %v within %f seconds\n" + ErrEntryNotReceived = "failed to receive entry %v within %f seconds\n" + ErrDuplicateEntry = "received a duplicate entry for ts %v\n" + ErrUnexpectedEntry = "received an unexpected entry with ts %v\n" + DebugWebsocketMissingEntry = "websocket missing entry: %v\n" + DebugQueryResult = "confirmation query result: %v\n" ) var ( @@ -125,7 +127,7 @@ func (c *Comparator) entryReceived(ts time.Time) { // If this isn't the first item in the list we received it out of order if i != 0 { outOfOrderEntries.Inc() - _, _ = fmt.Fprintf(c.w, ErrOutOfOrderEntry, e, c.entries[:i]) + fmt.Fprintf(c.w, ErrOutOfOrderEntry, e, c.entries[:i]) } responseLatency.Observe(time.Since(ts).Seconds()) // Put this element in the acknowledged entries list so we can use it to check for duplicates @@ -145,12 +147,12 @@ func (c *Comparator) entryReceived(ts time.Time) { if ts.Equal(*e) { duplicate = true duplicateEntries.Inc() - _, _ = fmt.Fprintf(c.w, ErrDuplicateEntry, ts.UnixNano()) + fmt.Fprintf(c.w, ErrDuplicateEntry, ts.UnixNano()) break } } if !duplicate { - _, _ = fmt.Fprintf(c.w, ErrUnexpectedEntry, ts.UnixNano()) + fmt.Fprintf(c.w, ErrUnexpectedEntry, ts.UnixNano()) unexpectedEntries.Inc() } } @@ -199,7 +201,7 @@ func (c *Comparator) pruneEntries() { if e.Before(time.Now().Add(-c.maxWait)) { missing = append(missing, e) wsMissingEntries.Inc() - _, _ = fmt.Fprintf(c.w, ErrEntryNotReceivedWs, e.UnixNano(), c.maxWait.Seconds()) + fmt.Fprintf(c.w, ErrEntryNotReceivedWs, e.UnixNano(), c.maxWait.Seconds()) } else { if i != k { c.entries[k] = c.entries[i] @@ -249,9 +251,18 @@ func (c *Comparator) confirmMissing(missing []*time.Time) { end = end.Add(10 * time.Second) recvd, err := c.rdr.Query(start, end) if err != nil { - _, _ = fmt.Fprintf(c.w, "error querying loki: %s", err) + fmt.Fprintf(c.w, "error querying loki: %s\n", err) return } + // This is to help debug some missing log entries when queried, + // let's print exactly what we are missing and what Loki sent back + for _, r := range missing { + fmt.Fprintf(c.w, DebugWebsocketMissingEntry, r.UnixNano()) + } + for _, r := range recvd { + fmt.Fprintf(c.w, DebugQueryResult, r.UnixNano()) + } + k := 0 for i, m := range missing { found := false @@ -277,6 +288,6 @@ func (c *Comparator) confirmMissing(missing []*time.Time) { missing = missing[:k] for _, e := range missing { missingEntries.Inc() - _, _ = fmt.Fprintf(c.w, ErrEntryNotReceived, e.UnixNano(), c.maxWait.Seconds()) + fmt.Fprintf(c.w, ErrEntryNotReceived, e.UnixNano(), c.maxWait.Seconds()) } } diff --git a/pkg/canary/comparator/comparator_test.go b/pkg/canary/comparator/comparator_test.go index 80d9c7d31f..0af0656644 100644 --- a/pkg/canary/comparator/comparator_test.go +++ b/pkg/canary/comparator/comparator_test.go @@ -179,11 +179,21 @@ func TestEntryNeverReceived(t *testing.T) { c.pruneEntries() - expected := fmt.Sprintf(ErrOutOfOrderEntry+ErrOutOfOrderEntry+ErrEntryNotReceivedWs+ErrEntryNotReceivedWs+ErrEntryNotReceived, + expected := fmt.Sprintf(ErrOutOfOrderEntry+ErrOutOfOrderEntry+ // Out of order because we missed entries + ErrEntryNotReceivedWs+ErrEntryNotReceivedWs+ // Complain about missed entries + DebugWebsocketMissingEntry+DebugWebsocketMissingEntry+ // List entries we are missing + DebugQueryResult+DebugQueryResult+DebugQueryResult+DebugQueryResult+ // List entries we got back from Loki + ErrEntryNotReceived, // List entry not received from Loki t3, []time.Time{t2}, t5, []time.Time{t2, t4}, t2.UnixNano(), maxWait.Seconds(), t4.UnixNano(), maxWait.Seconds(), + t2.UnixNano(), + t4.UnixNano(), + t1.UnixNano(), + t3.UnixNano(), + t4.UnixNano(), + t5.UnixNano(), t2.UnixNano(), maxWait.Seconds()) assert.Equal(t, expected, actual.String()) diff --git a/pkg/canary/reader/reader.go b/pkg/canary/reader/reader.go index 1b1824c428..f139bb2d2d 100644 --- a/pkg/canary/reader/reader.go +++ b/pkg/canary/reader/reader.go @@ -77,7 +77,7 @@ func NewReader(writer io.Writer, receivedChan chan time.Time, tls bool, go func() { <-rd.quit if rd.conn != nil { - _, _ = fmt.Fprintf(rd.w, "shutting down reader\n") + fmt.Fprintf(rd.w, "shutting down reader\n") rd.shuttingDown = true _ = rd.conn.Close() } @@ -107,7 +107,7 @@ func (r *Reader) Query(start time.Time, end time.Time) ([]time.Time, error) { "&query=" + url.QueryEscape(fmt.Sprintf("{stream=\"stdout\",%v=\"%v\"}", r.lName, r.lVal)) + "&limit=1000", } - _, _ = fmt.Fprintf(r.w, "Querying loki for missing values with query: %v\n", u.String()) + fmt.Fprintf(r.w, "Querying loki for missing values with query: %v\n", u.String()) req, err := http.NewRequest("GET", u.String(), nil) if err != nil { @@ -142,7 +142,7 @@ func (r *Reader) Query(start time.Time, end time.Time) ([]time.Time, error) { for _, entry := range stream.Entries { ts, err := parseResponse(&entry) if err != nil { - _, _ = fmt.Fprint(r.w, err) + fmt.Fprint(r.w, err) continue } tss = append(tss, *ts) @@ -166,7 +166,7 @@ func (r *Reader) run() { close(r.done) return } - _, _ = fmt.Fprintf(r.w, "error reading websocket: %s\n", err) + fmt.Fprintf(r.w, "error reading websocket: %s\n", err) r.closeAndReconnect() continue } @@ -174,7 +174,7 @@ func (r *Reader) run() { for _, entry := range stream.Entries { ts, err := parseResponse(&entry) if err != nil { - _, _ = fmt.Fprint(r.w, err) + fmt.Fprint(r.w, err) continue } r.recv <- *ts @@ -203,11 +203,11 @@ func (r *Reader) closeAndReconnect() { RawQuery: "query=" + url.QueryEscape(fmt.Sprintf("{stream=\"stdout\",%v=\"%v\"}", r.lName, r.lVal)), } - _, _ = fmt.Fprintf(r.w, "Connecting to loki at %v, querying for label '%v' with value '%v'\n", u.String(), r.lName, r.lVal) + fmt.Fprintf(r.w, "Connecting to loki at %v, querying for label '%v' with value '%v'\n", u.String(), r.lName, r.lVal) c, _, err := websocket.DefaultDialer.Dial(u.String(), r.header) if err != nil { - _, _ = fmt.Fprintf(r.w, "failed to connect to %s with err %s\n", u.String(), err) + fmt.Fprintf(r.w, "failed to connect to %s with err %s\n", u.String(), err) <-time.After(5 * time.Second) continue } diff --git a/pkg/canary/writer/writer.go b/pkg/canary/writer/writer.go index 8d9b163a80..2bf918b03f 100644 --- a/pkg/canary/writer/writer.go +++ b/pkg/canary/writer/writer.go @@ -72,7 +72,7 @@ func (w *Writer) run() { w.prevTsLen = tsLen } - _, _ = fmt.Fprintf(w.w, LogEntry, ts, w.pad) + fmt.Fprintf(w.w, LogEntry, ts, w.pad) w.sent <- t case <-w.quit: return