|
|
|
@ -11,6 +11,7 @@ import ( |
|
|
|
|
"fmt" |
|
|
|
|
|
|
|
|
|
"github.com/grafana/dskit/flagext" |
|
|
|
|
lru "github.com/hashicorp/golang-lru/v2" |
|
|
|
|
"github.com/prometheus/prometheus/model/labels" |
|
|
|
|
"github.com/thanos-io/objstore" |
|
|
|
|
|
|
|
|
@ -88,6 +89,8 @@ type Builder struct { |
|
|
|
|
bucket objstore.Bucket |
|
|
|
|
tenantID string |
|
|
|
|
|
|
|
|
|
labelCache *lru.Cache[string, labels.Labels] |
|
|
|
|
|
|
|
|
|
dirty bool // Whether the builder has been modified since the last flush.
|
|
|
|
|
|
|
|
|
|
streams *streams.Streams |
|
|
|
@ -103,11 +106,18 @@ func NewBuilder(cfg BuilderConfig, bucket objstore.Bucket, tenantID string) (*Bu |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
labelCache, err := lru.New[string, labels.Labels](5000) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, fmt.Errorf("failed to create LRU cache: %w", err) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return &Builder{ |
|
|
|
|
cfg: cfg, |
|
|
|
|
bucket: bucket, |
|
|
|
|
tenantID: tenantID, |
|
|
|
|
|
|
|
|
|
labelCache: labelCache, |
|
|
|
|
|
|
|
|
|
streams: streams.New(int(cfg.TargetPageSize)), |
|
|
|
|
logs: logs.New(int(cfg.TargetPageSize)), |
|
|
|
|
}, nil |
|
|
|
@ -120,7 +130,7 @@ func NewBuilder(cfg BuilderConfig, bucket objstore.Bucket, tenantID string) (*Bu |
|
|
|
|
// Once a Builder is full, call [Builder.Flush] to flush the buffered data,
|
|
|
|
|
// then call Append again with the same entry.
|
|
|
|
|
func (b *Builder) Append(stream logproto.Stream) error { |
|
|
|
|
ls, err := syntax.ParseLabels(stream.Labels) |
|
|
|
|
ls, err := b.parseLabels(stream.Labels) |
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
@ -146,6 +156,20 @@ func (b *Builder) Append(stream logproto.Stream) error { |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (b *Builder) parseLabels(labelString string) (labels.Labels, error) { |
|
|
|
|
labels, ok := b.labelCache.Get(labelString) |
|
|
|
|
if ok { |
|
|
|
|
return labels, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
labels, err := syntax.ParseLabels(labelString) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, fmt.Errorf("failed to parse labels: %w", err) |
|
|
|
|
} |
|
|
|
|
b.labelCache.Add(labelString, labels) |
|
|
|
|
return labels, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (b *Builder) estimatedSize() int { |
|
|
|
|
var size int |
|
|
|
|
size += b.streams.EstimatedSize() |
|
|
|
|