Loki: Push support for multi-tenancy mode (#60866)

* Loki: Add multitenancy support for the HTTP client

* Loki: Support to push with multitenancy mode

* Apply feedback suggestions
pull/61931/head
Joan López de la Franca Beltran 3 years ago committed by GitHub
parent 35c7bbef57
commit afef0c926c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 96
      pkg/components/loki/lokigrpc/client.go
  2. 15
      pkg/components/loki/lokigrpc/config.go
  3. 30
      pkg/components/loki/lokigrpc/tenant.go
  4. 19
      pkg/components/loki/lokihttp/client.go
  5. 2
      pkg/components/loki/lokihttp/config.go

@ -0,0 +1,96 @@
package lokigrpc
import (
"context"
"crypto/tls"
"errors"
grpcretry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"github.com/grafana/grafana/pkg/components/loki/logproto"
)
// Client is a gRPC-based Loki client implementation.
type Client struct {
client logproto.PusherClient
conn *grpc.ClientConn
opts []grpc.DialOption
cfg Config
}
// NewClient instantiates a new Client.
func NewClient(cfg Config, opts ...grpc.DialOption) (*Client, error) {
w := &Client{
opts: opts,
cfg: cfg,
}
return w, w.init()
}
// Write pushes a new request with the given streams through
// the attached gRPC connection.
func (c *Client) Write(streams []logproto.Stream) (err error) {
pushRequest := &logproto.PushRequest{
Streams: streams,
}
ctx, cancel := c.timeoutCtx()
defer cancel()
if len(c.cfg.TenantID) > 0 {
ctx = injectOrgID(ctx, c.cfg.TenantID)
}
_, err = c.client.Push(ctx, pushRequest)
return err
}
// Close closes the attached gRPC connection.
func (c *Client) Close() error {
return c.conn.Close()
}
func (c *Client) init() error {
if len(c.cfg.URL) == 0 {
return errors.New("cfg must have Loki url")
}
opts := append(c.opts, c.grpcTLSOption(), c.grpcRetryOption())
conn, err := grpc.Dial(c.cfg.URL, opts...)
if err != nil {
return err
}
c.conn = conn
c.client = logproto.NewPusherClient(conn)
return nil
}
func (c *Client) grpcTLSOption() grpc.DialOption {
if c.cfg.TLSDisabled {
return grpc.WithTransportCredentials(insecure.NewCredentials())
}
config := &tls.Config{InsecureSkipVerify: false}
return grpc.WithTransportCredentials(credentials.NewTLS(config))
}
func (c *Client) grpcRetryOption() grpc.DialOption {
return grpc.WithUnaryInterceptor(
grpcretry.UnaryClientInterceptor(
grpcretry.WithMax(c.cfg.Retries),
),
)
}
func (c *Client) timeoutCtx() (context.Context, context.CancelFunc) {
if c.cfg.Timeout > 0 {
return context.WithTimeout(context.Background(), c.cfg.Timeout)
}
return context.WithCancel(context.Background())
}

@ -0,0 +1,15 @@
package lokigrpc
import "time"
// Config describes configuration for a gRPC pusher client.
type Config struct {
URL string
Retries uint
Timeout time.Duration
TLSDisabled bool
TenantID string
}

@ -0,0 +1,30 @@
package lokigrpc
import (
"context"
"errors"
"google.golang.org/grpc/metadata"
)
const (
lowerOrgIDHeaderName = "x-scope-orgid"
)
var (
ErrDifferentOrgIDPresent = errors.New("different org ID already present")
ErrTooManyOrgIDs = errors.New("multiple org IDs present")
)
func injectOrgID(ctx context.Context, tenantID string) context.Context {
md, ok := metadata.FromOutgoingContext(ctx)
if ok {
md = md.Copy()
} else {
md = metadata.New(map[string]string{})
}
md[lowerOrgIDHeaderName] = []string{tenantID}
newCtx := metadata.NewOutgoingContext(ctx, md)
return newCtx
}

@ -15,6 +15,7 @@ import (
"github.com/grafana/dskit/backoff"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/config"
"github.com/prometheus/common/model"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/setting"
@ -231,7 +232,7 @@ func (c *client) run() {
if !ok {
return
}
tenantID := ""
tenantID := c.getTenantID(e.Labels)
batch, ok := batches[tenantID]
// If the batch doesn't exist yet, we create a new one with the entry
@ -266,6 +267,22 @@ func (c *client) run() {
}
}
func (c *client) getTenantID(labels model.LabelSet) string {
// Check if it has been overridden while processing the pipeline stages
if value, ok := labels[ReservedLabelTenantID]; ok {
return string(value)
}
// Check if has been specified in the config
if c.cfg.TenantID != "" {
return c.cfg.TenantID
}
// Defaults to an empty string, which means the X-Scope-OrgID header
// will not be sent
return ""
}
func (c *client) Chan() chan<- Entry {
return c.entries
}

@ -18,4 +18,6 @@ type Config struct {
BackoffConfig backoff.Config
Timeout time.Duration
TenantID string
}

Loading…
Cancel
Save