parent
d0ffe2ed3e
commit
0544bdd6af
@ -0,0 +1,161 @@ |
||||
// Copyright 2021 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package xds |
||||
|
||||
import ( |
||||
"context" |
||||
"errors" |
||||
"net/url" |
||||
"testing" |
||||
"time" |
||||
|
||||
v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" |
||||
"github.com/prometheus/common/config" |
||||
"github.com/stretchr/testify/require" |
||||
"google.golang.org/protobuf/types/known/anypb" |
||||
) |
||||
|
||||
var ( |
||||
httpResourceConf = &HTTPResourceClientConfig{ |
||||
HTTPClientConfig: config.HTTPClientConfig{ |
||||
TLSConfig: config.TLSConfig{InsecureSkipVerify: true}, |
||||
}, |
||||
ResourceType: "monitoring", |
||||
// Some known type.
|
||||
ResourceTypeURL: "type.googleapis.com/envoy.service.discovery.v3.DiscoveryRequest", |
||||
Server: "http://localhost", |
||||
ClientID: "test-id", |
||||
} |
||||
) |
||||
|
||||
func urlMustParse(str string) *url.URL { |
||||
parsed, err := url.Parse(str) |
||||
|
||||
if err != nil { |
||||
panic(err) |
||||
} |
||||
|
||||
return parsed |
||||
} |
||||
|
||||
func TestMakeXDSResourceHttpEndpointEmptyServerURLScheme(t *testing.T) { |
||||
endpointURL, err := makeXDSResourceHTTPEndpointURL(ProtocolV3, urlMustParse("127.0.0.1"), "monitoring") |
||||
|
||||
require.Empty(t, endpointURL) |
||||
require.Error(t, err) |
||||
require.Equal(t, err.Error(), "invalid xDS server URL") |
||||
} |
||||
|
||||
func TestMakeXDSResourceHttpEndpointEmptyServerURLHost(t *testing.T) { |
||||
endpointURL, err := makeXDSResourceHTTPEndpointURL(ProtocolV3, urlMustParse("grpc://127.0.0.1"), "monitoring") |
||||
|
||||
require.Empty(t, endpointURL) |
||||
require.NotNil(t, err) |
||||
require.Contains(t, err.Error(), "must be either 'http' or 'https'") |
||||
} |
||||
|
||||
func TestMakeXDSResourceHttpEndpoint(t *testing.T) { |
||||
endpointURL, err := makeXDSResourceHTTPEndpointURL(ProtocolV3, urlMustParse("http://127.0.0.1:5000"), "monitoring") |
||||
|
||||
require.NoError(t, err) |
||||
require.Equal(t, endpointURL.String(), "http://127.0.0.1:5000/v3/discovery:monitoring") |
||||
} |
||||
|
||||
func TestCreateNewHTTPResourceClient(t *testing.T) { |
||||
c := &HTTPResourceClientConfig{ |
||||
HTTPClientConfig: sdConf.HTTPClientConfig, |
||||
Name: "test", |
||||
ExtraQueryParams: url.Values{ |
||||
"param1": {"v1"}, |
||||
}, |
||||
Timeout: 1 * time.Minute, |
||||
ResourceType: "monitoring", |
||||
ResourceTypeURL: "type.googleapis.com/envoy.service.discovery.v3.DiscoveryRequest", |
||||
Server: "http://127.0.0.1:5000", |
||||
ClientID: "client", |
||||
} |
||||
|
||||
client, err := NewHTTPResourceClient(c, ProtocolV3) |
||||
|
||||
require.NoError(t, err) |
||||
|
||||
require.Equal(t, client.endpoint, "http://127.0.0.1:5000/v3/discovery:monitoring?param1=v1") |
||||
require.Equal(t, client.client.Timeout, 1*time.Minute) |
||||
|
||||
} |
||||
|
||||
func createTestHTTPResourceClient(t *testing.T, conf *HTTPResourceClientConfig, protocolVersion ProtocolVersion, responder discoveryResponder) (*HTTPResourceClient, func()) { |
||||
s := createTestHTTPServer(t, func(request *v3.DiscoveryRequest) (*v3.DiscoveryResponse, error) { |
||||
require.Equal(t, conf.ResourceTypeURL, request.TypeUrl) |
||||
require.Equal(t, conf.ClientID, request.Node.Id) |
||||
return responder(request) |
||||
}) |
||||
|
||||
conf.Server = s.URL |
||||
client, err := NewHTTPResourceClient(conf, protocolVersion) |
||||
require.NoError(t, err) |
||||
|
||||
return client, s.Close |
||||
} |
||||
|
||||
func TestHTTPResourceClientFetchEmptyResponse(t *testing.T) { |
||||
client, cleanup := createTestHTTPResourceClient(t, httpResourceConf, ProtocolV3, func(request *v3.DiscoveryRequest) (*v3.DiscoveryResponse, error) { |
||||
return nil, nil |
||||
}) |
||||
defer cleanup() |
||||
|
||||
res, err := client.Fetch(context.Background()) |
||||
require.Nil(t, res) |
||||
require.NoError(t, err) |
||||
} |
||||
|
||||
func TestHTTPResourceClientFetchFullResponse(t *testing.T) { |
||||
client, cleanup := createTestHTTPResourceClient(t, httpResourceConf, ProtocolV3, func(request *v3.DiscoveryRequest) (*v3.DiscoveryResponse, error) { |
||||
if request.VersionInfo == "1" { |
||||
return nil, nil |
||||
} |
||||
|
||||
return &v3.DiscoveryResponse{ |
||||
TypeUrl: request.TypeUrl, |
||||
VersionInfo: "1", |
||||
Nonce: "abc", |
||||
Resources: []*anypb.Any{}, |
||||
}, nil |
||||
}) |
||||
defer cleanup() |
||||
|
||||
res, err := client.Fetch(context.Background()) |
||||
require.NoError(t, err) |
||||
require.NotNil(t, res) |
||||
|
||||
require.Equal(t, client.ResourceTypeURL(), res.TypeUrl) |
||||
require.Len(t, res.Resources, 0) |
||||
require.Equal(t, "abc", client.latestNonce, "Nonce not cached") |
||||
require.Equal(t, "1", client.latestVersion, "Version not cached") |
||||
|
||||
res, err = client.Fetch(context.Background()) |
||||
require.Nil(t, res, "Update not expected") |
||||
require.NoError(t, err) |
||||
} |
||||
|
||||
func TestHTTPResourceClientServerError(t *testing.T) { |
||||
client, cleanup := createTestHTTPResourceClient(t, httpResourceConf, ProtocolV3, func(request *v3.DiscoveryRequest) (*v3.DiscoveryResponse, error) { |
||||
return nil, errors.New("server error") |
||||
}) |
||||
defer cleanup() |
||||
|
||||
res, err := client.Fetch(context.Background()) |
||||
require.Nil(t, res) |
||||
require.Error(t, err) |
||||
} |
||||
@ -0,0 +1,340 @@ |
||||
// Copyright 2021 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package xds |
||||
|
||||
import ( |
||||
"context" |
||||
"fmt" |
||||
"testing" |
||||
"time" |
||||
|
||||
v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" |
||||
"github.com/pkg/errors" |
||||
"github.com/prometheus/common/model" |
||||
"github.com/stretchr/testify/require" |
||||
"google.golang.org/protobuf/proto" |
||||
"google.golang.org/protobuf/types/known/anypb" |
||||
"gopkg.in/yaml.v2" |
||||
|
||||
"github.com/prometheus/prometheus/discovery/targetgroup" |
||||
) |
||||
|
||||
var ( |
||||
kumaConf KumaSDConfig = sdConf |
||||
|
||||
testKumaMadsV1Resources = []*MonitoringAssignment{ |
||||
{ |
||||
Mesh: "metrics", |
||||
Service: "prometheus", |
||||
Targets: []*MonitoringAssignment_Target{ |
||||
{ |
||||
Name: "prometheus-01", |
||||
Scheme: "http", |
||||
Address: "10.1.4.32:9090", |
||||
MetricsPath: "/custom-metrics", |
||||
Labels: map[string]string{ |
||||
"commit_hash": "620506a88", |
||||
}, |
||||
}, |
||||
{ |
||||
Name: "prometheus-02", |
||||
Scheme: "http", |
||||
Address: "10.1.4.33:9090", |
||||
Labels: map[string]string{ |
||||
"commit_hash": "3513bba00", |
||||
}, |
||||
}, |
||||
}, |
||||
Labels: map[string]string{ |
||||
"kuma.io/zone": "us-east-1", |
||||
"team": "infra", |
||||
}, |
||||
}, |
||||
{ |
||||
Mesh: "metrics", |
||||
Service: "grafana", |
||||
Targets: []*MonitoringAssignment_Target{}, |
||||
Labels: map[string]string{ |
||||
"kuma.io/zone": "us-east-1", |
||||
"team": "infra", |
||||
}, |
||||
}, |
||||
{ |
||||
Mesh: "data", |
||||
Service: "elasticsearch", |
||||
Targets: []*MonitoringAssignment_Target{ |
||||
{ |
||||
Name: "elasticsearch-01", |
||||
Scheme: "http", |
||||
Address: "10.1.1.1", |
||||
Labels: map[string]string{ |
||||
"role": "ml", |
||||
}, |
||||
}, |
||||
}, |
||||
}, |
||||
} |
||||
) |
||||
|
||||
func getKumaMadsV1DiscoveryResponse(resources ...*MonitoringAssignment) (*v3.DiscoveryResponse, error) { |
||||
serialized := make([]*anypb.Any, len(resources)) |
||||
for i, res := range resources { |
||||
data, err := proto.Marshal(res) |
||||
|
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
serialized[i] = &anypb.Any{ |
||||
TypeUrl: KumaMadsV1ResourceTypeURL, |
||||
Value: data, |
||||
} |
||||
} |
||||
return &v3.DiscoveryResponse{ |
||||
TypeUrl: KumaMadsV1ResourceTypeURL, |
||||
Resources: serialized, |
||||
}, nil |
||||
} |
||||
|
||||
func newKumaTestHTTPDiscovery(c KumaSDConfig) (*fetchDiscovery, error) { |
||||
kd, err := NewKumaHTTPDiscovery(&c, nopLogger) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
pd, ok := kd.(*fetchDiscovery) |
||||
if !ok { |
||||
return nil, errors.New("not a fetchDiscovery") |
||||
} |
||||
return pd, nil |
||||
} |
||||
|
||||
func TestKumaMadsV1ResourceParserInvalidTypeURL(t *testing.T) { |
||||
resources := make([]*anypb.Any, 0) |
||||
groups, err := kumaMadsV1ResourceParser(resources, "type.googleapis.com/some.api.v1.Monitoring") |
||||
require.Nil(t, groups) |
||||
require.Error(t, err) |
||||
} |
||||
|
||||
func TestKumaMadsV1ResourceParserEmptySlice(t *testing.T) { |
||||
resources := make([]*anypb.Any, 0) |
||||
groups, err := kumaMadsV1ResourceParser(resources, KumaMadsV1ResourceTypeURL) |
||||
require.Len(t, groups, 0) |
||||
require.NoError(t, err) |
||||
} |
||||
|
||||
func TestKumaMadsV1ResourceParserValidResources(t *testing.T) { |
||||
res, err := getKumaMadsV1DiscoveryResponse(testKumaMadsV1Resources...) |
||||
require.NoError(t, err) |
||||
|
||||
groups, err := kumaMadsV1ResourceParser(res.Resources, KumaMadsV1ResourceTypeURL) |
||||
require.NoError(t, err) |
||||
require.Len(t, groups, 3) |
||||
|
||||
expectedGroup1 := &targetgroup.Group{ |
||||
Targets: []model.LabelSet{ |
||||
{ |
||||
"__address__": "10.1.4.32:9090", |
||||
"__meta_kuma_label_commit_hash": "620506a88", |
||||
"__meta_kuma_dataplane": "prometheus-01", |
||||
"__metrics_path__": "/custom-metrics", |
||||
"__scheme__": "http", |
||||
"instance": "prometheus-01", |
||||
}, |
||||
{ |
||||
"__address__": "10.1.4.33:9090", |
||||
"__meta_kuma_label_commit_hash": "3513bba00", |
||||
"__meta_kuma_dataplane": "prometheus-02", |
||||
"__metrics_path__": "", |
||||
"__scheme__": "http", |
||||
"instance": "prometheus-02", |
||||
}, |
||||
}, |
||||
Labels: model.LabelSet{ |
||||
"__meta_kuma_mesh": "metrics", |
||||
"__meta_kuma_service": "prometheus", |
||||
"__meta_kuma_label_team": "infra", |
||||
"__meta_kuma_label_kuma_io_zone": "us-east-1", |
||||
}, |
||||
} |
||||
require.Equal(t, expectedGroup1, groups[0]) |
||||
|
||||
expectedGroup2 := &targetgroup.Group{ |
||||
Labels: model.LabelSet{ |
||||
"__meta_kuma_mesh": "metrics", |
||||
"__meta_kuma_service": "grafana", |
||||
"__meta_kuma_label_team": "infra", |
||||
"__meta_kuma_label_kuma_io_zone": "us-east-1", |
||||
}, |
||||
} |
||||
require.Equal(t, expectedGroup2, groups[1]) |
||||
|
||||
expectedGroup3 := &targetgroup.Group{ |
||||
Targets: []model.LabelSet{ |
||||
{ |
||||
"__address__": "10.1.1.1", |
||||
"__meta_kuma_label_role": "ml", |
||||
"__meta_kuma_dataplane": "elasticsearch-01", |
||||
"__metrics_path__": "", |
||||
"__scheme__": "http", |
||||
"instance": "elasticsearch-01", |
||||
}, |
||||
}, |
||||
Labels: model.LabelSet{ |
||||
"__meta_kuma_mesh": "data", |
||||
"__meta_kuma_service": "elasticsearch", |
||||
}, |
||||
} |
||||
require.Equal(t, expectedGroup3, groups[2]) |
||||
} |
||||
|
||||
func TestKumaMadsV1ResourceParserInvalidResources(t *testing.T) { |
||||
data, err := protoJSONMarshalOptions.Marshal(&MonitoringAssignment_Target{}) |
||||
require.NoError(t, err) |
||||
|
||||
resources := []*anypb.Any{{ |
||||
TypeUrl: KumaMadsV1ResourceTypeURL, |
||||
Value: data, |
||||
}} |
||||
groups, err := kumaMadsV1ResourceParser(resources, KumaMadsV1ResourceTypeURL) |
||||
require.Nil(t, groups) |
||||
require.Error(t, err) |
||||
|
||||
require.Contains(t, err.Error(), "cannot parse") |
||||
} |
||||
|
||||
func TestNewKumaHTTPDiscovery(t *testing.T) { |
||||
kd, err := newKumaTestHTTPDiscovery(kumaConf) |
||||
require.NoError(t, err) |
||||
require.NotNil(t, kd) |
||||
|
||||
resClient, ok := kd.client.(*HTTPResourceClient) |
||||
require.True(t, ok) |
||||
require.Equal(t, kumaConf.Server, resClient.Server()) |
||||
require.Equal(t, KumaMadsV1ResourceTypeURL, resClient.ResourceTypeURL()) |
||||
require.NotEmpty(t, resClient.ID()) |
||||
require.Equal(t, KumaMadsV1ResourceType, resClient.config.ResourceType) |
||||
} |
||||
|
||||
func TestKumaHTTPDiscoveryRefresh(t *testing.T) { |
||||
s := createTestHTTPServer(t, func(request *v3.DiscoveryRequest) (*v3.DiscoveryResponse, error) { |
||||
if request.VersionInfo == "1" { |
||||
return nil, nil |
||||
} |
||||
|
||||
res, err := getKumaMadsV1DiscoveryResponse(testKumaMadsV1Resources...) |
||||
require.NoError(t, err) |
||||
|
||||
res.VersionInfo = "1" |
||||
res.Nonce = "abc" |
||||
|
||||
return res, nil |
||||
}) |
||||
defer s.Close() |
||||
|
||||
cfgString := fmt.Sprintf(` |
||||
--- |
||||
server: %s |
||||
refresh_interval: 10s |
||||
tls_config: |
||||
insecure_skip_verify: true |
||||
`, s.URL) |
||||
|
||||
var cfg KumaSDConfig |
||||
require.NoError(t, yaml.Unmarshal([]byte(cfgString), &cfg)) |
||||
|
||||
kd, err := newKumaTestHTTPDiscovery(cfg) |
||||
require.NoError(t, err) |
||||
require.NotNil(t, kd) |
||||
|
||||
ch := make(chan []*targetgroup.Group, 1) |
||||
kd.poll(context.Background(), ch) |
||||
|
||||
groups := <-ch |
||||
require.Len(t, groups, 3) |
||||
|
||||
expectedGroup1 := &targetgroup.Group{ |
||||
Source: "kuma", |
||||
Targets: []model.LabelSet{ |
||||
{ |
||||
"__address__": "10.1.4.32:9090", |
||||
"__meta_kuma_label_commit_hash": "620506a88", |
||||
"__meta_kuma_dataplane": "prometheus-01", |
||||
"__metrics_path__": "/custom-metrics", |
||||
"__scheme__": "http", |
||||
"instance": "prometheus-01", |
||||
}, |
||||
{ |
||||
"__address__": "10.1.4.33:9090", |
||||
"__meta_kuma_label_commit_hash": "3513bba00", |
||||
"__meta_kuma_dataplane": "prometheus-02", |
||||
"__metrics_path__": "", |
||||
"__scheme__": "http", |
||||
"instance": "prometheus-02", |
||||
}, |
||||
}, |
||||
Labels: model.LabelSet{ |
||||
"__meta_kuma_mesh": "metrics", |
||||
"__meta_kuma_service": "prometheus", |
||||
"__meta_kuma_label_team": "infra", |
||||
"__meta_kuma_label_kuma_io_zone": "us-east-1", |
||||
}, |
||||
} |
||||
require.Equal(t, expectedGroup1, groups[0]) |
||||
|
||||
expectedGroup2 := &targetgroup.Group{ |
||||
Source: "kuma", |
||||
Labels: model.LabelSet{ |
||||
"__meta_kuma_mesh": "metrics", |
||||
"__meta_kuma_service": "grafana", |
||||
"__meta_kuma_label_team": "infra", |
||||
"__meta_kuma_label_kuma_io_zone": "us-east-1", |
||||
}, |
||||
} |
||||
require.Equal(t, expectedGroup2, groups[1]) |
||||
|
||||
expectedGroup3 := &targetgroup.Group{ |
||||
Source: "kuma", |
||||
Targets: []model.LabelSet{ |
||||
{ |
||||
"__address__": "10.1.1.1", |
||||
"__meta_kuma_label_role": "ml", |
||||
"__meta_kuma_dataplane": "elasticsearch-01", |
||||
"__metrics_path__": "", |
||||
"__scheme__": "http", |
||||
"instance": "elasticsearch-01", |
||||
}, |
||||
}, |
||||
Labels: model.LabelSet{ |
||||
"__meta_kuma_mesh": "data", |
||||
"__meta_kuma_service": "elasticsearch", |
||||
}, |
||||
} |
||||
require.Equal(t, expectedGroup3, groups[2]) |
||||
|
||||
// Should skip the next update.
|
||||
ctx, cancel := context.WithCancel(context.Background()) |
||||
go func() { |
||||
time.Sleep(1 * time.Second) |
||||
cancel() |
||||
}() |
||||
|
||||
kd.poll(ctx, ch) |
||||
select { |
||||
case <-ctx.Done(): |
||||
return |
||||
case <-ch: |
||||
require.Fail(t, "no update expected") |
||||
} |
||||
} |
||||
@ -0,0 +1,201 @@ |
||||
// Copyright 2021 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package xds |
||||
|
||||
import ( |
||||
"context" |
||||
"io" |
||||
"io/ioutil" |
||||
"net/http" |
||||
"net/http/httptest" |
||||
"testing" |
||||
"time" |
||||
|
||||
v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" |
||||
"github.com/go-kit/log" |
||||
"github.com/prometheus/client_golang/prometheus" |
||||
"github.com/prometheus/common/model" |
||||
"github.com/stretchr/testify/require" |
||||
"go.uber.org/goleak" |
||||
"google.golang.org/protobuf/types/known/anypb" |
||||
|
||||
"github.com/prometheus/prometheus/discovery/targetgroup" |
||||
) |
||||
|
||||
var ( |
||||
sdConf = SDConfig{ |
||||
Server: "http://127.0.0.1", |
||||
RefreshInterval: model.Duration(10 * time.Second), |
||||
} |
||||
|
||||
testFetchFailuresCount = prometheus.NewCounter( |
||||
prometheus.CounterOpts{}) |
||||
testFetchSkipUpdateCount = prometheus.NewCounter( |
||||
prometheus.CounterOpts{}) |
||||
testFetchDuration = prometheus.NewSummary( |
||||
prometheus.SummaryOpts{}, |
||||
) |
||||
) |
||||
|
||||
func TestMain(m *testing.M) { |
||||
goleak.VerifyTestMain(m) |
||||
} |
||||
|
||||
type discoveryResponder func(request *v3.DiscoveryRequest) (*v3.DiscoveryResponse, error) |
||||
|
||||
func createTestHTTPServer(t *testing.T, responder discoveryResponder) *httptest.Server { |
||||
return httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { |
||||
// Validate req MIME types.
|
||||
require.Equal(t, "application/json", r.Header.Get("Content-Type")) |
||||
require.Equal(t, "application/json", r.Header.Get("Accept")) |
||||
|
||||
body, err := ioutil.ReadAll(r.Body) |
||||
defer func() { |
||||
_, _ = io.Copy(ioutil.Discard, r.Body) |
||||
_ = r.Body.Close() |
||||
}() |
||||
require.NotEmpty(t, body) |
||||
require.NoError(t, err) |
||||
|
||||
// Validate discovery request.
|
||||
discoveryReq := &v3.DiscoveryRequest{} |
||||
err = protoJSONUnmarshalOptions.Unmarshal(body, discoveryReq) |
||||
require.NoError(t, err) |
||||
|
||||
discoveryRes, err := responder(discoveryReq) |
||||
if err != nil { |
||||
w.WriteHeader(500) |
||||
return |
||||
} |
||||
|
||||
if discoveryRes == nil { |
||||
w.WriteHeader(304) |
||||
return |
||||
} |
||||
|
||||
w.WriteHeader(200) |
||||
data, err := protoJSONMarshalOptions.Marshal(discoveryRes) |
||||
require.NoError(t, err) |
||||
|
||||
_, err = w.Write(data) |
||||
require.NoError(t, err) |
||||
})) |
||||
} |
||||
|
||||
func constantResourceParser(groups []*targetgroup.Group, err error) resourceParser { |
||||
return func(resources []*anypb.Any, typeUrl string) ([]*targetgroup.Group, error) { |
||||
return groups, err |
||||
} |
||||
} |
||||
|
||||
var nopLogger = log.NewNopLogger() |
||||
|
||||
type testResourceClient struct { |
||||
resourceTypeURL string |
||||
server string |
||||
protocolVersion ProtocolVersion |
||||
fetch func(ctx context.Context) (*v3.DiscoveryResponse, error) |
||||
} |
||||
|
||||
func (rc testResourceClient) ResourceTypeURL() string { |
||||
return rc.resourceTypeURL |
||||
} |
||||
|
||||
func (rc testResourceClient) Server() string { |
||||
return rc.server |
||||
} |
||||
|
||||
func (rc testResourceClient) Fetch(ctx context.Context) (*v3.DiscoveryResponse, error) { |
||||
return rc.fetch(ctx) |
||||
} |
||||
|
||||
func (rc testResourceClient) ID() string { |
||||
return "test-client" |
||||
} |
||||
|
||||
func (rc testResourceClient) Close() { |
||||
} |
||||
|
||||
func TestPollingRefreshSkipUpdate(t *testing.T) { |
||||
rc := &testResourceClient{ |
||||
fetch: func(ctx context.Context) (*v3.DiscoveryResponse, error) { |
||||
return nil, nil |
||||
}, |
||||
} |
||||
pd := &fetchDiscovery{ |
||||
client: rc, |
||||
logger: nopLogger, |
||||
fetchDuration: testFetchDuration, |
||||
fetchFailuresCount: testFetchFailuresCount, |
||||
fetchSkipUpdateCount: testFetchSkipUpdateCount, |
||||
} |
||||
|
||||
ctx, cancel := context.WithCancel(context.Background()) |
||||
go func() { |
||||
time.Sleep(1 * time.Second) |
||||
cancel() |
||||
}() |
||||
|
||||
ch := make(chan []*targetgroup.Group, 1) |
||||
pd.poll(ctx, ch) |
||||
select { |
||||
case <-ctx.Done(): |
||||
return |
||||
case <-ch: |
||||
require.Fail(t, "no update expected") |
||||
} |
||||
} |
||||
|
||||
func TestPollingRefreshAttachesGroupMetadata(t *testing.T) { |
||||
server := "http://198.161.2.0" |
||||
source := "test" |
||||
rc := &testResourceClient{ |
||||
server: server, |
||||
protocolVersion: ProtocolV3, |
||||
fetch: func(ctx context.Context) (*v3.DiscoveryResponse, error) { |
||||
return &v3.DiscoveryResponse{}, nil |
||||
}, |
||||
} |
||||
pd := &fetchDiscovery{ |
||||
source: source, |
||||
client: rc, |
||||
logger: nopLogger, |
||||
fetchDuration: testFetchDuration, |
||||
fetchFailuresCount: testFetchFailuresCount, |
||||
fetchSkipUpdateCount: testFetchSkipUpdateCount, |
||||
parseResources: constantResourceParser([]*targetgroup.Group{ |
||||
{}, |
||||
{ |
||||
Source: "a-custom-source", |
||||
Labels: model.LabelSet{ |
||||
"__meta_custom_xds_label": "a-value", |
||||
}, |
||||
}, |
||||
}, nil), |
||||
} |
||||
ch := make(chan []*targetgroup.Group, 1) |
||||
pd.poll(context.Background(), ch) |
||||
groups := <-ch |
||||
require.NotNil(t, groups) |
||||
|
||||
require.Len(t, groups, 2) |
||||
|
||||
for _, group := range groups { |
||||
require.Equal(t, source, group.Source) |
||||
} |
||||
|
||||
group2 := groups[1] |
||||
require.Contains(t, group2.Labels, model.LabelName("__meta_custom_xds_label")) |
||||
require.Equal(t, model.LabelValue("a-value"), group2.Labels["__meta_custom_xds_label"]) |
||||
} |
||||
Loading…
Reference in new issue