Require unique client configs when there are multiple clients. (#5688)

* Require unique client configs when there are multiple clients.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Fixing linter error

Signed-off-by: Danny Kopping <danny.kopping@grafana.com>

Co-authored-by: Danny Kopping <danny.kopping@grafana.com>
pull/5700/head
Callum Styan 3 years ago committed by GitHub
parent cb757a4a3a
commit 2ebbecfcbd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      clients/cmd/fluent-bit/dque.go
  2. 5
      clients/pkg/promtail/client/client.go
  3. 4
      clients/pkg/promtail/client/fake/client.go
  4. 4
      clients/pkg/promtail/client/logger.go
  5. 25
      clients/pkg/promtail/client/multi.go
  6. 39
      clients/pkg/promtail/client/multi_test.go

@ -150,3 +150,7 @@ func (c *dqueClient) enqueuer() {
}
}
}
func (c *dqueClient) Name() string {
return ""
}

@ -134,6 +134,7 @@ type Client interface {
api.EntryHandler
// Stop goroutine sending batch of entries without retries.
StopNow()
Name() string
}
// Client for pushing logs in snappy-compressed protos over HTTP.
@ -454,3 +455,7 @@ func (c *client) UnregisterLatencyMetric(labels prometheus.Labels) {
labels[HostLabel] = c.cfg.URL.Host
c.metrics.streamLag.Delete(labels)
}
func (c *client) Name() string {
return c.name
}

@ -56,3 +56,7 @@ func (c *Client) Received() []api.Entry {
func (c *Client) StopNow() {
c.Stop()
}
func (c *Client) Name() string {
return "fake"
}

@ -81,3 +81,7 @@ func (l *logger) run() {
}
}
func (l *logger) StopNow() { l.Stop() }
func (l *logger) Name() string {
return ""
}

@ -2,6 +2,8 @@ package client
import (
"errors"
"fmt"
"strings"
"sync"
"github.com/go-kit/log"
@ -20,16 +22,25 @@ type MultiClient struct {
// NewMulti creates a new client
func NewMulti(metrics *Metrics, streamLagLabels []string, logger log.Logger, cfgs ...Config) (Client, error) {
var fake struct{}
if len(cfgs) == 0 {
return nil, errors.New("at least one client config should be provided")
}
clientsCheck := make(map[string]struct{})
clients := make([]Client, 0, len(cfgs))
for _, cfg := range cfgs {
client, err := New(metrics, cfg, streamLagLabels, logger)
if err != nil {
return nil, err
}
// Don't allow duplicate clients, we have client specific metrics that need at least one unique label value (name).
if _, ok := clientsCheck[client.Name()]; ok {
return nil, fmt.Errorf("duplicate client configs are not allowed, found duplicate for URL: %s", cfg.URL)
}
clientsCheck[client.Name()] = fake
clients = append(clients, client)
}
multi := &MultiClient{
@ -71,3 +82,15 @@ func (m *MultiClient) StopNow() {
c.StopNow()
}
}
func (m *MultiClient) Name() string {
var sb strings.Builder
sb.WriteString("multi:")
for i, c := range m.clients {
sb.WriteString(c.Name())
if i != len(m.clients)-1 {
sb.WriteString(",")
}
}
return sb.String()
}

@ -68,6 +68,45 @@ func TestNewMulti(t *testing.T) {
}
}
func TestNewMulti_BlockDuplicates(t *testing.T) {
_, err := NewMulti(nilMetrics, nil, util_log.Logger, []Config{}...)
if err == nil {
t.Fatal("expected err but got nil")
}
host1, _ := url.Parse("http://localhost:3100")
cc1 := Config{
BatchSize: 20,
BatchWait: 1 * time.Second,
URL: flagext.URLValue{URL: host1},
ExternalLabels: lokiflag.LabelSet{LabelSet: model.LabelSet{"order": "yaml"}},
}
cc1Copy := cc1
_, err = NewMulti(metrics, nil, util_log.Logger, cc1, cc1Copy)
require.Error(t, err, "expected NewMulti to reject duplicate client configs")
cc1Copy.Name = "copy"
clients, err := NewMulti(metrics, nil, util_log.Logger, cc1, cc1Copy)
require.NoError(t, err, "expected NewMulti to reject duplicate client configs")
multi := clients.(*MultiClient)
if len(multi.clients) != 2 {
t.Fatalf("expected client: 2 got:%d", len(multi.clients))
}
actualCfg1 := clients.(*MultiClient).clients[0].(*client).cfg
// Yaml should overridden the command line so 'order: yaml' should be expected
expectedCfg1 := Config{
BatchSize: 20,
BatchWait: 1 * time.Second,
URL: flagext.URLValue{URL: host1},
ExternalLabels: lokiflag.LabelSet{LabelSet: model.LabelSet{"order": "yaml"}},
}
if !reflect.DeepEqual(actualCfg1, expectedCfg1) {
t.Fatalf("expected cfg: %v got:%v", expectedCfg1, actualCfg1)
}
}
func TestMultiClient_Stop(t *testing.T) {
var stopped int

Loading…
Cancel
Save