From 4bffed22e4ab11261604bfe7769efd36ef022750 Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Fri, 12 May 2023 23:12:59 +0200 Subject: [PATCH] Reuse string/InstanceDesc buffers in index gateway clients (#9341) **What this PR does / why we need it**: When running the IndexGateway in ring mode, `ring.MakeBuffersForGet()` in the index gateway client allocated fixed size `[]string`/`[]InstanceDesc` slices with a capacity of 5 for each and every request. By using a `sync.Pool` the client can reduce the amount of allocations. Additionally, since we run the index gateway ring with a high replication factor to be able to scale them better horizontally, an initial capacity of 5 is most of the time too low, leading to extra allocations when growing the capacity of the slice. **Checklist** - [ ] Reviewed the [`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md) guide (**required**) - [ ] Documentation added - [ ] Tests updated - [ ] `CHANGELOG.md` updated - [ ] Changes that require user attention or interaction to upgrade are documented in `docs/sources/upgrading/_index.md` --------- Signed-off-by: Christian Haudum --- .../gatewayclient/gateway_client.go | 27 +++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/pkg/storage/stores/indexshipper/gatewayclient/gateway_client.go b/pkg/storage/stores/indexshipper/gatewayclient/gateway_client.go index 1ab1f9ebea..b710d7141a 100644 --- a/pkg/storage/stores/indexshipper/gatewayclient/gateway_client.go +++ b/pkg/storage/stores/indexshipper/gatewayclient/gateway_client.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "math/rand" + "sync" "github.com/go-kit/log" "github.com/go-kit/log/level" @@ -97,6 +98,9 @@ type GatewayClient struct { pool *ring_client.Pool ring ring.ReadRing + + stringBufPool *sync.Pool + instanceBufPool *sync.Pool } // NewGatewayClient instantiates a new client used to communicate with an Index Gateway instance. @@ -142,6 +146,20 @@ func NewGatewayClient(cfg IndexGatewayClientConfig, r prometheus.Registerer, log return igPool, nil } + // Replication factor plus additional room for JOINING/LEAVING instances + // See also ring.GetBufferSize + bufSize := cfg.Ring.ReplicationFactor() * 3 / 2 + sgClient.stringBufPool = &sync.Pool{ + New: func() any { + return make([]string, 0, bufSize) + }, + } + sgClient.instanceBufPool = &sync.Pool{ + New: func() any { + return make([]ring.InstanceDesc, 0, bufSize) + }, + } + sgClient.pool = clientpool.NewPool(cfg.PoolConfig, sgClient.ring, factory, logger) } else { sgClient.conn, err = grpc.Dial(cfg.Address, dialOpts...) @@ -321,10 +339,15 @@ func (s *GatewayClient) ringModeDo(ctx context.Context, callback func(client log return errors.Wrap(err, "index gateway client get tenant ID") } - bufDescs, bufHosts, bufZones := ring.MakeBuffersForGet() + bufDescs := s.instanceBufPool.Get().([]ring.InstanceDesc) + defer s.instanceBufPool.Put(bufDescs) //nolint:staticcheck + bufHosts := s.stringBufPool.Get().([]string) + defer s.stringBufPool.Put(bufHosts) //nolint:staticcheck + bufZones := s.stringBufPool.Get().([]string) + defer s.stringBufPool.Put(bufZones) //nolint:staticcheck key := util.TokenFor(userID, "" /* labels */) - rs, err := s.ring.Get(key, ring.WriteNoExtend, bufDescs, bufHosts, bufZones) + rs, err := s.ring.Get(key, ring.WriteNoExtend, bufDescs[:0], bufHosts[:0], bufZones[:0]) if err != nil { return errors.Wrap(err, "index gateway get ring") }