You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
409 lines
11 KiB
409 lines
11 KiB
// Copyright 2024 The Prometheus Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
//go:build linux
|
|
|
|
package fileutil
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"unsafe"
|
|
|
|
"golang.org/x/sys/unix"
|
|
)
|
|
|
|
const (
|
|
// the defaults are deliberately set higher to cover most setups.
|
|
// On Linux >= 6.1, statx(2) https://man7.org/linux/man-pages/man2/statx.2.html will be later
|
|
// used to fetch the exact alignment restrictions.
|
|
defaultAlignment = 4096
|
|
defaultBufSize = 4096
|
|
)
|
|
|
|
var (
|
|
errWriterInvalid = errors.New("the last flush resulted in an unaligned offset, the writer can no longer ensure contiguous writes")
|
|
errStatxNotSupported = errors.New("the statx syscall with STATX_DIOALIGN is not supported. At least Linux kernel 6.1 is needed")
|
|
)
|
|
|
|
// directIOWriter is a specialized bufio.Writer that supports Direct IO to a file
|
|
// by ensuring all alignment restrictions are satisfied.
|
|
// The writer can handle files whose initial offsets are not aligned.
|
|
// Once Direct IO is in use, if an explicit call to Flush() results in an unaligned offset, the writer
|
|
// should no longer be used, as it can no longer support contiguous writes.
|
|
type directIOWriter struct {
|
|
buf []byte
|
|
n int
|
|
|
|
f *os.File
|
|
// offsetAlignmentGap represents the number of bytes needed to reach the nearest
|
|
// offset alignment on the file, making Direct IO possible.
|
|
offsetAlignmentGap int
|
|
alignmentRqmts *directIORqmts
|
|
|
|
err error
|
|
invalid bool
|
|
}
|
|
|
|
func newDirectIOWriter(f *os.File, size int) (*directIOWriter, error) {
|
|
alignmentRqmts, err := fileDirectIORqmts(f)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if size <= 0 {
|
|
size = defaultBufSize
|
|
}
|
|
if size%alignmentRqmts.offsetAlign != 0 {
|
|
return nil, fmt.Errorf("size %d should be a multiple of %d", size, alignmentRqmts.offsetAlign)
|
|
}
|
|
gap, err := checkInitialUnalignedOffset(f, alignmentRqmts)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &directIOWriter{
|
|
buf: alignedBlock(size, alignmentRqmts),
|
|
f: f,
|
|
offsetAlignmentGap: gap,
|
|
alignmentRqmts: alignmentRqmts,
|
|
}, nil
|
|
}
|
|
|
|
func (b *directIOWriter) Available() int { return len(b.buf) - b.n }
|
|
|
|
func (b *directIOWriter) Buffered() int { return b.n }
|
|
|
|
// fillInitialOffsetGap writes the necessary bytes from the buffer without Direct IO
|
|
// to fill offsetAlignmentGap and align the file offset, enabling Direct IO usage.
|
|
// Once alignment is achieved, Direct IO is enabled.
|
|
func (b *directIOWriter) fillInitialOffsetGap() {
|
|
if b.n == 0 || b.offsetAlignmentGap == 0 {
|
|
return
|
|
}
|
|
|
|
bytesToAlign := min(b.n, b.offsetAlignmentGap)
|
|
n, err := b.f.Write(b.buf[:bytesToAlign])
|
|
if n < bytesToAlign && err == nil {
|
|
err = io.ErrShortWrite
|
|
}
|
|
if n > 0 {
|
|
copy(b.buf[0:b.n-n], b.buf[n:b.n])
|
|
b.n -= n
|
|
}
|
|
// If the file offset was aligned, enable Direct IO.
|
|
b.offsetAlignmentGap -= n
|
|
if b.offsetAlignmentGap == 0 {
|
|
err = errors.Join(err, enableDirectIO(b.f.Fd()))
|
|
}
|
|
b.err = errors.Join(b.err, err)
|
|
}
|
|
|
|
func (b *directIOWriter) directIOWrite(p []byte, padding int) (int, error) {
|
|
relevant := len(p) - padding
|
|
|
|
n, err := b.f.Write(p)
|
|
switch {
|
|
case n < relevant:
|
|
relevant = n
|
|
if err == nil {
|
|
err = io.ErrShortWrite
|
|
}
|
|
case n > relevant:
|
|
// Adjust the offset to discard the padding that was written.
|
|
writtenPadding := int64(n - relevant)
|
|
_, err := b.f.Seek(-writtenPadding, io.SeekCurrent)
|
|
if err != nil {
|
|
b.err = errors.Join(b.err, fmt.Errorf("seek to discard written padding %d: %w", writtenPadding, err))
|
|
}
|
|
}
|
|
|
|
if relevant%b.alignmentRqmts.offsetAlign != 0 {
|
|
b.invalid = true
|
|
}
|
|
return relevant, err
|
|
}
|
|
|
|
// canDirectIOWrite returns true when all Direct IO alignment restrictions
|
|
// are met for the p block to be written into the file.
|
|
func (b *directIOWriter) canDirectIOWrite(p []byte) bool {
|
|
return isAligned(p, b.alignmentRqmts) && b.offsetAlignmentGap == 0
|
|
}
|
|
|
|
func (b *directIOWriter) Write(p []byte) (nn int, err error) {
|
|
if b.invalid {
|
|
return 0, errWriterInvalid
|
|
}
|
|
|
|
for len(p) > b.Available() && b.err == nil {
|
|
var n1, n2 int
|
|
if b.Buffered() == 0 && b.canDirectIOWrite(p) {
|
|
// Large write, empty buffer.
|
|
// To avoid copy, write from p via Direct IO as the block and the file
|
|
// offset are aligned.
|
|
n1, b.err = b.directIOWrite(p, 0)
|
|
} else {
|
|
n1 = copy(b.buf[b.n:], p)
|
|
b.n += n1
|
|
if b.offsetAlignmentGap != 0 {
|
|
b.fillInitialOffsetGap()
|
|
// Refill the buffer.
|
|
n2 = copy(b.buf[b.n:], p[n1:])
|
|
b.n += n2
|
|
}
|
|
if b.Available() == 0 {
|
|
// Avoid flushing in case the second refill wasn't complete.
|
|
b.err = errors.Join(b.err, b.flush())
|
|
}
|
|
}
|
|
nn += n1 + n2
|
|
p = p[n1+n2:]
|
|
}
|
|
|
|
if b.err != nil {
|
|
return nn, b.err
|
|
}
|
|
|
|
n := copy(b.buf[b.n:], p)
|
|
b.n += n
|
|
nn += n
|
|
return nn, nil
|
|
}
|
|
|
|
func (b *directIOWriter) flush() error {
|
|
if b.invalid {
|
|
return errWriterInvalid
|
|
}
|
|
if b.err != nil {
|
|
return b.err
|
|
}
|
|
if b.n == 0 {
|
|
return nil
|
|
}
|
|
|
|
// Ensure the segment length alignment restriction is met.
|
|
// If the buffer length isn't a multiple of offsetAlign, round
|
|
// it to the nearest upper multiple and add zero padding.
|
|
uOffset := b.n
|
|
if uOffset%b.alignmentRqmts.offsetAlign != 0 {
|
|
uOffset = ((uOffset / b.alignmentRqmts.offsetAlign) + 1) * b.alignmentRqmts.offsetAlign
|
|
for i := b.n; i < uOffset; i++ {
|
|
b.buf[i] = 0
|
|
}
|
|
}
|
|
n, err := b.directIOWrite(b.buf[:uOffset], uOffset-b.n)
|
|
if err != nil {
|
|
if n > 0 && n < b.n {
|
|
copy(b.buf[0:b.n-n], b.buf[n:b.n])
|
|
}
|
|
b.n -= n
|
|
b.err = errors.Join(b.err, err)
|
|
return err
|
|
}
|
|
|
|
b.n = 0
|
|
return nil
|
|
}
|
|
|
|
func (b *directIOWriter) Flush() error {
|
|
if b.offsetAlignmentGap != 0 {
|
|
b.fillInitialOffsetGap()
|
|
if b.err != nil {
|
|
return b.err
|
|
}
|
|
}
|
|
return b.flush()
|
|
}
|
|
|
|
func (b *directIOWriter) Reset(f *os.File) error {
|
|
alignmentRqmts, err := fileDirectIORqmts(f)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
b.alignmentRqmts = alignmentRqmts
|
|
|
|
if b.buf == nil {
|
|
b.buf = alignedBlock(defaultBufSize, b.alignmentRqmts)
|
|
}
|
|
gap, err := checkInitialUnalignedOffset(f, b.alignmentRqmts)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
b.offsetAlignmentGap = gap
|
|
b.err = nil
|
|
b.invalid = false
|
|
b.n = 0
|
|
b.f = f
|
|
return nil
|
|
}
|
|
|
|
// fileDirectIORqmts fetches alignment requirements via Statx, falling back to default
|
|
// values when unsupported.
|
|
func fileDirectIORqmts(f *os.File) (*directIORqmts, error) {
|
|
alignmentRqmts, err := fetchDirectIORqmtsFromStatx(f.Fd())
|
|
switch {
|
|
case errors.Is(err, errStatxNotSupported):
|
|
alignmentRqmts = defaultDirectIORqmts()
|
|
case err != nil:
|
|
return nil, err
|
|
}
|
|
|
|
if alignmentRqmts.memoryAlign == 0 || alignmentRqmts.offsetAlign == 0 {
|
|
// This may require some extra testing.
|
|
return nil, fmt.Errorf("zero alignment requirement is not supported %+v", alignmentRqmts)
|
|
}
|
|
return alignmentRqmts, nil
|
|
}
|
|
|
|
func alignmentOffset(block []byte, requiredAlignment int) int {
|
|
return computeAlignmentOffset(block, requiredAlignment)
|
|
}
|
|
|
|
func computeAlignmentOffset(block []byte, alignment int) int {
|
|
if alignment == 0 {
|
|
return 0
|
|
}
|
|
if len(block) == 0 {
|
|
panic("empty block not supported")
|
|
}
|
|
return int(uintptr(unsafe.Pointer(&block[0])) & uintptr(alignment-1))
|
|
}
|
|
|
|
// isAligned checks if the length of the block is a multiple of offsetAlign
|
|
// and if its address is aligned with memoryAlign.
|
|
func isAligned(block []byte, alignmentRqmts *directIORqmts) bool {
|
|
return alignmentOffset(block, alignmentRqmts.memoryAlign) == 0 && len(block)%alignmentRqmts.offsetAlign == 0
|
|
}
|
|
|
|
// alignedBlock returns a block whose address is alignment aligned.
|
|
// The size should be a multiple of offsetAlign.
|
|
func alignedBlock(size int, alignmentRqmts *directIORqmts) []byte {
|
|
if size == 0 || size%alignmentRqmts.offsetAlign != 0 {
|
|
panic(fmt.Errorf("size %d should be > 0 and a multiple of offsetAlign=%d", size, alignmentRqmts.offsetAlign))
|
|
}
|
|
if alignmentRqmts.memoryAlign == 0 {
|
|
return make([]byte, size)
|
|
}
|
|
|
|
block := make([]byte, size+alignmentRqmts.memoryAlign)
|
|
a := alignmentOffset(block, alignmentRqmts.memoryAlign)
|
|
if a == 0 {
|
|
return block[:size]
|
|
}
|
|
|
|
offset := alignmentRqmts.memoryAlign - a
|
|
block = block[offset : offset+size]
|
|
if !isAligned(block, alignmentRqmts) {
|
|
// Assuming this to be rare, if not impossible.
|
|
panic("cannot create an aligned block")
|
|
}
|
|
return block
|
|
}
|
|
|
|
func currentFileOffset(f *os.File) (int, error) {
|
|
curOff, err := f.Seek(0, io.SeekCurrent)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("cannot get the current offset: %w", err)
|
|
}
|
|
return int(curOff), nil
|
|
}
|
|
|
|
func fileStatusFlags(fd uintptr) (int, error) {
|
|
flag, err := unix.FcntlInt(fd, unix.F_GETFL, 0)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("cannot get file status flags: %w", err)
|
|
}
|
|
return flag, err
|
|
}
|
|
|
|
// enableDirectIO enables Direct IO on the file if needed.
|
|
func enableDirectIO(fd uintptr) error {
|
|
flag, err := fileStatusFlags(fd)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if (flag & unix.O_DIRECT) == unix.O_DIRECT {
|
|
return nil
|
|
}
|
|
|
|
_, err = unix.FcntlInt(fd, unix.F_SETFL, flag|unix.O_DIRECT)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot enable Direct IO: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// checkInitialUnalignedOffset returns the gap between the current offset of the file
|
|
// and the nearest aligned offset.
|
|
// If the current offset is aligned, Direct IO is enabled on the file.
|
|
func checkInitialUnalignedOffset(f *os.File, alignmentRqmts *directIORqmts) (int, error) {
|
|
offset, err := currentFileOffset(f)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
alignment := alignmentRqmts.offsetAlign
|
|
gap := (alignment - offset%alignment) % alignment
|
|
if gap == 0 {
|
|
if err := enableDirectIO(f.Fd()); err != nil {
|
|
return 0, err
|
|
}
|
|
}
|
|
return gap, nil
|
|
}
|
|
|
|
// directIORqmts holds the alignment requirements for direct I/O.
|
|
// All fields are in bytes.
|
|
type directIORqmts struct {
|
|
// The required alignment for memory buffers addresses.
|
|
memoryAlign int
|
|
// The required alignment for I/O segment lengths and file offsets.
|
|
offsetAlign int
|
|
}
|
|
|
|
func defaultDirectIORqmts() *directIORqmts {
|
|
return &directIORqmts{
|
|
memoryAlign: defaultAlignment,
|
|
offsetAlign: defaultAlignment,
|
|
}
|
|
}
|
|
|
|
// fetchDirectIORqmtsFromStatx tries to retrieve direct I/O alignment requirements for the
|
|
// file descriptor using statx.
|
|
func fetchDirectIORqmtsFromStatx(fd uintptr) (*directIORqmts, error) {
|
|
var stat unix.Statx_t
|
|
flags := unix.AT_SYMLINK_NOFOLLOW | unix.AT_EMPTY_PATH | unix.AT_STATX_DONT_SYNC
|
|
mask := unix.STATX_DIOALIGN
|
|
|
|
if err := unix.Statx(int(fd), "", flags, unix.STATX_DIOALIGN, &stat); err != nil {
|
|
if err == unix.ENOSYS {
|
|
return nil, errStatxNotSupported
|
|
}
|
|
return nil, fmt.Errorf("statx failed on fd %d: %w", fd, err)
|
|
}
|
|
|
|
if stat.Mask&uint32(mask) == 0 {
|
|
return nil, errStatxNotSupported
|
|
}
|
|
|
|
if stat.Dio_mem_align == 0 || stat.Dio_offset_align == 0 {
|
|
return nil, fmt.Errorf("%w: kernel may be old or the file may be on an unsupported FS", errDirectIOUnsupported)
|
|
}
|
|
|
|
return &directIORqmts{
|
|
memoryAlign: int(stat.Dio_mem_align),
|
|
offsetAlign: int(stat.Dio_offset_align),
|
|
}, nil
|
|
}
|
|
|