@ -13,11 +13,13 @@ import (
)
const (
ErrOutOfOrderEntry = "out of order entry %s was received before entries: %v\n"
ErrEntryNotReceivedWs = "websocket failed to receive entry %v within %f seconds\n"
ErrEntryNotReceived = "failed to receive entry %v within %f seconds\n"
ErrDuplicateEntry = "received a duplicate entry for ts %v\n"
ErrUnexpectedEntry = "received an unexpected entry with ts %v\n"
ErrOutOfOrderEntry = "out of order entry %s was received before entries: %v\n"
ErrEntryNotReceivedWs = "websocket failed to receive entry %v within %f seconds\n"
ErrEntryNotReceived = "failed to receive entry %v within %f seconds\n"
ErrDuplicateEntry = "received a duplicate entry for ts %v\n"
ErrUnexpectedEntry = "received an unexpected entry with ts %v\n"
DebugWebsocketMissingEntry = "websocket missing entry: %v\n"
DebugQueryResult = "confirmation query result: %v\n"
)
var (
@ -125,7 +127,7 @@ func (c *Comparator) entryReceived(ts time.Time) {
// If this isn't the first item in the list we received it out of order
if i != 0 {
outOfOrderEntries . Inc ( )
_ , _ = fmt . Fprintf ( c . w , ErrOutOfOrderEntry , e , c . entries [ : i ] )
fmt . Fprintf ( c . w , ErrOutOfOrderEntry , e , c . entries [ : i ] )
}
responseLatency . Observe ( time . Since ( ts ) . Seconds ( ) )
// Put this element in the acknowledged entries list so we can use it to check for duplicates
@ -145,12 +147,12 @@ func (c *Comparator) entryReceived(ts time.Time) {
if ts . Equal ( * e ) {
duplicate = true
duplicateEntries . Inc ( )
_ , _ = fmt . Fprintf ( c . w , ErrDuplicateEntry , ts . UnixNano ( ) )
fmt . Fprintf ( c . w , ErrDuplicateEntry , ts . UnixNano ( ) )
break
}
}
if ! duplicate {
_ , _ = fmt . Fprintf ( c . w , ErrUnexpectedEntry , ts . UnixNano ( ) )
fmt . Fprintf ( c . w , ErrUnexpectedEntry , ts . UnixNano ( ) )
unexpectedEntries . Inc ( )
}
}
@ -199,7 +201,7 @@ func (c *Comparator) pruneEntries() {
if e . Before ( time . Now ( ) . Add ( - c . maxWait ) ) {
missing = append ( missing , e )
wsMissingEntries . Inc ( )
_ , _ = fmt . Fprintf ( c . w , ErrEntryNotReceivedWs , e . UnixNano ( ) , c . maxWait . Seconds ( ) )
fmt . Fprintf ( c . w , ErrEntryNotReceivedWs , e . UnixNano ( ) , c . maxWait . Seconds ( ) )
} else {
if i != k {
c . entries [ k ] = c . entries [ i ]
@ -249,9 +251,18 @@ func (c *Comparator) confirmMissing(missing []*time.Time) {
end = end . Add ( 10 * time . Second )
recvd , err := c . rdr . Query ( start , end )
if err != nil {
_ , _ = fmt . Fprintf ( c . w , "error querying loki: %s" , err )
fmt . Fprintf ( c . w , "error querying loki: %s\n " , err )
return
}
// This is to help debug some missing log entries when queried,
// let's print exactly what we are missing and what Loki sent back
for _ , r := range missing {
fmt . Fprintf ( c . w , DebugWebsocketMissingEntry , r . UnixNano ( ) )
}
for _ , r := range recvd {
fmt . Fprintf ( c . w , DebugQueryResult , r . UnixNano ( ) )
}
k := 0
for i , m := range missing {
found := false
@ -277,6 +288,6 @@ func (c *Comparator) confirmMissing(missing []*time.Time) {
missing = missing [ : k ]
for _ , e := range missing {
missingEntries . Inc ( )
_ , _ = fmt . Fprintf ( c . w , ErrEntryNotReceived , e . UnixNano ( ) , c . maxWait . Seconds ( ) )
fmt . Fprintf ( c . w , ErrEntryNotReceived , e . UnixNano ( ) , c . maxWait . Seconds ( ) )
}
}