|
|
|
@ -90,12 +90,13 @@ func (f *frontendSchedulerWorkers) AddressAdded(address string) { |
|
|
|
|
f.mu.Lock() |
|
|
|
|
ws := f.workers |
|
|
|
|
w := f.workers[address] |
|
|
|
|
f.mu.Unlock() |
|
|
|
|
|
|
|
|
|
// Already stopped or we already have worker for this address.
|
|
|
|
|
if ws == nil || w != nil { |
|
|
|
|
f.mu.Unlock() |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
f.mu.Unlock() |
|
|
|
|
|
|
|
|
|
level.Info(f.logger).Log("msg", "adding connection to scheduler", "addr", address) |
|
|
|
|
conn, err := f.connectToScheduler(context.Background(), address) |
|
|
|
@ -111,10 +112,16 @@ func (f *frontendSchedulerWorkers) AddressAdded(address string) { |
|
|
|
|
defer f.mu.Unlock() |
|
|
|
|
|
|
|
|
|
// Can be nil if stopping has been called already.
|
|
|
|
|
if f.workers != nil { |
|
|
|
|
f.workers[address] = w |
|
|
|
|
w.start() |
|
|
|
|
if f.workers == nil { |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
// We have to recheck for presence in case we got called again while we were
|
|
|
|
|
// connecting and that one finished first.
|
|
|
|
|
if f.workers[address] != nil { |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
f.workers[address] = w |
|
|
|
|
w.start() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (f *frontendSchedulerWorkers) AddressRemoved(address string) { |
|
|
|
@ -277,7 +284,6 @@ func (w *frontendSchedulerWorker) schedulerLoop(loop schedulerpb.SchedulerForFro |
|
|
|
|
FrontendAddress: w.frontendAddr, |
|
|
|
|
StatsEnabled: req.statsEnabled, |
|
|
|
|
}) |
|
|
|
|
|
|
|
|
|
if err != nil { |
|
|
|
|
req.enqueue <- enqueueResult{status: failed} |
|
|
|
|
return err |
|
|
|
@ -323,7 +329,6 @@ func (w *frontendSchedulerWorker) schedulerLoop(loop schedulerpb.SchedulerForFro |
|
|
|
|
Type: schedulerpb.CANCEL, |
|
|
|
|
QueryID: reqID, |
|
|
|
|
}) |
|
|
|
|
|
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|