pull/2960/merge
Pranshu Srivastava 2 days ago committed by GitHub
commit bdfe9d11dd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 26
      collector/filesystem_bsd.go
  2. 3
      collector/filesystem_common.go
  3. 43
      collector/filesystem_freebsd.go
  4. 102
      collector/filesystem_linux.go
  5. 49
      collector/filesystem_openbsd.go

@ -17,6 +17,7 @@ package collector
import (
"errors"
"time"
"unsafe"
)
@ -34,17 +35,28 @@ const (
readOnly = 0x1 // MNT_RDONLY
)
// Expose filesystem fullness.
// GetStats exposes filesystem fullness.
func (c *filesystemCollector) GetStats() (stats []filesystemStats, err error) {
var mntbuf *C.struct_statfs
count := C.getmntinfo(&mntbuf, C.MNT_NOWAIT)
if count == 0 {
return nil, errors.New("getmntinfo() failed")
// `getmntinfo` relies on `getfsstat` in some variants, and is blocking in general.
count := 0
countCh := make(chan int, 1)
var mountBuf *C.struct_statfs
go func(mountBuf **C.struct_statfs) {
countCh <- int(C.getmntinfo(mountBuf, C.MNT_WAIT))
close(countCh)
}(&mountBuf)
select {
case count = <-countCh:
if count <= 0 {
return nil, errors.New("getmntinfo failed")
}
case <-time.After(*mountTimeout):
return nil, errors.New("getmntinfo timed out")
}
mnt := (*[1 << 20]C.struct_statfs)(unsafe.Pointer(mntbuf))
mnt := (*[1 << 20]C.struct_statfs)(unsafe.Pointer(mountBuf))
stats = []filesystemStats{}
for i := 0; i < int(count); i++ {
for i := 0; i < count; i++ {
mountpoint := C.GoString(&mnt[i].f_mntonname[0])
if c.mountPointFilter.ignored(mountpoint) {
c.logger.Debug("Ignoring mount point", "mountpoint", mountpoint)

@ -47,6 +47,9 @@ var (
"collector.filesystem.mount-points-include",
"Regexp of mount points to include for filesystem collector. (mutually exclusive to mount-points-exclude)",
).String()
mountTimeout = kingpin.Flag("collector.filesystem.mount-timeout",
"how long to wait for a mount to respond before marking it as stale").
Hidden().Default("5s").Duration()
fsTypesExcludeSet bool
fsTypesExclude = kingpin.Flag(

@ -16,6 +16,9 @@
package collector
import (
"errors"
"time"
"golang.org/x/sys/unix"
)
@ -26,16 +29,42 @@ const (
// Expose filesystem fullness.
func (c *filesystemCollector) GetStats() ([]filesystemStats, error) {
n, err := unix.Getfsstat(nil, unix.MNT_NOWAIT)
if err != nil {
var mountPointCount int
nChan := make(chan int, 1)
errChan := make(chan error, 1)
go func() {
var err error
var n int
n, err = unix.Getfsstat(nil, unix.MNT_WAIT)
if err != nil {
errChan <- err
return
}
nChan <- n
}()
select {
case mountPointCount = <-nChan:
case err := <-errChan:
return nil, err
case <-time.After(*mountTimeout):
return nil, errors.New("getfsstat timed out")
}
buf := make([]unix.Statfs_t, n)
_, err = unix.Getfsstat(buf, unix.MNT_NOWAIT)
if err != nil {
return nil, err
buf := make([]unix.Statfs_t, mountPointCount)
go func(buf []unix.Statfs_t) {
_, err := unix.Getfsstat(buf, unix.MNT_WAIT)
errChan <- err
}(buf)
select {
case err := <-errChan:
if err != nil {
return nil, err
}
case <-time.After(*mountTimeout):
return nil, errors.New("getfsstat timed out")
}
stats := []filesystemStats{}
var stats []filesystemStats
for _, fs := range buf {
mountpoint := unix.ByteSliceToString(fs.Mntonname[:])
if c.mountPointFilter.ignored(mountpoint) {

@ -38,14 +38,11 @@ const (
defFSTypesExcluded = "^(autofs|binfmt_misc|bpf|cgroup2?|configfs|debugfs|devpts|devtmpfs|fusectl|hugetlbfs|iso9660|mqueue|nsfs|overlay|proc|procfs|pstore|rpc_pipefs|securityfs|selinuxfs|squashfs|erofs|sysfs|tracefs)$"
)
var mountTimeout = kingpin.Flag("collector.filesystem.mount-timeout",
"how long to wait for a mount to respond before marking it as stale").
Hidden().Default("5s").Duration()
var statWorkerCount = kingpin.Flag("collector.filesystem.stat-workers",
"how many stat calls to process simultaneously").
Hidden().Default("4").Int()
var stuckMounts = make(map[string]struct{})
var stuckMountsMtx = &sync.Mutex{}
var stuckMountsMap = make(map[string]struct{})
var stuckMountsMutex = &sync.Mutex{}
// GetStats returns filesystem stats.
func (c *filesystemCollector) GetStats() ([]filesystemStats, error) {
@ -53,14 +50,14 @@ func (c *filesystemCollector) GetStats() ([]filesystemStats, error) {
if err != nil {
return nil, err
}
stats := []filesystemStats{}
var stats []filesystemStats
labelChan := make(chan filesystemLabels)
statChan := make(chan filesystemStats)
wg := sync.WaitGroup{}
workerCount := max(*statWorkerCount, 1)
for i := 0; i < workerCount; i++ {
for range workerCount {
wg.Add(1)
go func() {
defer wg.Done()
@ -80,20 +77,6 @@ func (c *filesystemCollector) GetStats() ([]filesystemStats, error) {
c.logger.Debug("Ignoring fs type", "type", labels.fsType)
continue
}
stuckMountsMtx.Lock()
if _, ok := stuckMounts[labels.mountPoint]; ok {
labels.deviceError = "mountpoint timeout"
stats = append(stats, filesystemStats{
labels: labels,
deviceError: 1,
})
c.logger.Debug("Mount point is in an unresponsive state", "mountpoint", labels.mountPoint)
stuckMountsMtx.Unlock()
continue
}
stuckMountsMtx.Unlock()
labelChan <- labels
}
close(labelChan)
@ -101,8 +84,8 @@ func (c *filesystemCollector) GetStats() ([]filesystemStats, error) {
close(statChan)
}()
for stat := range statChan {
stats = append(stats, stat)
for fsStat := range statChan {
stats = append(stats, fsStat)
}
return stats, nil
}
@ -113,20 +96,31 @@ func (c *filesystemCollector) processStat(labels filesystemLabels) filesystemSta
ro = 1
}
success := make(chan struct{})
go stuckMountWatcher(labels.mountPoint, success, c.logger)
// If the mount point is stuck, mark it as such and return early.
// This is done to avoid blocking the stat call indefinitely.
// NOTE: For instance, this can happen when an NFS mount is unreachable.
buf := new(unix.Statfs_t)
err := unix.Statfs(rootfsFilePath(labels.mountPoint), buf)
stuckMountsMtx.Lock()
close(success)
// If the mount has been marked as stuck, unmark it and log it's recovery.
if _, ok := stuckMounts[labels.mountPoint]; ok {
c.logger.Debug("Mount point has recovered, monitoring will resume", "mountpoint", labels.mountPoint)
delete(stuckMounts, labels.mountPoint)
statFsErrChan := make(chan error, 1)
go func(buf *unix.Statfs_t) {
statFsErrChan <- unix.Statfs(rootfsFilePath(labels.mountPoint), buf)
close(statFsErrChan)
}(buf)
select {
case err := <-statFsErrChan:
if err != nil {
c.logger.Debug("Error on statfs() system call", "rootfs", rootfsFilePath(labels.mountPoint), "err", err)
labels.deviceError = err.Error()
}
case <-time.After(*mountTimeout):
stuckMountsMutex.Lock()
if _, ok := stuckMountsMap[labels.mountPoint]; ok {
c.logger.Debug("Mount point timed out, it is being labeled as stuck and will not be monitored", "mountpoint", labels.mountPoint)
stuckMountsMap[labels.mountPoint] = struct{}{}
labels.deviceError = "mountpoint timeout"
}
stuckMountsMutex.Unlock()
}
stuckMountsMtx.Unlock()
// Remove options from labels because options will not be used from this point forward
// and keeping them can lead to errors when the same device is mounted to the same mountpoint
@ -134,9 +128,18 @@ func (c *filesystemCollector) processStat(labels filesystemLabels) filesystemSta
labels.mountOptions = ""
labels.superOptions = ""
if err != nil {
labels.deviceError = err.Error()
c.logger.Debug("Error on statfs() system call", "rootfs", rootfsFilePath(labels.mountPoint), "err", err)
// Check if the mount point has recovered and remove it from the stuck map.
if _, isOpen := <-statFsErrChan; !isOpen {
stuckMountsMutex.Lock()
if _, ok := stuckMountsMap[labels.mountPoint]; ok {
c.logger.Debug("Mount point has recovered, monitoring will resume", "mountpoint", labels.mountPoint)
delete(stuckMountsMap, labels.mountPoint)
}
stuckMountsMutex.Unlock()
}
// If the mount point is stuck or statfs errored, mark it as such and return.
if labels.deviceError != "" {
return filesystemStats{
labels: labels,
deviceError: 1,
@ -155,29 +158,6 @@ func (c *filesystemCollector) processStat(labels filesystemLabels) filesystemSta
}
}
// stuckMountWatcher listens on the given success channel and if the channel closes
// then the watcher does nothing. If instead the timeout is reached, the
// mount point that is being watched is marked as stuck.
func stuckMountWatcher(mountPoint string, success chan struct{}, logger *slog.Logger) {
mountCheckTimer := time.NewTimer(*mountTimeout)
defer mountCheckTimer.Stop()
select {
case <-success:
// Success
case <-mountCheckTimer.C:
// Timed out, mark mount as stuck
stuckMountsMtx.Lock()
select {
case <-success:
// Success came in just after the timeout was reached, don't label the mount as stuck
default:
logger.Debug("Mount point timed out, it is being labeled as stuck and will not be monitored", "mountpoint", mountPoint)
stuckMounts[mountPoint] = struct{}{}
}
stuckMountsMtx.Unlock()
}
}
func mountPointDetails(logger *slog.Logger) ([]filesystemLabels, error) {
fs, err := procfs.NewFS(*procPath)
if err != nil {

@ -16,6 +16,9 @@
package collector
import (
"errors"
"time"
"golang.org/x/sys/unix"
)
@ -24,21 +27,45 @@ const (
defFSTypesExcluded = "^devfs$"
)
// Expose filesystem fullness.
func (c *filesystemCollector) GetStats() (stats []filesystemStats, err error) {
var mnt []unix.Statfs_t
size, err := unix.Getfsstat(mnt, unix.MNT_NOWAIT)
if err != nil {
return nil, err
// GetStats exposes filesystem fullness.
func (c *filesystemCollector) GetStats() (stats []filesystemStats, fsstatErr error) {
var mountPointCount int
nChan := make(chan int, 1)
errChan := make(chan error, 1)
go func() {
var statErr error
var n int
n, statErr = unix.Getfsstat(nil, unix.MNT_WAIT)
if statErr != nil {
errChan <- statErr
return
}
nChan <- n
}()
select {
case mountPointCount = <-nChan:
case statErr := <-errChan:
return nil, statErr
case <-time.After(*mountTimeout):
return nil, errors.New("getfsstat timed out")
}
mnt = make([]unix.Statfs_t, size)
_, err = unix.Getfsstat(mnt, unix.MNT_NOWAIT)
if err != nil {
return nil, err
buf := make([]unix.Statfs_t, mountPointCount)
go func(buf []unix.Statfs_t) {
_, fsstatErr = unix.Getfsstat(buf, unix.MNT_WAIT)
errChan <- fsstatErr
}(buf)
select {
case err := <-errChan:
if err != nil {
return nil, err
}
case <-time.After(*mountTimeout):
return nil, errors.New("getfsstat timed out")
}
stats = []filesystemStats{}
for _, v := range mnt {
for _, v := range buf {
mountpoint := unix.ByteSliceToString(v.F_mntonname[:])
if c.mountPointFilter.ignored(mountpoint) {
c.logger.Debug("Ignoring mount point", "mountpoint", mountpoint)

Loading…
Cancel
Save