mirror of https://github.com/grafana/loki
chore: delete old Ingester RF-1 code that is no longer used (#14818)
parent
ed5ea97f67
commit
f3cbb1f5aa
@ -1,106 +0,0 @@ |
||||
package clientpool |
||||
|
||||
import ( |
||||
"flag" |
||||
"io" |
||||
"time" |
||||
|
||||
"github.com/grafana/loki/v3/pkg/logproto" |
||||
"github.com/grafana/loki/v3/pkg/util/server" |
||||
|
||||
"github.com/grafana/dskit/grpcclient" |
||||
"github.com/grafana/dskit/middleware" |
||||
"github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc" |
||||
"github.com/opentracing/opentracing-go" |
||||
"github.com/prometheus/client_golang/prometheus" |
||||
"github.com/prometheus/client_golang/prometheus/promauto" |
||||
"google.golang.org/grpc" |
||||
"google.golang.org/grpc/health/grpc_health_v1" |
||||
) |
||||
|
||||
var ingesterClientRequestDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ |
||||
Name: "loki_ingester_rf1_client_request_duration_seconds", |
||||
Help: "Time spent doing Ingester RF1 requests.", |
||||
Buckets: prometheus.ExponentialBuckets(0.001, 4, 6), |
||||
}, []string{"operation", "status_code"}) |
||||
|
||||
type HealthAndIngesterClient interface { |
||||
grpc_health_v1.HealthClient |
||||
Close() error |
||||
} |
||||
|
||||
type ClosableHealthAndIngesterClient struct { |
||||
logproto.PusherRF1Client |
||||
grpc_health_v1.HealthClient |
||||
io.Closer |
||||
} |
||||
|
||||
// Config for an ingester client.
|
||||
type Config struct { |
||||
PoolConfig PoolConfig `yaml:"pool_config,omitempty" doc:"description=Configures how connections are pooled."` |
||||
RemoteTimeout time.Duration `yaml:"remote_timeout,omitempty"` |
||||
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config" doc:"description=Configures how the gRPC connection to ingesters work as a client."` |
||||
GRPCUnaryClientInterceptors []grpc.UnaryClientInterceptor `yaml:"-"` |
||||
GRCPStreamClientInterceptors []grpc.StreamClientInterceptor `yaml:"-"` |
||||
|
||||
// Internal is used to indicate that this client communicates on behalf of
|
||||
// a machine and not a user. When Internal = true, the client won't attempt
|
||||
// to inject an userid into the context.
|
||||
Internal bool `yaml:"-"` |
||||
} |
||||
|
||||
// RegisterFlags registers flags.
|
||||
func (cfg *Config) RegisterFlags(f *flag.FlagSet) { |
||||
cfg.GRPCClientConfig.RegisterFlagsWithPrefix("ingester-rf1.client", f) |
||||
cfg.PoolConfig.RegisterFlagsWithPrefix("ingester-rf1.", f) |
||||
|
||||
f.DurationVar(&cfg.PoolConfig.RemoteTimeout, "ingester-rf1.client.healthcheck-timeout", 1*time.Second, "How quickly a dead client will be removed after it has been detected to disappear. Set this to a value to allow time for a secondary health check to recover the missing client.") |
||||
f.DurationVar(&cfg.RemoteTimeout, "ingester-rf1.client.timeout", 5*time.Second, "The remote request timeout on the client side.") |
||||
} |
||||
|
||||
// New returns a new ingester client.
|
||||
func NewClient(cfg Config, addr string) (HealthAndIngesterClient, error) { |
||||
opts := []grpc.DialOption{ |
||||
grpc.WithDefaultCallOptions(cfg.GRPCClientConfig.CallOptions()...), |
||||
} |
||||
|
||||
dialOpts, err := cfg.GRPCClientConfig.DialOption(instrumentation(&cfg)) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
opts = append(opts, dialOpts...) |
||||
|
||||
// nolint:staticcheck // grpc.Dial() has been deprecated; we'll address it before upgrading to gRPC 2.
|
||||
conn, err := grpc.Dial(addr, opts...) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
return ClosableHealthAndIngesterClient{ |
||||
PusherRF1Client: logproto.NewPusherRF1Client(conn), |
||||
HealthClient: grpc_health_v1.NewHealthClient(conn), |
||||
Closer: conn, |
||||
}, nil |
||||
} |
||||
|
||||
func instrumentation(cfg *Config) ([]grpc.UnaryClientInterceptor, []grpc.StreamClientInterceptor) { |
||||
var unaryInterceptors []grpc.UnaryClientInterceptor |
||||
unaryInterceptors = append(unaryInterceptors, cfg.GRPCUnaryClientInterceptors...) |
||||
unaryInterceptors = append(unaryInterceptors, server.UnaryClientQueryTagsInterceptor) |
||||
unaryInterceptors = append(unaryInterceptors, otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer())) |
||||
if !cfg.Internal { |
||||
unaryInterceptors = append(unaryInterceptors, middleware.ClientUserHeaderInterceptor) |
||||
} |
||||
unaryInterceptors = append(unaryInterceptors, middleware.UnaryClientInstrumentInterceptor(ingesterClientRequestDuration)) |
||||
|
||||
var streamInterceptors []grpc.StreamClientInterceptor |
||||
streamInterceptors = append(streamInterceptors, cfg.GRCPStreamClientInterceptors...) |
||||
streamInterceptors = append(streamInterceptors, server.StreamClientQueryTagsInterceptor) |
||||
streamInterceptors = append(streamInterceptors, otgrpc.OpenTracingStreamClientInterceptor(opentracing.GlobalTracer())) |
||||
if !cfg.Internal { |
||||
streamInterceptors = append(streamInterceptors, middleware.StreamClientUserHeaderInterceptor) |
||||
} |
||||
streamInterceptors = append(streamInterceptors, middleware.StreamClientInstrumentInterceptor(ingesterClientRequestDuration)) |
||||
|
||||
return unaryInterceptors, streamInterceptors |
||||
} |
@ -1,46 +0,0 @@ |
||||
package clientpool |
||||
|
||||
import ( |
||||
"flag" |
||||
"time" |
||||
|
||||
"github.com/go-kit/log" |
||||
"github.com/grafana/dskit/ring" |
||||
ring_client "github.com/grafana/dskit/ring/client" |
||||
"github.com/prometheus/client_golang/prometheus" |
||||
"github.com/prometheus/client_golang/prometheus/promauto" |
||||
) |
||||
|
||||
var clients prometheus.Gauge |
||||
|
||||
// PoolConfig is config for creating a Pool.
|
||||
type PoolConfig struct { |
||||
ClientCleanupPeriod time.Duration `yaml:"client_cleanup_period"` |
||||
HealthCheckIngesters bool `yaml:"health_check_ingesters"` |
||||
RemoteTimeout time.Duration `yaml:"remote_timeout"` |
||||
} |
||||
|
||||
// RegisterFlags adds the flags required to config this to the given FlagSet.
|
||||
func (cfg *PoolConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { |
||||
f.DurationVar(&cfg.ClientCleanupPeriod, prefix+"client-cleanup-period", 15*time.Second, "How frequently to clean up clients for ingesters that have gone away.") |
||||
f.BoolVar(&cfg.HealthCheckIngesters, prefix+"health-check-ingesters", true, "Run a health check on each ingester client during periodic cleanup.") |
||||
f.DurationVar(&cfg.RemoteTimeout, prefix+"remote-timeout", 1*time.Second, "Timeout for the health check.") |
||||
} |
||||
|
||||
func NewPool(name string, cfg PoolConfig, ring ring.ReadRing, factory ring_client.PoolFactory, logger log.Logger, metricsNamespace string) *ring_client.Pool { |
||||
poolCfg := ring_client.PoolConfig{ |
||||
CheckInterval: cfg.ClientCleanupPeriod, |
||||
HealthCheckEnabled: cfg.HealthCheckIngesters, |
||||
HealthCheckTimeout: cfg.RemoteTimeout, |
||||
} |
||||
|
||||
if clients == nil { |
||||
clients = promauto.NewGauge(prometheus.GaugeOpts{ |
||||
Namespace: metricsNamespace, |
||||
Name: "ingester_rf1_clients", |
||||
Help: "The current number of RF1 ingester clients.", |
||||
}) |
||||
} |
||||
// TODO(chaudum): Allow configuration of metric name by the caller.
|
||||
return ring_client.NewPool(name, poolCfg, ring_client.NewRingServiceDiscovery(ring), factory, clients, logger) |
||||
} |
@ -1,130 +0,0 @@ |
||||
package ingesterrf1 |
||||
|
||||
import ( |
||||
"bytes" |
||||
"crypto/rand" |
||||
"errors" |
||||
"fmt" |
||||
"net/http" |
||||
"time" |
||||
|
||||
"github.com/go-kit/log" |
||||
"github.com/go-kit/log/level" |
||||
"github.com/grafana/dskit/backoff" |
||||
"github.com/grafana/dskit/ring" |
||||
"github.com/oklog/ulid" |
||||
"golang.org/x/net/context" |
||||
|
||||
"github.com/grafana/loki/v3/pkg/ingester-rf1/metastore/metastorepb" |
||||
"github.com/grafana/loki/v3/pkg/storage/wal" |
||||
) |
||||
|
||||
// Note: this is called both during the WAL replay (zero or more times)
|
||||
// and then after replay as well.
|
||||
func (i *Ingester) InitFlushWorkers() { |
||||
i.flushWorkersDone.Add(i.cfg.ConcurrentFlushes) |
||||
for j := 0; j < i.cfg.ConcurrentFlushes; j++ { |
||||
i.flushBuffers[j] = new(bytes.Buffer) |
||||
go i.flushWorker(j) |
||||
} |
||||
} |
||||
|
||||
// Flush implements ring.FlushTransferer
|
||||
// Flush triggers a flush of all the chunks and closes the flush queues.
|
||||
// Called from the Lifecycler as part of the ingester shutdown.
|
||||
func (i *Ingester) Flush() { |
||||
i.wal.Close() |
||||
i.flushWorkersDone.Wait() |
||||
} |
||||
|
||||
// TransferOut implements ring.FlushTransferer
|
||||
// Noop implemenetation because ingesters have a WAL now that does not require transferring chunks any more.
|
||||
// We return ErrTransferDisabled to indicate that we don't support transfers, and therefore we may flush on shutdown if configured to do so.
|
||||
func (i *Ingester) TransferOut(_ context.Context) error { |
||||
return ring.ErrTransferDisabled |
||||
} |
||||
|
||||
// FlushHandler triggers a flush of all in memory chunks. Mainly used for
|
||||
// local testing.
|
||||
func (i *Ingester) FlushHandler(w http.ResponseWriter, _ *http.Request) { |
||||
w.WriteHeader(http.StatusNoContent) |
||||
} |
||||
|
||||
func (i *Ingester) flushWorker(j int) { |
||||
l := log.With(i.logger, "worker", j) |
||||
defer func() { |
||||
level.Debug(l).Log("msg", "Ingester.flushWorker() exited") |
||||
i.flushWorkersDone.Done() |
||||
}() |
||||
|
||||
for { |
||||
it, err := i.wal.NextPending() |
||||
if errors.Is(err, wal.ErrClosed) { |
||||
return |
||||
} |
||||
|
||||
if it == nil { |
||||
// TODO: Do something more clever here instead.
|
||||
time.Sleep(100 * time.Millisecond) |
||||
continue |
||||
} |
||||
|
||||
err = i.flush(l, j, it) |
||||
if err != nil { |
||||
level.Error(l).Log("msg", "failed to flush", "err", err) |
||||
} |
||||
|
||||
it.Result.SetDone(err) |
||||
i.wal.Put(it) |
||||
} |
||||
} |
||||
|
||||
func (i *Ingester) flush(l log.Logger, j int, it *wal.PendingSegment) error { |
||||
ctx, cancelFunc := context.WithCancel(context.Background()) |
||||
defer cancelFunc() |
||||
|
||||
b := backoff.New(ctx, i.cfg.FlushOpBackoff) |
||||
for b.Ongoing() { |
||||
err := i.flushSegment(ctx, j, it.Writer) |
||||
if err == nil { |
||||
break |
||||
} |
||||
level.Error(l).Log("msg", "failed to flush", "retries", b.NumRetries(), "err", err) |
||||
b.Wait() |
||||
} |
||||
return b.Err() |
||||
} |
||||
|
||||
func (i *Ingester) flushSegment(ctx context.Context, j int, w *wal.SegmentWriter) error { |
||||
ctx, cancelFunc := context.WithTimeout(ctx, i.cfg.FlushOpTimeout) |
||||
defer cancelFunc() |
||||
|
||||
start := time.Now() |
||||
i.metrics.flushesTotal.Add(1) |
||||
defer func() { i.metrics.flushDuration.Observe(time.Since(start).Seconds()) }() |
||||
|
||||
buf := i.flushBuffers[j] |
||||
defer buf.Reset() |
||||
if _, err := w.WriteTo(buf); err != nil { |
||||
i.metrics.flushFailuresTotal.Inc() |
||||
return err |
||||
} |
||||
|
||||
stats := wal.GetSegmentStats(w, time.Now()) |
||||
wal.ReportSegmentStats(stats, i.metrics.segmentMetrics) |
||||
|
||||
id := ulid.MustNew(ulid.Timestamp(time.Now()), rand.Reader).String() |
||||
if err := i.store.PutObject(ctx, wal.Dir+id, buf); err != nil { |
||||
i.metrics.flushFailuresTotal.Inc() |
||||
return fmt.Errorf("failed to put object: %w", err) |
||||
} |
||||
|
||||
if _, err := i.metastoreClient.AddBlock(ctx, &metastorepb.AddBlockRequest{ |
||||
Block: w.Meta(id), |
||||
}); err != nil { |
||||
i.metrics.flushFailuresTotal.Inc() |
||||
return fmt.Errorf("failed to update metastore: %w", err) |
||||
} |
||||
|
||||
return nil |
||||
} |
@ -1,886 +0,0 @@ |
||||
package ingesterrf1 |
||||
|
||||
import ( |
||||
"bytes" |
||||
"context" |
||||
"errors" |
||||
"flag" |
||||
"fmt" |
||||
"io" |
||||
"net/http" |
||||
"os" |
||||
"path" |
||||
"runtime/pprof" |
||||
"sync" |
||||
"time" |
||||
|
||||
ring_client "github.com/grafana/dskit/ring/client" |
||||
"github.com/opentracing/opentracing-go" |
||||
|
||||
"github.com/grafana/loki/v3/pkg/ingester-rf1/clientpool" |
||||
"github.com/grafana/loki/v3/pkg/ingester-rf1/metastore/metastorepb" |
||||
"github.com/grafana/loki/v3/pkg/ingester-rf1/objstore" |
||||
"github.com/grafana/loki/v3/pkg/ingester/index" |
||||
"github.com/grafana/loki/v3/pkg/loghttp/push" |
||||
"github.com/grafana/loki/v3/pkg/storage" |
||||
"github.com/grafana/loki/v3/pkg/storage/types" |
||||
"github.com/grafana/loki/v3/pkg/storage/wal" |
||||
util_log "github.com/grafana/loki/v3/pkg/util/log" |
||||
|
||||
"github.com/go-kit/log" |
||||
"github.com/go-kit/log/level" |
||||
"github.com/grafana/dskit/backoff" |
||||
"github.com/grafana/dskit/modules" |
||||
"github.com/grafana/dskit/multierror" |
||||
"github.com/grafana/dskit/ring" |
||||
"github.com/grafana/dskit/services" |
||||
"github.com/grafana/dskit/tenant" |
||||
"github.com/prometheus/client_golang/prometheus" |
||||
"google.golang.org/grpc/health/grpc_health_v1" |
||||
|
||||
"github.com/grafana/loki/v3/pkg/analytics" |
||||
"github.com/grafana/loki/v3/pkg/distributor/writefailures" |
||||
"github.com/grafana/loki/v3/pkg/ingester/client" |
||||
"github.com/grafana/loki/v3/pkg/logproto" |
||||
"github.com/grafana/loki/v3/pkg/runtime" |
||||
"github.com/grafana/loki/v3/pkg/storage/config" |
||||
"github.com/grafana/loki/v3/pkg/util" |
||||
) |
||||
|
||||
const ( |
||||
// RingKey is the key under which we store the ingesters ring in the KVStore.
|
||||
RingKey = "ring" |
||||
|
||||
shutdownMarkerFilename = "shutdown-requested.txt" |
||||
) |
||||
|
||||
// ErrReadOnly is returned when the ingester is shutting down and a push was
|
||||
// attempted.
|
||||
var ( |
||||
ErrReadOnly = errors.New("Ingester is shutting down") |
||||
|
||||
compressionStats = analytics.NewString("ingester_compression") |
||||
targetSizeStats = analytics.NewInt("ingester_target_size_bytes") |
||||
activeTenantsStats = analytics.NewInt("ingester_active_tenants") |
||||
) |
||||
|
||||
// Config for an ingester.
|
||||
type Config struct { |
||||
Enabled bool `yaml:"enabled" doc:"description=Whether the ingester is enabled."` |
||||
|
||||
LifecyclerConfig ring.LifecyclerConfig `yaml:"lifecycler,omitempty" doc:"description=Configures how the lifecycle of the ingester will operate and where it will register for discovery."` |
||||
MaxSegmentAge time.Duration `yaml:"max_segment_age"` |
||||
MaxSegmentSize int `yaml:"max_segment_size"` |
||||
MaxSegments int `yaml:"max_segments"` |
||||
ConcurrentFlushes int `yaml:"concurrent_flushes"` |
||||
FlushCheckPeriod time.Duration `yaml:"flush_check_period"` |
||||
FlushOpBackoff backoff.Config `yaml:"flush_op_backoff"` |
||||
FlushOpTimeout time.Duration `yaml:"flush_op_timeout"` |
||||
AutoForgetUnhealthy bool `yaml:"autoforget_unhealthy"` |
||||
|
||||
MaxReturnedErrors int `yaml:"max_returned_stream_errors"` |
||||
|
||||
// For testing, you can override the address and ID of this ingester.
|
||||
ingesterClientFactory func(cfg client.Config, addr string) (client.HealthAndIngesterClient, error) |
||||
|
||||
// Optional wrapper that can be used to modify the behaviour of the ingester
|
||||
Wrapper Wrapper `yaml:"-"` |
||||
|
||||
IndexShards int `yaml:"index_shards"` |
||||
|
||||
MaxDroppedStreams int `yaml:"max_dropped_streams"` |
||||
|
||||
ShutdownMarkerPath string `yaml:"shutdown_marker_path"` |
||||
|
||||
OwnedStreamsCheckInterval time.Duration `yaml:"owned_streams_check_interval" doc:"description=Interval at which the ingester ownedStreamService checks for changes in the ring to recalculate owned streams."` |
||||
StreamRetainPeriod time.Duration `yaml:"stream_retain_period" doc:"description=How long stream metadata is retained in memory after it was last seen."` |
||||
|
||||
// Tee configs
|
||||
ClientConfig clientpool.Config `yaml:"client_config,omitempty" doc:"description=Configures how the pattern ingester will connect to the ingesters."` |
||||
factory ring_client.PoolFactory `yaml:"-"` |
||||
} |
||||
|
||||
// RegisterFlags registers the flags.
|
||||
func (cfg *Config) RegisterFlags(f *flag.FlagSet) { |
||||
cfg.LifecyclerConfig.RegisterFlagsWithPrefix("ingester-rf1.", f, util_log.Logger) |
||||
cfg.ClientConfig.RegisterFlags(f) |
||||
|
||||
f.IntVar(&cfg.ConcurrentFlushes, "ingester-rf1.concurrent-flushes", 32, "How many flushes can happen concurrently from each stream.") |
||||
f.DurationVar(&cfg.FlushCheckPeriod, "ingester-rf1.flush-check-period", 500*time.Millisecond, "How often should the ingester see if there are any blocks to flush. The first flush check is delayed by a random time up to 0.8x the flush check period. Additionally, there is +/- 1% jitter added to the interval.") |
||||
f.DurationVar(&cfg.FlushOpBackoff.MinBackoff, "ingester-rf1.flush-op-backoff-min-period", 100*time.Millisecond, "Minimum backoff period when a flush fails. Each concurrent flush has its own backoff, see `ingester.concurrent-flushes`.") |
||||
f.DurationVar(&cfg.FlushOpBackoff.MaxBackoff, "ingester-rf1.flush-op-backoff-max-period", time.Minute, "Maximum backoff period when a flush fails. Each concurrent flush has its own backoff, see `ingester.concurrent-flushes`.") |
||||
f.IntVar(&cfg.FlushOpBackoff.MaxRetries, "ingester-rf1.flush-op-backoff-retries", 10, "Maximum retries for failed flushes.") |
||||
f.DurationVar(&cfg.FlushOpTimeout, "ingester-rf1.flush-op-timeout", 10*time.Second, "The timeout for an individual flush. Will be retried up to `flush-op-backoff-retries` times.") |
||||
f.DurationVar(&cfg.MaxSegmentAge, "ingester-rf1.max-segment-age", 500*time.Millisecond, "The maximum age of a segment before it should be flushed. Increasing this value allows more time for a segment to grow to max-segment-size, but may increase latency if the write volume is too small.") |
||||
f.IntVar(&cfg.MaxSegmentSize, "ingester-rf1.max-segment-size", 8*1024*1024, "The maximum size of a segment before it should be flushed. It is not a strict limit, and segments can exceed the maximum size when individual appends are larger than the remaining capacity.") |
||||
f.IntVar(&cfg.MaxSegments, "ingester-rf1.max-segments", 10, "The maximum number of segments to buffer in-memory. Increasing this value allows for large bursts of writes to be buffered in memory, but may increase latency if the write volume exceeds the rate at which segments can be flushed.") |
||||
|
||||
f.IntVar(&cfg.MaxReturnedErrors, "ingester-rf1.max-ignored-stream-errors", 10, "The maximum number of errors a stream will report to the user when a push fails. 0 to make unlimited.") |
||||
f.BoolVar(&cfg.AutoForgetUnhealthy, "ingester-rf1.autoforget-unhealthy", false, "Forget about ingesters having heartbeat timestamps older than `ring.kvstore.heartbeat_timeout`. This is equivalent to clicking on the `/ring` `forget` button in the UI: the ingester is removed from the ring. This is a useful setting when you are sure that an unhealthy node won't return. An example is when not using stateful sets or the equivalent. Use `memberlist.rejoin_interval` > 0 to handle network partition cases when using a memberlist.") |
||||
f.IntVar(&cfg.IndexShards, "ingester-rf1.index-shards", index.DefaultIndexShards, "Shard factor used in the ingesters for the in process reverse index. This MUST be evenly divisible by ALL schema shard factors or Loki will not start.") |
||||
f.IntVar(&cfg.MaxDroppedStreams, "ingester-rf1.tailer.max-dropped-streams", 10, "Maximum number of dropped streams to keep in memory during tailing.") |
||||
f.StringVar(&cfg.ShutdownMarkerPath, "ingester-rf1.shutdown-marker-path", "", "Path where the shutdown marker file is stored. If not set and common.path_prefix is set then common.path_prefix will be used.") |
||||
f.DurationVar(&cfg.OwnedStreamsCheckInterval, "ingester-rf1.owned-streams-check-interval", 30*time.Second, "Interval at which the ingester ownedStreamService checks for changes in the ring to recalculate owned streams.") |
||||
f.DurationVar(&cfg.StreamRetainPeriod, "ingester-rf1.stream-retain-period", 5*time.Minute, "How long stream metadata should be retained in-memory after the last log was seen.") |
||||
f.BoolVar(&cfg.Enabled, "ingester-rf1.enabled", false, "Flag to enable or disable the usage of the ingester-rf1 component.") |
||||
} |
||||
|
||||
func (cfg *Config) Validate() error { |
||||
if cfg.FlushOpBackoff.MinBackoff > cfg.FlushOpBackoff.MaxBackoff { |
||||
return errors.New("invalid flush op min backoff: cannot be larger than max backoff") |
||||
} |
||||
if cfg.FlushOpBackoff.MaxRetries <= 0 { |
||||
return fmt.Errorf("invalid flush op max retries: %d", cfg.FlushOpBackoff.MaxRetries) |
||||
} |
||||
if cfg.FlushOpTimeout <= 0 { |
||||
return fmt.Errorf("invalid flush op timeout: %s", cfg.FlushOpTimeout) |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
type Wrapper interface { |
||||
Wrap(wrapped Interface) Interface |
||||
} |
||||
|
||||
// Storage is the store interface we need on the ingester.
|
||||
type Storage interface { |
||||
PutObject(ctx context.Context, objectKey string, object io.Reader) error |
||||
Stop() |
||||
} |
||||
|
||||
// Interface is an interface for the Ingester
|
||||
type Interface interface { |
||||
services.Service |
||||
http.Handler |
||||
|
||||
logproto.PusherServer |
||||
|
||||
CheckReady(ctx context.Context) error |
||||
FlushHandler(w http.ResponseWriter, _ *http.Request) |
||||
GetOrCreateInstance(instanceID string) (*instance, error) |
||||
ShutdownHandler(w http.ResponseWriter, r *http.Request) |
||||
PrepareShutdown(w http.ResponseWriter, r *http.Request) |
||||
} |
||||
|
||||
// Ingester builds chunks for incoming log streams.
|
||||
type Ingester struct { |
||||
services.Service |
||||
|
||||
cfg Config |
||||
logger log.Logger |
||||
|
||||
clientConfig client.Config |
||||
tenantConfigs *runtime.TenantConfigs |
||||
|
||||
shutdownMtx sync.Mutex // Allows processes to grab a lock and prevent a shutdown
|
||||
instancesMtx sync.RWMutex |
||||
instances map[string]*instance |
||||
readonly bool |
||||
|
||||
lifecycler *ring.Lifecycler |
||||
lifecyclerWatcher *services.FailureWatcher |
||||
|
||||
store Storage |
||||
metastoreClient metastorepb.MetastoreServiceClient |
||||
periodicConfigs []config.PeriodConfig |
||||
|
||||
loopQuit chan struct{} |
||||
tailersQuit chan struct{} |
||||
|
||||
// One queue per flush thread. Fingerprint is used to
|
||||
// pick a queue.
|
||||
flushBuffers []*bytes.Buffer |
||||
flushWorkersDone sync.WaitGroup |
||||
|
||||
wal *wal.Manager |
||||
|
||||
limiter *Limiter |
||||
|
||||
// Flag for whether stopping the ingester service should also terminate the
|
||||
// loki process.
|
||||
// This is set when calling the shutdown handler.
|
||||
terminateOnShutdown bool |
||||
|
||||
// Only used by WAL & flusher to coordinate backpressure during replay.
|
||||
// replayController *replayController
|
||||
|
||||
metrics *ingesterMetrics |
||||
|
||||
streamRateCalculator *StreamRateCalculator |
||||
|
||||
writeLogManager *writefailures.Manager |
||||
|
||||
customStreamsTracker push.UsageTracker |
||||
|
||||
readRing ring.ReadRing |
||||
} |
||||
|
||||
// New makes a new Ingester.
|
||||
func New(cfg Config, clientConfig client.Config, |
||||
periodConfigs []config.PeriodConfig, |
||||
storageConfig storage.Config, |
||||
clientMetrics storage.ClientMetrics, |
||||
limits Limits, configs *runtime.TenantConfigs, |
||||
metastoreClient metastorepb.MetastoreServiceClient, |
||||
registerer prometheus.Registerer, |
||||
writeFailuresCfg writefailures.Cfg, |
||||
metricsNamespace string, |
||||
logger log.Logger, |
||||
customStreamsTracker push.UsageTracker, readRing ring.ReadRing, |
||||
) (*Ingester, error) { |
||||
storage, err := objstore.New(periodConfigs, storageConfig, clientMetrics) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
if cfg.ingesterClientFactory == nil { |
||||
cfg.ingesterClientFactory = client.New |
||||
} |
||||
metrics := newIngesterMetrics(registerer) |
||||
|
||||
walManager, err := wal.NewManager(wal.Config{ |
||||
MaxAge: cfg.MaxSegmentAge, |
||||
MaxSegments: int64(cfg.MaxSegments), |
||||
MaxSegmentSize: int64(cfg.MaxSegmentSize), |
||||
}, wal.NewManagerMetrics(registerer)) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
i := &Ingester{ |
||||
cfg: cfg, |
||||
logger: logger, |
||||
clientConfig: clientConfig, |
||||
tenantConfigs: configs, |
||||
instances: map[string]*instance{}, |
||||
store: storage, |
||||
periodicConfigs: periodConfigs, |
||||
flushBuffers: make([]*bytes.Buffer, cfg.ConcurrentFlushes), |
||||
flushWorkersDone: sync.WaitGroup{}, |
||||
loopQuit: make(chan struct{}), |
||||
tailersQuit: make(chan struct{}), |
||||
metrics: metrics, |
||||
metastoreClient: metastoreClient, |
||||
terminateOnShutdown: false, |
||||
streamRateCalculator: NewStreamRateCalculator(), |
||||
writeLogManager: writefailures.NewManager(logger, registerer, writeFailuresCfg, configs, "ingester_rf1"), |
||||
customStreamsTracker: customStreamsTracker, |
||||
readRing: readRing, |
||||
wal: walManager, |
||||
} |
||||
|
||||
// TODO: change flush on shutdown
|
||||
i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester-rf1", "ingester-rf1-ring", true, logger, prometheus.WrapRegistererWithPrefix(metricsNamespace+"_", registerer)) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
i.lifecyclerWatcher = services.NewFailureWatcher() |
||||
i.lifecyclerWatcher.WatchService(i.lifecycler) |
||||
|
||||
// Now that the lifecycler has been created, we can create the limiter
|
||||
// which depends on it.
|
||||
i.limiter = NewLimiter(limits, metrics, i.lifecycler, cfg.LifecyclerConfig.RingConfig.ReplicationFactor) |
||||
|
||||
i.Service = services.NewBasicService(i.starting, i.running, i.stopping) |
||||
|
||||
i.setupAutoForget() |
||||
|
||||
// i.recalculateOwnedStreams = newRecalculateOwnedStreams(i.getInstances, i.lifecycler.ID, i.readRing, cfg.OwnedStreamsCheckInterval, util_log.Logger)
|
||||
|
||||
return i, nil |
||||
} |
||||
|
||||
// setupAutoForget looks for ring status if `AutoForgetUnhealthy` is enabled
|
||||
// when enabled, unhealthy ingesters that reach `ring.kvstore.heartbeat_timeout` are removed from the ring every `HeartbeatPeriod`
|
||||
func (i *Ingester) setupAutoForget() { |
||||
if !i.cfg.AutoForgetUnhealthy { |
||||
return |
||||
} |
||||
|
||||
go func() { |
||||
ctx := context.Background() |
||||
err := i.Service.AwaitRunning(ctx) |
||||
if err != nil { |
||||
level.Error(i.logger).Log("msg", fmt.Sprintf("autoforget received error %s, autoforget is disabled", err.Error())) |
||||
return |
||||
} |
||||
|
||||
level.Info(i.logger).Log("msg", fmt.Sprintf("autoforget is enabled and will remove unhealthy instances from the ring after %v with no heartbeat", i.cfg.LifecyclerConfig.RingConfig.HeartbeatTimeout)) |
||||
|
||||
ticker := time.NewTicker(i.cfg.LifecyclerConfig.HeartbeatPeriod) |
||||
defer ticker.Stop() |
||||
|
||||
var forgetList []string |
||||
for range ticker.C { |
||||
err := i.lifecycler.KVStore.CAS(ctx, RingKey, func(in interface{}) (out interface{}, retry bool, err error) { |
||||
forgetList = forgetList[:0] |
||||
if in == nil { |
||||
return nil, false, nil |
||||
} |
||||
|
||||
ringDesc, ok := in.(*ring.Desc) |
||||
if !ok { |
||||
level.Warn(i.logger).Log("msg", fmt.Sprintf("autoforget saw a KV store value that was not `ring.Desc`, got `%T`", in)) |
||||
return nil, false, nil |
||||
} |
||||
|
||||
for id, ingester := range ringDesc.Ingesters { |
||||
if !ingester.IsHealthy(ring.Reporting, i.cfg.LifecyclerConfig.RingConfig.HeartbeatTimeout, time.Now()) { |
||||
if i.lifecycler.ID == id { |
||||
level.Warn(i.logger).Log("msg", fmt.Sprintf("autoforget has seen our ID `%s` as unhealthy in the ring, network may be partitioned, skip forgeting ingesters this round", id)) |
||||
return nil, false, nil |
||||
} |
||||
forgetList = append(forgetList, id) |
||||
} |
||||
} |
||||
|
||||
if len(forgetList) == len(ringDesc.Ingesters)-1 { |
||||
level.Warn(i.logger).Log("msg", fmt.Sprintf("autoforget have seen %d unhealthy ingesters out of %d, network may be partioned, skip forgeting ingesters this round", len(forgetList), len(ringDesc.Ingesters))) |
||||
forgetList = forgetList[:0] |
||||
return nil, false, nil |
||||
} |
||||
|
||||
if len(forgetList) > 0 { |
||||
for _, id := range forgetList { |
||||
ringDesc.RemoveIngester(id) |
||||
} |
||||
return ringDesc, true, nil |
||||
} |
||||
return nil, false, nil |
||||
}) |
||||
if err != nil { |
||||
level.Warn(i.logger).Log("msg", err) |
||||
continue |
||||
} |
||||
|
||||
for _, id := range forgetList { |
||||
level.Info(i.logger).Log("msg", fmt.Sprintf("autoforget removed ingester %v from the ring because it was not healthy after %v", id, i.cfg.LifecyclerConfig.RingConfig.HeartbeatTimeout)) |
||||
} |
||||
i.metrics.autoForgetUnhealthyIngestersTotal.Add(float64(len(forgetList))) |
||||
} |
||||
}() |
||||
} |
||||
|
||||
// ServeHTTP implements the pattern ring status page.
|
||||
func (i *Ingester) ServeHTTP(w http.ResponseWriter, r *http.Request) { |
||||
i.lifecycler.ServeHTTP(w, r) |
||||
} |
||||
|
||||
func (i *Ingester) starting(ctx context.Context) error { |
||||
i.InitFlushWorkers() |
||||
|
||||
// pass new context to lifecycler, so that it doesn't stop automatically when Ingester's service context is done
|
||||
err := i.lifecycler.StartAsync(context.Background()) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
err = i.lifecycler.AwaitRunning(ctx) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
shutdownMarkerPath := path.Join(i.cfg.ShutdownMarkerPath, shutdownMarkerFilename) |
||||
shutdownMarker, err := shutdownMarkerExists(shutdownMarkerPath) |
||||
if err != nil { |
||||
return fmt.Errorf("failed to check ingester shutdown marker: %w", err) |
||||
} |
||||
|
||||
if shutdownMarker { |
||||
level.Info(i.logger).Log("msg", "detected existing shutdown marker, setting unregister and flush on shutdown", "path", shutdownMarkerPath) |
||||
i.setPrepareShutdown() |
||||
} |
||||
|
||||
//err = i.recalculateOwnedStreams.StartAsync(ctx)
|
||||
//if err != nil {
|
||||
// return fmt.Errorf("can not start recalculate owned streams service: %w", err)
|
||||
//}
|
||||
|
||||
go i.periodicStreamMaintenance() |
||||
return nil |
||||
} |
||||
|
||||
func (i *Ingester) running(ctx context.Context) error { |
||||
var serviceError error |
||||
select { |
||||
// wait until service is asked to stop
|
||||
case <-ctx.Done(): |
||||
// stop
|
||||
case err := <-i.lifecyclerWatcher.Chan(): |
||||
serviceError = fmt.Errorf("lifecycler failed: %w", err) |
||||
} |
||||
|
||||
// close tailers before stopping our loop
|
||||
//close(i.tailersQuit)
|
||||
//for _, instance := range i.getInstances() {
|
||||
// instance.closeTailers()
|
||||
//}
|
||||
|
||||
return serviceError |
||||
} |
||||
|
||||
// stopping is called when Ingester transitions to Stopping state.
|
||||
//
|
||||
// At this point, loop no longer runs, but flushers are still running.
|
||||
func (i *Ingester) stopping(_ error) error { |
||||
i.stopIncomingRequests() |
||||
var errs util.MultiError |
||||
|
||||
//if i.flushOnShutdownSwitch.Get() {
|
||||
// i.lifecycler.SetFlushOnShutdown(true)
|
||||
//}
|
||||
errs.Add(services.StopAndAwaitTerminated(context.Background(), i.lifecycler)) |
||||
|
||||
i.flushWorkersDone.Wait() |
||||
|
||||
// i.streamRateCalculator.Stop()
|
||||
|
||||
// In case the flag to terminate on shutdown is set or this instance is marked to release its resources,
|
||||
// we need to mark the ingester service as "failed", so Loki will shut down entirely.
|
||||
// The module manager logs the failure `modules.ErrStopProcess` in a special way.
|
||||
if i.terminateOnShutdown && errs.Err() == nil { |
||||
i.removeShutdownMarkerFile() |
||||
return modules.ErrStopProcess |
||||
} |
||||
i.store.Stop() |
||||
return errs.Err() |
||||
} |
||||
|
||||
// stopIncomingRequests is called when ingester is stopping
|
||||
func (i *Ingester) stopIncomingRequests() { |
||||
i.shutdownMtx.Lock() |
||||
defer i.shutdownMtx.Unlock() |
||||
|
||||
i.instancesMtx.Lock() |
||||
defer i.instancesMtx.Unlock() |
||||
|
||||
i.readonly = true |
||||
} |
||||
|
||||
// removeShutdownMarkerFile removes the shutdown marker if it exists. Any errors are logged.
|
||||
func (i *Ingester) removeShutdownMarkerFile() { |
||||
shutdownMarkerPath := path.Join(i.cfg.ShutdownMarkerPath, shutdownMarkerFilename) |
||||
exists, err := shutdownMarkerExists(shutdownMarkerPath) |
||||
if err != nil { |
||||
level.Error(i.logger).Log("msg", "error checking shutdown marker file exists", "err", err) |
||||
} |
||||
if exists { |
||||
err = removeShutdownMarker(shutdownMarkerPath) |
||||
if err != nil { |
||||
level.Error(i.logger).Log("msg", "error removing shutdown marker file", "err", err) |
||||
} |
||||
} |
||||
} |
||||
|
||||
func (i *Ingester) periodicStreamMaintenance() { |
||||
streamRetentionTicker := time.NewTicker(i.cfg.StreamRetainPeriod) |
||||
defer streamRetentionTicker.Stop() |
||||
|
||||
for { |
||||
select { |
||||
case <-streamRetentionTicker.C: |
||||
i.cleanIdleStreams() |
||||
case <-i.loopQuit: |
||||
return |
||||
} |
||||
} |
||||
} |
||||
|
||||
func (i *Ingester) cleanIdleStreams() { |
||||
for _, instance := range i.getInstances() { |
||||
_ = instance.streams.ForEach(func(s *stream) (bool, error) { |
||||
if time.Since(s.highestTs) > i.cfg.StreamRetainPeriod { |
||||
instance.streams.Delete(s) |
||||
} |
||||
return true, nil |
||||
}) |
||||
} |
||||
} |
||||
|
||||
// PrepareShutdown will handle the /ingester/prepare_shutdown endpoint.
|
||||
//
|
||||
// Internally, when triggered, this handler will configure the ingester service to release their resources whenever a SIGTERM is received.
|
||||
// Releasing resources meaning flushing data, deleting tokens, and removing itself from the ring.
|
||||
//
|
||||
// It also creates a file on disk which is used to re-apply the configuration if the
|
||||
// ingester crashes and restarts before being permanently shutdown.
|
||||
//
|
||||
// * `GET` shows the status of this configuration
|
||||
// * `POST` enables this configuration
|
||||
// * `DELETE` disables this configuration
|
||||
func (i *Ingester) PrepareShutdown(w http.ResponseWriter, r *http.Request) { |
||||
if i.cfg.ShutdownMarkerPath == "" { |
||||
w.WriteHeader(http.StatusInternalServerError) |
||||
return |
||||
} |
||||
shutdownMarkerPath := path.Join(i.cfg.ShutdownMarkerPath, shutdownMarkerFilename) |
||||
|
||||
switch r.Method { |
||||
case http.MethodGet: |
||||
exists, err := shutdownMarkerExists(shutdownMarkerPath) |
||||
if err != nil { |
||||
level.Error(i.logger).Log("msg", "unable to check for prepare-shutdown marker file", "path", shutdownMarkerPath, "err", err) |
||||
w.WriteHeader(http.StatusInternalServerError) |
||||
return |
||||
} |
||||
|
||||
if exists { |
||||
util.WriteTextResponse(w, "set") |
||||
} else { |
||||
util.WriteTextResponse(w, "unset") |
||||
} |
||||
case http.MethodPost: |
||||
if err := createShutdownMarker(shutdownMarkerPath); err != nil { |
||||
level.Error(i.logger).Log("msg", "unable to create prepare-shutdown marker file", "path", shutdownMarkerPath, "err", err) |
||||
w.WriteHeader(http.StatusInternalServerError) |
||||
return |
||||
} |
||||
|
||||
i.setPrepareShutdown() |
||||
level.Info(i.logger).Log("msg", "created prepare-shutdown marker file", "path", shutdownMarkerPath) |
||||
|
||||
w.WriteHeader(http.StatusNoContent) |
||||
case http.MethodDelete: |
||||
if err := removeShutdownMarker(shutdownMarkerPath); err != nil { |
||||
level.Error(i.logger).Log("msg", "unable to remove prepare-shutdown marker file", "path", shutdownMarkerPath, "err", err) |
||||
w.WriteHeader(http.StatusInternalServerError) |
||||
return |
||||
} |
||||
|
||||
i.unsetPrepareShutdown() |
||||
level.Info(i.logger).Log("msg", "removed prepare-shutdown marker file", "path", shutdownMarkerPath) |
||||
|
||||
w.WriteHeader(http.StatusNoContent) |
||||
default: |
||||
w.WriteHeader(http.StatusMethodNotAllowed) |
||||
} |
||||
} |
||||
|
||||
// setPrepareShutdown toggles ingester lifecycler config to prepare for shutdown
|
||||
func (i *Ingester) setPrepareShutdown() { |
||||
level.Info(i.logger).Log("msg", "preparing full ingester shutdown, resources will be released on SIGTERM") |
||||
i.lifecycler.SetFlushOnShutdown(true) |
||||
i.lifecycler.SetUnregisterOnShutdown(true) |
||||
i.terminateOnShutdown = true |
||||
i.metrics.shutdownMarker.Set(1) |
||||
} |
||||
|
||||
func (i *Ingester) unsetPrepareShutdown() { |
||||
level.Info(i.logger).Log("msg", "undoing preparation for full ingester shutdown") |
||||
i.lifecycler.SetFlushOnShutdown(true) |
||||
i.lifecycler.SetUnregisterOnShutdown(i.cfg.LifecyclerConfig.UnregisterOnShutdown) |
||||
i.terminateOnShutdown = false |
||||
i.metrics.shutdownMarker.Set(0) |
||||
} |
||||
|
||||
// createShutdownMarker writes a marker file to disk to indicate that an ingester is
|
||||
// going to be scaled down in the future. The presence of this file means that an ingester
|
||||
// should flush and upload all data when stopping.
|
||||
func createShutdownMarker(p string) error { |
||||
// Write the file, fsync it, then fsync the containing directory in order to guarantee
|
||||
// it is persisted to disk. From https://man7.org/linux/man-pages/man2/fsync.2.html
|
||||
//
|
||||
// > Calling fsync() does not necessarily ensure that the entry in the
|
||||
// > directory containing the file has also reached disk. For that an
|
||||
// > explicit fsync() on a file descriptor for the directory is also
|
||||
// > needed.
|
||||
file, err := os.Create(p) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
merr := multierror.New() |
||||
_, err = file.WriteString(time.Now().UTC().Format(time.RFC3339)) |
||||
merr.Add(err) |
||||
merr.Add(file.Sync()) |
||||
merr.Add(file.Close()) |
||||
|
||||
if err := merr.Err(); err != nil { |
||||
return err |
||||
} |
||||
|
||||
dir, err := os.OpenFile(path.Dir(p), os.O_RDONLY, 0o777) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
merr.Add(dir.Sync()) |
||||
merr.Add(dir.Close()) |
||||
return merr.Err() |
||||
} |
||||
|
||||
// removeShutdownMarker removes the shutdown marker file if it exists.
|
||||
func removeShutdownMarker(p string) error { |
||||
err := os.Remove(p) |
||||
if err != nil && !os.IsNotExist(err) { |
||||
return err |
||||
} |
||||
|
||||
dir, err := os.OpenFile(path.Dir(p), os.O_RDONLY, 0o777) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
merr := multierror.New() |
||||
merr.Add(dir.Sync()) |
||||
merr.Add(dir.Close()) |
||||
return merr.Err() |
||||
} |
||||
|
||||
// shutdownMarkerExists returns true if the shutdown marker file exists, false otherwise
|
||||
func shutdownMarkerExists(p string) (bool, error) { |
||||
s, err := os.Stat(p) |
||||
if err != nil && os.IsNotExist(err) { |
||||
return false, nil |
||||
} |
||||
|
||||
if err != nil { |
||||
return false, err |
||||
} |
||||
|
||||
return s.Mode().IsRegular(), nil |
||||
} |
||||
|
||||
// ShutdownHandler handles a graceful shutdown of the ingester service and
|
||||
// termination of the Loki process.
|
||||
func (i *Ingester) ShutdownHandler(w http.ResponseWriter, r *http.Request) { |
||||
// Don't allow calling the shutdown handler multiple times
|
||||
if i.State() != services.Running { |
||||
w.WriteHeader(http.StatusServiceUnavailable) |
||||
_, _ = w.Write([]byte("Ingester is stopping or already stopped.")) |
||||
return |
||||
} |
||||
params := r.URL.Query() |
||||
doFlush := util.FlagFromValues(params, "flush", true) |
||||
doDeleteRingTokens := util.FlagFromValues(params, "delete_ring_tokens", false) |
||||
doTerminate := util.FlagFromValues(params, "terminate", true) |
||||
err := i.handleShutdown(doTerminate, doFlush, doDeleteRingTokens) |
||||
|
||||
// Stopping the module will return the modules.ErrStopProcess error. This is
|
||||
// needed so the Loki process is shut down completely.
|
||||
if err == nil || err == modules.ErrStopProcess { |
||||
w.WriteHeader(http.StatusNoContent) |
||||
} else { |
||||
w.WriteHeader(http.StatusInternalServerError) |
||||
_, _ = w.Write([]byte(err.Error())) |
||||
} |
||||
} |
||||
|
||||
// handleShutdown triggers the following operations:
|
||||
// - Change the state of ring to stop accepting writes.
|
||||
// - optional: Flush all the chunks.
|
||||
// - optional: Delete ring tokens file
|
||||
// - Unregister from KV store
|
||||
// - optional: Terminate process (handled by service manager in loki.go)
|
||||
func (i *Ingester) handleShutdown(terminate, flush, del bool) error { |
||||
i.lifecycler.SetFlushOnShutdown(flush) |
||||
i.lifecycler.SetClearTokensOnShutdown(del) |
||||
i.lifecycler.SetUnregisterOnShutdown(true) |
||||
i.terminateOnShutdown = terminate |
||||
return services.StopAndAwaitTerminated(context.Background(), i) |
||||
} |
||||
|
||||
// Push implements logproto.Pusher.
|
||||
func (i *Ingester) Push(ctx context.Context, req *logproto.PushRequest) (*logproto.PushResponse, error) { |
||||
instanceID, err := tenant.TenantID(ctx) |
||||
if err != nil { |
||||
return nil, err |
||||
} else if i.readonly { |
||||
return nil, ErrReadOnly |
||||
} |
||||
|
||||
instance, err := i.GetOrCreateInstance(instanceID) |
||||
if err != nil { |
||||
return &logproto.PushResponse{}, err |
||||
} |
||||
|
||||
if err = instance.Push(ctx, i.wal, req); err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
return &logproto.PushResponse{}, nil |
||||
} |
||||
|
||||
// GetStreamRates returns a response containing all streams and their current rate
|
||||
// TODO: It might be nice for this to be human readable, eventually: Sort output and return labels, too?
|
||||
func (i *Ingester) GetStreamRates(ctx context.Context, _ *logproto.StreamRatesRequest) (*logproto.StreamRatesResponse, error) { |
||||
if sp := opentracing.SpanFromContext(ctx); sp != nil { |
||||
sp.LogKV("event", "ingester started to handle GetStreamRates") |
||||
defer sp.LogKV("event", "ingester finished handling GetStreamRates") |
||||
} |
||||
|
||||
// Set profiling tags
|
||||
defer pprof.SetGoroutineLabels(ctx) |
||||
ctx = pprof.WithLabels(ctx, pprof.Labels("path", "write")) |
||||
pprof.SetGoroutineLabels(ctx) |
||||
|
||||
allRates := i.streamRateCalculator.Rates() |
||||
rates := make([]*logproto.StreamRate, len(allRates)) |
||||
for idx := range allRates { |
||||
rates[idx] = &allRates[idx] |
||||
} |
||||
return &logproto.StreamRatesResponse{StreamRates: nil}, nil |
||||
} |
||||
|
||||
func (i *Ingester) GetOrCreateInstance(instanceID string) (*instance, error) { //nolint:revive
|
||||
inst, ok := i.getInstanceByID(instanceID) |
||||
if ok { |
||||
return inst, nil |
||||
} |
||||
|
||||
i.instancesMtx.Lock() |
||||
defer i.instancesMtx.Unlock() |
||||
inst, ok = i.instances[instanceID] |
||||
if !ok { |
||||
var err error |
||||
inst, err = newInstance(&i.cfg, i.periodicConfigs, instanceID, i.limiter, i.tenantConfigs, i.metrics, i.streamRateCalculator, i.writeLogManager, i.customStreamsTracker, i.logger) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
i.instances[instanceID] = inst |
||||
activeTenantsStats.Set(int64(len(i.instances))) |
||||
} |
||||
return inst, nil |
||||
} |
||||
|
||||
// asyncStoreMaxLookBack returns a max look back period only if active index type is one of async index stores like `boltdb-shipper` and `tsdb`.
|
||||
// max look back is limited to from time of async store config.
|
||||
// It considers previous periodic config's from time if that also has async index type.
|
||||
// This is to limit the lookback to only async stores where relevant.
|
||||
func (i *Ingester) asyncStoreMaxLookBack() time.Duration { |
||||
activePeriodicConfigIndex := config.ActivePeriodConfig(i.periodicConfigs) |
||||
activePeriodicConfig := i.periodicConfigs[activePeriodicConfigIndex] |
||||
if activePeriodicConfig.IndexType != types.BoltDBShipperType && activePeriodicConfig.IndexType != types.TSDBType { |
||||
return 0 |
||||
} |
||||
|
||||
startTime := activePeriodicConfig.From |
||||
if activePeriodicConfigIndex != 0 && (i.periodicConfigs[activePeriodicConfigIndex-1].IndexType == types.BoltDBShipperType || |
||||
i.periodicConfigs[activePeriodicConfigIndex-1].IndexType == types.TSDBType) { |
||||
startTime = i.periodicConfigs[activePeriodicConfigIndex-1].From |
||||
} |
||||
|
||||
maxLookBack := time.Since(startTime.Time.Time()) |
||||
return maxLookBack |
||||
} |
||||
|
||||
// Watch implements grpc_health_v1.HealthCheck.
|
||||
func (*Ingester) Watch(*grpc_health_v1.HealthCheckRequest, grpc_health_v1.Health_WatchServer) error { |
||||
return nil |
||||
} |
||||
|
||||
// ReadinessHandler is used to indicate to k8s when the ingesters are ready for
|
||||
// the addition removal of another ingester. Returns 204 when the ingester is
|
||||
// ready, 500 otherwise.
|
||||
func (i *Ingester) CheckReady(ctx context.Context) error { |
||||
if s := i.State(); s != services.Running && s != services.Stopping { |
||||
return fmt.Errorf("ingester not ready: %v", s) |
||||
} |
||||
return i.lifecycler.CheckReady(ctx) |
||||
} |
||||
|
||||
func (i *Ingester) getInstanceByID(id string) (*instance, bool) { |
||||
i.instancesMtx.RLock() |
||||
defer i.instancesMtx.RUnlock() |
||||
|
||||
inst, ok := i.instances[id] |
||||
return inst, ok |
||||
} |
||||
|
||||
func (i *Ingester) getInstances() []*instance { |
||||
i.instancesMtx.RLock() |
||||
defer i.instancesMtx.RUnlock() |
||||
|
||||
instances := make([]*instance, 0, len(i.instances)) |
||||
for _, instance := range i.instances { |
||||
instances = append(instances, instance) |
||||
} |
||||
return instances |
||||
} |
||||
|
||||
//// Tail logs matching given query
|
||||
//func (i *Ingester) Tail(req *logproto.TailRequest, queryServer logproto.Querier_TailServer) error {
|
||||
// err := i.tail(req, queryServer)
|
||||
// err = server_util.ClientGrpcStatusAndError(err)
|
||||
// return err
|
||||
//}
|
||||
//func (i *Ingester) tail(req *logproto.TailRequest, queryServer logproto.Querier_TailServer) error {
|
||||
// select {
|
||||
// case <-i.tailersQuit:
|
||||
// return errors.New("Ingester is stopping")
|
||||
// default:
|
||||
// }
|
||||
//
|
||||
// if req.Plan == nil {
|
||||
// parsed, err := syntax.ParseLogSelector(req.Query, true)
|
||||
// if err != nil {
|
||||
// return err
|
||||
// }
|
||||
// req.Plan = &plan.QueryPlan{
|
||||
// AST: parsed,
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// instanceID, err := tenant.TenantID(queryServer.Context())
|
||||
// if err != nil {
|
||||
// return err
|
||||
// }
|
||||
//
|
||||
// instance, err := i.GetOrCreateInstance(instanceID)
|
||||
// if err != nil {
|
||||
// return err
|
||||
// }
|
||||
//
|
||||
// expr, ok := req.Plan.AST.(syntax.LogSelectorExpr)
|
||||
// if !ok {
|
||||
// return fmt.Errorf("unsupported query expression: want (LogSelectorExpr), got (%T)", req.Plan.AST)
|
||||
// }
|
||||
//
|
||||
// tailer, err := newTailer(instanceID, expr, queryServer, i.cfg.MaxDroppedStreams)
|
||||
// if err != nil {
|
||||
// return err
|
||||
// }
|
||||
//
|
||||
// if err := instance.addNewTailer(queryServer.Context(), tailer); err != nil {
|
||||
// return err
|
||||
// }
|
||||
// tailer.loop()
|
||||
// return nil
|
||||
//}
|
||||
//
|
||||
//// TailersCount returns count of active tail requests from a user
|
||||
//func (i *Ingester) TailersCount(ctx context.Context, _ *logproto.TailersCountRequest) (*logproto.TailersCountResponse, error) {
|
||||
// tcr, err := i.tailersCount(ctx)
|
||||
// err = server_util.ClientGrpcStatusAndError(err)
|
||||
// return tcr, err
|
||||
//}
|
||||
//
|
||||
//func (i *Ingester) tailersCount(ctx context.Context) (*logproto.TailersCountResponse, error) {
|
||||
// instanceID, err := tenant.TenantID(ctx)
|
||||
// if err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
//
|
||||
// resp := logproto.TailersCountResponse{}
|
||||
//
|
||||
// instance, ok := i.getInstanceByID(instanceID)
|
||||
// if ok {
|
||||
// resp.Count = instance.openTailersCount()
|
||||
// }
|
||||
//
|
||||
// return &resp, nil
|
||||
//}
|
||||
|
||||
func (i *Ingester) GetDetectedFields(_ context.Context, r *logproto.DetectedFieldsRequest) (*logproto.DetectedFieldsResponse, error) { |
||||
return &logproto.DetectedFieldsResponse{ |
||||
Fields: []*logproto.DetectedField{ |
||||
{ |
||||
Label: "foo", |
||||
Type: logproto.DetectedFieldString, |
||||
Cardinality: 1, |
||||
}, |
||||
}, |
||||
Limit: r.GetLimit(), |
||||
}, nil |
||||
} |
@ -1,43 +0,0 @@ |
||||
package ingesterrf1 |
||||
|
||||
import ( |
||||
"sync" |
||||
"testing" |
||||
"time" |
||||
|
||||
"github.com/stretchr/testify/require" |
||||
) |
||||
|
||||
func TestIngester_cleanIdleStreams(t *testing.T) { |
||||
i := &Ingester{ |
||||
instancesMtx: sync.RWMutex{}, |
||||
instances: make(map[string]*instance), |
||||
cfg: Config{StreamRetainPeriod: time.Minute}, |
||||
} |
||||
instance := &instance{ |
||||
instanceID: "test", |
||||
streams: newStreamsMap(), |
||||
} |
||||
stream := &stream{ |
||||
labelsString: "test,label", |
||||
highestTs: time.Now(), |
||||
} |
||||
instance.streams.Store(stream.labelsString, stream) |
||||
i.instances[instance.instanceID] = instance |
||||
|
||||
require.Len(t, i.instances, 1) |
||||
require.Equal(t, 1, instance.streams.Len()) |
||||
|
||||
// No-op
|
||||
i.cleanIdleStreams() |
||||
|
||||
require.Len(t, i.instances, 1) |
||||
require.Equal(t, 1, instance.streams.Len()) |
||||
|
||||
// Pretend stream is old and retry
|
||||
stream.highestTs = time.Now().Add(-time.Minute * 2) |
||||
i.cleanIdleStreams() |
||||
|
||||
require.Len(t, i.instances, 1) |
||||
require.Equal(t, 0, instance.streams.Len()) |
||||
} |
@ -1,295 +0,0 @@ |
||||
package ingesterrf1 |
||||
|
||||
import ( |
||||
"context" |
||||
"fmt" |
||||
"math" |
||||
"net/http" |
||||
"sync" |
||||
|
||||
"github.com/go-kit/log" |
||||
"github.com/go-kit/log/level" |
||||
"github.com/grafana/dskit/httpgrpc" |
||||
"github.com/prometheus/client_golang/prometheus" |
||||
"github.com/prometheus/client_golang/prometheus/promauto" |
||||
"github.com/prometheus/common/model" |
||||
"github.com/prometheus/prometheus/model/labels" |
||||
|
||||
"github.com/grafana/loki/v3/pkg/analytics" |
||||
"github.com/grafana/loki/v3/pkg/chunkenc" |
||||
"github.com/grafana/loki/v3/pkg/distributor/writefailures" |
||||
"github.com/grafana/loki/v3/pkg/loghttp/push" |
||||
"github.com/grafana/loki/v3/pkg/logproto" |
||||
"github.com/grafana/loki/v3/pkg/logql/syntax" |
||||
"github.com/grafana/loki/v3/pkg/runtime" |
||||
"github.com/grafana/loki/v3/pkg/storage/config" |
||||
"github.com/grafana/loki/v3/pkg/storage/wal" |
||||
"github.com/grafana/loki/v3/pkg/util" |
||||
"github.com/grafana/loki/v3/pkg/util/constants" |
||||
util_log "github.com/grafana/loki/v3/pkg/util/log" |
||||
"github.com/grafana/loki/v3/pkg/validation" |
||||
) |
||||
|
||||
var ( |
||||
memoryStreams = promauto.NewGaugeVec(prometheus.GaugeOpts{ |
||||
Namespace: constants.Loki, |
||||
Name: "ingester_rf1_memory_streams", |
||||
Help: "The total number of streams in memory per tenant.", |
||||
}, []string{"tenant"}) |
||||
memoryStreamsLabelsBytes = promauto.NewGauge(prometheus.GaugeOpts{ |
||||
Namespace: constants.Loki, |
||||
Name: "ingester_rf1_memory_streams_labels_bytes", |
||||
Help: "Total bytes of labels of the streams in memory.", |
||||
}) |
||||
streamsCreatedTotal = promauto.NewCounterVec(prometheus.CounterOpts{ |
||||
Namespace: constants.Loki, |
||||
Name: "ingester_rf1_streams_created_total", |
||||
Help: "The total number of streams created per tenant.", |
||||
}, []string{"tenant"}) |
||||
streamsRemovedTotal = promauto.NewCounterVec(prometheus.CounterOpts{ |
||||
Namespace: constants.Loki, |
||||
Name: "ingester_rf1_streams_removed_total", |
||||
Help: "The total number of streams removed per tenant.", |
||||
}, []string{"tenant"}) |
||||
|
||||
streamsCountStats = analytics.NewInt("ingester_rf1_streams_count") |
||||
) |
||||
|
||||
type instance struct { |
||||
cfg *Config |
||||
|
||||
buf []byte // buffer used to compute fps.
|
||||
streams *streamsMap |
||||
|
||||
mapper *FpMapper // using of mapper no longer needs mutex because reading from streams is lock-free
|
||||
|
||||
instanceID string |
||||
|
||||
streamsCreatedTotal prometheus.Counter |
||||
streamsRemovedTotal prometheus.Counter |
||||
|
||||
// tailers map[uint32]*tailer
|
||||
tailerMtx sync.RWMutex |
||||
|
||||
logger log.Logger |
||||
limiter *Limiter |
||||
streamCountLimiter *streamCountLimiter |
||||
ownedStreamsSvc *ownedStreamService |
||||
|
||||
configs *runtime.TenantConfigs |
||||
|
||||
metrics *ingesterMetrics |
||||
|
||||
streamRateCalculator *StreamRateCalculator |
||||
|
||||
writeFailures *writefailures.Manager |
||||
|
||||
schemaconfig *config.SchemaConfig |
||||
|
||||
customStreamsTracker push.UsageTracker |
||||
} |
||||
|
||||
func (i *instance) Push(ctx context.Context, w *wal.Manager, req *logproto.PushRequest) error { |
||||
rateLimitWholeStream := i.limiter.limits.ShardStreams(i.instanceID).Enabled |
||||
|
||||
results := make([]*wal.AppendResult, 0, len(req.Streams)) |
||||
for _, reqStream := range req.Streams { |
||||
s, _, err := i.streams.LoadOrStoreNew(reqStream.Labels, |
||||
func() (*stream, error) { |
||||
s, err := i.createStream(ctx, reqStream) |
||||
return s, err |
||||
}, |
||||
func(_ *stream) error { |
||||
return nil |
||||
}, |
||||
) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
_, res, err := s.Push(ctx, w, reqStream.Entries, rateLimitWholeStream, i.customStreamsTracker) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
results = append(results, res) |
||||
} |
||||
|
||||
for _, result := range results { |
||||
select { |
||||
case <-ctx.Done(): |
||||
return ctx.Err() |
||||
case <-result.Done(): |
||||
if err := result.Err(); err != nil { |
||||
return err |
||||
} |
||||
} |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
func newInstance( |
||||
cfg *Config, |
||||
periodConfigs []config.PeriodConfig, |
||||
instanceID string, |
||||
limiter *Limiter, |
||||
configs *runtime.TenantConfigs, |
||||
metrics *ingesterMetrics, |
||||
streamRateCalculator *StreamRateCalculator, |
||||
writeFailures *writefailures.Manager, |
||||
customStreamsTracker push.UsageTracker, |
||||
logger log.Logger, |
||||
) (*instance, error) { |
||||
streams := newStreamsMap() |
||||
ownedStreamsSvc := newOwnedStreamService(instanceID, limiter) |
||||
c := config.SchemaConfig{Configs: periodConfigs} |
||||
i := &instance{ |
||||
cfg: cfg, |
||||
streams: streams, |
||||
buf: make([]byte, 0, 1024), |
||||
instanceID: instanceID, |
||||
//
|
||||
streamsCreatedTotal: streamsCreatedTotal.WithLabelValues(instanceID), |
||||
streamsRemovedTotal: streamsRemovedTotal.WithLabelValues(instanceID), |
||||
//
|
||||
//tailers: map[uint32]*tailer{},
|
||||
logger: logger, |
||||
limiter: limiter, |
||||
streamCountLimiter: newStreamCountLimiter(instanceID, streams.Len, limiter, ownedStreamsSvc), |
||||
ownedStreamsSvc: ownedStreamsSvc, |
||||
configs: configs, |
||||
metrics: metrics, |
||||
|
||||
streamRateCalculator: streamRateCalculator, |
||||
|
||||
writeFailures: writeFailures, |
||||
schemaconfig: &c, |
||||
|
||||
customStreamsTracker: customStreamsTracker, |
||||
} |
||||
i.mapper = NewFPMapper(i.getLabelsFromFingerprint) |
||||
|
||||
return i, nil |
||||
} |
||||
|
||||
func (i *instance) createStream(ctx context.Context, pushReqStream logproto.Stream) (*stream, error) { |
||||
// record is only nil when replaying WAL. We don't want to drop data when replaying a WAL after
|
||||
// reducing the stream limits, for instance.
|
||||
var err error |
||||
|
||||
sortedLabels, err := syntax.ParseLabels(pushReqStream.Labels) |
||||
if err != nil { |
||||
if i.configs.LogStreamCreation(i.instanceID) { |
||||
level.Debug(util_log.Logger).Log( |
||||
"msg", "failed to create stream, failed to parse labels", |
||||
"org_id", i.instanceID, |
||||
"err", err, |
||||
"stream", pushReqStream.Labels, |
||||
) |
||||
} |
||||
return nil, httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()) |
||||
} |
||||
|
||||
if err != nil { |
||||
return i.onStreamCreationError(ctx, pushReqStream, err, sortedLabels) |
||||
} |
||||
|
||||
fp := i.getHashForLabels(sortedLabels) |
||||
|
||||
chunkfmt, headfmt, err := i.chunkFormatAt(minTs(&pushReqStream)) |
||||
if err != nil { |
||||
return nil, fmt.Errorf("failed to create stream: %w", err) |
||||
} |
||||
|
||||
s := newStream(chunkfmt, headfmt, i.cfg, i.limiter, i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID) /*i.streamRateCalculator,*/, i.metrics, i.writeFailures) |
||||
|
||||
i.onStreamCreated(s) |
||||
|
||||
return s, nil |
||||
} |
||||
|
||||
// minTs is a helper to return minimum Unix timestamp (as `model.Time`)
|
||||
// across all the entries in a given `stream`.
|
||||
func minTs(stream *logproto.Stream) model.Time { |
||||
// NOTE: We choose `min` timestamp because, the chunk is written once then
|
||||
// added to the index buckets for may be different days. It would better rather to have
|
||||
// some latest(say v13) indices reference older (say v12) compatible chunks than vice versa.
|
||||
|
||||
streamMinTs := int64(math.MaxInt64) |
||||
for _, entry := range stream.Entries { |
||||
ts := entry.Timestamp.UnixNano() |
||||
if streamMinTs > ts { |
||||
streamMinTs = ts |
||||
} |
||||
} |
||||
return model.TimeFromUnixNano(streamMinTs) |
||||
} |
||||
|
||||
// chunkFormatAt returns chunk formats to use at given period of time.
|
||||
func (i *instance) chunkFormatAt(at model.Time) (byte, chunkenc.HeadBlockFmt, error) { |
||||
// NOTE: We choose chunk formats for stream based on it's entries timestamp.
|
||||
// Rationale being, a single (ingester) instance can be running across multiple schema period
|
||||
// and choosing correct periodConfig during creation of stream is more accurate rather
|
||||
// than choosing it during starting of instance itself.
|
||||
|
||||
periodConfig, err := i.schemaconfig.SchemaForTime(at) |
||||
if err != nil { |
||||
return 0, 0, err |
||||
} |
||||
|
||||
chunkFormat, headblock, err := periodConfig.ChunkFormat() |
||||
if err != nil { |
||||
return 0, 0, err |
||||
} |
||||
|
||||
return chunkFormat, headblock, nil |
||||
} |
||||
|
||||
func (i *instance) getHashForLabels(ls labels.Labels) model.Fingerprint { |
||||
var fp uint64 |
||||
fp, i.buf = ls.HashWithoutLabels(i.buf, []string(nil)...) |
||||
return i.mapper.MapFP(model.Fingerprint(fp), ls) |
||||
} |
||||
|
||||
// Return labels associated with given fingerprint. Used by fingerprint mapper.
|
||||
func (i *instance) getLabelsFromFingerprint(fp model.Fingerprint) labels.Labels { |
||||
s, ok := i.streams.LoadByFP(fp) |
||||
if !ok { |
||||
return nil |
||||
} |
||||
return s.labels |
||||
} |
||||
|
||||
func (i *instance) onStreamCreationError(ctx context.Context, pushReqStream logproto.Stream, err error, labels labels.Labels) (*stream, error) { |
||||
if i.configs.LogStreamCreation(i.instanceID) { |
||||
level.Debug(util_log.Logger).Log( |
||||
"msg", "failed to create stream, exceeded limit", |
||||
"org_id", i.instanceID, |
||||
"err", err, |
||||
"stream", pushReqStream.Labels, |
||||
) |
||||
} |
||||
|
||||
validation.DiscardedSamples.WithLabelValues(validation.StreamLimit, i.instanceID).Add(float64(len(pushReqStream.Entries))) |
||||
bytes := util.EntriesTotalSize(pushReqStream.Entries) |
||||
validation.DiscardedBytes.WithLabelValues(validation.StreamLimit, i.instanceID).Add(float64(bytes)) |
||||
if i.customStreamsTracker != nil { |
||||
i.customStreamsTracker.DiscardedBytesAdd(ctx, i.instanceID, validation.StreamLimit, labels, float64(bytes)) |
||||
} |
||||
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, validation.StreamLimitErrorMsg, labels, i.instanceID) |
||||
} |
||||
|
||||
func (i *instance) onStreamCreated(s *stream) { |
||||
memoryStreams.WithLabelValues(i.instanceID).Inc() |
||||
memoryStreamsLabelsBytes.Add(float64(len(s.labels.String()))) |
||||
i.streamsCreatedTotal.Inc() |
||||
// i.addTailersToNewStream(s)
|
||||
streamsCountStats.Add(1) |
||||
i.ownedStreamsSvc.incOwnedStreamCount() |
||||
if i.configs.LogStreamCreation(i.instanceID) { |
||||
level.Debug(util_log.Logger).Log( |
||||
"msg", "successfully created stream", |
||||
"org_id", i.instanceID, |
||||
"stream", s.labels.String(), |
||||
) |
||||
} |
||||
} |
@ -1,226 +0,0 @@ |
||||
package ingesterrf1 |
||||
|
||||
import ( |
||||
"fmt" |
||||
"math" |
||||
"sync" |
||||
"time" |
||||
|
||||
"golang.org/x/time/rate" |
||||
|
||||
"github.com/grafana/loki/v3/pkg/distributor/shardstreams" |
||||
"github.com/grafana/loki/v3/pkg/validation" |
||||
) |
||||
|
||||
const ( |
||||
errMaxStreamsPerUserLimitExceeded = "tenant '%v' per-user streams limit exceeded, streams: %d exceeds calculated limit: %d (local limit: %d, global limit: %d, global/ingesters: %d)" |
||||
) |
||||
|
||||
// RingCount is the interface exposed by a ring implementation which allows
|
||||
// to count members
|
||||
type RingCount interface { |
||||
HealthyInstancesCount() int |
||||
} |
||||
|
||||
type Limits interface { |
||||
UnorderedWrites(userID string) bool |
||||
UseOwnedStreamCount(userID string) bool |
||||
MaxLocalStreamsPerUser(userID string) int |
||||
MaxGlobalStreamsPerUser(userID string) int |
||||
PerStreamRateLimit(userID string) validation.RateLimit |
||||
ShardStreams(userID string) shardstreams.Config |
||||
} |
||||
|
||||
// Limiter implements primitives to get the maximum number of streams
|
||||
// an ingester can handle for a specific tenant
|
||||
type Limiter struct { |
||||
limits Limits |
||||
ring RingCount |
||||
replicationFactor int |
||||
metrics *ingesterMetrics |
||||
|
||||
mtx sync.RWMutex |
||||
disabled bool |
||||
} |
||||
|
||||
func (l *Limiter) DisableForWALReplay() { |
||||
l.mtx.Lock() |
||||
defer l.mtx.Unlock() |
||||
l.disabled = true |
||||
l.metrics.limiterEnabled.Set(0) |
||||
} |
||||
|
||||
func (l *Limiter) Enable() { |
||||
l.mtx.Lock() |
||||
defer l.mtx.Unlock() |
||||
l.disabled = false |
||||
l.metrics.limiterEnabled.Set(1) |
||||
} |
||||
|
||||
// NewLimiter makes a new limiter
|
||||
func NewLimiter(limits Limits, metrics *ingesterMetrics, ring RingCount, replicationFactor int) *Limiter { |
||||
return &Limiter{ |
||||
limits: limits, |
||||
ring: ring, |
||||
replicationFactor: replicationFactor, |
||||
metrics: metrics, |
||||
} |
||||
} |
||||
|
||||
func (l *Limiter) UnorderedWrites(userID string) bool { |
||||
// WAL replay should not discard previously ack'd writes,
|
||||
// so allow out of order writes while the limiter is disabled.
|
||||
// This allows replaying unordered WALs into ordered configurations.
|
||||
if l.disabled { |
||||
return true |
||||
} |
||||
return l.limits.UnorderedWrites(userID) |
||||
} |
||||
|
||||
func (l *Limiter) GetStreamCountLimit(tenantID string) (calculatedLimit, localLimit, globalLimit, adjustedGlobalLimit int) { |
||||
// Start by setting the local limit either from override or default
|
||||
localLimit = l.limits.MaxLocalStreamsPerUser(tenantID) |
||||
|
||||
// We can assume that streams are evenly distributed across ingesters
|
||||
// so we do convert the global limit into a local limit
|
||||
globalLimit = l.limits.MaxGlobalStreamsPerUser(tenantID) |
||||
adjustedGlobalLimit = l.convertGlobalToLocalLimit(globalLimit) |
||||
|
||||
// Set the calculated limit to the lesser of the local limit or the new calculated global limit
|
||||
calculatedLimit = l.minNonZero(localLimit, adjustedGlobalLimit) |
||||
|
||||
// If both the local and global limits are disabled, we just
|
||||
// use the largest int value
|
||||
if calculatedLimit == 0 { |
||||
calculatedLimit = math.MaxInt32 |
||||
} |
||||
return |
||||
} |
||||
|
||||
func (l *Limiter) minNonZero(first, second int) int { |
||||
if first == 0 || (second != 0 && first > second) { |
||||
return second |
||||
} |
||||
|
||||
return first |
||||
} |
||||
|
||||
func (l *Limiter) convertGlobalToLocalLimit(globalLimit int) int { |
||||
if globalLimit == 0 { |
||||
return 0 |
||||
} |
||||
// todo: change to healthyInstancesInZoneCount() once
|
||||
// Given we don't need a super accurate count (ie. when the ingesters
|
||||
// topology changes) and we prefer to always be in favor of the tenant,
|
||||
// we can use a per-ingester limit equal to:
|
||||
// (global limit / number of ingesters) * replication factor
|
||||
numIngesters := l.ring.HealthyInstancesCount() |
||||
|
||||
// May happen because the number of ingesters is asynchronously updated.
|
||||
// If happens, we just temporarily ignore the global limit.
|
||||
if numIngesters > 0 { |
||||
return int((float64(globalLimit) / float64(numIngesters)) * float64(l.replicationFactor)) |
||||
} |
||||
|
||||
return 0 |
||||
} |
||||
|
||||
type supplier[T any] func() T |
||||
|
||||
type streamCountLimiter struct { |
||||
tenantID string |
||||
limiter *Limiter |
||||
defaultStreamCountSupplier supplier[int] |
||||
ownedStreamSvc *ownedStreamService |
||||
} |
||||
|
||||
var noopFixedLimitSupplier = func() int { |
||||
return 0 |
||||
} |
||||
|
||||
func newStreamCountLimiter(tenantID string, defaultStreamCountSupplier supplier[int], limiter *Limiter, service *ownedStreamService) *streamCountLimiter { |
||||
return &streamCountLimiter{ |
||||
tenantID: tenantID, |
||||
limiter: limiter, |
||||
defaultStreamCountSupplier: defaultStreamCountSupplier, |
||||
ownedStreamSvc: service, |
||||
} |
||||
} |
||||
|
||||
func (l *streamCountLimiter) AssertNewStreamAllowed(tenantID string) error { |
||||
streamCountSupplier, fixedLimitSupplier := l.getSuppliers(tenantID) |
||||
calculatedLimit, localLimit, globalLimit, adjustedGlobalLimit := l.getCurrentLimit(tenantID, fixedLimitSupplier) |
||||
actualStreamsCount := streamCountSupplier() |
||||
if actualStreamsCount < calculatedLimit { |
||||
return nil |
||||
} |
||||
|
||||
return fmt.Errorf(errMaxStreamsPerUserLimitExceeded, tenantID, actualStreamsCount, calculatedLimit, localLimit, globalLimit, adjustedGlobalLimit) |
||||
} |
||||
|
||||
func (l *streamCountLimiter) getCurrentLimit(tenantID string, fixedLimitSupplier supplier[int]) (calculatedLimit, localLimit, globalLimit, adjustedGlobalLimit int) { |
||||
calculatedLimit, localLimit, globalLimit, adjustedGlobalLimit = l.limiter.GetStreamCountLimit(tenantID) |
||||
fixedLimit := fixedLimitSupplier() |
||||
if fixedLimit > calculatedLimit { |
||||
calculatedLimit = fixedLimit |
||||
} |
||||
return |
||||
} |
||||
|
||||
func (l *streamCountLimiter) getSuppliers(tenant string) (streamCountSupplier, fixedLimitSupplier supplier[int]) { |
||||
if l.limiter.limits.UseOwnedStreamCount(tenant) { |
||||
return l.ownedStreamSvc.getOwnedStreamCount, l.ownedStreamSvc.getFixedLimit |
||||
} |
||||
return l.defaultStreamCountSupplier, noopFixedLimitSupplier |
||||
} |
||||
|
||||
type RateLimiterStrategy interface { |
||||
RateLimit(tenant string) validation.RateLimit |
||||
} |
||||
|
||||
func (l *Limiter) RateLimit(tenant string) validation.RateLimit { |
||||
if l.disabled { |
||||
return validation.Unlimited |
||||
} |
||||
|
||||
return l.limits.PerStreamRateLimit(tenant) |
||||
} |
||||
|
||||
type StreamRateLimiter struct { |
||||
recheckPeriod time.Duration |
||||
recheckAt time.Time |
||||
strategy RateLimiterStrategy |
||||
tenant string |
||||
lim *rate.Limiter |
||||
} |
||||
|
||||
func NewStreamRateLimiter(strategy RateLimiterStrategy, tenant string, recheckPeriod time.Duration) *StreamRateLimiter { |
||||
rl := strategy.RateLimit(tenant) |
||||
return &StreamRateLimiter{ |
||||
recheckPeriod: recheckPeriod, |
||||
strategy: strategy, |
||||
tenant: tenant, |
||||
lim: rate.NewLimiter(rl.Limit, rl.Burst), |
||||
} |
||||
} |
||||
|
||||
func (l *StreamRateLimiter) AllowN(at time.Time, n int) bool { |
||||
now := time.Now() |
||||
if now.After(l.recheckAt) { |
||||
l.recheckAt = now.Add(l.recheckPeriod) |
||||
|
||||
oldLim := l.lim.Limit() |
||||
oldBurst := l.lim.Burst() |
||||
|
||||
next := l.strategy.RateLimit(l.tenant) |
||||
|
||||
if oldLim != next.Limit || oldBurst != next.Burst { |
||||
// Edge case: rate.Inf doesn't advance nicely when reconfigured.
|
||||
// To simplify, we just create a new limiter after reconfiguration rather
|
||||
// than alter the existing one.
|
||||
l.lim = rate.NewLimiter(next.Limit, next.Burst) |
||||
} |
||||
} |
||||
|
||||
return l.lim.AllowN(at, n) |
||||
} |
@ -1,152 +0,0 @@ |
||||
package ingesterrf1 |
||||
|
||||
import ( |
||||
"fmt" |
||||
"sort" |
||||
"strings" |
||||
"sync" |
||||
|
||||
"github.com/go-kit/log/level" |
||||
"github.com/prometheus/common/model" |
||||
"github.com/prometheus/prometheus/model/labels" |
||||
"go.uber.org/atomic" |
||||
|
||||
util_log "github.com/grafana/loki/v3/pkg/util/log" |
||||
) |
||||
|
||||
const maxMappedFP = 1 << 20 // About 1M fingerprints reserved for mapping.
|
||||
|
||||
var separatorString = string([]byte{model.SeparatorByte}) |
||||
|
||||
// FpMapper is used to map fingerprints in order to work around fingerprint
|
||||
// collisions.
|
||||
type FpMapper struct { |
||||
// highestMappedFP has to be aligned for atomic operations.
|
||||
highestMappedFP atomic.Uint64 |
||||
|
||||
mtx sync.RWMutex // Protects mappings.
|
||||
// maps original fingerprints to a map of string representations of
|
||||
// metrics to the truly unique fingerprint.
|
||||
mappings map[model.Fingerprint]map[string]model.Fingerprint |
||||
|
||||
// Returns existing labels for given fingerprint, if any.
|
||||
// Equality check relies on labels.Labels being sorted.
|
||||
fpToLabels func(fingerprint model.Fingerprint) labels.Labels |
||||
} |
||||
|
||||
// NewFPMapper returns an fpMapper ready to use.
|
||||
func NewFPMapper(fpToLabels func(fingerprint model.Fingerprint) labels.Labels) *FpMapper { |
||||
if fpToLabels == nil { |
||||
panic("nil fpToLabels") |
||||
} |
||||
|
||||
return &FpMapper{ |
||||
fpToLabels: fpToLabels, |
||||
mappings: map[model.Fingerprint]map[string]model.Fingerprint{}, |
||||
} |
||||
} |
||||
|
||||
// MapFP takes a raw fingerprint (as returned by Metrics.FastFingerprint) and
|
||||
// returns a truly unique fingerprint. The caller must have locked the raw
|
||||
// fingerprint.
|
||||
func (m *FpMapper) MapFP(fp model.Fingerprint, metric labels.Labels) model.Fingerprint { |
||||
// First check if we are in the reserved FP space, in which case this is
|
||||
// automatically a collision that has to be mapped.
|
||||
if fp <= maxMappedFP { |
||||
return m.maybeAddMapping(fp, metric) |
||||
} |
||||
|
||||
// Then check the most likely case: This fp belongs to a series that is
|
||||
// already in memory.
|
||||
s := m.fpToLabels(fp) |
||||
if s != nil { |
||||
// FP exists in memory, but is it for the same metric?
|
||||
if labels.Equal(metric, s) { |
||||
// Yupp. We are done.
|
||||
return fp |
||||
} |
||||
// Collision detected!
|
||||
return m.maybeAddMapping(fp, metric) |
||||
} |
||||
// Metric is not in memory. Before doing the expensive archive lookup,
|
||||
// check if we have a mapping for this metric in place already.
|
||||
m.mtx.RLock() |
||||
mappedFPs, fpAlreadyMapped := m.mappings[fp] |
||||
m.mtx.RUnlock() |
||||
if fpAlreadyMapped { |
||||
// We indeed have mapped fp historically.
|
||||
ms := metricToUniqueString(metric) |
||||
// fp is locked by the caller, so no further locking of
|
||||
// 'collisions' required (it is specific to fp).
|
||||
mappedFP, ok := mappedFPs[ms] |
||||
if ok { |
||||
// Historical mapping found, return the mapped FP.
|
||||
return mappedFP |
||||
} |
||||
} |
||||
return fp |
||||
} |
||||
|
||||
// maybeAddMapping is only used internally. It takes a detected collision and
|
||||
// adds it to the collisions map if not yet there. In any case, it returns the
|
||||
// truly unique fingerprint for the colliding metric.
|
||||
func (m *FpMapper) maybeAddMapping(fp model.Fingerprint, collidingMetric labels.Labels) model.Fingerprint { |
||||
ms := metricToUniqueString(collidingMetric) |
||||
m.mtx.RLock() |
||||
mappedFPs, ok := m.mappings[fp] |
||||
m.mtx.RUnlock() |
||||
if ok { |
||||
// fp is locked by the caller, so no further locking required.
|
||||
mappedFP, ok := mappedFPs[ms] |
||||
if ok { |
||||
return mappedFP // Existing mapping.
|
||||
} |
||||
// A new mapping has to be created.
|
||||
mappedFP = m.nextMappedFP() |
||||
mappedFPs[ms] = mappedFP |
||||
level.Info(util_log.Logger).Log( |
||||
"msg", "fingerprint collision detected, mapping to new fingerprint", |
||||
"old_fp", fp, |
||||
"new_fp", mappedFP, |
||||
"metric", ms, |
||||
) |
||||
return mappedFP |
||||
} |
||||
// This is the first collision for fp.
|
||||
mappedFP := m.nextMappedFP() |
||||
mappedFPs = map[string]model.Fingerprint{ms: mappedFP} |
||||
m.mtx.Lock() |
||||
m.mappings[fp] = mappedFPs |
||||
m.mtx.Unlock() |
||||
level.Info(util_log.Logger).Log( |
||||
"msg", "fingerprint collision detected, mapping to new fingerprint", |
||||
"old_fp", fp, |
||||
"new_fp", mappedFP, |
||||
"metric", collidingMetric, |
||||
) |
||||
return mappedFP |
||||
} |
||||
|
||||
func (m *FpMapper) nextMappedFP() model.Fingerprint { |
||||
mappedFP := model.Fingerprint(m.highestMappedFP.Inc()) |
||||
if mappedFP > maxMappedFP { |
||||
panic(fmt.Errorf("more than %v fingerprints mapped in collision detection", maxMappedFP)) |
||||
} |
||||
return mappedFP |
||||
} |
||||
|
||||
// metricToUniqueString turns a metric into a string in a reproducible and
|
||||
// unique way, i.e. the same metric will always create the same string, and
|
||||
// different metrics will always create different strings. In a way, it is the
|
||||
// "ideal" fingerprint function, only that it is more expensive than the
|
||||
// FastFingerprint function, and its result is not suitable as a key for maps
|
||||
// and indexes as it might become really large, causing a lot of hashing effort
|
||||
// in maps and a lot of storage overhead in indexes.
|
||||
func metricToUniqueString(m labels.Labels) string { |
||||
parts := make([]string, 0, len(m)) |
||||
for _, pair := range m { |
||||
parts = append(parts, pair.Name+separatorString+pair.Value) |
||||
} |
||||
sort.Strings(parts) |
||||
return strings.Join(parts, separatorString) |
||||
} |
@ -1,63 +0,0 @@ |
||||
package ingesterrf1 |
||||
|
||||
import ( |
||||
"github.com/prometheus/client_golang/prometheus" |
||||
"github.com/prometheus/client_golang/prometheus/promauto" |
||||
|
||||
"github.com/grafana/loki/v3/pkg/storage/wal" |
||||
) |
||||
|
||||
type ingesterMetrics struct { |
||||
autoForgetUnhealthyIngestersTotal prometheus.Counter |
||||
limiterEnabled prometheus.Gauge |
||||
// Shutdown marker for ingester scale down.
|
||||
shutdownMarker prometheus.Gauge |
||||
flushesTotal prometheus.Counter |
||||
flushFailuresTotal prometheus.Counter |
||||
flushQueues prometheus.Gauge |
||||
flushDuration prometheus.Histogram |
||||
flushSize prometheus.Histogram |
||||
segmentMetrics *wal.SegmentMetrics |
||||
} |
||||
|
||||
func newIngesterMetrics(r prometheus.Registerer) *ingesterMetrics { |
||||
return &ingesterMetrics{ |
||||
autoForgetUnhealthyIngestersTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{ |
||||
Name: "loki_ingester_rf1_autoforget_unhealthy_ingesters_total", |
||||
Help: "Total number of ingesters automatically forgotten.", |
||||
}), |
||||
limiterEnabled: promauto.With(r).NewGauge(prometheus.GaugeOpts{ |
||||
Name: "loki_ingester_rf1_limiter_enabled", |
||||
Help: "1 if the limiter is enabled, otherwise 0.", |
||||
}), |
||||
shutdownMarker: promauto.With(r).NewGauge(prometheus.GaugeOpts{ |
||||
Name: "loki_ingester_rf1_shutdown_marker", |
||||
Help: "1 if prepare shutdown has been called, 0 otherwise.", |
||||
}), |
||||
flushesTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{ |
||||
Name: "loki_ingester_rf1_flushes_total", |
||||
Help: "The total number of flushes.", |
||||
}), |
||||
flushFailuresTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{ |
||||
Name: "loki_ingester_rf1_flush_failures_total", |
||||
Help: "The total number of failed flushes.", |
||||
}), |
||||
flushQueues: promauto.With(r).NewGauge(prometheus.GaugeOpts{ |
||||
Name: "loki_ingester_rf1_flush_queues", |
||||
Help: "The total number of flush queues.", |
||||
}), |
||||
flushDuration: promauto.With(r).NewHistogram(prometheus.HistogramOpts{ |
||||
Name: "loki_ingester_rf1_flush_duration_seconds", |
||||
Help: "The flush duration (in seconds).", |
||||
Buckets: prometheus.ExponentialBuckets(0.001, 4, 8), |
||||
NativeHistogramBucketFactor: 1.1, |
||||
}), |
||||
flushSize: promauto.With(r).NewHistogram(prometheus.HistogramOpts{ |
||||
Name: "loki_ingester_rf1_flush_size_bytes", |
||||
Help: "The flush size (as written to object storage).", |
||||
Buckets: prometheus.ExponentialBuckets(100, 10, 8), |
||||
NativeHistogramBucketFactor: 1.1, |
||||
}), |
||||
segmentMetrics: wal.NewSegmentMetrics(r), |
||||
} |
||||
} |
@ -1,74 +0,0 @@ |
||||
package ingesterrf1 |
||||
|
||||
import ( |
||||
"sync" |
||||
|
||||
"github.com/grafana/dskit/services" |
||||
"go.uber.org/atomic" |
||||
) |
||||
|
||||
type ownedStreamService struct { |
||||
services.Service |
||||
|
||||
tenantID string |
||||
limiter *Limiter |
||||
fixedLimit *atomic.Int32 |
||||
ownedStreamCount int |
||||
notOwnedStreamCount int |
||||
lock sync.RWMutex |
||||
} |
||||
|
||||
func newOwnedStreamService(tenantID string, limiter *Limiter) *ownedStreamService { |
||||
svc := &ownedStreamService{ |
||||
tenantID: tenantID, |
||||
limiter: limiter, |
||||
fixedLimit: atomic.NewInt32(0), |
||||
} |
||||
|
||||
svc.updateFixedLimit() |
||||
return svc |
||||
} |
||||
|
||||
func (s *ownedStreamService) getOwnedStreamCount() int { |
||||
s.lock.RLock() |
||||
defer s.lock.RUnlock() |
||||
return s.ownedStreamCount |
||||
} |
||||
|
||||
func (s *ownedStreamService) updateFixedLimit() { |
||||
limit, _, _, _ := s.limiter.GetStreamCountLimit(s.tenantID) |
||||
s.fixedLimit.Store(int32(limit)) |
||||
} |
||||
|
||||
func (s *ownedStreamService) getFixedLimit() int { |
||||
return int(s.fixedLimit.Load()) |
||||
} |
||||
|
||||
func (s *ownedStreamService) incOwnedStreamCount() { |
||||
s.lock.Lock() |
||||
defer s.lock.Unlock() |
||||
s.ownedStreamCount++ |
||||
} |
||||
|
||||
func (s *ownedStreamService) incNotOwnedStreamCount() { |
||||
s.lock.Lock() |
||||
defer s.lock.Unlock() |
||||
s.notOwnedStreamCount++ |
||||
} |
||||
|
||||
func (s *ownedStreamService) decOwnedStreamCount() { |
||||
s.lock.Lock() |
||||
defer s.lock.Unlock() |
||||
if s.notOwnedStreamCount > 0 { |
||||
s.notOwnedStreamCount-- |
||||
return |
||||
} |
||||
s.ownedStreamCount-- |
||||
} |
||||
|
||||
func (s *ownedStreamService) resetStreamCounts() { |
||||
s.lock.Lock() |
||||
defer s.lock.Unlock() |
||||
s.ownedStreamCount = 0 |
||||
s.notOwnedStreamCount = 0 |
||||
} |
@ -1,94 +0,0 @@ |
||||
package ingesterrf1 |
||||
|
||||
import ( |
||||
"context" |
||||
"fmt" |
||||
|
||||
"github.com/go-kit/log" |
||||
"github.com/grafana/dskit/kv" |
||||
"github.com/grafana/dskit/ring" |
||||
ring_client "github.com/grafana/dskit/ring/client" |
||||
"github.com/grafana/dskit/services" |
||||
"github.com/prometheus/client_golang/prometheus" |
||||
|
||||
"github.com/grafana/loki/v3/pkg/ingester-rf1/clientpool" |
||||
) |
||||
|
||||
type RingClient struct { |
||||
cfg Config |
||||
logger log.Logger |
||||
|
||||
services.Service |
||||
subservices *services.Manager |
||||
subservicesWatcher *services.FailureWatcher |
||||
ring *ring.Ring |
||||
pool *ring_client.Pool |
||||
} |
||||
|
||||
func NewRingClient( |
||||
cfg Config, |
||||
metricsNamespace string, |
||||
registerer prometheus.Registerer, |
||||
logger log.Logger, |
||||
) (*RingClient, error) { |
||||
var err error |
||||
registerer = prometheus.WrapRegistererWithPrefix(metricsNamespace+"_", registerer) |
||||
ringClient := &RingClient{ |
||||
logger: log.With(logger, "component", "ingester-rf1-client"), |
||||
cfg: cfg, |
||||
} |
||||
ringClient.ring, err = newRing(cfg.LifecyclerConfig.RingConfig, "ingester-rf1", "ingester-rf1-ring", ringClient.logger, registerer) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
factory := cfg.factory |
||||
if factory == nil { |
||||
factory = ring_client.PoolAddrFunc(func(addr string) (ring_client.PoolClient, error) { |
||||
return clientpool.NewClient(cfg.ClientConfig, addr) |
||||
}) |
||||
} |
||||
ringClient.pool = clientpool.NewPool("ingester-rf1", cfg.ClientConfig.PoolConfig, ringClient.ring, factory, logger, metricsNamespace) |
||||
|
||||
ringClient.subservices, err = services.NewManager(ringClient.pool, ringClient.ring) |
||||
if err != nil { |
||||
return nil, fmt.Errorf("services manager: %w", err) |
||||
} |
||||
ringClient.subservicesWatcher = services.NewFailureWatcher() |
||||
ringClient.subservicesWatcher.WatchManager(ringClient.subservices) |
||||
ringClient.Service = services.NewBasicService(ringClient.starting, ringClient.running, ringClient.stopping) |
||||
|
||||
return ringClient, nil |
||||
} |
||||
|
||||
func newRing(cfg ring.Config, name, key string, logger log.Logger, reg prometheus.Registerer) (*ring.Ring, error) { |
||||
codec := ring.GetCodec() |
||||
// Suffix all client names with "-ring" to denote this kv client is used by the ring
|
||||
store, err := kv.NewClient( |
||||
cfg.KVStore, |
||||
codec, |
||||
kv.RegistererWithKVName(reg, name+"-ring"), |
||||
logger, |
||||
) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
return ring.NewWithStoreClientAndStrategy(cfg, name, key, store, ring.NewIgnoreUnhealthyInstancesReplicationStrategy(), reg, logger) |
||||
} |
||||
|
||||
func (q *RingClient) starting(ctx context.Context) error { |
||||
return services.StartManagerAndAwaitHealthy(ctx, q.subservices) |
||||
} |
||||
|
||||
func (q *RingClient) running(ctx context.Context) error { |
||||
select { |
||||
case <-ctx.Done(): |
||||
return nil |
||||
case err := <-q.subservicesWatcher.Chan(): |
||||
return fmt.Errorf("ingester-rf1 tee subservices failed: %w", err) |
||||
} |
||||
} |
||||
|
||||
func (q *RingClient) stopping(_ error) error { |
||||
return services.StopManagerAndAwaitStopped(context.Background(), q.subservices) |
||||
} |
@ -1,325 +0,0 @@ |
||||
package ingesterrf1 |
||||
|
||||
import ( |
||||
"bytes" |
||||
"context" |
||||
"errors" |
||||
"fmt" |
||||
"net/http" |
||||
"time" |
||||
|
||||
"github.com/grafana/dskit/httpgrpc" |
||||
"github.com/opentracing/opentracing-go" |
||||
"github.com/prometheus/common/model" |
||||
"github.com/prometheus/prometheus/model/labels" |
||||
|
||||
"github.com/grafana/loki/v3/pkg/chunkenc" |
||||
"github.com/grafana/loki/v3/pkg/distributor/writefailures" |
||||
"github.com/grafana/loki/v3/pkg/loghttp/push" |
||||
"github.com/grafana/loki/v3/pkg/logproto" |
||||
"github.com/grafana/loki/v3/pkg/storage/wal" |
||||
"github.com/grafana/loki/v3/pkg/util" |
||||
"github.com/grafana/loki/v3/pkg/util/flagext" |
||||
"github.com/grafana/loki/v3/pkg/validation" |
||||
) |
||||
|
||||
var ErrEntriesExist = errors.New("duplicate push - entries already exist") |
||||
|
||||
type line struct { |
||||
ts time.Time |
||||
content string |
||||
} |
||||
|
||||
type stream struct { |
||||
limiter *StreamRateLimiter |
||||
cfg *Config |
||||
tenant string |
||||
// Newest chunk at chunks[n-1].
|
||||
// Not thread-safe; assume accesses to this are locked by caller.
|
||||
fp model.Fingerprint // possibly remapped fingerprint, used in the streams map
|
||||
|
||||
labels labels.Labels |
||||
labelsString string |
||||
labelHash uint64 |
||||
labelHashNoShard uint64 |
||||
|
||||
// most recently pushed line. This is used to prevent duplicate pushes.
|
||||
// It also determines chunk synchronization when unordered writes are disabled.
|
||||
lastLine line |
||||
|
||||
// keeps track of the highest timestamp accepted by the stream.
|
||||
// This is used when unordered writes are enabled to cap the validity window
|
||||
// of accepted writes and for chunk synchronization.
|
||||
highestTs time.Time |
||||
|
||||
metrics *ingesterMetrics |
||||
|
||||
// tailers map[uint32]*tailer
|
||||
// tailerMtx sync.RWMutex
|
||||
|
||||
// entryCt is a counter which is incremented on each accepted entry.
|
||||
// This allows us to discard WAL entries during replays which were
|
||||
// already recovered via checkpoints. Historically out of order
|
||||
// errors were used to detect this, but this counter has been
|
||||
// introduced to facilitate removing the ordering constraint.
|
||||
entryCt int64 |
||||
|
||||
unorderedWrites bool |
||||
// streamRateCalculator *StreamRateCalculator
|
||||
|
||||
writeFailures *writefailures.Manager |
||||
|
||||
chunkFormat byte |
||||
chunkHeadBlockFormat chunkenc.HeadBlockFmt |
||||
} |
||||
|
||||
type entryWithError struct { |
||||
entry *logproto.Entry |
||||
e error |
||||
} |
||||
|
||||
func newStream( |
||||
chunkFormat byte, |
||||
headBlockFmt chunkenc.HeadBlockFmt, |
||||
cfg *Config, |
||||
limits RateLimiterStrategy, |
||||
tenant string, |
||||
fp model.Fingerprint, |
||||
labels labels.Labels, |
||||
unorderedWrites bool, |
||||
// streamRateCalculator *StreamRateCalculator,
|
||||
metrics *ingesterMetrics, |
||||
writeFailures *writefailures.Manager, |
||||
) *stream { |
||||
// hashNoShard, _ := labels.HashWithoutLabels(make([]byte, 0, 1024), ShardLbName)
|
||||
return &stream{ |
||||
limiter: NewStreamRateLimiter(limits, tenant, 10*time.Second), |
||||
cfg: cfg, |
||||
fp: fp, |
||||
labels: labels, |
||||
labelsString: labels.String(), |
||||
labelHash: labels.Hash(), |
||||
// labelHashNoShard: hashNoShard,
|
||||
// tailers: map[uint32]*tailer{},
|
||||
metrics: metrics, |
||||
tenant: tenant, |
||||
// streamRateCalculator: streamRateCalculator,
|
||||
|
||||
unorderedWrites: unorderedWrites, |
||||
writeFailures: writeFailures, |
||||
chunkFormat: chunkFormat, |
||||
chunkHeadBlockFormat: headBlockFmt, |
||||
} |
||||
} |
||||
|
||||
// consumeChunk manually adds a chunk to the stream that was received during
|
||||
// ingester chunk transfer.
|
||||
// Must hold chunkMtx
|
||||
// DEPRECATED: chunk transfers are no longer suggested and remain for compatibility.
|
||||
func (s *stream) consumeChunk(_ context.Context, _ *logproto.Chunk) error { |
||||
return nil |
||||
} |
||||
|
||||
func (s *stream) Push( |
||||
ctx context.Context, |
||||
wal *wal.Manager, |
||||
entries []logproto.Entry, |
||||
// Whether nor not to ingest all at once or not. It is a per-tenant configuration.
|
||||
rateLimitWholeStream bool, |
||||
|
||||
usageTracker push.UsageTracker, |
||||
) (int, *wal.AppendResult, error) { |
||||
toStore, invalid := s.validateEntries(ctx, entries, rateLimitWholeStream, usageTracker) |
||||
if rateLimitWholeStream && hasRateLimitErr(invalid) { |
||||
return 0, nil, errorForFailedEntries(s, invalid, len(entries)) |
||||
} |
||||
|
||||
bytesAdded, res, err := s.storeEntries(ctx, wal, toStore) |
||||
if err != nil { |
||||
return 0, nil, err |
||||
} |
||||
|
||||
return bytesAdded, res, errorForFailedEntries(s, invalid, len(entries)) |
||||
} |
||||
|
||||
func errorForFailedEntries(s *stream, failedEntriesWithError []entryWithError, totalEntries int) error { |
||||
if len(failedEntriesWithError) == 0 { |
||||
return nil |
||||
} |
||||
|
||||
lastEntryWithErr := failedEntriesWithError[len(failedEntriesWithError)-1] |
||||
_, ok := lastEntryWithErr.e.(*validation.ErrStreamRateLimit) |
||||
outOfOrder := chunkenc.IsOutOfOrderErr(lastEntryWithErr.e) |
||||
if !outOfOrder && !ok { |
||||
return lastEntryWithErr.e |
||||
} |
||||
var statusCode int |
||||
if outOfOrder { |
||||
statusCode = http.StatusBadRequest |
||||
} |
||||
if ok { |
||||
statusCode = http.StatusTooManyRequests |
||||
} |
||||
// Return a http status 4xx request response with all failed entries.
|
||||
buf := bytes.Buffer{} |
||||
streamName := s.labelsString |
||||
|
||||
limitedFailedEntries := failedEntriesWithError |
||||
if maxIgnore := s.cfg.MaxReturnedErrors; maxIgnore > 0 && len(limitedFailedEntries) > maxIgnore { |
||||
limitedFailedEntries = limitedFailedEntries[:maxIgnore] |
||||
} |
||||
|
||||
for _, entryWithError := range limitedFailedEntries { |
||||
fmt.Fprintf(&buf, |
||||
"entry with timestamp %s ignored, reason: '%s',\n", |
||||
entryWithError.entry.Timestamp.String(), entryWithError.e.Error()) |
||||
} |
||||
|
||||
fmt.Fprintf(&buf, "user '%s', total ignored: %d out of %d for stream: %s", s.tenant, len(failedEntriesWithError), totalEntries, streamName) |
||||
|
||||
return httpgrpc.Errorf(statusCode, "%s", buf.String()) |
||||
} |
||||
|
||||
func hasRateLimitErr(errs []entryWithError) bool { |
||||
if len(errs) == 0 { |
||||
return false |
||||
} |
||||
|
||||
lastErr := errs[len(errs)-1] |
||||
_, ok := lastErr.e.(*validation.ErrStreamRateLimit) |
||||
return ok |
||||
} |
||||
|
||||
func (s *stream) storeEntries(ctx context.Context, w *wal.Manager, entries []*logproto.Entry) (int, *wal.AppendResult, error) { |
||||
sp, _ := opentracing.StartSpanFromContext(ctx, "storeEntries") |
||||
defer sp.Finish() |
||||
|
||||
var bytesAdded int |
||||
|
||||
for i := 0; i < len(entries); i++ { |
||||
s.entryCt++ |
||||
s.lastLine.ts = entries[i].Timestamp |
||||
s.lastLine.content = entries[i].Line |
||||
if s.highestTs.Before(entries[i].Timestamp) { |
||||
s.highestTs = entries[i].Timestamp |
||||
} |
||||
|
||||
bytesAdded += len(entries[i].Line) |
||||
} |
||||
|
||||
res, err := w.Append(wal.AppendRequest{ |
||||
TenantID: s.tenant, |
||||
Labels: s.labels, |
||||
LabelsStr: s.labelsString, |
||||
Entries: entries, |
||||
}) |
||||
if err != nil { |
||||
return 0, nil, err |
||||
} |
||||
return bytesAdded, res, nil |
||||
} |
||||
|
||||
func (s *stream) validateEntries(ctx context.Context, entries []logproto.Entry, rateLimitWholeStream bool, usageTracker push.UsageTracker) ([]*logproto.Entry, []entryWithError) { |
||||
sp, ctx := opentracing.StartSpanFromContext(ctx, "validateEntries") |
||||
defer sp.Finish() |
||||
var ( |
||||
outOfOrderSamples, outOfOrderBytes int |
||||
rateLimitedSamples, rateLimitedBytes int |
||||
validBytes, totalBytes int |
||||
failedEntriesWithError []entryWithError |
||||
limit = s.limiter.lim.Limit() |
||||
lastLine = s.lastLine |
||||
highestTs = s.highestTs |
||||
toStore = make([]*logproto.Entry, 0, len(entries)) |
||||
) |
||||
|
||||
for i := range entries { |
||||
// If this entry matches our last appended line's timestamp and contents,
|
||||
// ignore it.
|
||||
//
|
||||
// This check is done at the stream level so it persists across cut and
|
||||
// flushed chunks.
|
||||
//
|
||||
// NOTE: it's still possible for duplicates to be appended if a stream is
|
||||
// deleted from inactivity.
|
||||
if entries[i].Timestamp.Equal(lastLine.ts) && entries[i].Line == lastLine.content { |
||||
continue |
||||
} |
||||
|
||||
entryBytes := util.EntryTotalSize(&entries[i]) |
||||
totalBytes += entryBytes |
||||
|
||||
now := time.Now() |
||||
if !rateLimitWholeStream && !s.limiter.AllowN(now, entryBytes) { |
||||
failedEntriesWithError = append(failedEntriesWithError, entryWithError{&entries[i], &validation.ErrStreamRateLimit{RateLimit: flagext.ByteSize(limit), Labels: s.labelsString, Bytes: flagext.ByteSize(entryBytes)}}) |
||||
s.writeFailures.Log(s.tenant, failedEntriesWithError[len(failedEntriesWithError)-1].e) |
||||
rateLimitedSamples++ |
||||
rateLimitedBytes += entryBytes |
||||
continue |
||||
} |
||||
|
||||
// The validity window for unordered writes is the highest timestamp present minus 1/2 * max-chunk-age.
|
||||
cutoff := highestTs.Add(-time.Hour) |
||||
if s.unorderedWrites && !highestTs.IsZero() && cutoff.After(entries[i].Timestamp) { |
||||
failedEntriesWithError = append(failedEntriesWithError, entryWithError{&entries[i], chunkenc.ErrTooFarBehind(entries[i].Timestamp, cutoff)}) |
||||
s.writeFailures.Log(s.tenant, fmt.Errorf("%w for stream %s", failedEntriesWithError[len(failedEntriesWithError)-1].e, s.labels)) |
||||
outOfOrderSamples++ |
||||
outOfOrderBytes += entryBytes |
||||
continue |
||||
} |
||||
|
||||
validBytes += entryBytes |
||||
|
||||
lastLine.ts = entries[i].Timestamp |
||||
lastLine.content = entries[i].Line |
||||
if highestTs.Before(entries[i].Timestamp) { |
||||
highestTs = entries[i].Timestamp |
||||
} |
||||
|
||||
toStore = append(toStore, &entries[i]) |
||||
} |
||||
|
||||
// Each successful call to 'AllowN' advances the limiter. With all-or-nothing
|
||||
// ingestion, the limiter should only be advanced when the whole stream can be
|
||||
// sent
|
||||
now := time.Now() |
||||
if rateLimitWholeStream && !s.limiter.AllowN(now, validBytes) { |
||||
// Report that the whole stream was rate limited
|
||||
rateLimitedSamples = len(toStore) |
||||
failedEntriesWithError = make([]entryWithError, 0, len(toStore)) |
||||
for i := 0; i < len(toStore); i++ { |
||||
entryTotalSize := util.EntryTotalSize(toStore[i]) |
||||
failedEntriesWithError = append(failedEntriesWithError, entryWithError{toStore[i], &validation.ErrStreamRateLimit{RateLimit: flagext.ByteSize(limit), Labels: s.labelsString, Bytes: flagext.ByteSize(entryTotalSize)}}) |
||||
rateLimitedBytes += entryTotalSize |
||||
} |
||||
} |
||||
|
||||
// s.streamRateCalculator.Record(s.tenant, s.labelHash, s.labelHashNoShard, totalBytes)
|
||||
s.reportMetrics(ctx, outOfOrderSamples, outOfOrderBytes, rateLimitedSamples, rateLimitedBytes, usageTracker) |
||||
return toStore, failedEntriesWithError |
||||
} |
||||
|
||||
func (s *stream) reportMetrics(ctx context.Context, outOfOrderSamples, outOfOrderBytes, rateLimitedSamples, rateLimitedBytes int, usageTracker push.UsageTracker) { |
||||
if outOfOrderSamples > 0 { |
||||
name := validation.OutOfOrder |
||||
if s.unorderedWrites { |
||||
name = validation.TooFarBehind |
||||
} |
||||
validation.DiscardedSamples.WithLabelValues(name, s.tenant).Add(float64(outOfOrderSamples)) |
||||
validation.DiscardedBytes.WithLabelValues(name, s.tenant).Add(float64(outOfOrderBytes)) |
||||
if usageTracker != nil { |
||||
usageTracker.DiscardedBytesAdd(ctx, s.tenant, name, s.labels, float64(outOfOrderBytes)) |
||||
} |
||||
} |
||||
if rateLimitedSamples > 0 { |
||||
validation.DiscardedSamples.WithLabelValues(validation.StreamRateLimit, s.tenant).Add(float64(rateLimitedSamples)) |
||||
validation.DiscardedBytes.WithLabelValues(validation.StreamRateLimit, s.tenant).Add(float64(rateLimitedBytes)) |
||||
if usageTracker != nil { |
||||
usageTracker.DiscardedBytesAdd(ctx, s.tenant, validation.StreamRateLimit, s.labels, float64(rateLimitedBytes)) |
||||
} |
||||
} |
||||
} |
||||
|
||||
func (s *stream) resetCounter() { |
||||
s.entryCt = 0 |
||||
} |
@ -1,131 +0,0 @@ |
||||
package ingesterrf1 |
||||
|
||||
import ( |
||||
"sync" |
||||
"time" |
||||
|
||||
"github.com/grafana/loki/v3/pkg/logproto" |
||||
) |
||||
|
||||
const ( |
||||
// defaultStripeSize is the default number of entries to allocate in the
|
||||
// stripeSeries list.
|
||||
defaultStripeSize = 1 << 10 |
||||
|
||||
// The intent is for a per-second rate so this is hard coded
|
||||
updateInterval = time.Second |
||||
) |
||||
|
||||
// stripeLock is taken from ruler/storage/wal/series.go
|
||||
type stripeLock struct { |
||||
sync.RWMutex |
||||
// Padding to avoid multiple locks being on the same cache line.
|
||||
_ [40]byte |
||||
} |
||||
|
||||
type StreamRateCalculator struct { |
||||
size int |
||||
samples []map[string]map[uint64]logproto.StreamRate |
||||
locks []stripeLock |
||||
stopchan chan struct{} |
||||
|
||||
rateLock sync.RWMutex |
||||
allRates []logproto.StreamRate |
||||
} |
||||
|
||||
func NewStreamRateCalculator() *StreamRateCalculator { |
||||
calc := &StreamRateCalculator{ |
||||
size: defaultStripeSize, |
||||
// Lookup pattern: tenant -> fingerprint -> rate
|
||||
samples: make([]map[string]map[uint64]logproto.StreamRate, defaultStripeSize), |
||||
locks: make([]stripeLock, defaultStripeSize), |
||||
stopchan: make(chan struct{}), |
||||
} |
||||
|
||||
for i := 0; i < defaultStripeSize; i++ { |
||||
calc.samples[i] = make(map[string]map[uint64]logproto.StreamRate) |
||||
} |
||||
|
||||
go calc.updateLoop() |
||||
|
||||
return calc |
||||
} |
||||
|
||||
func (c *StreamRateCalculator) updateLoop() { |
||||
t := time.NewTicker(updateInterval) |
||||
defer t.Stop() |
||||
|
||||
for { |
||||
select { |
||||
case <-t.C: |
||||
c.updateRates() |
||||
case <-c.stopchan: |
||||
return |
||||
} |
||||
} |
||||
} |
||||
|
||||
func (c *StreamRateCalculator) updateRates() { |
||||
rates := make([]logproto.StreamRate, 0, c.size) |
||||
|
||||
for i := 0; i < c.size; i++ { |
||||
c.locks[i].Lock() |
||||
|
||||
tenantRates := c.samples[i] |
||||
for _, tenant := range tenantRates { |
||||
for _, streamRate := range tenant { |
||||
rates = append(rates, logproto.StreamRate{ |
||||
Tenant: streamRate.Tenant, |
||||
StreamHash: streamRate.StreamHash, |
||||
StreamHashNoShard: streamRate.StreamHashNoShard, |
||||
Rate: streamRate.Rate, |
||||
Pushes: streamRate.Pushes, |
||||
}) |
||||
} |
||||
} |
||||
|
||||
c.samples[i] = make(map[string]map[uint64]logproto.StreamRate) |
||||
c.locks[i].Unlock() |
||||
} |
||||
|
||||
c.rateLock.Lock() |
||||
defer c.rateLock.Unlock() |
||||
|
||||
c.allRates = rates |
||||
} |
||||
|
||||
func (c *StreamRateCalculator) Rates() []logproto.StreamRate { |
||||
c.rateLock.RLock() |
||||
defer c.rateLock.RUnlock() |
||||
|
||||
return c.allRates |
||||
} |
||||
|
||||
func (c *StreamRateCalculator) Record(tenant string, streamHash, streamHashNoShard uint64, bytes int) { |
||||
i := streamHash & uint64(c.size-1) |
||||
|
||||
c.locks[i].Lock() |
||||
defer c.locks[i].Unlock() |
||||
|
||||
tenantMap := c.getTenant(i, tenant) |
||||
streamRate := tenantMap[streamHash] |
||||
streamRate.StreamHash = streamHash |
||||
streamRate.StreamHashNoShard = streamHashNoShard |
||||
streamRate.Tenant = tenant |
||||
streamRate.Rate += int64(bytes) |
||||
streamRate.Pushes++ |
||||
tenantMap[streamHash] = streamRate |
||||
|
||||
c.samples[i][tenant] = tenantMap |
||||
} |
||||
|
||||
func (c *StreamRateCalculator) getTenant(idx uint64, tenant string) map[uint64]logproto.StreamRate { |
||||
if t, ok := c.samples[idx][tenant]; ok { |
||||
return t |
||||
} |
||||
return make(map[uint64]logproto.StreamRate) |
||||
} |
||||
|
||||
func (c *StreamRateCalculator) Stop() { |
||||
close(c.stopchan) |
||||
} |
@ -1,149 +0,0 @@ |
||||
package ingesterrf1 |
||||
|
||||
import ( |
||||
"sync" |
||||
|
||||
"github.com/prometheus/common/model" |
||||
"go.uber.org/atomic" |
||||
) |
||||
|
||||
type streamsMap struct { |
||||
consistencyMtx sync.RWMutex // Keep read/write consistency between other fields
|
||||
streams *sync.Map // map[string]*stream
|
||||
streamsByFP *sync.Map // map[model.Fingerprint]*stream
|
||||
streamsCounter *atomic.Int64 |
||||
} |
||||
|
||||
func newStreamsMap() *streamsMap { |
||||
return &streamsMap{ |
||||
consistencyMtx: sync.RWMutex{}, |
||||
streams: &sync.Map{}, |
||||
streamsByFP: &sync.Map{}, |
||||
streamsCounter: atomic.NewInt64(0), |
||||
} |
||||
} |
||||
|
||||
// Load is lock-free. If usage of the stream is consistency sensitive, must be called inside WithRLock at least
|
||||
func (m *streamsMap) Load(key string) (*stream, bool) { |
||||
return m.load(m.streams, key) |
||||
} |
||||
|
||||
// LoadByFP is lock-free. If usage of the stream is consistency sensitive, must be called inside WithRLock at least
|
||||
func (m *streamsMap) LoadByFP(fp model.Fingerprint) (*stream, bool) { |
||||
return m.load(m.streamsByFP, fp) |
||||
} |
||||
|
||||
// Store must be called inside WithLock
|
||||
func (m *streamsMap) Store(key string, s *stream) { |
||||
m.store(key, s) |
||||
} |
||||
|
||||
// StoreByFP must be called inside WithLock
|
||||
func (m *streamsMap) StoreByFP(fp model.Fingerprint, s *stream) { |
||||
m.store(fp, s) |
||||
} |
||||
|
||||
// Delete must be called inside WithLock
|
||||
func (m *streamsMap) Delete(s *stream) bool { |
||||
_, loaded := m.streams.LoadAndDelete(s.labelsString) |
||||
if loaded { |
||||
m.streamsByFP.Delete(s.fp) |
||||
m.streamsCounter.Dec() |
||||
return true |
||||
} |
||||
return false |
||||
} |
||||
|
||||
// LoadOrStoreNew already has lock inside, do NOT call inside WithLock or WithRLock
|
||||
func (m *streamsMap) LoadOrStoreNew(key string, newStreamFn func() (*stream, error), postLoadFn func(*stream) error) (*stream, bool, error) { |
||||
return m.loadOrStoreNew(m.streams, key, newStreamFn, postLoadFn) |
||||
} |
||||
|
||||
// LoadOrStoreNewByFP already has lock inside, do NOT call inside WithLock or WithRLock
|
||||
func (m *streamsMap) LoadOrStoreNewByFP(fp model.Fingerprint, newStreamFn func() (*stream, error), postLoadFn func(*stream) error) (*stream, bool, error) { |
||||
return m.loadOrStoreNew(m.streamsByFP, fp, newStreamFn, postLoadFn) |
||||
} |
||||
|
||||
// WithLock is a helper function to execute write operations
|
||||
func (m *streamsMap) WithLock(fn func()) { |
||||
m.consistencyMtx.Lock() |
||||
defer m.consistencyMtx.Unlock() |
||||
fn() |
||||
} |
||||
|
||||
// WithRLock is a helper function to execute consistency sensitive read operations.
|
||||
// Generally, if a stream loaded from streamsMap will have its chunkMtx locked, chunkMtx.Lock is supposed to be called
|
||||
// within this function.
|
||||
func (m *streamsMap) WithRLock(fn func()) { |
||||
m.consistencyMtx.RLock() |
||||
defer m.consistencyMtx.RUnlock() |
||||
fn() |
||||
} |
||||
|
||||
func (m *streamsMap) ForEach(fn func(s *stream) (bool, error)) error { |
||||
var c bool |
||||
var err error |
||||
m.streams.Range(func(_, value interface{}) bool { |
||||
c, err = fn(value.(*stream)) |
||||
return c |
||||
}) |
||||
return err |
||||
} |
||||
|
||||
func (m *streamsMap) Len() int { |
||||
return int(m.streamsCounter.Load()) |
||||
} |
||||
|
||||
func (m *streamsMap) load(mp *sync.Map, key interface{}) (*stream, bool) { |
||||
if v, ok := mp.Load(key); ok { |
||||
return v.(*stream), true |
||||
} |
||||
return nil, false |
||||
} |
||||
|
||||
func (m *streamsMap) store(key interface{}, s *stream) { |
||||
if labelsString, ok := key.(string); ok { |
||||
m.streams.Store(labelsString, s) |
||||
} else { |
||||
m.streams.Store(s.labelsString, s) |
||||
} |
||||
m.streamsByFP.Store(s.fp, s) |
||||
m.streamsCounter.Inc() |
||||
} |
||||
|
||||
// newStreamFn: Called if not loaded, with consistencyMtx locked. Must not be nil
|
||||
// postLoadFn: Called if loaded, with consistencyMtx read-locked at least. Can be nil
|
||||
func (m *streamsMap) loadOrStoreNew(mp *sync.Map, key interface{}, newStreamFn func() (*stream, error), postLoadFn func(*stream) error) (*stream, bool, error) { |
||||
var s *stream |
||||
var loaded bool |
||||
var err error |
||||
m.WithRLock(func() { |
||||
if s, loaded = m.load(mp, key); loaded { |
||||
if postLoadFn != nil { |
||||
err = postLoadFn(s) |
||||
} |
||||
} |
||||
}) |
||||
|
||||
if loaded { |
||||
return s, true, err |
||||
} |
||||
|
||||
m.WithLock(func() { |
||||
// Double check
|
||||
if s, loaded = m.load(mp, key); loaded { |
||||
if postLoadFn != nil { |
||||
err = postLoadFn(s) |
||||
} |
||||
return |
||||
} |
||||
|
||||
s, err = newStreamFn() |
||||
if err != nil { |
||||
return |
||||
} |
||||
m.store(key, s) |
||||
}) |
||||
|
||||
return s, loaded, err |
||||
} |
@ -1,88 +0,0 @@ |
||||
package ingesterrf1 |
||||
|
||||
import ( |
||||
"context" |
||||
"errors" |
||||
|
||||
"github.com/go-kit/log" |
||||
"github.com/go-kit/log/level" |
||||
"github.com/grafana/dskit/ring" |
||||
"github.com/grafana/dskit/user" |
||||
"github.com/prometheus/client_golang/prometheus" |
||||
"github.com/prometheus/client_golang/prometheus/promauto" |
||||
|
||||
"github.com/grafana/loki/v3/pkg/distributor" |
||||
"github.com/grafana/loki/v3/pkg/logproto" |
||||
) |
||||
|
||||
type Tee struct { |
||||
cfg Config |
||||
logger log.Logger |
||||
ringClient *RingClient |
||||
|
||||
ingesterAppends *prometheus.CounterVec |
||||
} |
||||
|
||||
func NewTee( |
||||
cfg Config, |
||||
ringClient *RingClient, |
||||
metricsNamespace string, |
||||
registerer prometheus.Registerer, |
||||
logger log.Logger, |
||||
) (*Tee, error) { |
||||
registerer = prometheus.WrapRegistererWithPrefix(metricsNamespace+"_", registerer) |
||||
|
||||
t := &Tee{ |
||||
logger: log.With(logger, "component", "ingester-rf1-tee"), |
||||
ingesterAppends: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ |
||||
Name: "ingester_rf1_appends_total", |
||||
Help: "The total number of batch appends sent to rf1 ingesters.", |
||||
}, []string{"ingester", "status"}), |
||||
cfg: cfg, |
||||
ringClient: ringClient, |
||||
} |
||||
|
||||
return t, nil |
||||
} |
||||
|
||||
// Duplicate Implements distributor.Tee which is used to tee distributor requests to pattern ingesters.
|
||||
func (t *Tee) Duplicate(tenant string, streams []distributor.KeyedStream) { |
||||
for idx := range streams { |
||||
go func(stream distributor.KeyedStream) { |
||||
if err := t.sendStream(tenant, stream); err != nil { |
||||
level.Error(t.logger).Log("msg", "failed to send stream to ingester-rf1", "err", err) |
||||
} |
||||
}(streams[idx]) |
||||
} |
||||
} |
||||
|
||||
func (t *Tee) sendStream(tenant string, stream distributor.KeyedStream) error { |
||||
var descs [1]ring.InstanceDesc |
||||
replicationSet, err := t.ringClient.ring.Get(stream.HashKey, ring.WriteNoExtend, descs[:0], nil, nil) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
if replicationSet.Instances == nil { |
||||
return errors.New("no instances found") |
||||
} |
||||
addr := replicationSet.Instances[0].Addr |
||||
client, err := t.ringClient.pool.GetClientFor(addr) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
req := &logproto.PushRequest{ |
||||
Streams: []logproto.Stream{ |
||||
stream.Stream, |
||||
}, |
||||
} |
||||
|
||||
ctx, cancel := context.WithTimeout(user.InjectOrgID(context.Background(), tenant), t.cfg.ClientConfig.RemoteTimeout) |
||||
defer cancel() |
||||
_, err = client.(logproto.PusherRF1Client).Push(ctx, req) |
||||
if err != nil { |
||||
t.ingesterAppends.WithLabelValues(addr, "fail").Inc() |
||||
return err |
||||
} |
||||
t.ingesterAppends.WithLabelValues(addr, "success").Inc() |
||||
return nil |
||||
} |
Loading…
Reference in new issue