From 078a040794a0345144920b5ffa57d9c28a2fe5af Mon Sep 17 00:00:00 2001 From: Travis Patterson Date: Wed, 25 Jan 2023 16:47:18 -0700 Subject: [PATCH] Retry getting logs from pubsub forever (#8283) --- .../promtail/targets/gcplog/pull_target.go | 25 ++++------------ .../targets/gcplog/pull_target_test.go | 29 +------------------ 2 files changed, 7 insertions(+), 47 deletions(-) diff --git a/clients/pkg/promtail/targets/gcplog/pull_target.go b/clients/pkg/promtail/targets/gcplog/pull_target.go index 81cee0e5c1..b1f1f15b50 100644 --- a/clients/pkg/promtail/targets/gcplog/pull_target.go +++ b/clients/pkg/promtail/targets/gcplog/pull_target.go @@ -3,7 +3,6 @@ package gcplog import ( "cloud.google.com/go/pubsub" "context" - "fmt" "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/grafana/dskit/backoff" @@ -22,7 +21,7 @@ import ( var defaultBackoff = backoff.Config{ MinBackoff: 1 * time.Second, MaxBackoff: 10 * time.Second, - MaxRetries: 5, + MaxRetries: 0, // Retry forever } // pubsubSubscription allows us to mock pubsub for testing @@ -101,15 +100,12 @@ func (t *pullTarget) run() error { t.wg.Add(1) defer t.wg.Done() - subscriptionErr := make(chan error) - go t.consumeSubscription(subscriptionErr) + go t.consumeSubscription() for { select { case <-t.ctx.Done(): return t.ctx.Err() - case e := <-subscriptionErr: - return e case m := <-t.msgs: entry, err := parseGCPLogsEntry(m.Data, t.config.Labels, nil, t.config.UseIncomingTimestamp, t.relabelConfig) if err != nil { @@ -124,32 +120,23 @@ func (t *pullTarget) run() error { } } -func (t *pullTarget) consumeSubscription(subscriptionErr chan error) { +func (t *pullTarget) consumeSubscription() { // NOTE(kavi): `cancel` the context as exiting from this goroutine should stop main `run` loop // It makesense as no more messages will be received. defer t.cancel() - var lastError error for t.backoff.Ongoing() { - lastError = t.sub.Receive(t.ctx, func(ctx context.Context, m *pubsub.Message) { + err := t.sub.Receive(t.ctx, func(ctx context.Context, m *pubsub.Message) { t.msgs <- m - - // When the subscription works properly, it doesn't return - // Reset relevant state here - lastError = nil t.backoff.Reset() }) - if lastError != nil { - level.Error(t.logger).Log("msg", "failed to receive pubsub messages", "error", lastError) + if err != nil { + level.Error(t.logger).Log("msg", "failed to receive pubsub messages", "error", err) t.metrics.gcplogErrors.WithLabelValues(t.config.ProjectID).Inc() t.metrics.gcplogTargetLastSuccessScrape.WithLabelValues(t.config.ProjectID, t.config.Subscription).SetToCurrentTime() t.backoff.Wait() } } - - if t.ctx.Err() == nil && t.backoff.Err() != nil { - subscriptionErr <- fmt.Errorf("%w: %s", t.backoff.Err(), lastError.Error()) - } } func (t *pullTarget) Type() target.TargetType { diff --git a/clients/pkg/promtail/targets/gcplog/pull_target_test.go b/clients/pkg/promtail/targets/gcplog/pull_target_test.go index fb248b964a..bd6b72c8c9 100644 --- a/clients/pkg/promtail/targets/gcplog/pull_target_test.go +++ b/clients/pkg/promtail/targets/gcplog/pull_target_test.go @@ -63,33 +63,7 @@ func TestPullTarget_RunStop(t *testing.T) { }, time.Second, 50*time.Millisecond) }) - t.Run("it gives up after MaxRetries of errors", func(t *testing.T) { - tc := testPullTarget(t) - - runErr := make(chan error) - go func() { - runErr <- tc.target.run() - }() - - tc.sub.errors <- errors.New("something bad") - tc.sub.errors <- errors.New("something bad") - tc.sub.errors <- errors.New("something bad") - tc.sub.errors <- errors.New("something bad") - tc.sub.errors <- errors.New("something bad") - - require.NoError(t, tc.target.Stop()) - - require.Eventually(t, func() bool { - select { - case e := <-runErr: - return e.Error() == "terminated after 5 retries: something bad" - default: - return false - } - }, time.Second, 50*time.Millisecond) - }) - - t.Run("a successful message resets retrues", func(t *testing.T) { + t.Run("a successful message resets retries", func(t *testing.T) { tc := testPullTarget(t) runErr := make(chan error) @@ -273,5 +247,4 @@ func (s *fakeSubscription) Receive(ctx context.Context, f func(context.Context, var testBackoff = backoff.Config{ MinBackoff: 1 * time.Millisecond, MaxBackoff: 10 * time.Millisecond, - MaxRetries: 5, }