Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/dataobj/prefetch_range_reader.go

123 lines
3.1 KiB

package dataobj
import (
"bytes"
"context"
"errors"
"fmt"
"io"
)
// prefetchedRangeReader wraps a rangeReader with a prefetched byte window.
// Overlapping ranges are served partially or wholly from prefetched bytes.
type prefetchedRangeReader struct {
inner rangeReader
prefetchOffset int64
prefetched []byte
}
var _ rangeReader = (*prefetchedRangeReader)(nil)
func (rr *prefetchedRangeReader) Size(ctx context.Context) (int64, error) {
return rr.inner.Size(ctx)
}
func (rr *prefetchedRangeReader) Read(ctx context.Context) (io.ReadCloser, error) {
return rr.inner.Read(ctx)
}
func (rr *prefetchedRangeReader) ReadRange(ctx context.Context, offset int64, length int64) (io.ReadCloser, error) {
if length < 0 {
return nil, fmt.Errorf("length must not be negative: %d", length)
} else if length == 0 {
return io.NopCloser(bytes.NewReader(nil)), nil
}
requestedEnd := offset + length
if requestedEnd < offset {
return nil, fmt.Errorf("read range overflows int64: offset=%d length=%d", offset, length)
}
// intersectStart/intersectEnd describe the intersection of:
// - requested range [offset, requestedEnd)
// - prefetched range [prefetchedStart, prefetchedEnd)
// The intersection can be empty.
var (
prefetchedStart = rr.prefetchOffset
prefetchedEnd = rr.prefetchOffset + int64(len(rr.prefetched))
intersectStart = max(offset, prefetchedStart)
intersectEnd = min(requestedEnd, prefetchedEnd)
)
if intersectStart >= intersectEnd {
return rr.inner.ReadRange(ctx, offset, length)
}
var (
readers []io.Reader
closers []io.Closer
)
// Callers only set a prefetched byte range that is at the absolute head or
// absolute tail of the object (never in the middle), so there will only be
// up to 2 readers.
//
// However, we handle the middle case anyway to be defensive.
if offset < intersectStart {
prefix, err := rr.inner.ReadRange(ctx, offset, intersectStart-offset)
if err != nil {
return nil, err
}
readers = append(readers, prefix)
closers = append(closers, prefix)
}
prefetchChunk := rr.prefetched[intersectStart-prefetchedStart : intersectEnd-prefetchedStart]
if len(prefetchChunk) > 0 {
readers = append(readers, bytes.NewReader(prefetchChunk))
}
if intersectEnd < requestedEnd {
suffix, err := rr.inner.ReadRange(ctx, intersectEnd, requestedEnd-intersectEnd)
if err != nil {
closeErr := closeClosers(closers)
if closeErr != nil {
return nil, errors.Join(err, closeErr)
}
return nil, err
}
readers = append(readers, suffix)
closers = append(closers, suffix)
}
return &multiReadCloser{
reader: io.MultiReader(readers...),
closers: closers,
}, nil
}
type multiReadCloser struct {
reader io.Reader
closers []io.Closer
}
func (rc *multiReadCloser) Read(p []byte) (int, error) {
return rc.reader.Read(p)
}
func (rc *multiReadCloser) Close() error {
return closeClosers(rc.closers)
}
func closeClosers(closers []io.Closer) error {
var errs []error
for i := len(closers) - 1; i >= 0; i-- {
if err := closers[i].Close(); err != nil {
errs = append(errs, err)
}
}
return errors.Join(errs...)
}