diff --git a/discovery/consul/consul.go b/discovery/consul/consul.go index 1004d0941a..4cc4003b2c 100644 --- a/discovery/consul/consul.go +++ b/discovery/consul/consul.go @@ -499,6 +499,7 @@ func (srv *consulService) watch(ctx context.Context, ch chan<- []*targetgroup.Gr WaitTime: watchTimeout, AllowStale: srv.discovery.allowStale, NodeMeta: srv.discovery.watchedNodeMeta, + Filter: srv.discovery.watchedFilter, } t0 := time.Now() diff --git a/discovery/consul/consul_test.go b/discovery/consul/consul_test.go index feec5d4747..32394dcc00 100644 --- a/discovery/consul/consul_test.go +++ b/discovery/consul/consul_test.go @@ -240,6 +240,8 @@ func newServer(t *testing.T) (*httptest.Server, *SDConfig) { response = ServiceTestAnswer case "/v1/health/service/test?wait=120000ms": response = ServiceTestAnswer + case "/v1/health/service/test?filter=NodeMeta.rack_name+%3D%3D+%222304%22&wait=120000ms": + response = ServiceTestAnswer case "/v1/health/service/other?wait=120000ms": response = `[]` case "/v1/catalog/services?node-meta=rack_name%3A2304&stale=&wait=120000ms": @@ -392,6 +394,54 @@ func TestFilterOption(t *testing.T) { cancel() } +// TestFilterOnHealthEndpoint verifies that filter is passed to health service endpoint. +func TestFilterOnHealthEndpoint(t *testing.T) { + filterReceived := false + stub := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + response := "" + switch r.URL.Path { + case "/v1/agent/self": + response = AgentAnswer + case "/v1/health/service/test": + // Verify filter parameter is present in the query + filter := r.URL.Query().Get("filter") + if filter == `Node.Meta.rack_name == "2304"` { + filterReceived = true + } + response = ServiceTestAnswer + default: + t.Errorf("Unhandled consul call: %s", r.URL) + } + w.Header().Add("X-Consul-Index", "1") + w.Write([]byte(response)) + })) + defer stub.Close() + + stuburl, err := url.Parse(stub.URL) + require.NoError(t, err) + + config := &SDConfig{ + Server: stuburl.Host, + Services: []string{"test"}, + Filter: `Node.Meta.rack_name == "2304"`, + RefreshInterval: model.Duration(1 * time.Second), + } + + d := newDiscovery(t, config) + + ctx, cancel := context.WithCancel(context.Background()) + ch := make(chan []*targetgroup.Group) + go func() { + d.Run(ctx, ch) + close(ch) + }() + checkOneTarget(t, <-ch) + cancel() + + // Verify the filter was actually sent to the health endpoint + require.True(t, filterReceived, "Filter parameter should be sent to health service endpoint") +} + func TestGetDatacenterShouldReturnError(t *testing.T) { for _, tc := range []struct { handler func(http.ResponseWriter, *http.Request)