diff --git a/CHANGELOG.md b/CHANGELOG.md index 9c1d397b56..ca7147950c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -98,6 +98,7 @@ Check the history of the branch FIXME. * [7414](https://github.com/grafana/loki/pull/7414) **thepalbi**: Add basic tracing support ##### Fixes +* [7394](https://github.com/grafana/loki/pull/7394) **liguozhong**: Fix issue with the Cloudflare target that caused it to stop working after it received an error in the logpull request as explained in issue https://github.com/grafana/loki/issues/6150 * [6766](https://github.com/grafana/loki/pull/6766) **kavirajk**: fix(logql): Make `LabelSampleExtractor` ignore processing the line if it doesn't contain that specific label. Fixes unwrap behavior explained in the issue https://github.com/grafana/loki/issues/6713 * [7016](https://github.com/grafana/loki/pull/7016) **chodges15**: Fix issue with dropping logs when a file based SD target's labels are updated * [7461](https://github.com/grafana/loki/pull/7461) **MarNicGit**: Promtail: Fix collecting userdata field from Windows Event Log diff --git a/clients/pkg/promtail/targets/cloudflare/target.go b/clients/pkg/promtail/targets/cloudflare/target.go index 91ae593744..2cb4cea974 100644 --- a/clients/pkg/promtail/targets/cloudflare/target.go +++ b/clients/pkg/promtail/targets/cloudflare/target.go @@ -158,6 +158,9 @@ func (t *Target) pull(ctx context.Context, start, end time.Time) error { level.Warn(t.logger).Log("msg", "failed iterating over logs, out of cloudflare range, not retrying", "err", err, "start", start, "end", end, "retries", backoff.NumRetries()) return nil } else if err != nil { + if it != nil { + it.Close() + } errs.Add(err) backoff.Wait() continue diff --git a/clients/pkg/promtail/targets/cloudflare/target_test.go b/clients/pkg/promtail/targets/cloudflare/target_test.go index 89edc7e825..d275a7e845 100644 --- a/clients/pkg/promtail/targets/cloudflare/target_test.go +++ b/clients/pkg/promtail/targets/cloudflare/target_test.go @@ -101,6 +101,37 @@ func Test_CloudflareTarget(t *testing.T) { require.Greater(t, newPos, end.UnixNano()) } +func Test_RetryErrorLogpullReceived(t *testing.T) { + var ( + w = log.NewSyncWriter(os.Stderr) + logger = log.NewLogfmtLogger(w) + end = time.Unix(0, time.Hour.Nanoseconds()) + start = time.Unix(0, end.Add(-30*time.Minute).UnixNano()) + client = fake.New(func() {}) + cfClient = newFakeCloudflareClient() + ) + cfClient.On("LogpullReceived", mock.Anything, start, end).Return(&fakeLogIterator{ + err: ErrorLogpullReceived, + }, nil).Times(2) // just retry once + // replace the client + getClient = func(apiKey, zoneID string, fields []string) (Client, error) { + return cfClient, nil + } + defaultBackoff.MinBackoff = 0 + defaultBackoff.MaxBackoff = 5 + ta := &Target{ + logger: logger, + handler: client, + client: cfClient, + config: &scrapeconfig.CloudflareConfig{ + Labels: make(model.LabelSet), + }, + metrics: NewMetrics(nil), + } + + require.NoError(t, ta.pull(context.Background(), start, end)) +} + func Test_RetryErrorIterating(t *testing.T) { var ( w = log.NewSyncWriter(os.Stderr) @@ -124,6 +155,9 @@ func Test_RetryErrorIterating(t *testing.T) { `{"EdgeStartTimestamp":3, "EdgeRequestHost":"foo.com"}`, }, }, nil).Once() + cfClient.On("LogpullReceived", mock.Anything, start, end).Return(&fakeLogIterator{ + err: ErrorLogpullReceived, + }, nil).Once() // replace the client. getClient = func(apiKey, zoneID string, fields []string) (Client, error) { return cfClient, nil diff --git a/clients/pkg/promtail/targets/cloudflare/util_test.go b/clients/pkg/promtail/targets/cloudflare/util_test.go index 14d241c513..ada3d79108 100644 --- a/clients/pkg/promtail/targets/cloudflare/util_test.go +++ b/clients/pkg/promtail/targets/cloudflare/util_test.go @@ -9,6 +9,8 @@ import ( "github.com/stretchr/testify/mock" ) +var ErrorLogpullReceived = errors.New("error logpull received") + type fakeCloudflareClient struct { mock.Mock } @@ -45,7 +47,12 @@ func (f *fakeLogIterator) Next() bool { func (f *fakeLogIterator) Err() error { return f.err } func (f *fakeLogIterator) Line() []byte { return []byte(f.current) } func (f *fakeLogIterator) Fields() (map[string]string, error) { return nil, nil } -func (f *fakeLogIterator) Close() error { return nil } +func (f *fakeLogIterator) Close() error { + if f.err == ErrorLogpullReceived { + f.err = nil + } + return nil +} func newFakeCloudflareClient() *fakeCloudflareClient { return &fakeCloudflareClient{} @@ -54,7 +61,11 @@ func newFakeCloudflareClient() *fakeCloudflareClient { func (f *fakeCloudflareClient) LogpullReceived(ctx context.Context, start, end time.Time) (cloudflare.LogpullReceivedIterator, error) { r := f.Called(ctx, start, end) if r.Get(0) != nil { - return r.Get(0).(cloudflare.LogpullReceivedIterator), nil + it := r.Get(0).(cloudflare.LogpullReceivedIterator) + if it.Err() == ErrorLogpullReceived { + return it, it.Err() + } + return it, nil } return nil, r.Error(1) }