diff --git a/pkg/dataobj/metastore/metastore.go b/pkg/dataobj/metastore/metastore.go index ffa1467280..be92d636ae 100644 --- a/pkg/dataobj/metastore/metastore.go +++ b/pkg/dataobj/metastore/metastore.go @@ -5,6 +5,7 @@ import ( "context" "fmt" "io" + "iter" "sync" "time" @@ -97,10 +98,7 @@ func (m *Manager) UpdateMetastore(ctx context.Context, dataobjPath string, flush // Work our way through the metastore objects window by window, updating & creating them as needed. // Each one handles its own retries in order to keep making progress in the event of a failure. - minMetastoreWindow := minTimestamp.Truncate(metastoreWindowSize) - maxMetastoreWindow := maxTimestamp.Truncate(metastoreWindowSize) - for metastoreWindow := minMetastoreWindow; !metastoreWindow.After(maxMetastoreWindow); metastoreWindow = metastoreWindow.Add(metastoreWindowSize) { - metastorePath := fmt.Sprintf("tenant-%s/metastore/%s.store", m.tenantID, metastoreWindow.Format(time.RFC3339)) + for metastorePath := range Iter(m.tenantID, minTimestamp, maxTimestamp) { m.backoff.Reset() for m.backoff.Ongoing() { err = m.bucket.GetAndReplace(ctx, metastorePath, func(existing io.Reader) (io.Reader, error) { @@ -183,3 +181,20 @@ func (m *Manager) readFromExisting(ctx context.Context, object *dataobj.Object) } return nil } + +func metastorePath(tenantID string, window time.Time) string { + return fmt.Sprintf("tenant-%s/metastore/%s.store", tenantID, window.Format(time.RFC3339)) +} + +func Iter(tenantID string, start, end time.Time) iter.Seq[string] { + minMetastoreWindow := start.Truncate(metastoreWindowSize) + maxMetastoreWindow := end.Truncate(metastoreWindowSize) + + return func(yield func(t string) bool) { + for metastoreWindow := minMetastoreWindow; !metastoreWindow.After(maxMetastoreWindow); metastoreWindow = metastoreWindow.Add(metastoreWindowSize) { + if !yield(metastorePath(tenantID, metastoreWindow)) { + return + } + } + } +} diff --git a/pkg/dataobj/metastore/metastore_test.go b/pkg/dataobj/metastore/metastore_test.go index 363e99a6b8..e3ffc924cf 100644 --- a/pkg/dataobj/metastore/metastore_test.go +++ b/pkg/dataobj/metastore/metastore_test.go @@ -97,3 +97,85 @@ func TestWriteMetastores(t *testing.T) { require.Greater(t, len(obj), originalSize) } } + +func TestIter(t *testing.T) { + tenantID := "TEST" + now := time.Date(2025, 1, 1, 15, 0, 0, 0, time.UTC) + + for _, tc := range []struct { + name string + start time.Time + end time.Time + expected []string + }{ + { + name: "within single window", + start: now, + end: now.Add(1 * time.Hour), + expected: []string{"tenant-TEST/metastore/2025-01-01T12:00:00Z.store"}, + }, + { + name: "same start and end", + start: now, + end: now, + expected: []string{"tenant-TEST/metastore/2025-01-01T12:00:00Z.store"}, + }, + { + name: "begin at start of window", + start: now.Add(-3 * time.Hour), + end: now, + expected: []string{ + "tenant-TEST/metastore/2025-01-01T12:00:00Z.store", + }, + }, + { + name: "end at start of next window", + start: now.Add(-4 * time.Hour), + end: now.Add(-3 * time.Hour), + expected: []string{ + "tenant-TEST/metastore/2025-01-01T00:00:00Z.store", + "tenant-TEST/metastore/2025-01-01T12:00:00Z.store", + }, + }, + { + name: "start and end in different windows", + start: now.Add(-12 * time.Hour), + end: now, + expected: []string{ + "tenant-TEST/metastore/2025-01-01T00:00:00Z.store", + "tenant-TEST/metastore/2025-01-01T12:00:00Z.store", + }, + }, + { + name: "span several windows", + start: now, + end: now.Add(48 * time.Hour), + expected: []string{ + "tenant-TEST/metastore/2025-01-01T12:00:00Z.store", + "tenant-TEST/metastore/2025-01-02T00:00:00Z.store", + "tenant-TEST/metastore/2025-01-02T12:00:00Z.store", + "tenant-TEST/metastore/2025-01-03T00:00:00Z.store", + "tenant-TEST/metastore/2025-01-03T12:00:00Z.store", + }, + }, + { + name: "start and end in different years", + start: time.Date(2024, 12, 31, 3, 0, 0, 0, time.UTC), + end: time.Date(2025, 1, 1, 9, 0, 0, 0, time.UTC), + expected: []string{ + "tenant-TEST/metastore/2024-12-31T00:00:00Z.store", + "tenant-TEST/metastore/2024-12-31T12:00:00Z.store", + "tenant-TEST/metastore/2025-01-01T00:00:00Z.store", + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + iter := Iter(tenantID, tc.start, tc.end) + actual := []string{} + for store := range iter { + actual = append(actual, store) + } + require.Equal(t, tc.expected, actual) + }) + } +}