canary: Adds locking to prevent multiple concurrent invocations of `confirmMissing` from clobbering each other (#5568)

k89
afayngelerindbx 3 years ago committed by GitHub
parent f6d15fba48
commit e49c360c36
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      CHANGELOG.md
  2. 18
      pkg/canary/comparator/comparator.go
  3. 40
      pkg/canary/comparator/comparator_test.go

@ -1,10 +1,10 @@
## Main
* [5568](https://github.com/grafana/loki/pull/5568) **afayngelerindbx**: Fix canary panics due to concurrent execution of `confirmMissing`
* [5552](https://github.com/grafana/loki/pull/5552) **jiachengxu**: Loki mixin: add `DiskSpaceUtilizationPanel`
* [5541](https://github.com/grafana/loki/pull/5541) **bboreham**: Queries: reject very deeply nested regexps which could crash Loki.
* [5536](https://github.com/grafana/loki/pull/5536) **jiachengxu**: Loki mixin: make labelsSelector in loki chunks dashboards configurable
* [5535](https://github.com/grafana/loki/pull/5535) **jiachengxu**: Loki mixins: use labels selector for loki chunks dashboard
* [5507](https://github.com/grafana/loki/pull/5507) **MichelHollands**: Remove extra param in call for inflightRequests metric.
* [5507](https://github.com/grafana/loki/pull/5507) **MichelHollands**: Remove extra param in call for inflightRequests metric.
* [5356](https://github.com/grafana/loki/pull/5356) **jbschami**: Enhance lambda-promtail to support adding extra labels from an environment variable value
* [5409](https://github.com/grafana/loki/pull/5409) **ldb**: Enable best effort parsing for Syslog messages
* [5392](https://github.com/grafana/loki/pull/5392) **MichelHollands**: Etcd credentials are parsed as secrets instead of plain text now.

@ -431,15 +431,28 @@ func (c *Comparator) pruneEntries(currentTime time.Time) {
func (c *Comparator) confirmMissing(currentTime time.Time) {
// Because we are querying loki timestamps vs the timestamp in the log,
// make the range +/- 10 seconds to allow for clock inaccuracies
c.missingMtx.Lock()
if len(c.missingEntries) == 0 {
c.missingMtx.Unlock()
return
}
start := *c.missingEntries[0]
start = start.Add(-10 * time.Second)
end := *c.missingEntries[len(c.missingEntries)-1]
end = end.Add(10 * time.Second)
c.missingMtx.Unlock()
recvd, err := c.rdr.Query(start, end)
if err != nil {
fmt.Fprintf(c.w, "error querying loki: %s\n", err)
return
}
// Now that query has returned, take out the lock on the missingEntries list so we can modify it
// It's possible more entries were added to this list but that's ok, if they match something in the
// query result we will remove them, if they don't they won't be old enough yet to remove.
c.missingMtx.Lock()
defer c.missingMtx.Unlock()
// This is to help debug some missing log entries when queried,
// let's print exactly what we are missing and what Loki sent back
for _, r := range c.missingEntries {
@ -449,11 +462,6 @@ func (c *Comparator) confirmMissing(currentTime time.Time) {
fmt.Fprintf(c.w, DebugQueryResult, r.UnixNano())
}
// Now that query has returned, take out the lock on the missingEntries list so we can modify it
// It's possible more entries were added to this list but that's ok, if they match something in the
// query result we will remove them, if they don't they won't be old enough yet to remove.
c.missingMtx.Lock()
defer c.missingMtx.Unlock()
k := 0
for i, m := range c.missingEntries {
found := false

@ -218,6 +218,46 @@ func TestEntryNeverReceived(t *testing.T) {
}
// Ensure that if confirmMissing calls pile up and run concurrently, this doesn't cause a panic
func TestConcurrentConfirmMissing(t *testing.T) {
found := []time.Time{
time.Unix(0, 0),
time.UnixMilli(1),
time.UnixMilli(2),
}
mr := &mockReader{resp: found}
output := &bytes.Buffer{}
wait := 30 * time.Millisecond
maxWait := 30 * time.Millisecond
c := NewComparator(output, wait, maxWait, 50*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 0, 1*time.Minute, 0, 0, 1, make(chan time.Time), make(chan time.Time), mr, false)
for _, t := range found {
tCopy := t
c.missingEntries = append(c.missingEntries, &tCopy)
}
wg := sync.WaitGroup{}
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
assert.NotPanics(t, func() { c.confirmMissing(time.UnixMilli(3)) })
}()
}
assert.Eventually(
t,
func() bool {
wg.Wait()
return true
},
time.Second,
10*time.Millisecond,
)
}
func TestPruneAckdEntires(t *testing.T) {
actual := &bytes.Buffer{}
wait := 30 * time.Millisecond

Loading…
Cancel
Save