|
|
|
|
@ -7,12 +7,13 @@ import ( |
|
|
|
|
"sync/atomic" |
|
|
|
|
"time" |
|
|
|
|
|
|
|
|
|
cortex_client "github.com/cortexproject/cortex/pkg/ingester/client" |
|
|
|
|
"github.com/cortexproject/cortex/pkg/ring" |
|
|
|
|
"github.com/cortexproject/cortex/pkg/util" |
|
|
|
|
opentracing "github.com/opentracing/opentracing-go" |
|
|
|
|
"github.com/prometheus/client_golang/prometheus" |
|
|
|
|
"github.com/weaveworks/common/instrument" |
|
|
|
|
"github.com/weaveworks/common/user" |
|
|
|
|
cortex_client "github.com/weaveworks/cortex/pkg/ingester/client" |
|
|
|
|
"github.com/weaveworks/cortex/pkg/ring" |
|
|
|
|
"google.golang.org/grpc/health/grpc_health_v1" |
|
|
|
|
|
|
|
|
|
"github.com/grafana/tempo/pkg/ingester/client" |
|
|
|
|
@ -75,7 +76,7 @@ func New(cfg Config, ring ring.ReadRing) (*Distributor, error) { |
|
|
|
|
return &Distributor{ |
|
|
|
|
cfg: cfg, |
|
|
|
|
ring: ring, |
|
|
|
|
pool: cortex_client.NewPool(cfg.PoolConfig, ring, factory), |
|
|
|
|
pool: cortex_client.NewPool(cfg.PoolConfig, ring, factory, util.Logger), |
|
|
|
|
}, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@ -122,12 +123,14 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
samplesByIngester := map[*ring.IngesterDesc][]*streamTracker{} |
|
|
|
|
samplesByIngester := map[string][]*streamTracker{} |
|
|
|
|
ingesterDescs := map[string]ring.IngesterDesc{} |
|
|
|
|
for i, replicationSet := range replicationSets { |
|
|
|
|
streams[i].minSuccess = len(replicationSet.Ingesters) - replicationSet.MaxErrors |
|
|
|
|
streams[i].maxFailures = replicationSet.MaxErrors |
|
|
|
|
for _, ingester := range replicationSet.Ingesters { |
|
|
|
|
samplesByIngester[ingester] = append(samplesByIngester[ingester], &streams[i]) |
|
|
|
|
samplesByIngester[ingester.Addr] = append(samplesByIngester[ingester.Addr], &streams[i]) |
|
|
|
|
ingesterDescs[ingester.Addr] = ingester |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@ -137,7 +140,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log |
|
|
|
|
err: make(chan error), |
|
|
|
|
} |
|
|
|
|
for ingester, samples := range samplesByIngester { |
|
|
|
|
go func(ingester *ring.IngesterDesc, samples []*streamTracker) { |
|
|
|
|
go func(ingester ring.IngesterDesc, samples []*streamTracker) { |
|
|
|
|
// Use a background context to make sure all ingesters get samples even if we return early
|
|
|
|
|
localCtx, cancel := context.WithTimeout(context.Background(), d.cfg.RemoteTimeout) |
|
|
|
|
defer cancel() |
|
|
|
|
@ -146,7 +149,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log |
|
|
|
|
localCtx = opentracing.ContextWithSpan(localCtx, sp) |
|
|
|
|
} |
|
|
|
|
d.sendSamples(localCtx, ingester, samples, &pushTracker) |
|
|
|
|
}(ingester, samples) |
|
|
|
|
}(ingesterDescs[ingester], samples) |
|
|
|
|
} |
|
|
|
|
select { |
|
|
|
|
case err := <-pushTracker.err: |
|
|
|
|
@ -157,7 +160,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// TODO taken from Cortex, see if we can refactor out an usable interface.
|
|
|
|
|
func (d *Distributor) sendSamples(ctx context.Context, ingester *ring.IngesterDesc, streamTrackers []*streamTracker, pushTracker *pushTracker) { |
|
|
|
|
func (d *Distributor) sendSamples(ctx context.Context, ingester ring.IngesterDesc, streamTrackers []*streamTracker, pushTracker *pushTracker) { |
|
|
|
|
err := d.sendSamplesErr(ctx, ingester, streamTrackers) |
|
|
|
|
|
|
|
|
|
// If we succeed, decrement each sample's pending count by one. If we reach
|
|
|
|
|
@ -189,7 +192,7 @@ func (d *Distributor) sendSamples(ctx context.Context, ingester *ring.IngesterDe |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// TODO taken from Cortex, see if we can refactor out an usable interface.
|
|
|
|
|
func (d *Distributor) sendSamplesErr(ctx context.Context, ingester *ring.IngesterDesc, streams []*streamTracker) error { |
|
|
|
|
func (d *Distributor) sendSamplesErr(ctx context.Context, ingester ring.IngesterDesc, streams []*streamTracker) error { |
|
|
|
|
c, err := d.pool.GetClientFor(ingester.Addr) |
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
|
|