Remove worker callback function that is only used in tests (#11356)

The queue already exposes the functionality to track connected consumers
(workers), so there is no need to have callback that is only used in
tests.

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
pull/11338/head
Christian Haudum 2 years ago committed by GitHub
parent cd3a04d6d7
commit fe4d9bea68
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 37
      pkg/storage/stores/shipper/bloomshipper/block_downloader.go
  2. 12
      pkg/storage/stores/shipper/bloomshipper/block_downloader_test.go

@ -34,10 +34,9 @@ type blockDownloader struct {
limits Limits
activeUsersService *util.ActiveUsersCleanupService
ctx context.Context
manager *services.Manager
onWorkerStopCallback func()
wg sync.WaitGroup
ctx context.Context
manager *services.Manager
wg sync.WaitGroup
}
func newBlockDownloader(config config.Config, blockClient BlockClient, limits Limits, logger log.Logger, reg prometheus.Registerer) (*blockDownloader, error) {
@ -57,17 +56,16 @@ func newBlockDownloader(config config.Config, blockClient BlockClient, limits Li
}
b := &blockDownloader{
ctx: ctx,
logger: logger,
workingDirectory: config.WorkingDirectory,
queueMetrics: queueMetrics,
queue: downloadingQueue,
blockClient: blockClient,
activeUsersService: activeUsersService,
limits: limits,
manager: manager,
onWorkerStopCallback: onWorkerStopNoopCallback,
wg: sync.WaitGroup{},
ctx: ctx,
logger: logger,
workingDirectory: config.WorkingDirectory,
queueMetrics: queueMetrics,
queue: downloadingQueue,
blockClient: blockClient,
activeUsersService: activeUsersService,
limits: limits,
manager: manager,
wg: sync.WaitGroup{},
}
for i := 0; i < config.BlocksDownloadingQueue.WorkersCount; i++ {
@ -95,18 +93,15 @@ func NewBlockDownloadingTask(ctx context.Context, block BlockRef, resCh chan<- b
}
}
// noop implementation
var onWorkerStopNoopCallback = func() {}
func (d *blockDownloader) serveDownloadingTasks(workerID string) {
// defer first, so it gets executed as last of the deferred functions
defer d.wg.Done()
logger := log.With(d.logger, "worker", workerID)
level.Debug(logger).Log("msg", "starting worker")
d.queue.RegisterConsumerConnection(workerID)
defer d.queue.UnregisterConsumerConnection(workerID)
//this callback is used only in the tests to assert that worker is stopped
defer d.onWorkerStopCallback()
defer d.wg.Done()
idx := queue.StartIndexWithLocalQueue

@ -15,7 +15,6 @@ import (
"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper/config"
@ -37,10 +36,6 @@ func Test_blockDownloader_downloadBlocks(t *testing.T) {
MaxTasksEnqueuedPerTenant: 20,
},
}, blockClient, overrides, log.NewNopLogger(), prometheus.DefaultRegisterer)
stoppedWorkersCount := atomic.NewInt32(0)
downloader.onWorkerStopCallback = func() {
stoppedWorkersCount.Inc()
}
require.NoError(t, err)
blocksCh, errorsCh := downloader.downloadBlocks(context.Background(), "fake", blockReferences)
downloadedBlocks := make(map[string]any, len(blockReferences))
@ -63,8 +58,13 @@ func Test_blockDownloader_downloadBlocks(t *testing.T) {
}
require.Len(t, downloadedBlocks, 20, "all 20 block must be downloaded")
// We want all workers to be connected to the queue
require.Equal(t, workersCount, int(downloader.queue.GetConnectedConsumersMetric()))
downloader.stop()
require.Equal(t, int32(workersCount), stoppedWorkersCount.Load())
// We want all workers to be disconnected from the queue
require.Equal(t, 0, int(downloader.queue.GetConnectedConsumersMetric()))
}
// creates fake blocks and returns map[block-path]Block and mockBlockClient

Loading…
Cancel
Save