Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/pattern/tee_service.go

448 lines
13 KiB

package pattern
import (
"context"
"errors"
"strings"
"sync"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/util/pool"
"github.com/grafana/dskit/instrument"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/user"
"github.com/grafana/loki/v3/pkg/distributor"
"github.com/grafana/loki/v3/pkg/loghttp/push"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/runtime"
"github.com/grafana/loki/v3/pkg/util/spanlogger"
ring_client "github.com/grafana/dskit/ring/client"
)
type TeeService struct {
cfg Config
limits Limits
tenantCfgs *runtime.TenantConfigs
logger log.Logger
ringClient RingClient
wg *sync.WaitGroup
ingesterAppends *prometheus.CounterVec
ingesterMetricAppends *prometheus.CounterVec
teedStreams *prometheus.CounterVec
teedRequests *prometheus.CounterVec
sendDuration *instrument.HistogramCollector
flushQueue chan clientRequest
bufferPool *pool.Pool
buffersMutex *sync.Mutex
buffers map[string][]distributor.KeyedStream
}
func NewTeeService(
cfg Config,
limits Limits,
ringClient RingClient,
tenantCfgs *runtime.TenantConfigs,
metricsNamespace string,
registerer prometheus.Registerer,
logger log.Logger,
) (*TeeService, error) {
registerer = prometheus.WrapRegistererWithPrefix(metricsNamespace+"_", registerer)
t := &TeeService{
logger: log.With(logger, "component", "pattern-tee"),
ingesterAppends: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Name: "pattern_ingester_appends_total",
Help: "The total number of batch appends sent to pattern ingesters.",
}, []string{"ingester", "status"}),
ingesterMetricAppends: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Name: "pattern_ingester_metric_appends_total",
Help: "The total number of metric only batch appends sent to pattern ingesters. These requests will not be processed for patterns.",
}, []string{"status"}),
teedStreams: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Name: "pattern_ingester_teed_streams_total",
Help: "The total number of streams teed to the pattern ingester.",
}, []string{"status"}),
teedRequests: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Name: "pattern_ingester_teed_requests_total",
Help: "The total number of batch appends sent to fallback pattern ingesters, for not owned streams.",
}, []string{"status"}),
sendDuration: instrument.NewHistogramCollector(
promauto.With(registerer).NewHistogramVec(
prometheus.HistogramOpts{
Name: "pattern_ingester_tee_send_duration_seconds",
Help: "Time spent sending batches from the tee to the pattern ingester",
Buckets: prometheus.DefBuckets,
}, instrument.HistogramCollectorBuckets,
),
),
cfg: cfg,
limits: limits,
tenantCfgs: tenantCfgs,
ringClient: ringClient,
wg: &sync.WaitGroup{},
buffersMutex: &sync.Mutex{},
buffers: make(map[string][]distributor.KeyedStream),
flushQueue: make(chan clientRequest, cfg.TeeConfig.FlushQueueSize),
}
return t, nil
}
func (ts *TeeService) Start(runCtx context.Context) error {
ts.wg.Add(1)
// Start all batchSenders. We don't use the Run() context here, because we
// want the senders to finish sending any currently in-flight data and the
// remining batches in the queue before the TeeService fully stops.
//
// Still, we have a maximum amount of time we will wait after the TeeService
// is stopped, see cfg.StopFlushTimeout below.
senderCtx, senderCancel := context.WithCancel(context.Background())
sendersWg := &sync.WaitGroup{}
sendersWg.Add(ts.cfg.TeeConfig.FlushWorkerCount)
for i := 0; i < ts.cfg.TeeConfig.FlushWorkerCount; i++ {
go func() {
ts.batchSender(senderCtx)
sendersWg.Done()
}()
}
// We need this to implement the select with StopFlushTimeout below
sendersDone := make(chan struct{})
go func() {
sendersWg.Wait()
close(sendersDone)
}()
go func() {
// We wait for the Run() context to be done, so we know we are stopping
<-runCtx.Done()
// The senders euther stop normally in the allotted time, or we hit the
// timeout and cancel thir context. In either case, we wait for them to
// finish before we consider the service to be done.
select {
case <-time.After(ts.cfg.TeeConfig.StopFlushTimeout):
senderCancel() // Cancel any remaining senders
<-sendersDone // Wait for them to be done
case <-sendersDone:
}
ts.wg.Done()
}()
go func() {
t := time.NewTicker(ts.cfg.TeeConfig.BatchFlushInterval)
defer t.Stop()
for {
select {
case <-t.C:
ts.flush()
case <-runCtx.Done():
// Final flush to send anything currently buffered
ts.flush()
close(ts.flushQueue) // nothing will write to it anymore
return
}
}
}()
return nil
}
func (ts *TeeService) WaitUntilDone() {
ts.wg.Wait()
}
func (ts *TeeService) flush() {
ts.buffersMutex.Lock()
if len(ts.buffers) == 0 {
ts.buffersMutex.Unlock()
return
}
buffered := ts.buffers
ts.buffers = make(map[string][]distributor.KeyedStream)
ts.buffersMutex.Unlock()
batches := make([]map[string]map[string]*logproto.PushRequest, 0, len(buffered))
for tenant, streams := range buffered {
batches = append(batches, ts.batchesForTenant(tenant, streams))
}
byTenantAndPatternIngester := make(map[string]map[string][]*logproto.PushRequest)
for _, b := range batches {
for tenant, requests := range b {
for addr, req := range requests {
byTenant, ok := byTenantAndPatternIngester[tenant]
if !ok {
byTenant = make(map[string][]*logproto.PushRequest)
}
byTenant[addr] = append(
byTenant[addr],
req,
)
byTenantAndPatternIngester[tenant] = byTenant
}
}
}
for tenant, requests := range byTenantAndPatternIngester {
for addr, reqs := range requests {
select {
case ts.flushQueue <- clientRequest{
ingesterAddr: addr,
tenant: tenant,
reqs: reqs,
}:
ts.teedRequests.WithLabelValues("queued").Inc()
default:
ts.teedRequests.WithLabelValues("dropped").Inc()
}
}
}
}
func (ts *TeeService) batchesForTenant(
tenant string,
streams []distributor.KeyedStream,
) map[string]map[string]*logproto.PushRequest {
batches := map[string]map[string]*logproto.PushRequest{
tenant: make(map[string]*logproto.PushRequest),
}
if len(streams) == 0 {
return batches
}
for _, stream := range streams {
var descs [1]ring.InstanceDesc
replicationSet, err := ts.ringClient.Ring().
Get(stream.HashKey, ring.WriteNoExtend, descs[:0], nil, nil)
if err != nil || len(replicationSet.Instances) == 0 {
ts.teedStreams.WithLabelValues("dropped").Inc()
continue
}
addr := replicationSet.Instances[0].Addr
batch, ok := batches[tenant][addr]
if !ok {
batch = &logproto.PushRequest{}
batches[tenant][addr] = batch
}
if len(stream.Stream.Entries) > 0 {
batch.Streams = append(batch.Streams, stream.Stream)
ts.teedStreams.WithLabelValues("batched").Inc()
}
}
streamCount := uint64(len(streams))
level.Debug(ts.logger).Log(
"msg", "prepared pattern Tee batches for tenant",
"tenant", tenant,
"stream_count", streamCount,
)
return batches
}
type clientRequest struct {
ingesterAddr string
tenant string
reqs []*logproto.PushRequest
}
func (ts *TeeService) batchSender(ctx context.Context) {
for {
select {
case clientReq, ok := <-ts.flushQueue:
if !ok {
return // we are done, the queue was closed by Run()
}
ts.sendBatch(ctx, clientReq)
case <-ctx.Done():
return
}
}
}
func (ts *TeeService) sendBatch(ctx context.Context, clientRequest clientRequest) {
ctx, cancel := context.WithTimeout(ctx, ts.cfg.ConnectionTimeout)
defer cancel()
for i := 0; i < len(clientRequest.reqs); i++ {
req := clientRequest.reqs[i]
if len(req.Streams) == 0 {
continue
}
// Nothing to do with this error. It's recorded in the metrics that
// are gathered by this request
_ = instrument.CollectedRequest(
ctx,
"FlushTeedLogsToPatternIngester",
ts.sendDuration,
instrument.ErrorCode,
func(ctx context.Context) error {
sp := spanlogger.FromContext(ctx)
client, err := ts.ringClient.GetClientFor(clientRequest.ingesterAddr)
if err != nil {
return err
}
ctx, cancel := context.WithTimeout(
user.InjectOrgID(ctx, clientRequest.tenant),
ts.cfg.ClientConfig.RemoteTimeout,
)
// First try to send the request to the correct pattern ingester
defer cancel()
_, err = client.(logproto.PatternClient).Push(ctx, req)
if err == nil {
// Success here means the stream will be processed for both metrics and patterns
ts.ingesterAppends.WithLabelValues(clientRequest.ingesterAddr, "success").Inc()
ts.ingesterMetricAppends.WithLabelValues("success").Inc()
// limit logged labels to 1000
labelsLimit := len(req.Streams)
if labelsLimit > 1000 {
labelsLimit = 1000
}
labels := make([]string, 0, labelsLimit)
for _, stream := range req.Streams {
if len(labels) >= 1000 {
break
}
labels = append(labels, stream.Labels)
}
sp.LogKV(
"event", "forwarded push request to pattern ingester",
"num_streams", len(req.Streams),
"first_1k_labels", strings.Join(labels, ", "),
"tenant", clientRequest.tenant,
)
// this is basically the same as logging push request streams,
// so put it behind the same flag
if ts.tenantCfgs.LogPushRequestStreams(clientRequest.tenant) {
level.Debug(ts.logger).
Log(
"msg", "forwarded push request to pattern ingester",
"num_streams", len(req.Streams),
"first_1k_labels", strings.Join(labels, ", "),
"tenant", clientRequest.tenant,
)
}
return nil
}
// The pattern ingester appends failed, but we can retry the metric append
ts.ingesterAppends.WithLabelValues(clientRequest.ingesterAddr, "fail").Inc()
level.Error(ts.logger).Log("msg", "failed to send patterns to pattern ingester", "err", err)
// Pattern ingesters serve 2 functions, processing patterns and aggregating metrics.
// Only owned streams are processed for patterns, however any pattern ingester can
// aggregate metrics for any stream. Therefore, if we can't send the owned stream,
// try to forward request to any pattern ingester so we at least capture the metrics.
if !ts.limits.MetricAggregationEnabled(clientRequest.tenant) {
return err
}
replicationSet, err := ts.ringClient.Ring().
GetReplicationSetForOperation(ring.WriteNoExtend)
if err != nil || len(replicationSet.Instances) == 0 {
ts.ingesterMetricAppends.WithLabelValues("fail").Inc()
level.Error(ts.logger).Log(
"msg", "failed to send metrics to fallback pattern ingesters",
"num_instances", len(replicationSet.Instances),
"err", err,
)
return errors.New("no instances found for fallback")
}
fallbackAddrs := make([]string, 0, len(replicationSet.Instances))
for _, instance := range replicationSet.Instances {
addr := instance.Addr
fallbackAddrs = append(fallbackAddrs, addr)
var client ring_client.PoolClient
client, err = ts.ringClient.GetClientFor(addr)
if err != nil {
ctx, cancel := context.WithTimeout(
user.InjectOrgID(ctx, clientRequest.tenant),
ts.cfg.ClientConfig.RemoteTimeout,
)
defer cancel()
_, err = client.(logproto.PatternClient).Push(ctx, req)
if err != nil {
continue
}
ts.ingesterMetricAppends.WithLabelValues("success").Inc()
// bail after any success to prevent sending more than one
return nil
}
}
ts.ingesterMetricAppends.WithLabelValues("fail").Inc()
level.Error(ts.logger).Log(
"msg", "failed to send metrics to fallback pattern ingesters. exhausted all fallback instances",
"addresses", strings.Join(fallbackAddrs, ", "),
"err", err,
)
return err
})
}
}
// Duplicate Implements distributor.Tee which is used to tee distributor requests to pattern ingesters.
func (ts *TeeService) Duplicate(tenant string, streams []distributor.KeyedStream) {
if !ts.cfg.Enabled {
return
}
if len(streams) == 0 {
return
}
for _, stream := range streams {
lbls, err := syntax.ParseLabels(stream.Stream.Labels)
if err != nil {
level.Error(ts.logger).
Log("msg", "error parsing stream labels", "labels", stream.Stream.Labels, "err", err)
continue
}
if lbls.Has(push.AggregatedMetricLabel) {
continue
}
ts.buffersMutex.Lock()
ts.buffers[tenant] = append(ts.buffers[tenant], stream)
ts.buffersMutex.Unlock()
}
}