feat: move metric aggregation to a per-tenant config (#14709)

pull/14733/head
Trevor Whitney 2 years ago committed by GitHub
parent 2f41584c4c
commit c1fde26730
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 11
      docs/sources/shared/configuration.md
  2. 1
      pkg/loki/modules.go
  3. 12
      pkg/pattern/aggregation/config.go
  4. 42
      pkg/pattern/ingester.go
  5. 5
      pkg/pattern/ingester_test.go
  6. 12
      pkg/pattern/tee_service.go
  7. 3
      pkg/pattern/tee_service_test.go
  8. 16
      pkg/validation/limits.go
  9. 33
      pkg/validation/limits_test.go

@ -359,10 +359,6 @@ pattern_ingester:
# Configures the metric aggregation and storage behavior of the pattern
# ingester.
metric_aggregation:
# Whether the pattern ingester metric aggregation is enabled.
# CLI flag: -pattern-ingester.metric-aggregation.enabled
[enabled: <boolean> | default = false]
# How often to downsample metrics from raw push observations.
# CLI flag: -pattern-ingester.metric-aggregation.downsample-period
[downsample_period: <duration> | default = 10s]
@ -3845,6 +3841,13 @@ otlp_config:
# CLI flag: -limits.ingestion-partition-tenant-shard-size
[ingestion_partitions_tenant_shard_size: <int> | default = 0]
# Enable metric aggregation. When enabled, pushed streams will be sampled for
# bytes and count, and these metric will be written back into Loki as a special
# __aggregated_metric__ stream, which can be queried for faster histogram
# queries.
# CLI flag: -limits.metric-aggregation-enabled
[metric_aggregation_enabled: <boolean> | default = false]
# S3 server-side encryption type. Required to enable server-side encryption
# overrides for a specific tenant. If not set, the default S3 client settings
# are used.

@ -680,6 +680,7 @@ func (t *Loki) initPatternIngesterTee() (services.Service, error) {
svc, err := pattern.NewTeeService(
t.Cfg.Pattern,
t.Overrides,
t.PatternRingClient,
t.Cfg.MetricsNamespace,
prometheus.DefaultRegisterer,

@ -9,8 +9,6 @@ import (
)
type Config struct {
// TODO(twhitney): This needs to be a per-tenant config
Enabled bool `yaml:"enabled,omitempty" doc:"description=Whether the pattern ingester metric aggregation is enabled."`
DownsamplePeriod time.Duration `yaml:"downsample_period"`
LokiAddr string `yaml:"loki_address,omitempty" doc:"description=The address of the Loki instance to push aggregated metrics to."`
WriteTimeout time.Duration `yaml:"timeout,omitempty" doc:"description=The timeout for writing to Loki."`
@ -27,12 +25,6 @@ func (cfg *Config) RegisterFlags(fs *flag.FlagSet) {
}
func (cfg *Config) RegisterFlagsWithPrefix(fs *flag.FlagSet, prefix string) {
fs.BoolVar(
&cfg.Enabled,
prefix+"metric-aggregation.enabled",
false,
"Flag to enable or disable metric aggregation.",
)
fs.DurationVar(
&cfg.DownsamplePeriod,
prefix+"metric-aggregation.downsample-period",
@ -105,3 +97,7 @@ func (s *secretValue) Set(val string) error {
func (s *secretValue) Get() any { return string(*s) }
func (s *secretValue) String() string { return string(*s) }
type Limits interface {
MetricAggregationEnabled(userID string) bool
}

@ -150,6 +150,7 @@ func (cfg *Config) Validate() error {
type Limits interface {
drain.Limits
aggregation.Limits
}
type Ingester struct {
@ -294,29 +295,18 @@ func (i *Ingester) loop() {
flushTicker := util.NewTickerWithJitter(i.cfg.FlushCheckPeriod, j)
defer flushTicker.Stop()
if i.cfg.MetricAggregation.Enabled {
downsampleTicker := time.NewTimer(i.cfg.MetricAggregation.DownsamplePeriod)
defer downsampleTicker.Stop()
for {
select {
case <-flushTicker.C:
i.sweepUsers(false, true)
case t := <-downsampleTicker.C:
downsampleTicker.Reset(i.cfg.MetricAggregation.DownsamplePeriod)
now := model.TimeFromUnixNano(t.UnixNano())
i.downsampleMetrics(now)
case <-i.loopQuit:
return
}
}
} else {
for {
select {
case <-flushTicker.C:
i.sweepUsers(false, true)
case <-i.loopQuit:
return
}
downsampleTicker := time.NewTimer(i.cfg.MetricAggregation.DownsamplePeriod)
defer downsampleTicker.Stop()
for {
select {
case <-flushTicker.C:
i.sweepUsers(false, true)
case t := <-downsampleTicker.C:
downsampleTicker.Reset(i.cfg.MetricAggregation.DownsamplePeriod)
now := model.TimeFromUnixNano(t.UnixNano())
i.downsampleMetrics(now)
case <-i.loopQuit:
return
}
}
}
@ -401,7 +391,7 @@ func (i *Ingester) GetOrCreateInstance(instanceID string) (*instance, error) { /
var writer aggregation.EntryWriter
aggCfg := i.cfg.MetricAggregation
if aggCfg.Enabled {
if i.limits.MetricAggregationEnabled(instanceID) {
writer, err = aggregation.NewPush(
aggCfg.LokiAddr,
instanceID,
@ -469,6 +459,8 @@ func (i *Ingester) downsampleMetrics(ts model.Time) {
instances := i.getInstances()
for _, instance := range instances {
instance.Downsample(ts)
if i.limits.MetricAggregationEnabled(instance.instanceID) {
instance.Downsample(ts)
}
}
}

@ -341,8 +341,13 @@ func (m *mockEntryWriter) Stop() {
type fakeLimits struct {
Limits
metricAggregationEnabled bool
}
func (f *fakeLimits) PatternIngesterTokenizableJSONFields(_ string) []string {
return []string{"log", "message", "msg", "msg_", "_msg", "content"}
}
func (f *fakeLimits) MetricAggregationEnabled(_ string) bool {
return f.metricAggregationEnabled
}

@ -27,6 +27,7 @@ import (
type TeeService struct {
cfg Config
limits Limits
logger log.Logger
ringClient RingClient
wg *sync.WaitGroup
@ -48,6 +49,7 @@ type TeeService struct {
func NewTeeService(
cfg Config,
limits Limits,
ringClient RingClient,
metricsNamespace string,
registerer prometheus.Registerer,
@ -83,6 +85,7 @@ func NewTeeService(
),
),
cfg: cfg,
limits: limits,
ringClient: ringClient,
wg: &sync.WaitGroup{},
@ -317,14 +320,15 @@ func (ts *TeeService) sendBatch(ctx context.Context, clientRequest clientRequest
ts.ingesterAppends.WithLabelValues(clientRequest.ingesterAddr, "fail").Inc()
level.Error(ts.logger).Log("msg", "failed to send patterns to pattern ingester", "err", err)
if !ts.cfg.MetricAggregation.Enabled {
return err
}
// Pattern ingesters serve 2 functions, processing patterns and aggregating metrics.
// Only owned streams are processed for patterns, however any pattern ingester can
// aggregate metrics for any stream. Therefore, if we can't send the owned stream,
// try to forward request to any pattern ingester so we at least capture the metrics.
if !ts.limits.MetricAggregationEnabled(clientRequest.tenant) {
return err
}
replicationSet, err := ts.ringClient.Ring().
GetReplicationSetForOperation(ring.WriteNoExtend)
if err != nil || len(replicationSet.Instances) == 0 {

@ -47,6 +47,9 @@ func getTestTee(t *testing.T) (*TeeService, *mockPoolClient) {
logsTee, err := NewTeeService(
cfg,
&fakeLimits{
metricAggregationEnabled: true,
},
ringClient,
"test",
nil,

@ -229,8 +229,9 @@ type Limits struct {
IngestionPartitionsTenantShardSize int `yaml:"ingestion_partitions_tenant_shard_size" json:"ingestion_partitions_tenant_shard_size" category:"experimental"`
PatternIngesterTokenizableJSONFieldsDefault dskit_flagext.StringSliceCSV `yaml:"pattern_ingester_tokenizable_json_fields_default" json:"pattern_ingester_tokenizable_json_fields_default" doc:"hidden"`
PatternIngesterTokenizableJSONFieldsAppend dskit_flagext.StringSliceCSV `yaml:"pattern_ingester_tokenizable_json_fields_append" json:"pattern_ingester_tokenizable_json_fields_append" doc:"hidden"`
PatternIngesterTokenizableJSONFieldsDelete dskit_flagext.StringSliceCSV `yaml:"pattern_ingester_tokenizable_json_fields_delete" json:"pattern_ingester_tokenizable_json_fields_delete" doc:"hidden"`
PatternIngesterTokenizableJSONFieldsAppend dskit_flagext.StringSliceCSV `yaml:"pattern_ingester_tokenizable_json_fields_append" json:"pattern_ingester_tokenizable_json_fields_append" doc:"hidden"`
PatternIngesterTokenizableJSONFieldsDelete dskit_flagext.StringSliceCSV `yaml:"pattern_ingester_tokenizable_json_fields_delete" json:"pattern_ingester_tokenizable_json_fields_delete" doc:"hidden"`
MetricAggregationEnabled bool `yaml:"metric_aggregation_enabled" json:"metric_aggregation_enabled"`
// This config doesn't have a CLI flag registered here because they're registered in
// their own original config struct.
@ -438,6 +439,13 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.Var(&l.PatternIngesterTokenizableJSONFieldsDefault, "limits.pattern-ingester-tokenizable-json-fields", "List of JSON fields that should be tokenized in the pattern ingester.")
f.Var(&l.PatternIngesterTokenizableJSONFieldsAppend, "limits.pattern-ingester-tokenizable-json-fields-append", "List of JSON fields that should be appended to the default list of tokenizable fields in the pattern ingester.")
f.Var(&l.PatternIngesterTokenizableJSONFieldsDelete, "limits.pattern-ingester-tokenizable-json-fields-delete", "List of JSON fields that should be deleted from the (default U append) list of tokenizable fields in the pattern ingester.")
f.BoolVar(
&l.MetricAggregationEnabled,
"limits.metric-aggregation-enabled",
false,
"Enable metric aggregation. When enabled, pushed streams will be sampled for bytes and count, and these metric will be written back into Loki as a special __aggregated_metric__ stream, which can be queried for faster histogram queries.",
)
}
// SetGlobalOTLPConfig set GlobalOTLPConfig which is used while unmarshaling per-tenant otlp config to use the default list of resource attributes picked as index labels.
@ -1113,6 +1121,10 @@ func (o *Overrides) PatternIngesterTokenizableJSONFieldsDelete(userID string) []
return o.getOverridesForUser(userID).PatternIngesterTokenizableJSONFieldsDelete
}
func (o *Overrides) MetricAggregationEnabled(userID string) bool {
return o.getOverridesForUser(userID).MetricAggregationEnabled
}
// S3SSEType returns the per-tenant S3 SSE type.
func (o *Overrides) S3SSEType(user string) string {
return o.getOverridesForUser(user).S3SSEType

@ -414,3 +414,36 @@ pattern_ingester_tokenizable_json_fields_delete: body
})
}
}
func Test_MetricAggregationEnabled(t *testing.T) {
for _, tc := range []struct {
name string
yaml string
expected bool
}{
{
name: "when true",
yaml: `
metric_aggregation_enabled: true
`,
expected: true,
},
{
name: "when false",
yaml: `
metric_aggregation_enabled: false
`,
expected: false,
},
} {
t.Run(tc.name, func(t *testing.T) {
overrides := Overrides{
defaultLimits: &Limits{},
}
require.NoError(t, yaml.Unmarshal([]byte(tc.yaml), overrides.defaultLimits))
actual := overrides.MetricAggregationEnabled("fake")
require.Equal(t, tc.expected, actual)
})
}
}

Loading…
Cancel
Save