feat(tsdb): introduce --use-uncached-io feature flag and allow using it for chunks writing (#15365)

Signed-off-by: machine424 <ayoubmrini424@gmail.com>
Signed-off-by: Ayoub Mrini <ayoubmrini424@gmail.com>
pull/14548/merge
Ayoub Mrini 1 month ago committed by GitHub
parent 091e662f4d
commit 2edc3ed6c5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 1
      .github/workflows/ci.yml
  2. 7
      cmd/prometheus/main.go
  3. 2
      docs/command-line/prometheus.md
  4. 14
      docs/feature_flags.md
  5. 83
      tsdb/chunks/chunks.go
  6. 6
      tsdb/compact.go
  7. 4
      tsdb/db.go
  8. 2
      tsdb/db_test.go
  9. 39
      tsdb/fileutil/direct_io.go
  10. 28
      tsdb/fileutil/direct_io_force.go
  11. 29
      tsdb/fileutil/direct_io_linux.go
  12. 29
      tsdb/fileutil/direct_io_unsupported.go
  13. 407
      tsdb/fileutil/direct_io_writer.go
  14. 197
      tsdb/fileutil/direct_io_writer_test.go

@ -33,6 +33,7 @@ jobs:
- uses: prometheus/promci@443c7fc2397e946bc9f5029e313a9c3441b9b86d # v0.4.7
- uses: ./.github/promci/actions/setup_environment
- run: go test --tags=dedupelabels ./...
- run: go test --tags=forcedirectio,stringlabels -race ./tsdb/
- run: GOARCH=386 go test ./...
- uses: ./.github/promci/actions/check_proto
with:

@ -293,6 +293,9 @@ func (c *flagConfig) setFeatureListOptions(logger *slog.Logger) error {
case "type-and-unit-labels":
c.scrape.EnableTypeAndUnitLabels = true
logger.Info("Experimental type and unit labels enabled")
case "use-uncached-io":
c.tsdb.UseUncachedIO = true
logger.Info("Experimental Uncached IO is enabled.")
default:
logger.Warn("Unknown option for --enable-feature", "option", o)
}
@ -557,7 +560,7 @@ func main() {
a.Flag("scrape.discovery-reload-interval", "Interval used by scrape manager to throttle target groups updates.").
Hidden().Default("5s").SetValue(&cfg.scrape.DiscoveryReloadInterval)
a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, extra-scrape-metrics, auto-gomaxprocs, native-histograms, created-timestamp-zero-ingestion, concurrent-rule-eval, delayed-compaction, old-ui, otlp-deltatocumulative, promql-duration-expr. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details.").
a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, extra-scrape-metrics, auto-gomaxprocs, native-histograms, created-timestamp-zero-ingestion, concurrent-rule-eval, delayed-compaction, old-ui, otlp-deltatocumulative, promql-duration-expr, use-uncached-io. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details.").
Default("").StringsVar(&cfg.featureList)
a.Flag("agent", "Run Prometheus in 'Agent mode'.").BoolVar(&agentMode)
@ -1844,6 +1847,7 @@ type tsdbOptions struct {
EnableDelayedCompaction bool
CompactionDelayMaxPercent int
EnableOverlappingCompaction bool
UseUncachedIO bool
}
func (opts tsdbOptions) ToTSDBOptions() tsdb.Options {
@ -1867,6 +1871,7 @@ func (opts tsdbOptions) ToTSDBOptions() tsdb.Options {
EnableDelayedCompaction: opts.EnableDelayedCompaction,
CompactionDelayMaxPercent: opts.CompactionDelayMaxPercent,
EnableOverlappingCompaction: opts.EnableOverlappingCompaction,
UseUncachedIO: opts.UseUncachedIO,
}
}

@ -61,7 +61,7 @@ The Prometheus monitoring server
| <code class="text-nowrap">--query.timeout</code> | Maximum time a query may take before being aborted. Use with server mode only. | `2m` |
| <code class="text-nowrap">--query.max-concurrency</code> | Maximum number of queries executed concurrently. Use with server mode only. | `20` |
| <code class="text-nowrap">--query.max-samples</code> | Maximum number of samples a single query can load into memory. Note that queries will fail if they try to load more samples than this into memory, so this also limits the number of samples a query can return. Use with server mode only. | `50000000` |
| <code class="text-nowrap">--enable-feature</code> <code class="text-nowrap">...<code class="text-nowrap"> | Comma separated feature names to enable. Valid options: exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, extra-scrape-metrics, auto-gomaxprocs, native-histograms, created-timestamp-zero-ingestion, concurrent-rule-eval, delayed-compaction, old-ui, otlp-deltatocumulative, promql-duration-expr. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details. | |
| <code class="text-nowrap">--enable-feature</code> <code class="text-nowrap">...<code class="text-nowrap"> | Comma separated feature names to enable. Valid options: exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-per-step-stats, promql-experimental-functions, extra-scrape-metrics, auto-gomaxprocs, native-histograms, created-timestamp-zero-ingestion, concurrent-rule-eval, delayed-compaction, old-ui, otlp-deltatocumulative, promql-duration-expr, use-uncached-io. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details. | |
| <code class="text-nowrap">--agent</code> | Run Prometheus in 'Agent mode'. | |
| <code class="text-nowrap">--log.level</code> | Only log messages with the given severity or above. One of: [debug, info, warn, error] | `info` |
| <code class="text-nowrap">--log.format</code> | Output format of log messages. One of: [logfmt, json] | `logfmt` |

@ -272,3 +272,17 @@ It's especially useful for users who:
In future more [work is planned](https://github.com/prometheus/prometheus/issues/16610) that will depend on this e.g. rich PromQL UX that helps
when wrong types are used on wrong functions, automatic renames, delta types and more.
## Use Uncached IO
`--enable-feature=use-uncached-io`
Experimental and only available on Linux.
When enabled, it makes chunks writing bypass the page cache. Its primary
goal is to reduce confusion around page‐cache behavior and to prevent over‑allocation of
memory in response to misleading cache growth.
This is currently implemented using direct I/O.
For more details, see the [proposal](https://github.com/prometheus/proposals/pull/45).

@ -14,7 +14,6 @@
package chunks
import (
"bufio"
"encoding/binary"
"errors"
"fmt"
@ -281,12 +280,13 @@ func checkCRC32(data, sum []byte) error {
type Writer struct {
dirFile *os.File
files []*os.File
wbuf *bufio.Writer
wbuf fileutil.BufWriter
n int64
crc32 hash.Hash
buf [binary.MaxVarintLen32]byte
segmentSize int64
segmentSize int64
useUncachedIO bool
}
const (
@ -294,21 +294,34 @@ const (
DefaultChunkSegmentSize = 512 * 1024 * 1024
)
// NewWriterWithSegSize returns a new writer against the given directory
// and allows setting a custom size for the segments.
func NewWriterWithSegSize(dir string, segmentSize int64) (*Writer, error) {
return newWriter(dir, segmentSize)
type writerOptions struct {
segmentSize int64
useUncachedIO bool
}
// NewWriter returns a new writer against the given directory
// using the default segment size.
func NewWriter(dir string) (*Writer, error) {
return newWriter(dir, DefaultChunkSegmentSize)
type WriterOption func(*writerOptions)
func WithUncachedIO(enabled bool) WriterOption {
return func(o *writerOptions) {
o.useUncachedIO = enabled
}
}
func newWriter(dir string, segmentSize int64) (*Writer, error) {
if segmentSize <= 0 {
segmentSize = DefaultChunkSegmentSize
func WithSegmentSize(segmentSize int64) WriterOption {
return func(o *writerOptions) {
if segmentSize <= 0 {
segmentSize = DefaultChunkSegmentSize
}
o.segmentSize = segmentSize
}
}
// NewWriter returns a new writer against the given directory.
func NewWriter(dir string, opts ...WriterOption) (*Writer, error) {
options := &writerOptions{}
for _, opt := range opts {
opt(options)
}
if err := os.MkdirAll(dir, 0o777); err != nil {
@ -319,10 +332,11 @@ func newWriter(dir string, segmentSize int64) (*Writer, error) {
return nil, err
}
return &Writer{
dirFile: dirFile,
n: 0,
crc32: newCRC32(),
segmentSize: segmentSize,
dirFile: dirFile,
n: 0,
crc32: newCRC32(),
segmentSize: options.segmentSize,
useUncachedIO: options.useUncachedIO,
}, nil
}
@ -333,7 +347,7 @@ func (w *Writer) tail() *os.File {
return w.files[len(w.files)-1]
}
// finalizeTail writes all pending data to the current tail file,
// finalizeTail writes all pending data to the current tail file if any,
// truncates its size, and closes it.
func (w *Writer) finalizeTail() error {
tf := w.tail()
@ -341,8 +355,10 @@ func (w *Writer) finalizeTail() error {
return nil
}
if err := w.wbuf.Flush(); err != nil {
return err
if w.wbuf != nil {
if err := w.wbuf.Flush(); err != nil {
return err
}
}
if err := tf.Sync(); err != nil {
return err
@ -373,9 +389,25 @@ func (w *Writer) cut() error {
w.files = append(w.files, f)
if w.wbuf != nil {
w.wbuf.Reset(f)
if err := w.wbuf.Reset(f); err != nil {
return err
}
} else {
w.wbuf = bufio.NewWriterSize(f, 8*1024*1024)
var (
wbuf fileutil.BufWriter
err error
)
size := 8 * 1024 * 1024
if w.useUncachedIO {
// Uncached IO is implemented using direct I/O for now.
wbuf, err = fileutil.NewDirectIOWriter(f, size)
} else {
wbuf, err = fileutil.NewBufioWriterWithSeek(f, size)
}
if err != nil {
return err
}
w.wbuf = wbuf
}
return nil
@ -434,8 +466,9 @@ func cutSegmentFile(dirFile *os.File, magicNumber uint32, chunksFormat byte, all
return 0, nil, 0, fmt.Errorf("open final file: %w", err)
}
// Skip header for further writes.
if _, err := f.Seek(int64(n), 0); err != nil {
return 0, nil, 0, fmt.Errorf("seek in final file: %w", err)
offset := int64(n)
if _, err := f.Seek(offset, 0); err != nil {
return 0, nil, 0, fmt.Errorf("seek to %d in final file: %w", offset, err)
}
return n, f, seq, nil
}

@ -85,6 +85,7 @@ type LeveledCompactor struct {
chunkPool chunkenc.Pool
ctx context.Context
maxBlockChunkSegmentSize int64
useUncachedIO bool
mergeFunc storage.VerticalChunkSeriesMergeFunc
postingsEncoder index.PostingsEncoder
postingsDecoderFactory PostingsDecoderFactory
@ -171,6 +172,8 @@ type LeveledCompactorOptions struct {
EnableOverlappingCompaction bool
// Metrics is set of metrics for Compactor. By default, NewCompactorMetrics would be called to initialize metrics unless it is provided.
Metrics *CompactorMetrics
// UseUncachedIO allows bypassing the page cache when appropriate.
UseUncachedIO bool
}
type PostingsDecoderFactory func(meta *BlockMeta) index.PostingsDecoder
@ -226,6 +229,7 @@ func NewLeveledCompactorWithOptions(ctx context.Context, r prometheus.Registerer
metrics: opts.Metrics,
ctx: ctx,
maxBlockChunkSegmentSize: maxBlockChunkSegmentSize,
useUncachedIO: opts.UseUncachedIO,
mergeFunc: mergeFunc,
postingsEncoder: pe,
postingsDecoderFactory: opts.PD,
@ -657,7 +661,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blockPopulator Bl
// data of all blocks.
var chunkw ChunkWriter
chunkw, err = chunks.NewWriterWithSegSize(chunkDir(tmp), c.maxBlockChunkSegmentSize)
chunkw, err = chunks.NewWriter(chunkDir(tmp), chunks.WithSegmentSize(c.maxBlockChunkSegmentSize), chunks.WithUncachedIO(c.useUncachedIO))
if err != nil {
return fmt.Errorf("open chunk writer: %w", err)
}

@ -219,6 +219,9 @@ type Options struct {
// PostingsDecoderFactory allows users to customize postings decoders based on BlockMeta.
// By default, DefaultPostingsDecoderFactory will be used to create raw posting decoder.
PostingsDecoderFactory PostingsDecoderFactory
// UseUncachedIO allows bypassing the page cache when appropriate.
UseUncachedIO bool
}
type NewCompactorFunc func(ctx context.Context, r prometheus.Registerer, l *slog.Logger, ranges []int64, pool chunkenc.Pool, opts *Options) (Compactor, error)
@ -903,6 +906,7 @@ func open(dir string, l *slog.Logger, r prometheus.Registerer, opts *Options, rn
MaxBlockChunkSegmentSize: opts.MaxBlockChunkSegmentSize,
EnableOverlappingCompaction: opts.EnableOverlappingCompaction,
PD: opts.PostingsDecoderFactory,
UseUncachedIO: opts.UseUncachedIO,
})
}
if err != nil {

@ -2923,7 +2923,7 @@ func TestChunkWriter_ReadAfterWrite(t *testing.T) {
t.Run(strconv.Itoa(i), func(t *testing.T) {
tempDir := t.TempDir()
chunkw, err := chunks.NewWriterWithSegSize(tempDir, chunks.SegmentHeaderSize+int64(test.segmentSize))
chunkw, err := chunks.NewWriter(tempDir, chunks.WithSegmentSize(chunks.SegmentHeaderSize+int64(test.segmentSize)))
require.NoError(t, err)
for _, chks := range test.chks {

@ -0,0 +1,39 @@
// Copyright 2024 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package fileutil
import (
"bufio"
"errors"
"os"
)
var errDirectIOUnsupported = errors.New("direct IO is unsupported")
type BufWriter interface {
Write([]byte) (int, error)
Flush() error
Reset(f *os.File) error
}
// writer is a specialized wrapper around bufio.Writer.
// It is used when Direct IO isn't enabled, as using directIOWriter in such cases is impractical.
type writer struct {
*bufio.Writer
}
func (b *writer) Reset(f *os.File) error {
b.Writer.Reset(f)
return nil
}

@ -0,0 +1,28 @@
// Copyright 2024 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// This allows seamless testing of the Direct I/O writer across all tsdb tests.
//go:build linux && forcedirectio
package fileutil
import "os"
func NewDirectIOWriter(f *os.File, size int) (BufWriter, error) {
return newDirectIOWriter(f, size)
}
func NewBufioWriterWithSeek(f *os.File, size int) (BufWriter, error) {
return NewDirectIOWriter(f, size)
}

@ -0,0 +1,29 @@
// Copyright 2024 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build linux && !forcedirectio
package fileutil
import (
"bufio"
"os"
)
func NewBufioWriterWithSeek(f *os.File, size int) (BufWriter, error) {
return &writer{bufio.NewWriterSize(f, size)}, nil
}
func NewDirectIOWriter(f *os.File, size int) (BufWriter, error) {
return newDirectIOWriter(f, size)
}

@ -0,0 +1,29 @@
// Copyright 2024 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build !linux
package fileutil
import (
"bufio"
"os"
)
func NewBufioWriterWithSeek(f *os.File, size int) (BufWriter, error) {
return &writer{bufio.NewWriterSize(f, size)}, nil
}
func NewDirectIOWriter(_ *os.File, _ int) (BufWriter, error) {
return nil, errDirectIOUnsupported
}

@ -0,0 +1,407 @@
// Copyright 2024 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build linux
package fileutil
import (
"errors"
"fmt"
"io"
"os"
"unsafe"
"golang.org/x/sys/unix"
)
const (
// the defaults are deliberately set higher to cover most setups.
// On Linux >= 6.14, statx(2) https://man7.org/linux/man-pages/man2/statx.2.html will be later
// used to fetch the exact alignment restrictions.
defaultAlignment = 4096
defaultBufSize = 4096
)
var (
errWriterInvalid = errors.New("the last flush resulted in an unaligned offset, the writer can no longer ensure contiguous writes")
errStatxNotSupported = errors.New("the statx syscall with STATX_DIOALIGN is not supported. At least Linux kernel 6.1 is needed")
)
// directIOWriter is a specialized bufio.Writer that supports Direct IO to a file
// by ensuring all alignment restrictions are satisfied.
// The writer can handle files whose initial offsets are not aligned.
// Once Direct IO is in use, if an explicit call to Flush() results in an unaligned offset, the writer
// should no longer be used, as it can no longer support contiguous writes.
type directIOWriter struct {
buf []byte
n int
f *os.File
// offsetAlignmentGap represents the number of bytes needed to reach the nearest
// offset alignment on the file, making Direct IO possible.
offsetAlignmentGap int
alignmentRqmts *directIORqmts
err error
invalid bool
}
func newDirectIOWriter(f *os.File, size int) (*directIOWriter, error) {
alignmentRqmts, err := fileDirectIORqmts(f)
if err != nil {
return nil, err
}
if size <= 0 {
size = defaultBufSize
}
if size%alignmentRqmts.offsetAlign != 0 {
return nil, fmt.Errorf("size %d should be a multiple of %d", size, alignmentRqmts.offsetAlign)
}
gap, err := checkInitialUnalignedOffset(f, alignmentRqmts)
if err != nil {
return nil, err
}
return &directIOWriter{
buf: alignedBlock(size, alignmentRqmts),
f: f,
offsetAlignmentGap: gap,
alignmentRqmts: alignmentRqmts,
}, nil
}
func (b *directIOWriter) Available() int { return len(b.buf) - b.n }
func (b *directIOWriter) Buffered() int { return b.n }
// fillInitialOffsetGap writes the necessary bytes from the buffer without Direct IO
// to fill offsetAlignmentGap and align the file offset, enabling Direct IO usage.
// Once alignment is achieved, Direct IO is enabled.
func (b *directIOWriter) fillInitialOffsetGap() {
if b.n == 0 || b.offsetAlignmentGap == 0 {
return
}
bytesToAlign := min(b.n, b.offsetAlignmentGap)
n, err := b.f.Write(b.buf[:bytesToAlign])
if n < bytesToAlign && err == nil {
err = io.ErrShortWrite
}
if n > 0 {
copy(b.buf[0:b.n-n], b.buf[n:b.n])
b.n -= n
}
// If the file offset was aligned, enable Direct IO.
b.offsetAlignmentGap -= n
if b.offsetAlignmentGap == 0 {
err = errors.Join(err, enableDirectIO(b.f.Fd()))
}
b.err = errors.Join(b.err, err)
}
func (b *directIOWriter) directIOWrite(p []byte, padding int) (int, error) {
relevant := len(p) - padding
n, err := b.f.Write(p)
switch {
case n < relevant:
relevant = n
if err == nil {
err = io.ErrShortWrite
}
case n > relevant:
// Adjust the offset to discard the padding that was written.
writtenPadding := int64(n - relevant)
_, err := b.f.Seek(-writtenPadding, io.SeekCurrent)
if err != nil {
b.err = errors.Join(b.err, fmt.Errorf("seek to discard written padding %d: %w", writtenPadding, err))
}
}
if relevant%b.alignmentRqmts.offsetAlign != 0 {
b.invalid = true
}
return relevant, err
}
// canDirectIOWrite returns true when all Direct IO alignment restrictions
// are met for the p block to be written into the file.
func (b *directIOWriter) canDirectIOWrite(p []byte) bool {
return isAligned(p, b.alignmentRqmts) && b.offsetAlignmentGap == 0
}
func (b *directIOWriter) Write(p []byte) (nn int, err error) {
if b.invalid {
return 0, errWriterInvalid
}
for len(p) > b.Available() && b.err == nil {
var n1, n2 int
if b.Buffered() == 0 && b.canDirectIOWrite(p) {
// Large write, empty buffer.
// To avoid copy, write from p via Direct IO as the block and the file
// offset are aligned.
n1, b.err = b.directIOWrite(p, 0)
} else {
n1 = copy(b.buf[b.n:], p)
b.n += n1
if b.offsetAlignmentGap != 0 {
b.fillInitialOffsetGap()
// Refill the buffer.
n2 = copy(b.buf[b.n:], p[n1:])
b.n += n2
}
if b.Available() == 0 {
// Avoid flushing in case the second refill wasn't complete.
b.err = errors.Join(b.err, b.flush())
}
}
nn += n1 + n2
p = p[n1+n2:]
}
if b.err != nil {
return nn, b.err
}
n := copy(b.buf[b.n:], p)
b.n += n
nn += n
return nn, nil
}
func (b *directIOWriter) flush() error {
if b.invalid {
return errWriterInvalid
}
if b.err != nil {
return b.err
}
if b.n == 0 {
return nil
}
// Ensure the segment length alignment restriction is met.
// If the buffer length isn't a multiple of offsetAlign, round
// it to the nearest upper multiple and add zero padding.
uOffset := b.n
if uOffset%b.alignmentRqmts.offsetAlign != 0 {
uOffset = ((uOffset / b.alignmentRqmts.offsetAlign) + 1) * b.alignmentRqmts.offsetAlign
for i := b.n; i < uOffset; i++ {
b.buf[i] = 0
}
}
n, err := b.directIOWrite(b.buf[:uOffset], uOffset-b.n)
if err != nil {
if n > 0 && n < b.n {
copy(b.buf[0:b.n-n], b.buf[n:b.n])
}
b.n -= n
b.err = errors.Join(b.err, err)
return err
}
b.n = 0
return nil
}
func (b *directIOWriter) Flush() error {
if b.offsetAlignmentGap != 0 {
b.fillInitialOffsetGap()
if b.err != nil {
return b.err
}
}
return b.flush()
}
func (b *directIOWriter) Reset(f *os.File) error {
alignmentRqmts, err := fileDirectIORqmts(f)
if err != nil {
return err
}
b.alignmentRqmts = alignmentRqmts
if b.buf == nil {
b.buf = alignedBlock(defaultBufSize, b.alignmentRqmts)
}
gap, err := checkInitialUnalignedOffset(f, b.alignmentRqmts)
if err != nil {
return err
}
b.offsetAlignmentGap = gap
b.err = nil
b.invalid = false
b.n = 0
b.f = f
return nil
}
func fileDirectIORqmts(f *os.File) (*directIORqmts, error) {
alignmentRqmts, err := fetchDirectIORqmts(f.Fd())
switch {
case errors.Is(err, errStatxNotSupported):
alignmentRqmts = defaultDirectIORqmts()
case err != nil:
return nil, err
}
if alignmentRqmts.memoryAlign == 0 || alignmentRqmts.offsetAlign == 0 {
// This may require some extra testing.
return nil, fmt.Errorf("zero alignment requirement is not supported %+v", alignmentRqmts)
}
return alignmentRqmts, nil
}
func alignmentOffset(block []byte, requiredAlignment int) int {
return computeAlignmentOffset(block, requiredAlignment)
}
func computeAlignmentOffset(block []byte, alignment int) int {
if alignment == 0 {
return 0
}
if len(block) == 0 {
panic("empty block not supported")
}
return int(uintptr(unsafe.Pointer(&block[0])) & uintptr(alignment-1))
}
// isAligned checks if the length of the block is a multiple of offsetAlign
// and if its address is aligned with memoryAlign.
func isAligned(block []byte, alignmentRqmts *directIORqmts) bool {
return alignmentOffset(block, alignmentRqmts.memoryAlign) == 0 && len(block)%alignmentRqmts.offsetAlign == 0
}
// alignedBlock returns a block whose address is alignment aligned.
// The size should be a multiple of offsetAlign.
func alignedBlock(size int, alignmentRqmts *directIORqmts) []byte {
if size == 0 || size%alignmentRqmts.offsetAlign != 0 {
panic(fmt.Errorf("size %d should be > 0 and a multiple of offsetAlign=%d", size, alignmentRqmts.offsetAlign))
}
if alignmentRqmts.memoryAlign == 0 {
return make([]byte, size)
}
block := make([]byte, size+alignmentRqmts.memoryAlign)
a := alignmentOffset(block, alignmentRqmts.memoryAlign)
if a == 0 {
return block[:size]
}
offset := alignmentRqmts.memoryAlign - a
block = block[offset : offset+size]
if !isAligned(block, alignmentRqmts) {
// Assuming this to be rare, if not impossible.
panic("cannot create an aligned block")
}
return block
}
func currentFileOffset(f *os.File) (int, error) {
curOff, err := f.Seek(0, io.SeekCurrent)
if err != nil {
return 0, fmt.Errorf("cannot get the current offset: %w", err)
}
return int(curOff), nil
}
func fileStatusFlags(fd uintptr) (int, error) {
flag, err := unix.FcntlInt(fd, unix.F_GETFL, 0)
if err != nil {
return 0, fmt.Errorf("cannot get file status flags: %w", err)
}
return flag, err
}
// enableDirectIO enables Direct IO on the file if needed.
func enableDirectIO(fd uintptr) error {
flag, err := fileStatusFlags(fd)
if err != nil {
return err
}
if (flag & unix.O_DIRECT) == unix.O_DIRECT {
return nil
}
_, err = unix.FcntlInt(fd, unix.F_SETFL, flag|unix.O_DIRECT)
if err != nil {
return fmt.Errorf("cannot enable Direct IO: %w", err)
}
return nil
}
// checkInitialUnalignedOffset returns the gap between the current offset of the file
// and the nearest aligned offset.
// If the current offset is aligned, Direct IO is enabled on the file.
func checkInitialUnalignedOffset(f *os.File, alignmentRqmts *directIORqmts) (int, error) {
offset, err := currentFileOffset(f)
if err != nil {
return 0, err
}
alignment := alignmentRqmts.offsetAlign
gap := (alignment - offset%alignment) % alignment
if gap == 0 {
if err := enableDirectIO(f.Fd()); err != nil {
return 0, err
}
}
return gap, nil
}
// directIORqmts holds the alignment requirements for direct I/O.
// All fields are in bytes.
type directIORqmts struct {
// The required alignment for memory buffers addresses.
memoryAlign int
// The required alignment for I/O segment lengths and file offsets.
offsetAlign int
}
func defaultDirectIORqmts() *directIORqmts {
return &directIORqmts{
memoryAlign: defaultAlignment,
offsetAlign: defaultAlignment,
}
}
// fetchDirectIORqmts retrieves direct I/O alignment requirements for a file descriptor using statx
// when possible.
func fetchDirectIORqmts(fd uintptr) (*directIORqmts, error) {
var stat unix.Statx_t
flags := unix.AT_SYMLINK_NOFOLLOW | unix.AT_EMPTY_PATH | unix.AT_STATX_DONT_SYNC
mask := unix.STATX_DIOALIGN
if err := unix.Statx(int(fd), "", flags, unix.STATX_DIOALIGN, &stat); err != nil {
if err == unix.ENOSYS {
return nil, errStatxNotSupported
}
return nil, fmt.Errorf("statx failed on fd %d: %w", fd, err)
}
if stat.Mask&uint32(mask) == 0 {
return nil, errStatxNotSupported
}
if stat.Dio_mem_align == 0 || stat.Dio_offset_align == 0 {
return nil, fmt.Errorf("%w: kernel may be old or the file may be on an unsupported FS", errDirectIOUnsupported)
}
return &directIORqmts{
memoryAlign: int(stat.Dio_mem_align),
offsetAlign: int(stat.Dio_offset_align),
}, nil
}

@ -0,0 +1,197 @@
// Copyright 2024 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build linux
package fileutil
import (
"io"
"os"
"path"
"testing"
"github.com/stretchr/testify/require"
)
func directIORqmtsForTest(tb testing.TB) *directIORqmts {
f, err := os.OpenFile(path.Join(tb.TempDir(), "foo"), os.O_CREATE|os.O_WRONLY, 0o666)
require.NoError(tb, err)
alignmentRqmts, err := fetchDirectIORqmts(f.Fd())
require.NoError(tb, err)
return alignmentRqmts
}
func TestDirectIOFile(t *testing.T) {
tmpDir := t.TempDir()
f, err := os.OpenFile(path.Join(tmpDir, "test"), os.O_CREATE|os.O_WRONLY, 0o666)
require.NoError(t, err)
require.NoError(t, enableDirectIO(f.Fd()))
}
func TestAlignedBlockEarlyPanic(t *testing.T) {
alignRqmts := directIORqmtsForTest(t)
cases := []struct {
desc string
size int
}{
{"Zero size", 0},
{"Size not multiple of offset alignment", 9973},
}
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
require.Panics(t, func() {
alignedBlock(tc.size, alignRqmts)
})
})
}
}
func TestAlignedBloc(t *testing.T) {
alignRqmts := directIORqmtsForTest(t)
block := alignedBlock(5*alignRqmts.offsetAlign, alignRqmts)
require.True(t, isAligned(block, alignRqmts))
require.Len(t, block, 5*alignRqmts.offsetAlign)
require.False(t, isAligned(block[1:], alignRqmts))
}
func TestDirectIOWriter(t *testing.T) {
alignRqmts := directIORqmtsForTest(t)
cases := []struct {
name string
initialOffset int
bufferSize int
dataSize int
// writtenBytes should also consider needed zero padding.
writtenBytes int
shouldInvalidate bool
}{
{
name: "data equal to buffer",
bufferSize: 8 * alignRqmts.offsetAlign,
dataSize: 8 * alignRqmts.offsetAlign,
writtenBytes: 8 * alignRqmts.offsetAlign,
},
{
name: "data exceeds buffer",
bufferSize: 4 * alignRqmts.offsetAlign,
dataSize: 64 * alignRqmts.offsetAlign,
writtenBytes: 64 * alignRqmts.offsetAlign,
},
{
name: "data exceeds buffer + final offset unaligned",
bufferSize: 2 * alignRqmts.offsetAlign,
dataSize: 4*alignRqmts.offsetAlign + 33,
writtenBytes: 4*alignRqmts.offsetAlign + alignRqmts.offsetAlign,
shouldInvalidate: true,
},
{
name: "data smaller than buffer",
bufferSize: 8 * alignRqmts.offsetAlign,
dataSize: 3 * alignRqmts.offsetAlign,
writtenBytes: 3 * alignRqmts.offsetAlign,
},
{
name: "data smaller than buffer + final offset unaligned",
bufferSize: 4 * alignRqmts.offsetAlign,
dataSize: alignRqmts.offsetAlign + 70,
writtenBytes: alignRqmts.offsetAlign + alignRqmts.offsetAlign,
shouldInvalidate: true,
},
{
name: "offset aligned",
initialOffset: alignRqmts.offsetAlign,
bufferSize: 8 * alignRqmts.offsetAlign,
dataSize: alignRqmts.offsetAlign,
writtenBytes: alignRqmts.offsetAlign,
},
{
name: "initial offset unaligned + final offset unaligned",
initialOffset: 8,
bufferSize: 8 * alignRqmts.offsetAlign,
dataSize: 64 * alignRqmts.offsetAlign,
writtenBytes: 64*alignRqmts.offsetAlign + (alignRqmts.offsetAlign - 8),
shouldInvalidate: true,
},
{
name: "offset unaligned + final offset aligned",
initialOffset: 8,
bufferSize: 4 * alignRqmts.offsetAlign,
dataSize: 4*alignRqmts.offsetAlign + (alignRqmts.offsetAlign - 8),
writtenBytes: 4*alignRqmts.offsetAlign + (alignRqmts.offsetAlign - 8),
},
{
name: "empty data",
bufferSize: 4 * alignRqmts.offsetAlign,
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
fileName := path.Join(t.TempDir(), "test")
data := make([]byte, tc.dataSize)
for i := 0; i < len(data); i++ {
// Do not use 256 as it may be a divider of requiredAlignment. To avoid patterns.
data[i] = byte(i % 251)
}
f, err := os.OpenFile(fileName, os.O_CREATE|os.O_WRONLY, 0o666)
require.NoError(t, err)
if tc.initialOffset != 0 {
_, err = f.Seek(int64(tc.initialOffset), io.SeekStart)
require.NoError(t, err)
}
w, err := newDirectIOWriter(f, tc.bufferSize)
require.NoError(t, err)
n, err := w.Write(data)
require.NoError(t, err)
require.Equal(t, tc.dataSize, n)
require.NoError(t, w.Flush())
// Check the file's final offset.
currOffset, err := currentFileOffset(f)
require.NoError(t, err)
require.Equal(t, tc.dataSize+tc.initialOffset, currOffset)
// Check the written data.
fileBytes, err := os.ReadFile(fileName)
require.NoError(t, err)
if tc.dataSize > 0 {
require.Len(t, fileBytes, tc.writtenBytes+tc.initialOffset)
require.Equal(t, data, fileBytes[tc.initialOffset:tc.dataSize+tc.initialOffset])
} else {
require.Empty(t, fileBytes)
}
// Check the writer state.
if tc.shouldInvalidate {
require.True(t, w.invalid)
require.Error(t, w.Flush())
_, err = w.Write([]byte{})
require.Error(t, err)
} else {
require.False(t, w.invalid)
require.NoError(t, w.Flush())
_, err = w.Write([]byte{})
require.NoError(t, err)
}
})
}
}
Loading…
Cancel
Save