feat: write to dataobj partitions based on segmentation key (#19946)

pull/19961/head
George Robinson 2 months ago committed by GitHub
parent 8394909984
commit 3a24f5d291
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 8
      docs/sources/shared/configuration.md
  2. 104
      pkg/distributor/dataobj_tee.go
  3. 13
      pkg/distributor/distributor.go
  4. 6
      pkg/distributor/ingest_limits.go
  5. 22
      pkg/distributor/ingest_limits_test.go
  6. 5
      pkg/limits/frontend/frontend.go
  7. 17
      pkg/limits/frontend/ring.go
  8. 8
      pkg/limits/frontend/ring_test.go

@ -3188,6 +3188,14 @@ dataobj_tee:
# Maximum number of bytes to buffer.
# CLI flag: -distributor.dataobj-tee.max-buffered-bytes
[max_buffered_bytes: <int> | default = 104857600]
# The per-tenant partition rate (bytes/sec).
# CLI flag: -distributor.dataobj-tee.per-partition-rate-bytes
[per_partition_rate_bytes: <int> | default = 1048576]
# Enables optional debug metrics.
# CLI flag: -distributor.dataobj-tee.debug-metrics-enabled
[debug_metrics_enabled: <boolean> | default = false]
```
### etcd

@ -4,10 +4,10 @@ import (
"context"
"errors"
"flag"
"strconv"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/ring"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/twmb/franz-go/pkg/kgo"
@ -16,50 +16,70 @@ import (
)
type DataObjTeeConfig struct {
Enabled bool `yaml:"enabled"`
Topic string `yaml:"topic"`
MaxBufferedBytes int `yaml:"max_buffered_bytes"`
Enabled bool `yaml:"enabled"`
Topic string `yaml:"topic"`
MaxBufferedBytes int `yaml:"max_buffered_bytes"`
PerPartitionRateBytes int `yaml:"per_partition_rate_bytes"`
DebugMetricsEnabled bool `yaml:"debug_metrics_enabled"`
}
func (c *DataObjTeeConfig) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&c.Enabled, "distributor.dataobj-tee.enabled", false, "Enable data object tee.")
f.StringVar(&c.Topic, "distributor.dataobj-tee.topic", "", "Topic for data object tee.")
f.IntVar(&c.MaxBufferedBytes, "distributor.dataobj-tee.max-buffered-bytes", 100<<20, "Maximum number of bytes to buffer.")
f.IntVar(&c.PerPartitionRateBytes, "distributor.dataobj-tee.per-partition-rate-bytes", 1024*1024, "The per-tenant partition rate (bytes/sec).")
f.BoolVar(&c.DebugMetricsEnabled, "distributor.dataobj-tee.debug-metrics-enabled", false, "Enables optional debug metrics.")
}
func (c *DataObjTeeConfig) Validate() error {
if c.Enabled && c.Topic == "" {
if !c.Enabled {
return nil
}
if c.Topic == "" {
return errors.New("the topic is required")
}
if c.MaxBufferedBytes < 0 {
return errors.New("max buffered bytes cannot be negative")
}
if c.PerPartitionRateBytes < 0 {
return errors.New("per partition rate bytes cannot be negative")
}
return nil
}
// DataObjTee is a tee that duplicates streams to the data object topic.
// It is a temporary solution while we work on segmentation keys.
type DataObjTee struct {
cfg *DataObjTeeConfig
client *kgo.Client
ringReader ring.PartitionRingReader
logger log.Logger
cfg *DataObjTeeConfig
limitsClient *ingestLimits
kafkaClient *kgo.Client
resolver *SegmentationPartitionResolver
logger log.Logger
// Metrics.
failures prometheus.Counter
total prometheus.Counter
// High cardinality metrics which are only emitted when debug metrics
// are enabled.
produced *prometheus.CounterVec
}
// NewDataObjTee returns a new DataObjTee.
func NewDataObjTee(
cfg *DataObjTeeConfig,
client *kgo.Client,
ringReader ring.PartitionRingReader,
resolver *SegmentationPartitionResolver,
limitsClient *ingestLimits,
kafkaClient *kgo.Client,
logger log.Logger,
reg prometheus.Registerer,
) (*DataObjTee, error) {
return &DataObjTee{
cfg: cfg,
client: client,
ringReader: ringReader,
logger: logger,
cfg: cfg,
resolver: resolver,
kafkaClient: kafkaClient,
limitsClient: limitsClient,
logger: logger,
failures: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "loki_distributor_dataobj_tee_duplicate_stream_failures_total",
Help: "Total number of streams that could not be duplicated.",
@ -68,21 +88,54 @@ func NewDataObjTee(
Name: "loki_distributor_dataobj_tee_duplicate_streams_total",
Help: "Total number of streams duplicated.",
}),
produced: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "loki_distributor_dataobj_tee_produced_bytes_total",
Help: "Total number of bytes produced to each partition.",
}, []string{"tenant", "partition", "segmentation_key"}),
}, nil
}
// A SegmentedStream is a KeyedStream with a segmentation key.
type SegmentedStream struct {
KeyedStream
SegmentationKey SegmentationKey
}
// Duplicate implements the [Tee] interface.
func (t *DataObjTee) Duplicate(_ context.Context, tenant string, streams []KeyedStream) {
for _, s := range streams {
go t.duplicate(tenant, s)
func (t *DataObjTee) Duplicate(ctx context.Context, tenant string, streams []KeyedStream) {
segmentationKeyStreams := make([]SegmentedStream, 0, len(streams))
for _, stream := range streams {
segmentationKey, err := GetSegmentationKey(stream)
if err != nil {
level.Error(t.logger).Log("msg", "failed to get segmentation key", "err", err)
t.failures.Inc()
return
}
segmentationKeyStreams = append(segmentationKeyStreams, SegmentedStream{
KeyedStream: stream,
SegmentationKey: segmentationKey,
})
}
rates, err := t.limitsClient.UpdateRates(ctx, tenant, segmentationKeyStreams)
if err != nil {
level.Error(t.logger).Log("msg", "failed to update rates", "err", err)
}
// fastRates is a temporary lookup table that lets us find the rate
// for a segmentation key in constant time.
fastRates := make(map[uint64]uint64, len(rates))
for _, rate := range rates {
fastRates[rate.StreamHash] = rate.Rate
}
for _, s := range segmentationKeyStreams {
go t.duplicate(ctx, tenant, s, fastRates[s.SegmentationKey.Sum64()])
}
}
func (t *DataObjTee) duplicate(tenant string, stream KeyedStream) {
func (t *DataObjTee) duplicate(ctx context.Context, tenant string, stream SegmentedStream, rateBytes uint64) {
t.total.Inc()
partition, err := t.ringReader.PartitionRing().ActivePartitionForKey(stream.HashKey)
partition, err := t.resolver.Resolve(ctx, stream.SegmentationKey, rateBytes)
if err != nil {
level.Error(t.logger).Log("msg", "failed to get partition", "err", err)
level.Error(t.logger).Log("msg", "failed to resolve partition", "err", err)
t.failures.Inc()
return
}
@ -92,9 +145,16 @@ func (t *DataObjTee) duplicate(tenant string, stream KeyedStream) {
t.failures.Inc()
return
}
results := t.client.ProduceSync(context.TODO(), records...)
results := t.kafkaClient.ProduceSync(ctx, records...)
if err := results.FirstErr(); err != nil {
level.Error(t.logger).Log("msg", "failed to produce records", "err", err)
t.failures.Inc()
}
if t.cfg.DebugMetricsEnabled {
t.produced.WithLabelValues(
tenant,
strconv.FormatInt(int64(partition), 10),
string(stream.SegmentationKey),
).Add(float64(stream.Stream.Size()))
}
}

@ -283,6 +283,8 @@ func New(
return nil, fmt.Errorf("partition ring is required for kafka writes")
}
ingestLimits := newIngestLimits(limitsFrontendClient, registerer)
var kafkaWriter KafkaProducer
if cfg.KafkaEnabled {
kafkaClient, err := kafka_client.NewWriterClient("distributor", cfg.KafkaConfig, 20, logger, registerer)
@ -295,10 +297,17 @@ func New(
)
if cfg.DataObjTeeConfig.Enabled {
resolver := NewSegmentationPartitionResolver(
uint64(cfg.DataObjTeeConfig.PerPartitionRateBytes),
dataObjConsumerPartitionRing,
registerer,
logger,
)
dataObjTee, err := NewDataObjTee(
&cfg.DataObjTeeConfig,
resolver,
ingestLimits,
kafkaClient,
dataObjConsumerPartitionRing,
logger,
registerer,
)
@ -379,7 +388,7 @@ func New(
writeFailuresManager: writefailures.NewManager(logger, registerer, cfg.WriteFailuresLogging, configs, "distributor"),
kafkaWriter: kafkaWriter,
partitionRing: partitionRing,
ingestLimits: newIngestLimits(limitsFrontendClient, registerer),
ingestLimits: ingestLimits,
numMetadataPartitions: numMetadataPartitions,
}

@ -194,7 +194,7 @@ func newExceedsLimitsRequest(tenant string, streams []KeyedStream) (*proto.Excee
// UpdateRates updates the rates for the streams and returns a slice of the
// updated rates for all streams. Any streams that could not have rates updated
// have a rate of zero.
func (l *ingestLimits) UpdateRates(ctx context.Context, tenant string, streams []KeyedStream) ([]*proto.UpdateRatesResult, error) {
func (l *ingestLimits) UpdateRates(ctx context.Context, tenant string, streams []SegmentedStream) ([]*proto.UpdateRatesResult, error) {
l.requests.WithLabelValues("UpdateRates").Inc()
req, err := newUpdateRatesRequest(tenant, streams)
if err != nil {
@ -209,7 +209,7 @@ func (l *ingestLimits) UpdateRates(ctx context.Context, tenant string, streams [
return resp.Results, nil
}
func newUpdateRatesRequest(tenant string, streams []KeyedStream) (*proto.UpdateRatesRequest, error) {
func newUpdateRatesRequest(tenant string, streams []SegmentedStream) (*proto.UpdateRatesRequest, error) {
// The distributor sends the hashes of all streams in the request to the
// limits-frontend. The limits-frontend is responsible for deciding if
// the request would exceed the tenants limits, and if so, which streams
@ -218,7 +218,7 @@ func newUpdateRatesRequest(tenant string, streams []KeyedStream) (*proto.UpdateR
for _, stream := range streams {
entriesSize, structuredMetadataSize := calculateStreamSizes(stream.Stream)
streamMetadata = append(streamMetadata, &proto.StreamMetadata{
StreamHash: stream.HashKeyNoShard,
StreamHash: stream.SegmentationKey.Sum64(),
TotalSize: entriesSize + structuredMetadataSize,
IngestionPolicy: stream.Policy,
})

@ -302,7 +302,7 @@ func TestIngestLimits_UpdateRates(t *testing.T) {
tests := []struct {
name string
tenant string
streams []KeyedStream
streams []SegmentedStream
expectedRequest *proto.UpdateRatesRequest
response *proto.UpdateRatesResponse
responseErr error
@ -311,37 +311,31 @@ func TestIngestLimits_UpdateRates(t *testing.T) {
}{{
name: "error should be returned if rates cannot be updated",
tenant: "test",
streams: []KeyedStream{{
HashKeyNoShard: 1,
streams: []SegmentedStream{{
SegmentationKey: "test",
}},
expectedRequest: &proto.UpdateRatesRequest{
Tenant: "test",
Streams: []*proto.StreamMetadata{{
StreamHash: 1,
}},
},
responseErr: errors.New("failed to update rates"),
expectedErr: "failed to update rates",
}, {
name: "updates rates",
tenant: "test",
streams: []KeyedStream{{
HashKeyNoShard: 1,
streams: []SegmentedStream{{
SegmentationKey: "test",
}},
expectedRequest: &proto.UpdateRatesRequest{
Tenant: "test",
Streams: []*proto.StreamMetadata{{
StreamHash: 1,
StreamHash: 0xb5fb79e24c92922f,
}},
},
response: &proto.UpdateRatesResponse{
Results: []*proto.UpdateRatesResult{{
StreamHash: 1,
StreamHash: 0xb5fb79e24c92922f,
Rate: 1024,
}},
},
expectedResult: []*proto.UpdateRatesResult{{
StreamHash: 1,
StreamHash: 0xb5fb79e24c92922f,
Rate: 1024,
}},
}}

@ -137,10 +137,7 @@ func (f *Frontend) ExceedsLimits(ctx context.Context, req *proto.ExceedsLimitsRe
return &proto.ExceedsLimitsResponse{Results: results}, nil
}
func (f *Frontend) UpdateRates(
ctx context.Context,
req *proto.UpdateRatesRequest,
) (*proto.UpdateRatesResponse, error) {
func (f *Frontend) UpdateRates(ctx context.Context, req *proto.UpdateRatesRequest) (*proto.UpdateRatesResponse, error) {
results := make([]*proto.UpdateRatesResult, 0, len(req.Streams))
resps, err := f.limitsClient.UpdateRates(ctx, req)
if err != nil {

@ -73,10 +73,7 @@ func newRingLimitsClient(
}
// ExceedsLimits implements the [exceedsLimitsGatherer] interface.
func (r *ringLimitsClient) ExceedsLimits(
ctx context.Context,
req *proto.ExceedsLimitsRequest,
) ([]*proto.ExceedsLimitsResponse, error) {
func (r *ringLimitsClient) ExceedsLimits(ctx context.Context, req *proto.ExceedsLimitsRequest) ([]*proto.ExceedsLimitsResponse, error) {
if len(req.Streams) == 0 {
return nil, nil
}
@ -101,10 +98,7 @@ func (r *ringLimitsClient) ExceedsLimits(
}
// UpdateRates implements the [exceedsLimitsGatherer] interface.
func (r *ringLimitsClient) UpdateRates(
ctx context.Context,
req *proto.UpdateRatesRequest,
) ([]*proto.UpdateRatesResponse, error) {
func (r *ringLimitsClient) UpdateRates(ctx context.Context, req *proto.UpdateRatesRequest) ([]*proto.UpdateRatesResponse, error) {
if len(req.Streams) == 0 {
return nil, nil
}
@ -238,12 +232,7 @@ type doRPCsFunc func(
// exhaustAllZones queries all zones, one at a time, until either all streams
// have been answered or all zones have been exhausted.
func (r *ringLimitsClient) exhaustAllZones(
ctx context.Context,
tenant string,
streams []*proto.StreamMetadata,
doRPCs doRPCsFunc,
) ([]*proto.StreamMetadata, error) {
func (r *ringLimitsClient) exhaustAllZones(ctx context.Context, tenant string, streams []*proto.StreamMetadata, doRPCs doRPCsFunc) ([]*proto.StreamMetadata, error) {
zonesIter, err := r.allZones(ctx)
if err != nil {
return nil, err

@ -15,7 +15,7 @@ import (
"github.com/grafana/loki/v3/pkg/limits/proto"
)
func TestRingGatherer_ExceedsLimits(t *testing.T) {
func TestRingLimitsClient_ExceedsLimits(t *testing.T) {
tests := []struct {
name string
request *proto.ExceedsLimitsRequest
@ -442,7 +442,7 @@ func TestRingGatherer_ExceedsLimits(t *testing.T) {
}
}
func TestRingStreamUsageGatherer_GetZoneAwarePartitionConsumers(t *testing.T) {
func TestRingLimitsClient_GetZoneAwarePartitionConsumers(t *testing.T) {
tests := []struct {
name string
instances []ring.InstanceDesc
@ -603,7 +603,7 @@ func TestRingStreamUsageGatherer_GetZoneAwarePartitionConsumers(t *testing.T) {
}
}
func TestRingStreamUsageGatherer_GetPartitionConsumers(t *testing.T) {
func TestRingLimitsClient_GetPartitionConsumers(t *testing.T) {
tests := []struct {
name string
// Instances contains the complete set of instances that should be mocked.
@ -740,7 +740,7 @@ func TestRingStreamUsageGatherer_GetPartitionConsumers(t *testing.T) {
}
}
func TestRingStreamUsageGatherer_GetPartitionConsumers_Caching(t *testing.T) {
func TestRingLimitsClient_GetPartitionConsumers_Caching(t *testing.T) {
// Set up the mock clients.
req0 := proto.GetAssignedPartitionsResponse{
AssignedPartitions: map[int32]int64{

Loading…
Cancel
Save