package worker import ( "context" "strconv" "sync" "time" "github.com/pkg/errors" "go.uber.org/atomic" "google.golang.org/grpc" ) const ( notifyShutdownTimeout = 5 * time.Second ) // Manages processor goroutines for single grpc connection. type processorManager struct { p processor conn *grpc.ClientConn address string // Main context to control all goroutines. ctx context.Context wg sync.WaitGroup // Cancel functions for individual goroutines. cancelsMu sync.Mutex cancels []context.CancelFunc currentProcessors *atomic.Int32 } func newProcessorManager(ctx context.Context, p processor, conn *grpc.ClientConn, address string) *processorManager { return &processorManager{ p: p, ctx: ctx, conn: conn, address: address, currentProcessors: atomic.NewInt32(0), } } func (pm *processorManager) stop() { // Notify the remote query-frontend or query-scheduler we're shutting down. // We use a new context to make sure it's not cancelled. notifyCtx, cancel := context.WithTimeoutCause(context.Background(), notifyShutdownTimeout, errors.New("notify shutdown timeout reached")) defer cancel() pm.p.notifyShutdown(notifyCtx, pm.conn, pm.address) // Stop all goroutines. pm.concurrency(0) // Wait until they finish. pm.wg.Wait() _ = pm.conn.Close() } func (pm *processorManager) concurrency(n int) { pm.cancelsMu.Lock() defer pm.cancelsMu.Unlock() if n < 0 { n = 0 } for len(pm.cancels) < n { workerID := len(pm.cancels) + 1 ctx, cancel := context.WithCancel(pm.ctx) pm.cancels = append(pm.cancels, cancel) pm.wg.Add(1) go func(workerID int) { defer pm.wg.Done() pm.currentProcessors.Inc() defer pm.currentProcessors.Dec() pm.p.processQueriesOnSingleStream(ctx, pm.conn, pm.address, strconv.Itoa(workerID)) }(workerID) } for len(pm.cancels) > n { pm.cancels[0]() pm.cancels = pm.cancels[1:] } }