mirror of https://github.com/grafana/loki
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
203 lines
6.3 KiB
203 lines
6.3 KiB
|
4 years ago
|
package client
|
||
|
4 years ago
|
|
||
|
|
import (
|
||
|
|
"bytes"
|
||
|
|
"context"
|
||
|
|
"encoding/base64"
|
||
|
4 years ago
|
"io"
|
||
|
4 years ago
|
"strings"
|
||
|
4 years ago
|
"time"
|
||
|
4 years ago
|
|
||
|
|
"github.com/pkg/errors"
|
||
|
|
|
||
|
|
"github.com/grafana/loki/pkg/storage/chunk"
|
||
|
4 years ago
|
"github.com/grafana/loki/pkg/storage/chunk/client/util"
|
||
|
|
"github.com/grafana/loki/pkg/storage/config"
|
||
|
4 years ago
|
)
|
||
|
|
|
||
|
4 years ago
|
// ObjectClient is used to store arbitrary data in Object Store (S3/GCS/Azure/...)
|
||
|
|
type ObjectClient interface {
|
||
|
|
PutObject(ctx context.Context, objectKey string, object io.ReadSeeker) error
|
||
|
|
// NOTE: The consumer of GetObject should always call the Close method when it is done reading which otherwise could cause a resource leak.
|
||
|
|
GetObject(ctx context.Context, objectKey string) (io.ReadCloser, int64, error)
|
||
|
|
|
||
|
|
// List objects with given prefix.
|
||
|
|
//
|
||
|
|
// If delimiter is empty, all objects are returned, even if they are in nested in "subdirectories".
|
||
|
|
// If delimiter is not empty, it is used to compute common prefixes ("subdirectories"),
|
||
|
|
// and objects containing delimiter in the name will not be returned in the result.
|
||
|
|
//
|
||
|
|
// For example, if the prefix is "notes/" and the delimiter is a slash (/) as in "notes/summer/july", the common prefix is "notes/summer/".
|
||
|
|
// Common prefixes will always end with passed delimiter.
|
||
|
|
//
|
||
|
|
// Keys of returned storage objects have given prefix.
|
||
|
|
List(ctx context.Context, prefix string, delimiter string) ([]StorageObject, []StorageCommonPrefix, error)
|
||
|
|
DeleteObject(ctx context.Context, objectKey string) error
|
||
|
|
IsObjectNotFoundErr(err error) bool
|
||
|
|
Stop()
|
||
|
|
}
|
||
|
|
|
||
|
|
// StorageObject represents an object being stored in an Object Store
|
||
|
|
type StorageObject struct {
|
||
|
|
Key string
|
||
|
|
ModifiedAt time.Time
|
||
|
|
}
|
||
|
|
|
||
|
|
// StorageCommonPrefix represents a common prefix aka a synthetic directory in Object Store.
|
||
|
|
// It is guaranteed to always end with delimiter passed to List method.
|
||
|
|
type StorageCommonPrefix string
|
||
|
|
|
||
|
4 years ago
|
// KeyEncoder is used to encode chunk keys before writing/retrieving chunks
|
||
|
|
// from the underlying ObjectClient
|
||
|
4 years ago
|
// Schema/Chunk are passed as arguments to allow this to improve over revisions
|
||
|
4 years ago
|
type KeyEncoder func(schema config.SchemaConfig, chk chunk.Chunk) string
|
||
|
4 years ago
|
|
||
|
4 years ago
|
// base64Encoder is used to encode chunk keys in base64 before storing/retrieving
|
||
|
4 years ago
|
// them from the ObjectClient
|
||
|
4 years ago
|
var base64Encoder = func(key string) string {
|
||
|
4 years ago
|
return base64.StdEncoding.EncodeToString([]byte(key))
|
||
|
|
}
|
||
|
|
|
||
|
4 years ago
|
var FSEncoder = func(schema config.SchemaConfig, chk chunk.Chunk) string {
|
||
|
4 years ago
|
// Filesystem encoder pre-v12 encodes the chunk as one base64 string.
|
||
|
|
// This has the downside of making them opaque and storing all chunks in a single
|
||
|
|
// directory, hurting performance at scale and discoverability.
|
||
|
|
// Post v12, we respect the directory structure imposed by chunk keys.
|
||
|
4 years ago
|
key := schema.ExternalKey(chk.ChunkRef)
|
||
|
|
if schema.VersionForChunk(chk.ChunkRef) > 11 {
|
||
|
4 years ago
|
split := strings.LastIndexByte(key, '/')
|
||
|
|
encodedTail := base64Encoder(key[split+1:])
|
||
|
|
return strings.Join([]string{key[:split], encodedTail}, "/")
|
||
|
|
|
||
|
|
}
|
||
|
|
return base64Encoder(key)
|
||
|
|
}
|
||
|
|
|
||
|
4 years ago
|
const defaultMaxParallel = 150
|
||
|
|
|
||
|
4 years ago
|
// client is used to store chunks in object store backends
|
||
|
|
type client struct {
|
||
|
|
store ObjectClient
|
||
|
4 years ago
|
keyEncoder KeyEncoder
|
||
|
|
getChunkMaxParallel int
|
||
|
4 years ago
|
schema config.SchemaConfig
|
||
|
4 years ago
|
}
|
||
|
|
|
||
|
|
// NewClient wraps the provided ObjectClient with a chunk.Client implementation
|
||
|
4 years ago
|
func NewClient(store ObjectClient, encoder KeyEncoder, schema config.SchemaConfig) Client {
|
||
|
4 years ago
|
return NewClientWithMaxParallel(store, encoder, defaultMaxParallel, schema)
|
||
|
4 years ago
|
}
|
||
|
|
|
||
|
4 years ago
|
func NewClientWithMaxParallel(store ObjectClient, encoder KeyEncoder, maxParallel int, schema config.SchemaConfig) Client {
|
||
|
|
return &client{
|
||
|
4 years ago
|
store: store,
|
||
|
|
keyEncoder: encoder,
|
||
|
|
getChunkMaxParallel: maxParallel,
|
||
|
4 years ago
|
schema: schema,
|
||
|
4 years ago
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// Stop shuts down the object store and any underlying clients
|
||
|
4 years ago
|
func (o *client) Stop() {
|
||
|
4 years ago
|
o.store.Stop()
|
||
|
|
}
|
||
|
|
|
||
|
|
// PutChunks stores the provided chunks in the configured backend. If multiple errors are
|
||
|
|
// returned, the last one sequentially will be propagated up.
|
||
|
4 years ago
|
func (o *client) PutChunks(ctx context.Context, chunks []chunk.Chunk) error {
|
||
|
4 years ago
|
var (
|
||
|
|
chunkKeys []string
|
||
|
|
chunkBufs [][]byte
|
||
|
|
)
|
||
|
|
|
||
|
|
for i := range chunks {
|
||
|
|
buf, err := chunks[i].Encoded()
|
||
|
|
if err != nil {
|
||
|
|
return err
|
||
|
|
}
|
||
|
4 years ago
|
|
||
|
4 years ago
|
var key string
|
||
|
4 years ago
|
if o.keyEncoder != nil {
|
||
|
4 years ago
|
key = o.keyEncoder(o.schema, chunks[i])
|
||
|
|
} else {
|
||
|
4 years ago
|
key = o.schema.ExternalKey(chunks[i].ChunkRef)
|
||
|
4 years ago
|
}
|
||
|
|
|
||
|
|
chunkKeys = append(chunkKeys, key)
|
||
|
|
chunkBufs = append(chunkBufs, buf)
|
||
|
|
}
|
||
|
|
|
||
|
|
incomingErrors := make(chan error)
|
||
|
|
for i := range chunkBufs {
|
||
|
|
go func(i int) {
|
||
|
|
incomingErrors <- o.store.PutObject(ctx, chunkKeys[i], bytes.NewReader(chunkBufs[i]))
|
||
|
|
}(i)
|
||
|
|
}
|
||
|
|
|
||
|
|
var lastErr error
|
||
|
|
for range chunkKeys {
|
||
|
|
err := <-incomingErrors
|
||
|
|
if err != nil {
|
||
|
|
lastErr = err
|
||
|
|
}
|
||
|
|
}
|
||
|
|
return lastErr
|
||
|
|
}
|
||
|
|
|
||
|
|
// GetChunks retrieves the specified chunks from the configured backend
|
||
|
4 years ago
|
func (o *client) GetChunks(ctx context.Context, chunks []chunk.Chunk) ([]chunk.Chunk, error) {
|
||
|
4 years ago
|
getChunkMaxParallel := o.getChunkMaxParallel
|
||
|
|
if getChunkMaxParallel == 0 {
|
||
|
|
getChunkMaxParallel = defaultMaxParallel
|
||
|
|
}
|
||
|
|
return util.GetParallelChunks(ctx, getChunkMaxParallel, chunks, o.getChunk)
|
||
|
4 years ago
|
}
|
||
|
|
|
||
|
4 years ago
|
func (o *client) getChunk(ctx context.Context, decodeContext *chunk.DecodeContext, c chunk.Chunk) (chunk.Chunk, error) {
|
||
|
4 years ago
|
if ctx.Err() != nil {
|
||
|
|
return chunk.Chunk{}, ctx.Err()
|
||
|
|
}
|
||
|
|
|
||
|
4 years ago
|
key := o.schema.ExternalKey(c.ChunkRef)
|
||
|
4 years ago
|
if o.keyEncoder != nil {
|
||
|
4 years ago
|
key = o.keyEncoder(o.schema, c)
|
||
|
4 years ago
|
}
|
||
|
|
|
||
|
4 years ago
|
readCloser, size, err := o.store.GetObject(ctx, key)
|
||
|
4 years ago
|
if err != nil {
|
||
|
|
return chunk.Chunk{}, errors.WithStack(err)
|
||
|
|
}
|
||
|
|
|
||
|
|
defer readCloser.Close()
|
||
|
|
|
||
|
4 years ago
|
// adds bytes.MinRead to avoid allocations when the size is known.
|
||
|
|
// This is because ReadFrom reads bytes.MinRead by bytes.MinRead.
|
||
|
|
buf := bytes.NewBuffer(make([]byte, 0, size+bytes.MinRead))
|
||
|
|
_, err = buf.ReadFrom(readCloser)
|
||
|
4 years ago
|
if err != nil {
|
||
|
|
return chunk.Chunk{}, errors.WithStack(err)
|
||
|
|
}
|
||
|
|
|
||
|
4 years ago
|
if err := c.Decode(decodeContext, buf.Bytes()); err != nil {
|
||
|
4 years ago
|
return chunk.Chunk{}, errors.WithStack(err)
|
||
|
|
}
|
||
|
|
return c, nil
|
||
|
|
}
|
||
|
|
|
||
|
|
// GetChunks retrieves the specified chunks from the configured backend
|
||
|
4 years ago
|
func (o *client) DeleteChunk(ctx context.Context, userID, chunkID string) error {
|
||
|
4 years ago
|
key := chunkID
|
||
|
|
if o.keyEncoder != nil {
|
||
|
4 years ago
|
c, err := chunk.ParseExternalKey(userID, key)
|
||
|
|
if err != nil {
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
key = o.keyEncoder(o.schema, c)
|
||
|
4 years ago
|
}
|
||
|
|
return o.store.DeleteObject(ctx, key)
|
||
|
|
}
|
||
|
4 years ago
|
|
||
|
4 years ago
|
func (o *client) IsChunkNotFoundErr(err error) bool {
|
||
|
4 years ago
|
return o.store.IsObjectNotFoundErr(err)
|
||
|
|
}
|