|
|
|
|
@ -61,7 +61,7 @@ func NewReader(opts ReaderOptions) *Reader { |
|
|
|
|
// Read reads up to the next len(s) rows from r and stores them into s. It
|
|
|
|
|
// returns the number of rows read and any error encountered. At the end of the
|
|
|
|
|
// Dataset, Read returns 0, [io.EOF].
|
|
|
|
|
func (r *Reader) Read(ctx context.Context, s []Row) (n int, err error) { |
|
|
|
|
func (r *Reader) Read(ctx context.Context, s []Row) (int, error) { |
|
|
|
|
stats := StatsFromContext(ctx) |
|
|
|
|
stats.AddReadCalls(1) |
|
|
|
|
|
|
|
|
|
@ -105,16 +105,16 @@ func (r *Reader) Read(ctx context.Context, s []Row) (n int, err error) { |
|
|
|
|
|
|
|
|
|
row, err := r.alignRow() |
|
|
|
|
if err != nil { |
|
|
|
|
return n, err |
|
|
|
|
return 0, err |
|
|
|
|
} else if _, err := r.inner.Seek(int64(row), io.SeekStart); err != nil { |
|
|
|
|
return n, fmt.Errorf("failed to seek to row %d: %w", row, err) |
|
|
|
|
return 0, fmt.Errorf("failed to seek to row %d: %w", row, err) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
currentRange, ok := r.ranges.Range(row) |
|
|
|
|
if !ok { |
|
|
|
|
// This should be unreachable; alignToRange already ensures that we're in a
|
|
|
|
|
// range, or it returns io.EOF.
|
|
|
|
|
return n, fmt.Errorf("failed to find range for row %d", row) |
|
|
|
|
return 0, fmt.Errorf("failed to find range for row %d", row) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
readSize := min(len(s), int(currentRange.End-row+1)) |
|
|
|
|
@ -134,7 +134,7 @@ func (r *Reader) Read(ctx context.Context, s []Row) (n int, err error) { |
|
|
|
|
if len(r.opts.Predicates) == 0 { |
|
|
|
|
count, err := r.inner.ReadColumns(ctx, r.primaryColumns(), s[:readSize]) |
|
|
|
|
if err != nil && !errors.Is(err, io.EOF) { |
|
|
|
|
return n, err |
|
|
|
|
return count, err |
|
|
|
|
} else if count == 0 && errors.Is(err, io.EOF) { |
|
|
|
|
return 0, io.EOF |
|
|
|
|
} |
|
|
|
|
@ -151,7 +151,7 @@ func (r *Reader) Read(ctx context.Context, s []Row) (n int, err error) { |
|
|
|
|
} else { |
|
|
|
|
rowsRead, passCount, err = r.readAndFilterPrimaryColumns(ctx, readSize, s[:readSize], stats) |
|
|
|
|
if err != nil { |
|
|
|
|
return n, err |
|
|
|
|
return passCount, err |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@ -169,9 +169,9 @@ func (r *Reader) Read(ctx context.Context, s []Row) (n int, err error) { |
|
|
|
|
|
|
|
|
|
count, err := r.inner.Fill(ctx, secondary, s[:passCount]) |
|
|
|
|
if err != nil && !errors.Is(err, io.EOF) { |
|
|
|
|
return n, err |
|
|
|
|
return count, err |
|
|
|
|
} else if count != passCount { |
|
|
|
|
return n, fmt.Errorf("failed to fill rows: expected %d, got %d", n, count) |
|
|
|
|
return count, fmt.Errorf("failed to fill rows: expected %d, got %d", passCount, count) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
var totalBytesFilled int64 |
|
|
|
|
@ -183,12 +183,10 @@ func (r *Reader) Read(ctx context.Context, s []Row) (n int, err error) { |
|
|
|
|
stats.AddSecondaryRowBytes(uint64(totalBytesFilled)) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
n += passCount |
|
|
|
|
|
|
|
|
|
// We only advance r.row after we successfully read and filled rows. This
|
|
|
|
|
// allows the caller to retry reading rows if a sporadic error occurs.
|
|
|
|
|
r.row += int64(rowsRead) |
|
|
|
|
return n, nil |
|
|
|
|
return passCount, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// readAndFilterPrimaryColumns reads the primary columns from the dataset
|
|
|
|
|
@ -234,7 +232,7 @@ func (r *Reader) readAndFilterPrimaryColumns(ctx context.Context, readSize int, |
|
|
|
|
if err != nil && !errors.Is(err, io.EOF) { |
|
|
|
|
return rowsRead, 0, err |
|
|
|
|
} else if count != readSize { |
|
|
|
|
return rowsRead, 0, fmt.Errorf("failed to fill rows: expected %d, got %d", len(s), count) |
|
|
|
|
return rowsRead, 0, fmt.Errorf("failed to fill rows: expected %d, got %d", readSize, count) |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
count = readSize // required columns are already filled
|
|
|
|
|
|