chore(dataobj): Download pages in 16MB batches (#16689)

pull/16709/head
Robert Fratto 2 months ago committed by GitHub
parent bc2111e8f1
commit 8c4eb428c3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 48
      pkg/dataobj/internal/encoding/dataset_logs.go
  2. 48
      pkg/dataobj/internal/encoding/dataset_streams.go
  3. 264
      pkg/dataobj/internal/encoding/decoder_range.go
  4. 109
      pkg/dataobj/internal/encoding/windowing.go
  5. 147
      pkg/dataobj/internal/encoding/windowing_test.go

@ -40,30 +40,52 @@ func (ds *logsDataset) ListColumns(ctx context.Context) result.Seq[dataset.Colum
}
func (ds *logsDataset) ListPages(ctx context.Context, columns []dataset.Column) result.Seq[dataset.Pages] {
// TODO(rfratto): Switch to batch retrieval instead of iterating over each column.
// We unwrap columns to get the underlying metadata and rewrap to
// dataset.Page to be able to allow the underlying decoder to read multiple
// column metadatas in a single call.
return result.Iter(func(yield func(dataset.Pages) bool) error {
for _, column := range columns {
pages, err := result.Collect(column.ListPages(ctx))
if err != nil {
return err
} else if !yield(pages) {
return nil
descs := make([]*logsmd.ColumnDesc, len(columns))
for i, column := range columns {
column, ok := column.(*logsDatasetColumn)
if !ok {
return fmt.Errorf("unexpected column type: got=%T want=*logsDatasetColumn", column)
}
descs[i] = column.desc
}
for result := range ds.dec.Pages(ctx, descs) {
pagesDescs, err := result.Value()
pages := make([]dataset.Page, len(pagesDescs))
for i, pageDesc := range pagesDescs {
pages[i] = &logsDatasetPage{dec: ds.dec, desc: pageDesc}
}
if err != nil || !yield(pages) {
return err
}
}
return nil
})
}
func (ds *logsDataset) ReadPages(ctx context.Context, pages []dataset.Page) result.Seq[dataset.PageData] {
// TODO(rfratto): Switch to batch retrieval instead of iterating over each page.
// We unwrap columns to get the underlying metadata and rewrap to
// dataset.Page to be able to allow the underlying decoder to read multiple
// pages in a single call.
return result.Iter(func(yield func(dataset.PageData) bool) error {
for _, page := range pages {
data, err := page.ReadPage(ctx)
if err != nil {
descs := make([]*logsmd.PageDesc, len(pages))
for i, page := range pages {
page, ok := page.(*logsDatasetPage)
if !ok {
return fmt.Errorf("unexpected page type: got=%T want=*logsDatasetPage", page)
}
descs[i] = page.desc
}
for result := range ds.dec.ReadPages(ctx, descs) {
data, err := result.Value()
if err != nil || !yield(data) {
return err
} else if !yield(data) {
return nil
}
}

@ -40,30 +40,52 @@ func (ds *streamsDataset) ListColumns(ctx context.Context) result.Seq[dataset.Co
}
func (ds *streamsDataset) ListPages(ctx context.Context, columns []dataset.Column) result.Seq[dataset.Pages] {
// TODO(rfratto): Switch to batch retrieval instead of iterating over each column.
// We unwrap columns to get the underlying metadata and rewrap to
// dataset.Page to be able to allow the underlying decoder to read multiple
// column metadatas in a single call.
return result.Iter(func(yield func(dataset.Pages) bool) error {
for _, column := range columns {
pages, err := result.Collect(column.ListPages(ctx))
if err != nil {
return err
} else if !yield(pages) {
return nil
descs := make([]*streamsmd.ColumnDesc, len(columns))
for i, column := range columns {
column, ok := column.(*streamsDatasetColumn)
if !ok {
return fmt.Errorf("unexpected column type: got=%T want=*streamsDatasetColumn", column)
}
descs[i] = column.desc
}
for result := range ds.dec.Pages(ctx, descs) {
pagesDescs, err := result.Value()
pages := make([]dataset.Page, len(pagesDescs))
for i, pageDesc := range pagesDescs {
pages[i] = &streamsDatasetPage{dec: ds.dec, desc: pageDesc}
}
if err != nil || !yield(pages) {
return err
}
}
return nil
})
}
func (ds *streamsDataset) ReadPages(ctx context.Context, pages []dataset.Page) result.Seq[dataset.PageData] {
// TODO(rfratto): Switch to batch retrieval instead of iterating over each page.
// We unwrap columns to get the underlying metadata and rewrap to
// dataset.Page to be able to allow the underlying decoder to read multiple
// pages in a single call.
return result.Iter(func(yield func(dataset.PageData) bool) error {
for _, page := range pages {
data, err := page.ReadPage(ctx)
if err != nil {
descs := make([]*streamsmd.PageDesc, len(pages))
for i, page := range pages {
page, ok := page.(*streamsDatasetPage)
if !ok {
return fmt.Errorf("unexpected page type: got=%T want=*streamsDatasetPage", page)
}
descs[i] = page.desc
}
for result := range ds.dec.ReadPages(ctx, descs) {
data, err := result.Value()
if err != nil || !yield(data) {
return err
} else if !yield(data) {
return nil
}
}

@ -1,6 +1,7 @@
package encoding
import (
"bytes"
"context"
"fmt"
"io"
@ -12,6 +13,16 @@ import (
"github.com/grafana/loki/v3/pkg/dataobj/internal/result"
)
// windowSize specifies the maximum amount of data to download at once from
// object storage. 16MB is chosen based on S3's [recommendations] for
// Byte-Range fetches, which recommends either 8MB or 16MB.
//
// As windowing is designed to reduce the number of requests made to object
// storage, 16MB is chosen over 8MB, as it will lead to fewer requests.
//
// [recommendations]: https://docs.aws.amazon.com/whitepapers/latest/s3-optimizing-performance-best-practices/use-byte-range-fetches.html
const windowSize = 16_000_000
// rangeReader is an interface that can read a range of bytes from an object.
type rangeReader interface {
// Size returns the full size of the object.
@ -114,31 +125,54 @@ func (rd *rangeStreamsDecoder) Columns(ctx context.Context, section *filemd.Sect
}
func (rd *rangeStreamsDecoder) Pages(ctx context.Context, columns []*streamsmd.ColumnDesc) result.Seq[[]*streamsmd.PageDesc] {
getPages := func(ctx context.Context, column *streamsmd.ColumnDesc) ([]*streamsmd.PageDesc, error) {
rc, err := rd.rr.ReadRange(ctx, int64(column.Info.MetadataOffset), int64(column.Info.MetadataSize))
if err != nil {
return nil, fmt.Errorf("reading column metadata: %w", err)
return result.Iter(func(yield func([]*streamsmd.PageDesc) bool) error {
results := make([][]*streamsmd.PageDesc, len(columns))
columnInfo := func(c *streamsmd.ColumnDesc) (uint64, uint64) {
return c.GetInfo().MetadataOffset, c.GetInfo().MetadataSize
}
defer rc.Close()
br, release := getBufioReader(rc)
defer release()
for window := range iterWindows(columns, columnInfo, windowSize) {
if len(window) == 0 {
continue
}
md, err := decodeStreamsColumnMetadata(br)
if err != nil {
return nil, err
}
return md.Pages, nil
}
var (
windowOffset = window.Start().GetInfo().MetadataOffset
windowSize = (window.End().GetInfo().MetadataOffset + window.End().GetInfo().MetadataSize) - windowOffset
)
// TODO(rfratto): this retrieves all pages for all columns individually; we
// may be able to batch requests to minimize roundtrips.
return result.Iter(func(yield func([]*streamsmd.PageDesc) bool) error {
for _, column := range columns {
pages, err := getPages(ctx, column)
rc, err := rd.rr.ReadRange(ctx, int64(windowOffset), int64(windowSize))
if err != nil {
return fmt.Errorf("reading column data: %w", err)
}
data, err := readAndClose(rc, windowSize)
if err != nil {
return err
} else if !yield(pages) {
return fmt.Errorf("read column data: %w", err)
}
for _, wp := range window {
// Find the slice in the data for this column.
var (
columnOffset = wp.Data.GetInfo().MetadataOffset
dataOffset = columnOffset - windowOffset
)
r := bytes.NewReader(data[dataOffset : dataOffset+wp.Data.GetInfo().MetadataSize])
md, err := decodeStreamsColumnMetadata(r)
if err != nil {
return err
}
// wp.Position is the position of the column in the original pages
// slice; this retains the proper order of data in results.
results[wp.Position] = md.Pages
}
}
for _, data := range results {
if !yield(data) {
return nil
}
}
@ -147,32 +181,61 @@ func (rd *rangeStreamsDecoder) Pages(ctx context.Context, columns []*streamsmd.C
})
}
// readAndClose reads exactly size bytes from rc and then closes it.
func readAndClose(rc io.ReadCloser, size uint64) ([]byte, error) {
defer rc.Close()
data := make([]byte, size)
if _, err := io.ReadFull(rc, data); err != nil {
return nil, fmt.Errorf("read column data: %w", err)
}
return data, nil
}
func (rd *rangeStreamsDecoder) ReadPages(ctx context.Context, pages []*streamsmd.PageDesc) result.Seq[dataset.PageData] {
getPageData := func(ctx context.Context, page *streamsmd.PageDesc) (dataset.PageData, error) {
rc, err := rd.rr.ReadRange(ctx, int64(page.Info.DataOffset), int64(page.Info.DataSize))
if err != nil {
return nil, fmt.Errorf("reading page data: %w", err)
return result.Iter(func(yield func(dataset.PageData) bool) error {
results := make([]dataset.PageData, len(pages))
pageInfo := func(p *streamsmd.PageDesc) (uint64, uint64) {
return p.GetInfo().DataOffset, p.GetInfo().DataSize
}
defer rc.Close()
br, release := getBufioReader(rc)
defer release()
// TODO(rfratto): If there are many windows, it may make sense to read them
// in parallel.
for window := range iterWindows(pages, pageInfo, windowSize) {
if len(window) == 0 {
continue
}
data := make([]byte, page.Info.DataSize)
if _, err := io.ReadFull(br, data); err != nil {
return nil, fmt.Errorf("read page data: %w", err)
}
return dataset.PageData(data), nil
}
var (
windowOffset = window.Start().GetInfo().DataOffset
windowSize = (window.End().GetInfo().DataOffset + window.End().GetInfo().DataSize) - windowOffset
)
// TODO(rfratto): this retrieves all pages for all columns individually; we
// may be able to batch requests to minimize roundtrips.
return result.Iter(func(yield func(dataset.PageData) bool) error {
for _, page := range pages {
data, err := getPageData(ctx, page)
rc, err := rd.rr.ReadRange(ctx, int64(windowOffset), int64(windowSize))
if err != nil {
return fmt.Errorf("reading page data: %w", err)
}
data, err := readAndClose(rc, windowSize)
if err != nil {
return err
} else if !yield(data) {
return fmt.Errorf("read page data: %w", err)
}
for _, wp := range window {
// Find the slice in the data for this page.
var (
pageOffset = wp.Data.GetInfo().DataOffset
dataOffset = pageOffset - windowOffset
)
// wp.Position is the position of the page in the original pages slice;
// this retains the proper order of data in results.
results[wp.Position] = dataset.PageData(data[dataOffset : dataOffset+wp.Data.GetInfo().DataSize])
}
}
for _, data := range results {
if !yield(data) {
return nil
}
}
@ -206,31 +269,54 @@ func (rd *rangeLogsDecoder) Columns(ctx context.Context, section *filemd.Section
}
func (rd *rangeLogsDecoder) Pages(ctx context.Context, columns []*logsmd.ColumnDesc) result.Seq[[]*logsmd.PageDesc] {
getPages := func(ctx context.Context, column *logsmd.ColumnDesc) ([]*logsmd.PageDesc, error) {
rc, err := rd.rr.ReadRange(ctx, int64(column.Info.MetadataOffset), int64(column.Info.MetadataSize))
if err != nil {
return nil, fmt.Errorf("reading column metadata: %w", err)
return result.Iter(func(yield func([]*logsmd.PageDesc) bool) error {
results := make([][]*logsmd.PageDesc, len(columns))
columnInfo := func(c *logsmd.ColumnDesc) (uint64, uint64) {
return c.GetInfo().MetadataOffset, c.GetInfo().MetadataSize
}
defer rc.Close()
br, release := getBufioReader(rc)
defer release()
for window := range iterWindows(columns, columnInfo, windowSize) {
if len(window) == 0 {
continue
}
md, err := decodeLogsColumnMetadata(br)
if err != nil {
return nil, err
}
return md.Pages, nil
}
var (
windowOffset = window.Start().GetInfo().MetadataOffset
windowSize = (window.End().GetInfo().MetadataOffset + window.End().GetInfo().MetadataSize) - windowOffset
)
// TODO(rfratto): this retrieves all pages for all columns individually; we
// may be able to batch requests to minimize roundtrips.
return result.Iter(func(yield func([]*logsmd.PageDesc) bool) error {
for _, column := range columns {
pages, err := getPages(ctx, column)
rc, err := rd.rr.ReadRange(ctx, int64(windowOffset), int64(windowSize))
if err != nil {
return err
} else if !yield(pages) {
return fmt.Errorf("reading column data: %w", err)
}
data, err := readAndClose(rc, windowSize)
if err != nil {
return fmt.Errorf("read page data: %w", err)
}
for _, wp := range window {
// Find the slice in the data for this column.
var (
columnOffset = wp.Data.GetInfo().MetadataOffset
dataOffset = columnOffset - windowOffset
)
r := bytes.NewReader(data[dataOffset : dataOffset+wp.Data.GetInfo().MetadataSize])
md, err := decodeLogsColumnMetadata(r)
if err != nil {
return err
}
// wp.Position is the position of the column in the original pages
// slice; this retains the proper order of data in results.
results[wp.Position] = md.Pages
}
}
for _, data := range results {
if !yield(data) {
return nil
}
}
@ -240,31 +326,49 @@ func (rd *rangeLogsDecoder) Pages(ctx context.Context, columns []*logsmd.ColumnD
}
func (rd *rangeLogsDecoder) ReadPages(ctx context.Context, pages []*logsmd.PageDesc) result.Seq[dataset.PageData] {
getPageData := func(ctx context.Context, page *logsmd.PageDesc) (dataset.PageData, error) {
rc, err := rd.rr.ReadRange(ctx, int64(page.Info.DataOffset), int64(page.Info.DataSize))
if err != nil {
return nil, fmt.Errorf("reading page data: %w", err)
return result.Iter(func(yield func(dataset.PageData) bool) error {
results := make([]dataset.PageData, len(pages))
pageInfo := func(p *logsmd.PageDesc) (uint64, uint64) {
return p.GetInfo().DataOffset, p.GetInfo().DataSize
}
defer rc.Close()
br, release := getBufioReader(rc)
defer release()
// TODO(rfratto): If there are many windows, it may make sense to read them
// in parallel.
for window := range iterWindows(pages, pageInfo, windowSize) {
if len(window) == 0 {
continue
}
data := make([]byte, page.Info.DataSize)
if _, err := io.ReadFull(br, data); err != nil {
return nil, fmt.Errorf("read page data: %w", err)
}
return dataset.PageData(data), nil
}
var (
windowOffset = window.Start().GetInfo().DataOffset
windowSize = (window.End().GetInfo().DataOffset + window.End().GetInfo().DataSize) - windowOffset
)
// TODO(rfratto): this retrieves all pages for all columns individually; we
// may be able to batch requests to minimize roundtrips.
return result.Iter(func(yield func(dataset.PageData) bool) error {
for _, page := range pages {
data, err := getPageData(ctx, page)
rc, err := rd.rr.ReadRange(ctx, int64(windowOffset), int64(windowSize))
if err != nil {
return err
} else if !yield(data) {
return fmt.Errorf("reading page data: %w", err)
}
data, err := readAndClose(rc, windowSize)
if err != nil {
return fmt.Errorf("read page data: %w", err)
}
for _, wp := range window {
// Find the slice in the data for this page.
var (
pageOffset = wp.Data.GetInfo().DataOffset
dataOffset = pageOffset - windowOffset
)
// wp.Position is the position of the page in the original pages slice;
// this retains the proper order of data in results.
results[wp.Position] = dataset.PageData(data[dataOffset : dataOffset+wp.Data.GetInfo().DataSize])
}
}
for _, data := range results {
if !yield(data) {
return nil
}
}

@ -0,0 +1,109 @@
package encoding
import (
"cmp"
"iter"
"slices"
)
// The windowing utilities allow for grouping subsections of a file into
// windows of a specified size; for example, given a slices of pages to
// download and a window size of 16MB, pages will be grouped such that the
// first byte of the first page and the last byte of the last page are no more
// than 16MB apart.
// window represents a window of file subsections.
type window[T any] []windowedElement[T]
// Start returns the first element in the window.
func (w window[T]) Start() T {
var zero T
if len(w) == 0 {
return zero
}
return w[0].Data
}
// End returns the last element in the window.
func (w window[T]) End() T {
var zero T
if len(w) == 0 {
return zero
}
return w[len(w)-1].Data
}
type windowedElement[T any] struct {
Data T // Windowed data.
Position int // Position of the element in the original slice pre-windowing.
}
type getElementInfo[T any] func(v T) (offset, size uint64)
// iterWindows groups elements into windows of a specified size, returning an
// iterator over the windows. The input slice is not modified.
func iterWindows[T any](elements []T, getInfo getElementInfo[T], windowSize int64) iter.Seq[window[T]] {
// Sort elements by their start position.
sortedElements := make(window[T], len(elements))
for i, element := range elements {
sortedElements[i] = windowedElement[T]{Data: element, Position: i}
}
slices.SortFunc(sortedElements, func(a, b windowedElement[T]) int {
aOffset, _ := getInfo(a.Data)
bOffset, _ := getInfo(b.Data)
return cmp.Compare(aOffset, bOffset)
})
return func(yield func(window[T]) bool) {
var start, end int
for end < len(sortedElements) {
startElement := sortedElements[start]
currentElement := sortedElements[end]
var (
startOffset, _ = getInfo(startElement.Data)
endOffset, endSize = getInfo(currentElement.Data)
)
var (
startByte = startOffset
endByte = endOffset + endSize
)
switch {
case endByte-startByte > uint64(windowSize) && start == end:
// We have an empty window and the element is larger than the current
// window size. We want to immediately add the page into the window and
// yield what we have.
end++
if !yield(sortedElements[start:end]) {
return
}
start = end
case endByte-startByte > uint64(windowSize) && start < end:
// Including end in the window would exceed the window size; we yield
// everything up to end and start a new window from end.
//
// We *do not* increment end here; if we did, we would start with two
// elements in the next window.
if !yield(sortedElements[start:end]) {
return
}
start = end
default:
// The element fits within the window size; move end forward so it gets
// included.
end++
}
}
// Yield all remaining elements.
if start < len(sortedElements) {
yield(sortedElements[start:])
}
}
}

@ -0,0 +1,147 @@
package encoding
import (
"fmt"
"slices"
"testing"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd"
)
func Test_windowPages(t *testing.T) {
tt := []struct {
name string
pages []*fakePageDesc
windowSize int64
expect []window[*fakePageDesc]
}{
{
name: "empty pages",
pages: nil,
windowSize: 1_000_000,
expect: nil,
},
{
name: "single page smaller than window",
pages: []*fakePageDesc{newFakePage(0, 100)},
windowSize: 1_000_000,
expect: []window[*fakePageDesc]{
{{Data: newFakePage(0, 100), Position: 0}},
},
},
{
name: "single page larger than window",
pages: []*fakePageDesc{newFakePage(0, 5_000_000)},
windowSize: 5_000_000,
expect: []window[*fakePageDesc]{
{{Data: newFakePage(0, 5_000_000), Position: 0}},
},
},
{
name: "basic grouping",
pages: []*fakePageDesc{
newFakePage(0, 100),
newFakePage(100, 100),
newFakePage(200, 100),
newFakePage(1500, 100),
newFakePage(1600, 100),
},
windowSize: 1000,
expect: []window[*fakePageDesc]{
{
{Data: newFakePage(0, 100), Position: 0},
{Data: newFakePage(100, 100), Position: 1},
{Data: newFakePage(200, 100), Position: 2},
},
{
{Data: newFakePage(1500, 100), Position: 3},
{Data: newFakePage(1600, 100), Position: 4},
},
},
},
{
name: "basic grouping (unordered)",
pages: []*fakePageDesc{
newFakePage(1500, 100),
newFakePage(200, 100),
newFakePage(100, 100),
newFakePage(1600, 100),
newFakePage(0, 100),
},
windowSize: 1000,
expect: []window[*fakePageDesc]{
{
{Data: newFakePage(0, 100), Position: 4},
{Data: newFakePage(100, 100), Position: 2},
{Data: newFakePage(200, 100), Position: 1},
},
{
{Data: newFakePage(1500, 100), Position: 0},
{Data: newFakePage(1600, 100), Position: 3},
},
},
},
{
name: "grouping with large page",
pages: []*fakePageDesc{
newFakePage(0, 100),
newFakePage(100, 100),
newFakePage(200, 1000),
newFakePage(300, 100),
newFakePage(400, 100),
},
windowSize: 500,
expect: []window[*fakePageDesc]{
{
{Data: newFakePage(0, 100), Position: 0},
{Data: newFakePage(100, 100), Position: 1},
},
{
{Data: newFakePage(200, 1000), Position: 2},
},
{
{Data: newFakePage(300, 100), Position: 3},
{Data: newFakePage(400, 100), Position: 4},
},
},
},
}
for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
getInfo := func(p *fakePageDesc) (uint64, uint64) {
return p.Info.DataOffset, p.Info.DataSize
}
actual := slices.Collect(iterWindows(tc.pages, getInfo, tc.windowSize))
for wi, w := range actual {
for pi, p := range w {
t.Logf("window %d page %d: %#v\n", wi, pi, p.Data)
}
}
require.Equal(t, tc.expect, actual)
})
}
}
type fakePageDesc struct{ Info *datasetmd.PageInfo }
func (f *fakePageDesc) GetInfo() *datasetmd.PageInfo { return f.Info }
func (f *fakePageDesc) GoString() string {
return fmt.Sprintf("(start: %d, size: %d)", f.Info.DataOffset, f.Info.DataSize)
}
func newFakePage(offset, size uint64) *fakePageDesc {
return &fakePageDesc{
Info: &datasetmd.PageInfo{
DataOffset: offset,
DataSize: size,
},
}
}
Loading…
Cancel
Save