chore(dataobj): Add iterator for metastores (#16114)

pull/16115/head
benclive 4 months ago committed by GitHub
parent 398c340b4f
commit b7f6bb359b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 23
      pkg/dataobj/metastore/metastore.go
  2. 82
      pkg/dataobj/metastore/metastore_test.go

@ -5,6 +5,7 @@ import (
"context"
"fmt"
"io"
"iter"
"sync"
"time"
@ -97,10 +98,7 @@ func (m *Manager) UpdateMetastore(ctx context.Context, dataobjPath string, flush
// Work our way through the metastore objects window by window, updating & creating them as needed.
// Each one handles its own retries in order to keep making progress in the event of a failure.
minMetastoreWindow := minTimestamp.Truncate(metastoreWindowSize)
maxMetastoreWindow := maxTimestamp.Truncate(metastoreWindowSize)
for metastoreWindow := minMetastoreWindow; !metastoreWindow.After(maxMetastoreWindow); metastoreWindow = metastoreWindow.Add(metastoreWindowSize) {
metastorePath := fmt.Sprintf("tenant-%s/metastore/%s.store", m.tenantID, metastoreWindow.Format(time.RFC3339))
for metastorePath := range Iter(m.tenantID, minTimestamp, maxTimestamp) {
m.backoff.Reset()
for m.backoff.Ongoing() {
err = m.bucket.GetAndReplace(ctx, metastorePath, func(existing io.Reader) (io.Reader, error) {
@ -183,3 +181,20 @@ func (m *Manager) readFromExisting(ctx context.Context, object *dataobj.Object)
}
return nil
}
func metastorePath(tenantID string, window time.Time) string {
return fmt.Sprintf("tenant-%s/metastore/%s.store", tenantID, window.Format(time.RFC3339))
}
func Iter(tenantID string, start, end time.Time) iter.Seq[string] {
minMetastoreWindow := start.Truncate(metastoreWindowSize)
maxMetastoreWindow := end.Truncate(metastoreWindowSize)
return func(yield func(t string) bool) {
for metastoreWindow := minMetastoreWindow; !metastoreWindow.After(maxMetastoreWindow); metastoreWindow = metastoreWindow.Add(metastoreWindowSize) {
if !yield(metastorePath(tenantID, metastoreWindow)) {
return
}
}
}
}

@ -97,3 +97,85 @@ func TestWriteMetastores(t *testing.T) {
require.Greater(t, len(obj), originalSize)
}
}
func TestIter(t *testing.T) {
tenantID := "TEST"
now := time.Date(2025, 1, 1, 15, 0, 0, 0, time.UTC)
for _, tc := range []struct {
name string
start time.Time
end time.Time
expected []string
}{
{
name: "within single window",
start: now,
end: now.Add(1 * time.Hour),
expected: []string{"tenant-TEST/metastore/2025-01-01T12:00:00Z.store"},
},
{
name: "same start and end",
start: now,
end: now,
expected: []string{"tenant-TEST/metastore/2025-01-01T12:00:00Z.store"},
},
{
name: "begin at start of window",
start: now.Add(-3 * time.Hour),
end: now,
expected: []string{
"tenant-TEST/metastore/2025-01-01T12:00:00Z.store",
},
},
{
name: "end at start of next window",
start: now.Add(-4 * time.Hour),
end: now.Add(-3 * time.Hour),
expected: []string{
"tenant-TEST/metastore/2025-01-01T00:00:00Z.store",
"tenant-TEST/metastore/2025-01-01T12:00:00Z.store",
},
},
{
name: "start and end in different windows",
start: now.Add(-12 * time.Hour),
end: now,
expected: []string{
"tenant-TEST/metastore/2025-01-01T00:00:00Z.store",
"tenant-TEST/metastore/2025-01-01T12:00:00Z.store",
},
},
{
name: "span several windows",
start: now,
end: now.Add(48 * time.Hour),
expected: []string{
"tenant-TEST/metastore/2025-01-01T12:00:00Z.store",
"tenant-TEST/metastore/2025-01-02T00:00:00Z.store",
"tenant-TEST/metastore/2025-01-02T12:00:00Z.store",
"tenant-TEST/metastore/2025-01-03T00:00:00Z.store",
"tenant-TEST/metastore/2025-01-03T12:00:00Z.store",
},
},
{
name: "start and end in different years",
start: time.Date(2024, 12, 31, 3, 0, 0, 0, time.UTC),
end: time.Date(2025, 1, 1, 9, 0, 0, 0, time.UTC),
expected: []string{
"tenant-TEST/metastore/2024-12-31T00:00:00Z.store",
"tenant-TEST/metastore/2024-12-31T12:00:00Z.store",
"tenant-TEST/metastore/2025-01-01T00:00:00Z.store",
},
},
} {
t.Run(tc.name, func(t *testing.T) {
iter := Iter(tenantID, tc.start, tc.end)
actual := []string{}
for store := range iter {
actual = append(actual, store)
}
require.Equal(t, tc.expected, actual)
})
}
}

Loading…
Cancel
Save