Retry getting logs from pubsub forever (#8283)

pull/8290/head
Travis Patterson 3 years ago committed by GitHub
parent 1728d09cba
commit 078a040794
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 25
      clients/pkg/promtail/targets/gcplog/pull_target.go
  2. 29
      clients/pkg/promtail/targets/gcplog/pull_target_test.go

@ -3,7 +3,6 @@ package gcplog
import (
"cloud.google.com/go/pubsub"
"context"
"fmt"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/backoff"
@ -22,7 +21,7 @@ import (
var defaultBackoff = backoff.Config{
MinBackoff: 1 * time.Second,
MaxBackoff: 10 * time.Second,
MaxRetries: 5,
MaxRetries: 0, // Retry forever
}
// pubsubSubscription allows us to mock pubsub for testing
@ -101,15 +100,12 @@ func (t *pullTarget) run() error {
t.wg.Add(1)
defer t.wg.Done()
subscriptionErr := make(chan error)
go t.consumeSubscription(subscriptionErr)
go t.consumeSubscription()
for {
select {
case <-t.ctx.Done():
return t.ctx.Err()
case e := <-subscriptionErr:
return e
case m := <-t.msgs:
entry, err := parseGCPLogsEntry(m.Data, t.config.Labels, nil, t.config.UseIncomingTimestamp, t.relabelConfig)
if err != nil {
@ -124,32 +120,23 @@ func (t *pullTarget) run() error {
}
}
func (t *pullTarget) consumeSubscription(subscriptionErr chan error) {
func (t *pullTarget) consumeSubscription() {
// NOTE(kavi): `cancel` the context as exiting from this goroutine should stop main `run` loop
// It makesense as no more messages will be received.
defer t.cancel()
var lastError error
for t.backoff.Ongoing() {
lastError = t.sub.Receive(t.ctx, func(ctx context.Context, m *pubsub.Message) {
err := t.sub.Receive(t.ctx, func(ctx context.Context, m *pubsub.Message) {
t.msgs <- m
// When the subscription works properly, it doesn't return
// Reset relevant state here
lastError = nil
t.backoff.Reset()
})
if lastError != nil {
level.Error(t.logger).Log("msg", "failed to receive pubsub messages", "error", lastError)
if err != nil {
level.Error(t.logger).Log("msg", "failed to receive pubsub messages", "error", err)
t.metrics.gcplogErrors.WithLabelValues(t.config.ProjectID).Inc()
t.metrics.gcplogTargetLastSuccessScrape.WithLabelValues(t.config.ProjectID, t.config.Subscription).SetToCurrentTime()
t.backoff.Wait()
}
}
if t.ctx.Err() == nil && t.backoff.Err() != nil {
subscriptionErr <- fmt.Errorf("%w: %s", t.backoff.Err(), lastError.Error())
}
}
func (t *pullTarget) Type() target.TargetType {

@ -63,33 +63,7 @@ func TestPullTarget_RunStop(t *testing.T) {
}, time.Second, 50*time.Millisecond)
})
t.Run("it gives up after MaxRetries of errors", func(t *testing.T) {
tc := testPullTarget(t)
runErr := make(chan error)
go func() {
runErr <- tc.target.run()
}()
tc.sub.errors <- errors.New("something bad")
tc.sub.errors <- errors.New("something bad")
tc.sub.errors <- errors.New("something bad")
tc.sub.errors <- errors.New("something bad")
tc.sub.errors <- errors.New("something bad")
require.NoError(t, tc.target.Stop())
require.Eventually(t, func() bool {
select {
case e := <-runErr:
return e.Error() == "terminated after 5 retries: something bad"
default:
return false
}
}, time.Second, 50*time.Millisecond)
})
t.Run("a successful message resets retrues", func(t *testing.T) {
t.Run("a successful message resets retries", func(t *testing.T) {
tc := testPullTarget(t)
runErr := make(chan error)
@ -273,5 +247,4 @@ func (s *fakeSubscription) Receive(ctx context.Context, f func(context.Context,
var testBackoff = backoff.Config{
MinBackoff: 1 * time.Millisecond,
MaxBackoff: 10 * time.Millisecond,
MaxRetries: 5,
}

Loading…
Cancel
Save