|
|
|
|
@ -349,7 +349,26 @@ func (p *persistence) recoverFromCrash(fingerprintToSeries map[clientmodel.Finge |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// TODO: Document.
|
|
|
|
|
// sanitizeSeries sanitizes a series based on its series file as defined by the provided directory and FileInfo.
|
|
|
|
|
// The method returns the fingerprint as derived from the directory and file name, and whether the provided
|
|
|
|
|
// file has been sanitized. A file that failed to be sanitized is deleted, if possible.
|
|
|
|
|
//
|
|
|
|
|
// The following steps are performed:
|
|
|
|
|
//
|
|
|
|
|
// - A file whose name doesn't comply with the naming scheme of a series file is simply deleted.
|
|
|
|
|
//
|
|
|
|
|
// - If the size of the series file isn't a multiple of the chunk size, extraneous bytes are truncated.
|
|
|
|
|
// If the truncation fails, the file is deleted instead.
|
|
|
|
|
//
|
|
|
|
|
// - A file that is empty (after truncation) is deleted.
|
|
|
|
|
//
|
|
|
|
|
// - A series that is not archived (i.e. it is in the fingerprintToSeries map) is checked for consistency of
|
|
|
|
|
// its various parameters (like head-chunk persistence state, offset of chunkDescs etc.). In particular,
|
|
|
|
|
// overlap between an in-memory head chunk with the most recent persisted chunk is checked. Inconsistencies
|
|
|
|
|
// are rectified.
|
|
|
|
|
//
|
|
|
|
|
// - A series this in archived (i.e. it is not in the fingerprintToSeries map) is checked for its presence
|
|
|
|
|
// in the index of archived series. If it cannot be found there, it is deleted.
|
|
|
|
|
func (p *persistence) sanitizeSeries(dirname string, fi os.FileInfo, fingerprintToSeries map[clientmodel.Fingerprint]*memorySeries) (clientmodel.Fingerprint, bool) { |
|
|
|
|
filename := path.Join(dirname, fi.Name()) |
|
|
|
|
purge := func() { |
|
|
|
|
@ -364,7 +383,11 @@ func (p *persistence) sanitizeSeries(dirname string, fi os.FileInfo, fingerprint |
|
|
|
|
purge() |
|
|
|
|
return fp, false |
|
|
|
|
} |
|
|
|
|
fp.LoadFromString(path.Base(dirname) + fi.Name()[:fpLen-seriesDirNameLen]) // TODO: Panics if that doesn't parse as hex.
|
|
|
|
|
if err := fp.LoadFromString(path.Base(dirname) + fi.Name()[:fpLen-seriesDirNameLen]); err != nil { |
|
|
|
|
glog.Warningf("Error parsing file name %s: %s", filename, err) |
|
|
|
|
purge() |
|
|
|
|
return fp, false |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
bytesToTrim := fi.Size() % int64(p.chunkLen+chunkHeaderLen) |
|
|
|
|
chunksInFile := int(fi.Size()) / (p.chunkLen + chunkHeaderLen) |
|
|
|
|
@ -676,9 +699,6 @@ func (p *persistence) persistChunk(fp clientmodel.Fingerprint, c chunk) (int, er |
|
|
|
|
// each index in indexes. It is the caller's responsibility to not persist or
|
|
|
|
|
// drop anything for the same fingerprint concurrently.
|
|
|
|
|
func (p *persistence) loadChunks(fp clientmodel.Fingerprint, indexes []int, indexOffset int) ([]chunk, error) { |
|
|
|
|
// TODO: we need to verify at some point that file length is a multiple of
|
|
|
|
|
// the chunk size. When is the best time to do this, and where to remember
|
|
|
|
|
// it? Right now, we only do it when loading chunkDescs.
|
|
|
|
|
f, err := p.openChunkFileForReading(fp) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
@ -731,15 +751,11 @@ func (p *persistence) loadChunkDescs(fp clientmodel.Fingerprint, beforeTime clie |
|
|
|
|
} |
|
|
|
|
totalChunkLen := chunkHeaderLen + p.chunkLen |
|
|
|
|
if fi.Size()%int64(totalChunkLen) != 0 { |
|
|
|
|
// TODO: record number of encountered corrupt series files in a metric?
|
|
|
|
|
|
|
|
|
|
// Truncate the file size to the nearest multiple of chunkLen.
|
|
|
|
|
truncateTo := fi.Size() - fi.Size()%int64(totalChunkLen) |
|
|
|
|
glog.Infof("Bad series file size for %s: %d bytes (no multiple of %d). Truncating to %d bytes.", fp, fi.Size(), totalChunkLen, truncateTo) |
|
|
|
|
// TODO: this doesn't work, as this is a read-only file handle.
|
|
|
|
|
if err := f.Truncate(truncateTo); err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
p.setDirty(true) |
|
|
|
|
return nil, fmt.Errorf( |
|
|
|
|
"size of series file for fingerprint %v is %d, which is not a multiple of the chunk length %d", |
|
|
|
|
fp, fi.Size(), totalChunkLen, |
|
|
|
|
) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
numChunks := int(fi.Size()) / totalChunkLen |
|
|
|
|
@ -766,13 +782,49 @@ func (p *persistence) loadChunkDescs(fp clientmodel.Fingerprint, beforeTime clie |
|
|
|
|
cds = append(cds, cd) |
|
|
|
|
} |
|
|
|
|
chunkDescOps.WithLabelValues(load).Add(float64(len(cds))) |
|
|
|
|
atomic.AddInt64(&numMemChunkDescs, int64(len(cds))) |
|
|
|
|
numMemChunkDescs.Add(float64(len(cds))) |
|
|
|
|
return cds, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// checkpointSeriesMapAndHeads persists the fingerprint to memory-series mapping
|
|
|
|
|
// and all open (non-full) head chunks. Do not call concurrently with
|
|
|
|
|
// loadSeriesMapAndHeads.
|
|
|
|
|
//
|
|
|
|
|
// Description of the file format:
|
|
|
|
|
//
|
|
|
|
|
// (1) Magic string (const headsMagicString).
|
|
|
|
|
//
|
|
|
|
|
// (2) Varint-encoded format version (const headsFormatVersion).
|
|
|
|
|
//
|
|
|
|
|
// (3) Number of series in checkpoint as big-endian uint64.
|
|
|
|
|
//
|
|
|
|
|
// (4) Repeated once per series:
|
|
|
|
|
//
|
|
|
|
|
// (4.1) A flag byte, see flag constants above.
|
|
|
|
|
//
|
|
|
|
|
// (4.2) The fingerprint as big-endian uint64.
|
|
|
|
|
//
|
|
|
|
|
// (4.3) The metric as defined by codable.Metric.
|
|
|
|
|
//
|
|
|
|
|
// (4.4) The varint-encoded chunkDescsOffset.
|
|
|
|
|
//
|
|
|
|
|
// (4.5) The varint-encoded savedFirstTime.
|
|
|
|
|
//
|
|
|
|
|
// (4.6) The varint-encoded number of chunk descriptors.
|
|
|
|
|
//
|
|
|
|
|
// (4.7) Repeated once per chunk descriptor, oldest to most recent:
|
|
|
|
|
//
|
|
|
|
|
// (4.7.1) The varint-encoded first time.
|
|
|
|
|
//
|
|
|
|
|
// (4.7.2) The varint-encoded last time.
|
|
|
|
|
//
|
|
|
|
|
// (4.8) Exception to 4.7: If the most recent chunk is a non-persisted head chunk,
|
|
|
|
|
// the following is persisted instead of the most recent chunk descriptor:
|
|
|
|
|
//
|
|
|
|
|
// (4.8.1) A byte defining the chunk type.
|
|
|
|
|
//
|
|
|
|
|
// (4.8.2) The head chunk itself, marshaled with the marshal() method.
|
|
|
|
|
//
|
|
|
|
|
func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap, fpLocker *fingerprintLocker) (err error) { |
|
|
|
|
glog.Info("Checkpointing in-memory metrics and head chunks...") |
|
|
|
|
begin := time.Now() |
|
|
|
|
@ -916,7 +968,7 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, err error) { |
|
|
|
|
} |
|
|
|
|
if err == nil { |
|
|
|
|
atomic.AddInt64(&numMemChunks, chunksTotal) |
|
|
|
|
atomic.AddInt64(&numMemChunkDescs, chunkDescsTotal) |
|
|
|
|
numMemChunkDescs.Add(float64(chunkDescsTotal)) |
|
|
|
|
} |
|
|
|
|
}() |
|
|
|
|
|
|
|
|
|
@ -1328,7 +1380,18 @@ func (p *persistence) openChunkFileForWriting(fp clientmodel.Fingerprint) (*os.F |
|
|
|
|
if err := os.MkdirAll(p.dirNameForFingerprint(fp), 0700); err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
return os.OpenFile(p.fileNameForFingerprint(fp), os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0640) |
|
|
|
|
f, err := os.OpenFile(p.fileNameForFingerprint(fp), os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0640) |
|
|
|
|
if err != nil { |
|
|
|
|
return f, err |
|
|
|
|
} |
|
|
|
|
offset, err := f.Seek(0, os.SEEK_CUR) |
|
|
|
|
if offset%int64(chunkHeaderLen+p.chunkLen) != 0 { |
|
|
|
|
return f, fmt.Errorf( |
|
|
|
|
"size of series file for fingerprint %v is %d, which is not a multiple of the chunk length %d", |
|
|
|
|
fp, offset, chunkHeaderLen+p.chunkLen, |
|
|
|
|
) |
|
|
|
|
} |
|
|
|
|
return f, err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (p *persistence) openChunkFileForReading(fp clientmodel.Fingerprint) (*os.File, error) { |
|
|
|
|
|