Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/storage/chunk/client/object_client.go

205 lines
6.4 KiB

package client
import (
"bytes"
"context"
"encoding/base64"
"io"
"strings"
"time"
"github.com/pkg/errors"
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/chunk/client/util"
"github.com/grafana/loki/pkg/storage/config"
)
// ObjectClient is used to store arbitrary data in Object Store (S3/GCS/Azure/...)
type ObjectClient interface {
PutObject(ctx context.Context, objectKey string, object io.ReadSeeker) error
// NOTE: The consumer of GetObject should always call the Close method when it is done reading which otherwise could cause a resource leak.
GetObject(ctx context.Context, objectKey string) (io.ReadCloser, int64, error)
// List objects with given prefix.
//
// If delimiter is empty, all objects are returned, even if they are in nested in "subdirectories".
// If delimiter is not empty, it is used to compute common prefixes ("subdirectories"),
// and objects containing delimiter in the name will not be returned in the result.
//
// For example, if the prefix is "notes/" and the delimiter is a slash (/) as in "notes/summer/july", the common prefix is "notes/summer/".
// Common prefixes will always end with passed delimiter.
//
// Keys of returned storage objects have given prefix.
List(ctx context.Context, prefix string, delimiter string) ([]StorageObject, []StorageCommonPrefix, error)
DeleteObject(ctx context.Context, objectKey string) error
IsObjectNotFoundErr(err error) bool
Stop()
}
// StorageObject represents an object being stored in an Object Store
type StorageObject struct {
Key string
ModifiedAt time.Time
}
// StorageCommonPrefix represents a common prefix aka a synthetic directory in Object Store.
// It is guaranteed to always end with delimiter passed to List method.
type StorageCommonPrefix string
// KeyEncoder is used to encode chunk keys before writing/retrieving chunks
// from the underlying ObjectClient
// Schema/Chunk are passed as arguments to allow this to improve over revisions
type KeyEncoder func(schema config.SchemaConfig, chk chunk.Chunk) string
// base64Encoder is used to encode chunk keys in base64 before storing/retrieving
// them from the ObjectClient
var base64Encoder = func(key string) string {
return base64.StdEncoding.EncodeToString([]byte(key))
}
var FSEncoder = func(schema config.SchemaConfig, chk chunk.Chunk) string {
// Filesystem encoder pre-v12 encodes the chunk as one base64 string.
// This has the downside of making them opaque and storing all chunks in a single
// directory, hurting performance at scale and discoverability.
// Post v12, we respect the directory structure imposed by chunk keys.
key := schema.ExternalKey(chk.ChunkRef)
if schema.VersionForChunk(chk.ChunkRef) > 11 {
split := strings.LastIndexByte(key, '/')
encodedTail := base64Encoder(key[split+1:])
return strings.Join([]string{key[:split], encodedTail}, "/")
}
return base64Encoder(key)
}
const defaultMaxParallel = 150
// client is used to store chunks in object store backends
type client struct {
store ObjectClient
keyEncoder KeyEncoder
getChunkMaxParallel int
schema config.SchemaConfig
}
// NewClient wraps the provided ObjectClient with a chunk.Client implementation
func NewClient(store ObjectClient, encoder KeyEncoder, schema config.SchemaConfig) Client {
return NewClientWithMaxParallel(store, encoder, defaultMaxParallel, schema)
}
func NewClientWithMaxParallel(store ObjectClient, encoder KeyEncoder, maxParallel int, schema config.SchemaConfig) Client {
return &client{
store: store,
keyEncoder: encoder,
getChunkMaxParallel: maxParallel,
schema: schema,
}
}
// Stop shuts down the object store and any underlying clients
func (o *client) Stop() {
o.store.Stop()
}
// PutChunks stores the provided chunks in the configured backend. If multiple errors are
// returned, the last one sequentially will be propagated up.
func (o *client) PutChunks(ctx context.Context, chunks []chunk.Chunk) error {
var (
chunkKeys []string
chunkBufs [][]byte
)
for i := range chunks {
buf, err := chunks[i].Encoded()
if err != nil {
return err
}
var key string
if o.keyEncoder != nil {
key = o.keyEncoder(o.schema, chunks[i])
} else {
key = o.schema.ExternalKey(chunks[i].ChunkRef)
}
chunkKeys = append(chunkKeys, key)
chunkBufs = append(chunkBufs, buf)
}
incomingErrors := make(chan error)
for i := range chunkBufs {
go func(i int) {
incomingErrors <- o.store.PutObject(ctx, chunkKeys[i], bytes.NewReader(chunkBufs[i]))
}(i)
}
var lastErr error
for range chunkKeys {
err := <-incomingErrors
if err != nil {
lastErr = err
}
}
return lastErr
}
// GetChunks retrieves the specified chunks from the configured backend
func (o *client) GetChunks(ctx context.Context, chunks []chunk.Chunk) ([]chunk.Chunk, error) {
getChunkMaxParallel := o.getChunkMaxParallel
if getChunkMaxParallel == 0 {
getChunkMaxParallel = defaultMaxParallel
}
return util.GetParallelChunks(ctx, getChunkMaxParallel, chunks, o.getChunk)
}
func (o *client) getChunk(ctx context.Context, decodeContext *chunk.DecodeContext, c chunk.Chunk) (chunk.Chunk, error) {
if ctx.Err() != nil {
return chunk.Chunk{}, ctx.Err()
}
key := o.schema.ExternalKey(c.ChunkRef)
if o.keyEncoder != nil {
key = o.keyEncoder(o.schema, c)
}
readCloser, size, err := o.store.GetObject(ctx, key)
if err != nil {
return chunk.Chunk{}, errors.WithStack(errors.Wrapf(err, "failed to load chunk '%s'", key))
}
if readCloser == nil {
return chunk.Chunk{}, errors.New("object client getChunk fail because object is nil")
}
defer readCloser.Close()
// adds bytes.MinRead to avoid allocations when the size is known.
// This is because ReadFrom reads bytes.MinRead by bytes.MinRead.
buf := bytes.NewBuffer(make([]byte, 0, size+bytes.MinRead))
_, err = buf.ReadFrom(readCloser)
if err != nil {
return chunk.Chunk{}, errors.WithStack(err)
}
if err := c.Decode(decodeContext, buf.Bytes()); err != nil {
return chunk.Chunk{}, errors.WithStack(err)
}
return c, nil
}
// GetChunks retrieves the specified chunks from the configured backend
func (o *client) DeleteChunk(ctx context.Context, userID, chunkID string) error {
key := chunkID
if o.keyEncoder != nil {
c, err := chunk.ParseExternalKey(userID, key)
if err != nil {
return err
}
key = o.keyEncoder(o.schema, c)
}
return o.store.DeleteObject(ctx, key)
}
func (o *client) IsChunkNotFoundErr(err error) bool {
return o.store.IsObjectNotFoundErr(err)
}