@ -413,9 +413,10 @@ type QueueManager struct {
clientMtx sync . RWMutex
storeClient WriteClient
seriesMtx sync . Mutex // Covers seriesLabels and droppedSeries .
seriesMtx sync . Mutex // Covers seriesLabels, droppedSeries and builder .
seriesLabels map [ chunks . HeadSeriesRef ] labels . Labels
droppedSeries map [ chunks . HeadSeriesRef ] struct { }
builder * labels . Builder
seriesSegmentMtx sync . Mutex // Covers seriesSegmentIndexes - if you also lock seriesMtx, take seriesMtx first.
seriesSegmentIndexes map [ chunks . HeadSeriesRef ] int
@ -482,6 +483,7 @@ func NewQueueManager(
seriesLabels : make ( map [ chunks . HeadSeriesRef ] labels . Labels ) ,
seriesSegmentIndexes : make ( map [ chunks . HeadSeriesRef ] int ) ,
droppedSeries : make ( map [ chunks . HeadSeriesRef ] struct { } ) ,
builder : labels . NewBuilder ( labels . EmptyLabels ( ) ) ,
numShards : cfg . MinShards ,
reshardChan : make ( chan int ) ,
@ -897,12 +899,14 @@ func (t *QueueManager) StoreSeries(series []record.RefSeries, index int) {
// Just make sure all the Refs of Series will insert into seriesSegmentIndexes map for tracking.
t . seriesSegmentIndexes [ s . Ref ] = index
ls := processExternalLabels ( s . Labels , t . externalLabels )
lbls , keep := relabel . Process ( ls , t . relabelConfigs ... )
if ! keep || lbls . IsEmpty ( ) {
t . builder . Reset ( s . Labels )
processExternalLabels ( t . builder , t . externalLabels )
keep := relabel . ProcessBuilder ( t . builder , t . relabelConfigs ... )
if ! keep {
t . droppedSeries [ s . Ref ] = struct { } { }
continue
}
lbls := t . builder . Labels ( )
t . internLabels ( lbls )
// We should not ever be replacing a series labels in the map, but just
@ -967,30 +971,14 @@ func (t *QueueManager) releaseLabels(ls labels.Labels) {
ls . ReleaseStrings ( t . interner . release )
}
// processExternalLabels merges externalLabels into ls. If ls contains
// a label in externalLabels, the value in ls wins.
func processExternalLabels ( ls labels . Labels , externalLabels [ ] labels . Label ) labels . Labels {
if len ( externalLabels ) == 0 {
return ls
}
b := labels . NewScratchBuilder ( ls . Len ( ) + len ( externalLabels ) )
j := 0
ls . Range ( func ( l labels . Label ) {
for j < len ( externalLabels ) && l . Name > externalLabels [ j ] . Name {
b . Add ( externalLabels [ j ] . Name , externalLabels [ j ] . Value )
j ++
}
if j < len ( externalLabels ) && l . Name == externalLabels [ j ] . Name {
j ++
// processExternalLabels merges externalLabels into b. If b contains
// a label in externalLabels, the value in b wins.
func processExternalLabels ( b * labels . Builder , externalLabels [ ] labels . Label ) {
for _ , el := range externalLabels {
if b . Get ( el . Name ) == "" {
b . Set ( el . Name , el . Value )
}
b . Add ( l . Name , l . Value )
} )
for ; j < len ( externalLabels ) ; j ++ {
b . Add ( externalLabels [ j ] . Name , externalLabels [ j ] . Value )
}
return b . Labels ( )
}
func ( t * QueueManager ) updateShardsLoop ( ) {