|
|
|
@ -109,6 +109,7 @@ type Discovery struct { |
|
|
|
|
sources map[string]*targetgroup.Group |
|
|
|
|
|
|
|
|
|
updates chan treecache.ZookeeperTreeCacheEvent |
|
|
|
|
pathUpdates []chan treecache.ZookeeperTreeCacheEvent |
|
|
|
|
treeCaches []*treecache.ZookeeperTreeCache |
|
|
|
|
|
|
|
|
|
parse func(data []byte, path string) (model.LabelSet, error) |
|
|
|
@ -155,7 +156,9 @@ func NewDiscovery( |
|
|
|
|
logger: logger, |
|
|
|
|
} |
|
|
|
|
for _, path := range paths { |
|
|
|
|
sd.treeCaches = append(sd.treeCaches, treecache.NewZookeeperTreeCache(conn, path, updates, logger)) |
|
|
|
|
pathUpdate := make(chan treecache.ZookeeperTreeCacheEvent) |
|
|
|
|
sd.pathUpdates = append(sd.pathUpdates, pathUpdate) |
|
|
|
|
sd.treeCaches = append(sd.treeCaches, treecache.NewZookeeperTreeCache(conn, path, pathUpdate, logger)) |
|
|
|
|
} |
|
|
|
|
return sd, nil |
|
|
|
|
} |
|
|
|
@ -166,12 +169,26 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { |
|
|
|
|
for _, tc := range d.treeCaches { |
|
|
|
|
tc.Stop() |
|
|
|
|
} |
|
|
|
|
for _, pathUpdate := range d.pathUpdates { |
|
|
|
|
// Drain event channel in case the treecache leaks goroutines otherwise.
|
|
|
|
|
for range d.updates { |
|
|
|
|
for range pathUpdate { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
d.conn.Close() |
|
|
|
|
}() |
|
|
|
|
|
|
|
|
|
for _, pathUpdate := range d.pathUpdates { |
|
|
|
|
go func(update chan treecache.ZookeeperTreeCacheEvent) { |
|
|
|
|
for event := range update { |
|
|
|
|
select { |
|
|
|
|
case d.updates <- event: |
|
|
|
|
case <-ctx.Done(): |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
}(pathUpdate) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
for { |
|
|
|
|
select { |
|
|
|
|
case <-ctx.Done(): |
|
|
|
|