diff --git a/pkg/dataobj/internal/encoding/dataset_logs.go b/pkg/dataobj/internal/encoding/dataset_logs.go index 35c6f46936..ce04cf7e1e 100644 --- a/pkg/dataobj/internal/encoding/dataset_logs.go +++ b/pkg/dataobj/internal/encoding/dataset_logs.go @@ -40,30 +40,52 @@ func (ds *logsDataset) ListColumns(ctx context.Context) result.Seq[dataset.Colum } func (ds *logsDataset) ListPages(ctx context.Context, columns []dataset.Column) result.Seq[dataset.Pages] { - // TODO(rfratto): Switch to batch retrieval instead of iterating over each column. + // We unwrap columns to get the underlying metadata and rewrap to + // dataset.Page to be able to allow the underlying decoder to read multiple + // column metadatas in a single call. return result.Iter(func(yield func(dataset.Pages) bool) error { - for _, column := range columns { - pages, err := result.Collect(column.ListPages(ctx)) - if err != nil { - return err - } else if !yield(pages) { - return nil + descs := make([]*logsmd.ColumnDesc, len(columns)) + for i, column := range columns { + column, ok := column.(*logsDatasetColumn) + if !ok { + return fmt.Errorf("unexpected column type: got=%T want=*logsDatasetColumn", column) } + descs[i] = column.desc } + for result := range ds.dec.Pages(ctx, descs) { + pagesDescs, err := result.Value() + + pages := make([]dataset.Page, len(pagesDescs)) + for i, pageDesc := range pagesDescs { + pages[i] = &logsDatasetPage{dec: ds.dec, desc: pageDesc} + } + if err != nil || !yield(pages) { + return err + } + } return nil }) } func (ds *logsDataset) ReadPages(ctx context.Context, pages []dataset.Page) result.Seq[dataset.PageData] { - // TODO(rfratto): Switch to batch retrieval instead of iterating over each page. + // We unwrap columns to get the underlying metadata and rewrap to + // dataset.Page to be able to allow the underlying decoder to read multiple + // pages in a single call. return result.Iter(func(yield func(dataset.PageData) bool) error { - for _, page := range pages { - data, err := page.ReadPage(ctx) - if err != nil { + descs := make([]*logsmd.PageDesc, len(pages)) + for i, page := range pages { + page, ok := page.(*logsDatasetPage) + if !ok { + return fmt.Errorf("unexpected page type: got=%T want=*logsDatasetPage", page) + } + descs[i] = page.desc + } + + for result := range ds.dec.ReadPages(ctx, descs) { + data, err := result.Value() + if err != nil || !yield(data) { return err - } else if !yield(data) { - return nil } } diff --git a/pkg/dataobj/internal/encoding/dataset_streams.go b/pkg/dataobj/internal/encoding/dataset_streams.go index 9442a50686..d7592d3cf2 100644 --- a/pkg/dataobj/internal/encoding/dataset_streams.go +++ b/pkg/dataobj/internal/encoding/dataset_streams.go @@ -40,30 +40,52 @@ func (ds *streamsDataset) ListColumns(ctx context.Context) result.Seq[dataset.Co } func (ds *streamsDataset) ListPages(ctx context.Context, columns []dataset.Column) result.Seq[dataset.Pages] { - // TODO(rfratto): Switch to batch retrieval instead of iterating over each column. + // We unwrap columns to get the underlying metadata and rewrap to + // dataset.Page to be able to allow the underlying decoder to read multiple + // column metadatas in a single call. return result.Iter(func(yield func(dataset.Pages) bool) error { - for _, column := range columns { - pages, err := result.Collect(column.ListPages(ctx)) - if err != nil { - return err - } else if !yield(pages) { - return nil + descs := make([]*streamsmd.ColumnDesc, len(columns)) + for i, column := range columns { + column, ok := column.(*streamsDatasetColumn) + if !ok { + return fmt.Errorf("unexpected column type: got=%T want=*streamsDatasetColumn", column) } + descs[i] = column.desc } + for result := range ds.dec.Pages(ctx, descs) { + pagesDescs, err := result.Value() + + pages := make([]dataset.Page, len(pagesDescs)) + for i, pageDesc := range pagesDescs { + pages[i] = &streamsDatasetPage{dec: ds.dec, desc: pageDesc} + } + if err != nil || !yield(pages) { + return err + } + } return nil }) } func (ds *streamsDataset) ReadPages(ctx context.Context, pages []dataset.Page) result.Seq[dataset.PageData] { - // TODO(rfratto): Switch to batch retrieval instead of iterating over each page. + // We unwrap columns to get the underlying metadata and rewrap to + // dataset.Page to be able to allow the underlying decoder to read multiple + // pages in a single call. return result.Iter(func(yield func(dataset.PageData) bool) error { - for _, page := range pages { - data, err := page.ReadPage(ctx) - if err != nil { + descs := make([]*streamsmd.PageDesc, len(pages)) + for i, page := range pages { + page, ok := page.(*streamsDatasetPage) + if !ok { + return fmt.Errorf("unexpected page type: got=%T want=*streamsDatasetPage", page) + } + descs[i] = page.desc + } + + for result := range ds.dec.ReadPages(ctx, descs) { + data, err := result.Value() + if err != nil || !yield(data) { return err - } else if !yield(data) { - return nil } } diff --git a/pkg/dataobj/internal/encoding/decoder_range.go b/pkg/dataobj/internal/encoding/decoder_range.go index 19d5847d2f..22b9083868 100644 --- a/pkg/dataobj/internal/encoding/decoder_range.go +++ b/pkg/dataobj/internal/encoding/decoder_range.go @@ -1,6 +1,7 @@ package encoding import ( + "bytes" "context" "fmt" "io" @@ -12,6 +13,16 @@ import ( "github.com/grafana/loki/v3/pkg/dataobj/internal/result" ) +// windowSize specifies the maximum amount of data to download at once from +// object storage. 16MB is chosen based on S3's [recommendations] for +// Byte-Range fetches, which recommends either 8MB or 16MB. +// +// As windowing is designed to reduce the number of requests made to object +// storage, 16MB is chosen over 8MB, as it will lead to fewer requests. +// +// [recommendations]: https://docs.aws.amazon.com/whitepapers/latest/s3-optimizing-performance-best-practices/use-byte-range-fetches.html +const windowSize = 16_000_000 + // rangeReader is an interface that can read a range of bytes from an object. type rangeReader interface { // Size returns the full size of the object. @@ -114,31 +125,54 @@ func (rd *rangeStreamsDecoder) Columns(ctx context.Context, section *filemd.Sect } func (rd *rangeStreamsDecoder) Pages(ctx context.Context, columns []*streamsmd.ColumnDesc) result.Seq[[]*streamsmd.PageDesc] { - getPages := func(ctx context.Context, column *streamsmd.ColumnDesc) ([]*streamsmd.PageDesc, error) { - rc, err := rd.rr.ReadRange(ctx, int64(column.Info.MetadataOffset), int64(column.Info.MetadataSize)) - if err != nil { - return nil, fmt.Errorf("reading column metadata: %w", err) + return result.Iter(func(yield func([]*streamsmd.PageDesc) bool) error { + results := make([][]*streamsmd.PageDesc, len(columns)) + + columnInfo := func(c *streamsmd.ColumnDesc) (uint64, uint64) { + return c.GetInfo().MetadataOffset, c.GetInfo().MetadataSize } - defer rc.Close() - br, release := getBufioReader(rc) - defer release() + for window := range iterWindows(columns, columnInfo, windowSize) { + if len(window) == 0 { + continue + } - md, err := decodeStreamsColumnMetadata(br) - if err != nil { - return nil, err - } - return md.Pages, nil - } + var ( + windowOffset = window.Start().GetInfo().MetadataOffset + windowSize = (window.End().GetInfo().MetadataOffset + window.End().GetInfo().MetadataSize) - windowOffset + ) - // TODO(rfratto): this retrieves all pages for all columns individually; we - // may be able to batch requests to minimize roundtrips. - return result.Iter(func(yield func([]*streamsmd.PageDesc) bool) error { - for _, column := range columns { - pages, err := getPages(ctx, column) + rc, err := rd.rr.ReadRange(ctx, int64(windowOffset), int64(windowSize)) + if err != nil { + return fmt.Errorf("reading column data: %w", err) + } + data, err := readAndClose(rc, windowSize) if err != nil { - return err - } else if !yield(pages) { + return fmt.Errorf("read column data: %w", err) + } + + for _, wp := range window { + // Find the slice in the data for this column. + var ( + columnOffset = wp.Data.GetInfo().MetadataOffset + dataOffset = columnOffset - windowOffset + ) + + r := bytes.NewReader(data[dataOffset : dataOffset+wp.Data.GetInfo().MetadataSize]) + + md, err := decodeStreamsColumnMetadata(r) + if err != nil { + return err + } + + // wp.Position is the position of the column in the original pages + // slice; this retains the proper order of data in results. + results[wp.Position] = md.Pages + } + } + + for _, data := range results { + if !yield(data) { return nil } } @@ -147,32 +181,61 @@ func (rd *rangeStreamsDecoder) Pages(ctx context.Context, columns []*streamsmd.C }) } +// readAndClose reads exactly size bytes from rc and then closes it. +func readAndClose(rc io.ReadCloser, size uint64) ([]byte, error) { + defer rc.Close() + + data := make([]byte, size) + if _, err := io.ReadFull(rc, data); err != nil { + return nil, fmt.Errorf("read column data: %w", err) + } + return data, nil +} + func (rd *rangeStreamsDecoder) ReadPages(ctx context.Context, pages []*streamsmd.PageDesc) result.Seq[dataset.PageData] { - getPageData := func(ctx context.Context, page *streamsmd.PageDesc) (dataset.PageData, error) { - rc, err := rd.rr.ReadRange(ctx, int64(page.Info.DataOffset), int64(page.Info.DataSize)) - if err != nil { - return nil, fmt.Errorf("reading page data: %w", err) + return result.Iter(func(yield func(dataset.PageData) bool) error { + results := make([]dataset.PageData, len(pages)) + + pageInfo := func(p *streamsmd.PageDesc) (uint64, uint64) { + return p.GetInfo().DataOffset, p.GetInfo().DataSize } - defer rc.Close() - br, release := getBufioReader(rc) - defer release() + // TODO(rfratto): If there are many windows, it may make sense to read them + // in parallel. + for window := range iterWindows(pages, pageInfo, windowSize) { + if len(window) == 0 { + continue + } - data := make([]byte, page.Info.DataSize) - if _, err := io.ReadFull(br, data); err != nil { - return nil, fmt.Errorf("read page data: %w", err) - } - return dataset.PageData(data), nil - } + var ( + windowOffset = window.Start().GetInfo().DataOffset + windowSize = (window.End().GetInfo().DataOffset + window.End().GetInfo().DataSize) - windowOffset + ) - // TODO(rfratto): this retrieves all pages for all columns individually; we - // may be able to batch requests to minimize roundtrips. - return result.Iter(func(yield func(dataset.PageData) bool) error { - for _, page := range pages { - data, err := getPageData(ctx, page) + rc, err := rd.rr.ReadRange(ctx, int64(windowOffset), int64(windowSize)) + if err != nil { + return fmt.Errorf("reading page data: %w", err) + } + data, err := readAndClose(rc, windowSize) if err != nil { - return err - } else if !yield(data) { + return fmt.Errorf("read page data: %w", err) + } + + for _, wp := range window { + // Find the slice in the data for this page. + var ( + pageOffset = wp.Data.GetInfo().DataOffset + dataOffset = pageOffset - windowOffset + ) + + // wp.Position is the position of the page in the original pages slice; + // this retains the proper order of data in results. + results[wp.Position] = dataset.PageData(data[dataOffset : dataOffset+wp.Data.GetInfo().DataSize]) + } + } + + for _, data := range results { + if !yield(data) { return nil } } @@ -206,31 +269,54 @@ func (rd *rangeLogsDecoder) Columns(ctx context.Context, section *filemd.Section } func (rd *rangeLogsDecoder) Pages(ctx context.Context, columns []*logsmd.ColumnDesc) result.Seq[[]*logsmd.PageDesc] { - getPages := func(ctx context.Context, column *logsmd.ColumnDesc) ([]*logsmd.PageDesc, error) { - rc, err := rd.rr.ReadRange(ctx, int64(column.Info.MetadataOffset), int64(column.Info.MetadataSize)) - if err != nil { - return nil, fmt.Errorf("reading column metadata: %w", err) + return result.Iter(func(yield func([]*logsmd.PageDesc) bool) error { + results := make([][]*logsmd.PageDesc, len(columns)) + + columnInfo := func(c *logsmd.ColumnDesc) (uint64, uint64) { + return c.GetInfo().MetadataOffset, c.GetInfo().MetadataSize } - defer rc.Close() - br, release := getBufioReader(rc) - defer release() + for window := range iterWindows(columns, columnInfo, windowSize) { + if len(window) == 0 { + continue + } - md, err := decodeLogsColumnMetadata(br) - if err != nil { - return nil, err - } - return md.Pages, nil - } + var ( + windowOffset = window.Start().GetInfo().MetadataOffset + windowSize = (window.End().GetInfo().MetadataOffset + window.End().GetInfo().MetadataSize) - windowOffset + ) - // TODO(rfratto): this retrieves all pages for all columns individually; we - // may be able to batch requests to minimize roundtrips. - return result.Iter(func(yield func([]*logsmd.PageDesc) bool) error { - for _, column := range columns { - pages, err := getPages(ctx, column) + rc, err := rd.rr.ReadRange(ctx, int64(windowOffset), int64(windowSize)) if err != nil { - return err - } else if !yield(pages) { + return fmt.Errorf("reading column data: %w", err) + } + data, err := readAndClose(rc, windowSize) + if err != nil { + return fmt.Errorf("read page data: %w", err) + } + + for _, wp := range window { + // Find the slice in the data for this column. + var ( + columnOffset = wp.Data.GetInfo().MetadataOffset + dataOffset = columnOffset - windowOffset + ) + + r := bytes.NewReader(data[dataOffset : dataOffset+wp.Data.GetInfo().MetadataSize]) + + md, err := decodeLogsColumnMetadata(r) + if err != nil { + return err + } + + // wp.Position is the position of the column in the original pages + // slice; this retains the proper order of data in results. + results[wp.Position] = md.Pages + } + } + + for _, data := range results { + if !yield(data) { return nil } } @@ -240,31 +326,49 @@ func (rd *rangeLogsDecoder) Pages(ctx context.Context, columns []*logsmd.ColumnD } func (rd *rangeLogsDecoder) ReadPages(ctx context.Context, pages []*logsmd.PageDesc) result.Seq[dataset.PageData] { - getPageData := func(ctx context.Context, page *logsmd.PageDesc) (dataset.PageData, error) { - rc, err := rd.rr.ReadRange(ctx, int64(page.Info.DataOffset), int64(page.Info.DataSize)) - if err != nil { - return nil, fmt.Errorf("reading page data: %w", err) + return result.Iter(func(yield func(dataset.PageData) bool) error { + results := make([]dataset.PageData, len(pages)) + + pageInfo := func(p *logsmd.PageDesc) (uint64, uint64) { + return p.GetInfo().DataOffset, p.GetInfo().DataSize } - defer rc.Close() - br, release := getBufioReader(rc) - defer release() + // TODO(rfratto): If there are many windows, it may make sense to read them + // in parallel. + for window := range iterWindows(pages, pageInfo, windowSize) { + if len(window) == 0 { + continue + } - data := make([]byte, page.Info.DataSize) - if _, err := io.ReadFull(br, data); err != nil { - return nil, fmt.Errorf("read page data: %w", err) - } - return dataset.PageData(data), nil - } + var ( + windowOffset = window.Start().GetInfo().DataOffset + windowSize = (window.End().GetInfo().DataOffset + window.End().GetInfo().DataSize) - windowOffset + ) - // TODO(rfratto): this retrieves all pages for all columns individually; we - // may be able to batch requests to minimize roundtrips. - return result.Iter(func(yield func(dataset.PageData) bool) error { - for _, page := range pages { - data, err := getPageData(ctx, page) + rc, err := rd.rr.ReadRange(ctx, int64(windowOffset), int64(windowSize)) if err != nil { - return err - } else if !yield(data) { + return fmt.Errorf("reading page data: %w", err) + } + data, err := readAndClose(rc, windowSize) + if err != nil { + return fmt.Errorf("read page data: %w", err) + } + + for _, wp := range window { + // Find the slice in the data for this page. + var ( + pageOffset = wp.Data.GetInfo().DataOffset + dataOffset = pageOffset - windowOffset + ) + + // wp.Position is the position of the page in the original pages slice; + // this retains the proper order of data in results. + results[wp.Position] = dataset.PageData(data[dataOffset : dataOffset+wp.Data.GetInfo().DataSize]) + } + } + + for _, data := range results { + if !yield(data) { return nil } } diff --git a/pkg/dataobj/internal/encoding/windowing.go b/pkg/dataobj/internal/encoding/windowing.go new file mode 100644 index 0000000000..6237d0ea37 --- /dev/null +++ b/pkg/dataobj/internal/encoding/windowing.go @@ -0,0 +1,109 @@ +package encoding + +import ( + "cmp" + "iter" + "slices" +) + +// The windowing utilities allow for grouping subsections of a file into +// windows of a specified size; for example, given a slices of pages to +// download and a window size of 16MB, pages will be grouped such that the +// first byte of the first page and the last byte of the last page are no more +// than 16MB apart. + +// window represents a window of file subsections. +type window[T any] []windowedElement[T] + +// Start returns the first element in the window. +func (w window[T]) Start() T { + var zero T + if len(w) == 0 { + return zero + } + return w[0].Data +} + +// End returns the last element in the window. +func (w window[T]) End() T { + var zero T + if len(w) == 0 { + return zero + } + return w[len(w)-1].Data +} + +type windowedElement[T any] struct { + Data T // Windowed data. + Position int // Position of the element in the original slice pre-windowing. +} + +type getElementInfo[T any] func(v T) (offset, size uint64) + +// iterWindows groups elements into windows of a specified size, returning an +// iterator over the windows. The input slice is not modified. +func iterWindows[T any](elements []T, getInfo getElementInfo[T], windowSize int64) iter.Seq[window[T]] { + // Sort elements by their start position. + sortedElements := make(window[T], len(elements)) + for i, element := range elements { + sortedElements[i] = windowedElement[T]{Data: element, Position: i} + } + slices.SortFunc(sortedElements, func(a, b windowedElement[T]) int { + aOffset, _ := getInfo(a.Data) + bOffset, _ := getInfo(b.Data) + return cmp.Compare(aOffset, bOffset) + }) + + return func(yield func(window[T]) bool) { + var start, end int + + for end < len(sortedElements) { + startElement := sortedElements[start] + currentElement := sortedElements[end] + + var ( + startOffset, _ = getInfo(startElement.Data) + endOffset, endSize = getInfo(currentElement.Data) + ) + + var ( + startByte = startOffset + endByte = endOffset + endSize + ) + + switch { + case endByte-startByte > uint64(windowSize) && start == end: + // We have an empty window and the element is larger than the current + // window size. We want to immediately add the page into the window and + // yield what we have. + end++ + + if !yield(sortedElements[start:end]) { + return + } + start = end + + case endByte-startByte > uint64(windowSize) && start < end: + // Including end in the window would exceed the window size; we yield + // everything up to end and start a new window from end. + // + // We *do not* increment end here; if we did, we would start with two + // elements in the next window. + if !yield(sortedElements[start:end]) { + return + } + start = end + + default: + // The element fits within the window size; move end forward so it gets + // included. + end++ + } + } + + // Yield all remaining elements. + if start < len(sortedElements) { + yield(sortedElements[start:]) + } + } +} diff --git a/pkg/dataobj/internal/encoding/windowing_test.go b/pkg/dataobj/internal/encoding/windowing_test.go new file mode 100644 index 0000000000..4482ca53a5 --- /dev/null +++ b/pkg/dataobj/internal/encoding/windowing_test.go @@ -0,0 +1,147 @@ +package encoding + +import ( + "fmt" + "slices" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd" +) + +func Test_windowPages(t *testing.T) { + tt := []struct { + name string + pages []*fakePageDesc + windowSize int64 + expect []window[*fakePageDesc] + }{ + { + name: "empty pages", + pages: nil, + windowSize: 1_000_000, + expect: nil, + }, + { + name: "single page smaller than window", + pages: []*fakePageDesc{newFakePage(0, 100)}, + windowSize: 1_000_000, + expect: []window[*fakePageDesc]{ + {{Data: newFakePage(0, 100), Position: 0}}, + }, + }, + { + name: "single page larger than window", + pages: []*fakePageDesc{newFakePage(0, 5_000_000)}, + windowSize: 5_000_000, + expect: []window[*fakePageDesc]{ + {{Data: newFakePage(0, 5_000_000), Position: 0}}, + }, + }, + { + name: "basic grouping", + pages: []*fakePageDesc{ + newFakePage(0, 100), + newFakePage(100, 100), + newFakePage(200, 100), + + newFakePage(1500, 100), + newFakePage(1600, 100), + }, + windowSize: 1000, + expect: []window[*fakePageDesc]{ + { + {Data: newFakePage(0, 100), Position: 0}, + {Data: newFakePage(100, 100), Position: 1}, + {Data: newFakePage(200, 100), Position: 2}, + }, + { + {Data: newFakePage(1500, 100), Position: 3}, + {Data: newFakePage(1600, 100), Position: 4}, + }, + }, + }, + { + name: "basic grouping (unordered)", + pages: []*fakePageDesc{ + newFakePage(1500, 100), + newFakePage(200, 100), + newFakePage(100, 100), + + newFakePage(1600, 100), + newFakePage(0, 100), + }, + windowSize: 1000, + expect: []window[*fakePageDesc]{ + { + {Data: newFakePage(0, 100), Position: 4}, + {Data: newFakePage(100, 100), Position: 2}, + {Data: newFakePage(200, 100), Position: 1}, + }, + { + {Data: newFakePage(1500, 100), Position: 0}, + {Data: newFakePage(1600, 100), Position: 3}, + }, + }, + }, + { + name: "grouping with large page", + pages: []*fakePageDesc{ + newFakePage(0, 100), + newFakePage(100, 100), + newFakePage(200, 1000), + newFakePage(300, 100), + newFakePage(400, 100), + }, + windowSize: 500, + expect: []window[*fakePageDesc]{ + { + {Data: newFakePage(0, 100), Position: 0}, + {Data: newFakePage(100, 100), Position: 1}, + }, + { + {Data: newFakePage(200, 1000), Position: 2}, + }, + { + {Data: newFakePage(300, 100), Position: 3}, + {Data: newFakePage(400, 100), Position: 4}, + }, + }, + }, + } + + for _, tc := range tt { + t.Run(tc.name, func(t *testing.T) { + getInfo := func(p *fakePageDesc) (uint64, uint64) { + return p.Info.DataOffset, p.Info.DataSize + } + actual := slices.Collect(iterWindows(tc.pages, getInfo, tc.windowSize)) + + for wi, w := range actual { + for pi, p := range w { + t.Logf("window %d page %d: %#v\n", wi, pi, p.Data) + } + } + + require.Equal(t, tc.expect, actual) + }) + } +} + +type fakePageDesc struct{ Info *datasetmd.PageInfo } + +func (f *fakePageDesc) GetInfo() *datasetmd.PageInfo { return f.Info } + +func (f *fakePageDesc) GoString() string { + return fmt.Sprintf("(start: %d, size: %d)", f.Info.DataOffset, f.Info.DataSize) +} + +func newFakePage(offset, size uint64) *fakePageDesc { + return &fakePageDesc{ + Info: &datasetmd.PageInfo{ + DataOffset: offset, + DataSize: size, + }, + } +}