chore: Refactor storage interface for rf1 (#13415)

pull/13423/head
Cyril Tovena 2 years ago committed by GitHub
parent 7fc926e36e
commit 0076bbdb42
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 50
      pkg/ingester-rf1/flush.go
  2. 142
      pkg/ingester-rf1/ingester.go
  3. 15
      pkg/ingester-rf1/instance.go
  4. 132
      pkg/ingester-rf1/objstore/storage.go
  5. 38
      pkg/ingester-rf1/stream.go
  6. 5
      pkg/ingester/flush_test.go
  7. 5
      pkg/ingester/ingester_test.go
  8. 2
      pkg/loki/modules.go
  9. 4
      pkg/querier/querier_mock_test.go
  10. 4
      pkg/storage/chunk/client/alibaba/oss_object_client.go
  11. 5
      pkg/storage/chunk/client/aws/dynamodb_storage_client.go
  12. 14
      pkg/storage/chunk/client/aws/s3_storage_client.go
  13. 3
      pkg/storage/chunk/client/azure/blob_storage_client.go
  14. 2
      pkg/storage/chunk/client/baidubce/bos_storage_client.go
  15. 6
      pkg/storage/chunk/client/cassandra/storage_client.go
  16. 2
      pkg/storage/chunk/client/client.go
  17. 10
      pkg/storage/chunk/client/congestion/controller.go
  18. 2
      pkg/storage/chunk/client/congestion/controller_test.go
  19. 5
      pkg/storage/chunk/client/gcp/bigtable_object_client.go
  20. 2
      pkg/storage/chunk/client/gcp/gcs_object_client.go
  21. 5
      pkg/storage/chunk/client/grpc/storage_client.go
  22. 13
      pkg/storage/chunk/client/ibmcloud/cos_object_client.go
  23. 2
      pkg/storage/chunk/client/local/fs_object_client.go
  24. 5
      pkg/storage/chunk/client/metrics.go
  25. 18
      pkg/storage/chunk/client/object_client.go
  26. 2
      pkg/storage/chunk/client/openstack/swift_object_client.go
  27. 2
      pkg/storage/chunk/client/prefixed_object_client.go
  28. 2
      pkg/storage/chunk/client/testutils/inmemory_storage_client.go
  29. 17
      pkg/storage/chunk/client/util/reader.go
  30. 5
      pkg/storage/store.go
  31. 17
      pkg/storage/stores/composite_store.go
  32. 6
      pkg/storage/stores/composite_store_test.go
  33. 5
      pkg/storage/stores/series_store_write.go
  34. 9
      pkg/storage/stores/series_store_write_test.go
  35. 5
      pkg/storage/util_test.go
  36. 57
      pkg/storage/wal/segment.go
  37. 8
      pkg/storage/wal/segment_test.go

@ -1,6 +1,7 @@
package ingesterrf1
import (
"crypto/rand"
"fmt"
"net/http"
"time"
@ -9,14 +10,14 @@ import (
"github.com/go-kit/log/level"
"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/ring"
"github.com/prometheus/client_golang/prometheus"
"github.com/grafana/dskit/runutil"
"github.com/oklog/ulid"
"github.com/prometheus/common/model"
"golang.org/x/net/context"
"github.com/grafana/loki/v3/pkg/chunkenc"
"github.com/grafana/loki/v3/pkg/storage/chunk"
"github.com/grafana/loki/v3/pkg/storage/wal"
"github.com/grafana/loki/v3/pkg/util"
util_log "github.com/grafana/loki/v3/pkg/util/log"
)
const (
@ -140,7 +141,12 @@ func (i *Ingester) flushOp(l log.Logger, flushCtx *flushCtx) error {
// If the flush isn't successful, the operation for this userID is requeued allowing this and all other unflushed
// segments to have another opportunity to be flushed.
func (i *Ingester) flushSegment(ctx context.Context, ch *wal.SegmentWriter) error {
if err := i.store.PutWal(ctx, ch); err != nil {
reader := ch.Reader()
defer runutil.CloseWithLogOnErr(util_log.Logger, reader, "flushSegment")
newUlid := ulid.MustNew(ulid.Timestamp(time.Now()), rand.Reader)
if err := i.store.PutObject(ctx, fmt.Sprintf("loki-v2/wal/anon/"+newUlid.String()), reader); err != nil {
i.metrics.chunksFlushFailures.Inc()
return fmt.Errorf("store put chunk: %w", err)
}
@ -148,39 +154,3 @@ func (i *Ingester) flushSegment(ctx context.Context, ch *wal.SegmentWriter) erro
// TODO: report some flush metrics
return nil
}
// reportFlushedChunkStatistics calculate overall statistics of flushed chunks without compromising the flush process.
func (i *Ingester) reportFlushedChunkStatistics(ch *chunk.Chunk, desc *chunkDesc, sizePerTenant prometheus.Counter, countPerTenant prometheus.Counter, reason string) {
byt, err := ch.Encoded()
if err != nil {
level.Error(i.logger).Log("msg", "failed to encode flushed wire chunk", "err", err)
return
}
i.metrics.chunksFlushedPerReason.WithLabelValues(reason).Add(1)
compressedSize := float64(len(byt))
uncompressedSize, ok := chunkenc.UncompressedSize(ch.Data)
if ok && compressedSize > 0 {
i.metrics.chunkCompressionRatio.Observe(float64(uncompressedSize) / compressedSize)
}
utilization := ch.Data.Utilization()
i.metrics.chunkUtilization.Observe(utilization)
numEntries := desc.chunk.Size()
i.metrics.chunkEntries.Observe(float64(numEntries))
i.metrics.chunkSize.Observe(compressedSize)
sizePerTenant.Add(compressedSize)
countPerTenant.Inc()
boundsFrom, boundsTo := desc.chunk.Bounds()
i.metrics.chunkAge.Observe(time.Since(boundsFrom).Seconds())
i.metrics.chunkLifespan.Observe(boundsTo.Sub(boundsFrom).Hours())
i.metrics.flushedChunksBytesStats.Record(compressedSize)
i.metrics.flushedChunksLinesStats.Record(float64(numEntries))
i.metrics.flushedChunksUtilizationStats.Record(utilization)
i.metrics.flushedChunksAgeStats.Record(time.Since(boundsFrom).Seconds())
i.metrics.flushedChunksLifespanStats.Record(boundsTo.Sub(boundsFrom).Seconds())
}

@ -4,6 +4,7 @@ import (
"context"
"flag"
"fmt"
"io"
"math/rand"
"net/http"
"os"
@ -16,9 +17,10 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/grafana/loki/v3/pkg/ingester-rf1/clientpool"
"github.com/grafana/loki/v3/pkg/ingester-rf1/objstore"
"github.com/grafana/loki/v3/pkg/ingester/index"
"github.com/grafana/loki/v3/pkg/loghttp/push"
lokilog "github.com/grafana/loki/v3/pkg/logql/log"
"github.com/grafana/loki/v3/pkg/storage"
"github.com/grafana/loki/v3/pkg/storage/types"
"github.com/grafana/loki/v3/pkg/storage/wal"
util_log "github.com/grafana/loki/v3/pkg/util/log"
@ -35,20 +37,13 @@ import (
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc/health/grpc_health_v1"
server_util "github.com/grafana/loki/v3/pkg/util/server"
"github.com/grafana/loki/v3/pkg/analytics"
"github.com/grafana/loki/v3/pkg/chunkenc"
"github.com/grafana/loki/v3/pkg/distributor/writefailures"
"github.com/grafana/loki/v3/pkg/ingester/client"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/runtime"
"github.com/grafana/loki/v3/pkg/storage"
"github.com/grafana/loki/v3/pkg/storage/chunk"
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/stores"
indexstore "github.com/grafana/loki/v3/pkg/storage/stores/index"
"github.com/grafana/loki/v3/pkg/util"
)
@ -121,7 +116,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.FlushOpBackoff.MaxRetries, "ingester-rf1.flush-op-backoff-retries", 10, "Maximum retries for failed flushes.")
f.DurationVar(&cfg.FlushOpTimeout, "ingester-rf1.flush-op-timeout", 10*time.Minute, "The timeout for an individual flush. Will be retried up to `flush-op-backoff-retries` times.")
f.DurationVar(&cfg.RetainPeriod, "ingester-rf1.chunks-retain-period", 0, "How long chunks should be retained in-memory after they've been flushed.")
//f.DurationVar(&cfg.MaxChunkIdle, "ingester-rf1.chunks-idle-period", 30*time.Minute, "How long chunks should sit in-memory with no updates before being flushed if they don't hit the max block size. This means that half-empty chunks will still be flushed after a certain period as long as they receive no further activity.")
// f.DurationVar(&cfg.MaxChunkIdle, "ingester-rf1.chunks-idle-period", 30*time.Minute, "How long chunks should sit in-memory with no updates before being flushed if they don't hit the max block size. This means that half-empty chunks will still be flushed after a certain period as long as they receive no further activity.")
f.IntVar(&cfg.BlockSize, "ingester-rf1.chunks-block-size", 256*1024, "The targeted _uncompressed_ size in bytes of a chunk block When this threshold is exceeded the head block will be cut and compressed inside the chunk.")
f.IntVar(&cfg.TargetChunkSize, "ingester-rf1.chunk-target-size", 1572864, "A target _compressed_ size in bytes for chunks. This is a desired size not an exact size, chunks may be slightly bigger or significantly smaller if they get flushed for other reasons (e.g. chunk_idle_period). A value of 0 creates chunks with a fixed 10 blocks, a non zero value will create chunks with a variable number of blocks to meet the target size.") // 1.5 MB
f.StringVar(&cfg.ChunkEncoding, "ingester-rf1.chunk-encoding", chunkenc.EncGZIP.String(), fmt.Sprintf("The algorithm to use for compressing chunk. (%s)", chunkenc.SupportedEncoding()))
@ -159,13 +154,10 @@ type Wrapper interface {
Wrap(wrapped Interface) Interface
}
// Store is the store interface we need on the ingester.
type Store interface {
stores.ChunkWriter
stores.ChunkFetcher
storage.SelectStore
storage.SchemaConfigProvider
indexstore.StatsReader
// Storage is the store interface we need on the ingester.
type Storage interface {
PutObject(ctx context.Context, objectKey string, object io.Reader) error
Stop()
}
// Interface is an interface for the Ingester
@ -174,8 +166,6 @@ type Interface interface {
http.Handler
logproto.PusherServer
//logproto.QuerierServer
//logproto.StreamDataServer
CheckReady(ctx context.Context) error
FlushHandler(w http.ResponseWriter, _ *http.Request)
@ -218,7 +208,7 @@ type Ingester struct {
lifecycler *ring.Lifecycler
lifecyclerWatcher *services.FailureWatcher
store Store
store Storage
periodicConfigs []config.PeriodConfig
loopDone sync.WaitGroup
@ -240,14 +230,10 @@ type Ingester struct {
terminateOnShutdown bool
// Only used by WAL & flusher to coordinate backpressure during replay.
//replayController *replayController
// replayController *replayController
metrics *ingesterMetrics
chunkFilter chunk.RequestChunkFilterer
extractorWrapper lokilog.SampleExtractorWrapper
pipelineWrapper lokilog.PipelineWrapper
streamRateCalculator *StreamRateCalculator
writeLogManager *writefailures.Manager
@ -256,11 +242,25 @@ type Ingester struct {
// recalculateOwnedStreams periodically checks the ring for changes and recalculates owned streams for each instance.
readRing ring.ReadRing
//recalculateOwnedStreams *recalculateOwnedStreams
// recalculateOwnedStreams *recalculateOwnedStreams
}
// New makes a new Ingester.
func New(cfg Config, clientConfig client.Config, store Store, limits Limits, configs *runtime.TenantConfigs, registerer prometheus.Registerer, writeFailuresCfg writefailures.Cfg, metricsNamespace string, logger log.Logger, customStreamsTracker push.UsageTracker, readRing ring.ReadRing) (*Ingester, error) {
func New(cfg Config, clientConfig client.Config,
periodConfigs []config.PeriodConfig,
storageConfig storage.Config,
clientMetrics storage.ClientMetrics,
limits Limits, configs *runtime.TenantConfigs,
registerer prometheus.Registerer,
writeFailuresCfg writefailures.Cfg,
metricsNamespace string,
logger log.Logger,
customStreamsTracker push.UsageTracker, readRing ring.ReadRing,
) (*Ingester, error) {
storage, err := objstore.New(periodConfigs, storageConfig, clientMetrics)
if err != nil {
return nil, err
}
if cfg.ingesterClientFactory == nil {
cfg.ingesterClientFactory = client.New
}
@ -279,13 +279,13 @@ func New(cfg Config, clientConfig client.Config, store Store, limits Limits, con
clientConfig: clientConfig,
tenantConfigs: configs,
instances: map[string]*instance{},
store: store,
periodicConfigs: store.GetSchemaConfigs(),
store: storage,
periodicConfigs: periodConfigs,
loopQuit: make(chan struct{}),
flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes),
tailersQuit: make(chan struct{}),
metrics: metrics,
//flushOnShutdownSwitch: &OnceSwitch{},
// flushOnShutdownSwitch: &OnceSwitch{},
terminateOnShutdown: false,
streamRateCalculator: NewStreamRateCalculator(),
writeLogManager: writefailures.NewManager(logger, registerer, writeFailuresCfg, configs, "ingester_rf1"),
@ -298,7 +298,6 @@ func New(cfg Config, clientConfig client.Config, store Store, limits Limits, con
segmentWriter: segmentWriter,
},
}
//i.replayController = newReplayController(metrics, cfg.WAL, &replayFlusher{i})
// TODO: change flush on shutdown
i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester-rf1", "ingester-rf1-ring", true, logger, prometheus.WrapRegistererWithPrefix(metricsNamespace+"_", registerer))
@ -334,18 +333,6 @@ func New(cfg Config, clientConfig client.Config, store Store, limits Limits, con
return i, nil
}
func (i *Ingester) SetChunkFilterer(chunkFilter chunk.RequestChunkFilterer) {
i.chunkFilter = chunkFilter
}
func (i *Ingester) SetExtractorWrapper(wrapper lokilog.SampleExtractorWrapper) {
i.extractorWrapper = wrapper
}
func (i *Ingester) SetPipelineWrapper(wrapper lokilog.PipelineWrapper) {
i.pipelineWrapper = wrapper
}
// setupAutoForget looks for ring status if `AutoForgetUnhealthy` is enabled
// when enabled, unhealthy ingesters that reach `ring.kvstore.heartbeat_timeout` are removed from the ring every `HeartbeatPeriod`
func (i *Ingester) setupAutoForget() {
@ -490,7 +477,7 @@ func (i *Ingester) running(ctx context.Context) error {
func (i *Ingester) stopping(_ error) error {
i.stopIncomingRequests()
var errs util.MultiError
//errs.Add(i.wal.Stop())
// errs.Add(i.wal.Stop())
//if i.flushOnShutdownSwitch.Get() {
// i.lifecycler.SetFlushOnShutdown(true)
@ -502,7 +489,7 @@ func (i *Ingester) stopping(_ error) error {
}
i.flushQueuesDone.Wait()
//i.streamRateCalculator.Stop()
// i.streamRateCalculator.Stop()
// In case the flag to terminate on shutdown is set or this instance is marked to release its resources,
// we need to mark the ingester service as "failed", so Loki will shut down entirely.
@ -511,6 +498,7 @@ func (i *Ingester) stopping(_ error) error {
i.removeShutdownMarkerFile()
return modules.ErrStopProcess
}
i.store.Stop()
return errs.Err()
}
@ -581,7 +569,7 @@ func (i *Ingester) loop() {
func (i *Ingester) doFlushTick() {
i.flushCtx.lock.Lock()
//i.logger.Log("msg", "starting periodic flush")
// i.logger.Log("msg", "starting periodic flush")
// Stop new chunks being written while we swap destinations - we'll never unlock as this flushctx can no longer be used.
currentFlushCtx := i.flushCtx
@ -708,7 +696,7 @@ func createShutdownMarker(p string) error {
return err
}
dir, err := os.OpenFile(path.Dir(p), os.O_RDONLY, 0777)
dir, err := os.OpenFile(path.Dir(p), os.O_RDONLY, 0o777)
if err != nil {
return err
}
@ -725,7 +713,7 @@ func removeShutdownMarker(p string) error {
return err
}
dir, err := os.OpenFile(path.Dir(p), os.O_RDONLY, 0777)
dir, err := os.OpenFile(path.Dir(p), os.O_RDONLY, 0o777)
if err != nil {
return err
}
@ -811,7 +799,7 @@ func (i *Ingester) Push(ctx context.Context, req *logproto.PushRequest) (*logpro
// Fetch a flush context and try to acquire the RLock
// The only time the Write Lock is held is when this context is no longer usable and a new one is being created.
// In this case, we need to re-read i.flushCtx in order to fetch the new one as soon as it's available.
//The newCtxAvailable chan is closed as soon as the new one is available to avoid a busy loop.
// The newCtxAvailable chan is closed as soon as the new one is available to avoid a busy loop.
currentFlushCtx := i.flushCtx
for !currentFlushCtx.lock.TryRLock() {
select {
@ -863,7 +851,7 @@ func (i *Ingester) GetOrCreateInstance(instanceID string) (*instance, error) { /
inst, ok = i.instances[instanceID]
if !ok {
var err error
inst, err = newInstance(&i.cfg, i.periodicConfigs, instanceID, i.limiter, i.tenantConfigs, i.metrics, i.chunkFilter, i.pipelineWrapper, i.extractorWrapper, i.streamRateCalculator, i.writeLogManager, i.customStreamsTracker)
inst, err = newInstance(&i.cfg, i.periodicConfigs, instanceID, i.limiter, i.tenantConfigs, i.metrics, i.streamRateCalculator, i.writeLogManager, i.customStreamsTracker)
if err != nil {
return nil, err
}
@ -894,62 +882,6 @@ func (i *Ingester) asyncStoreMaxLookBack() time.Duration {
return maxLookBack
}
// GetChunkIDs is meant to be used only when using an async store like boltdb-shipper or tsdb.
func (i *Ingester) GetChunkIDs(ctx context.Context, req *logproto.GetChunkIDsRequest) (*logproto.GetChunkIDsResponse, error) {
gcr, err := i.getChunkIDs(ctx, req)
err = server_util.ClientGrpcStatusAndError(err)
return gcr, err
}
// GetChunkIDs is meant to be used only when using an async store like boltdb-shipper or tsdb.
func (i *Ingester) getChunkIDs(ctx context.Context, req *logproto.GetChunkIDsRequest) (*logproto.GetChunkIDsResponse, error) {
orgID, err := tenant.TenantID(ctx)
if err != nil {
return nil, err
}
// Set profiling tags
defer pprof.SetGoroutineLabels(ctx)
ctx = pprof.WithLabels(ctx, pprof.Labels("path", "read", "type", "chunkIDs", "tenant", orgID))
pprof.SetGoroutineLabels(ctx)
asyncStoreMaxLookBack := i.asyncStoreMaxLookBack()
if asyncStoreMaxLookBack == 0 {
return &logproto.GetChunkIDsResponse{}, nil
}
reqStart := req.Start
reqStart = adjustQueryStartTime(asyncStoreMaxLookBack, reqStart, time.Now())
// parse the request
start, end := util.RoundToMilliseconds(reqStart, req.End)
matchers, err := syntax.ParseMatchers(req.Matchers, true)
if err != nil {
return nil, err
}
// get chunk references
chunksGroups, _, err := i.store.GetChunks(ctx, orgID, start, end, chunk.NewPredicate(matchers, nil), nil)
if err != nil {
return nil, err
}
// todo (Callum) ingester should maybe store the whole schema config?
s := config.SchemaConfig{
Configs: i.periodicConfigs,
}
// build the response
resp := logproto.GetChunkIDsResponse{ChunkIDs: []string{}}
for _, chunks := range chunksGroups {
for _, chk := range chunks {
resp.ChunkIDs = append(resp.ChunkIDs, s.ExternalKey(chk.ChunkRef))
}
}
return &resp, nil
}
// Watch implements grpc_health_v1.HealthCheck.
func (*Ingester) Watch(*grpc_health_v1.HealthCheckRequest, grpc_health_v1.Health_WatchServer) error {
return nil

@ -20,10 +20,8 @@ import (
"github.com/grafana/loki/v3/pkg/ingester/index"
"github.com/grafana/loki/v3/pkg/loghttp/push"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/log"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/runtime"
"github.com/grafana/loki/v3/pkg/storage/chunk"
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/util/constants"
util_log "github.com/grafana/loki/v3/pkg/util/log"
@ -69,7 +67,7 @@ type instance struct {
streamsCreatedTotal prometheus.Counter
streamsRemovedTotal prometheus.Counter
//tailers map[uint32]*tailer
// tailers map[uint32]*tailer
tailerMtx sync.RWMutex
limiter *Limiter
@ -80,9 +78,6 @@ type instance struct {
metrics *ingesterMetrics
chunkFilter chunk.RequestChunkFilterer
pipelineWrapper log.PipelineWrapper
extractorWrapper log.SampleExtractorWrapper
streamRateCalculator *StreamRateCalculator
writeFailures *writefailures.Manager
@ -123,9 +118,6 @@ func newInstance(
limiter *Limiter,
configs *runtime.TenantConfigs,
metrics *ingesterMetrics,
chunkFilter chunk.RequestChunkFilterer,
pipelineWrapper log.PipelineWrapper,
extractorWrapper log.SampleExtractorWrapper,
streamRateCalculator *StreamRateCalculator,
writeFailures *writefailures.Manager,
customStreamsTracker push.UsageTracker,
@ -154,9 +146,6 @@ func newInstance(
ownedStreamsSvc: ownedStreamsSvc,
configs: configs,
metrics: metrics,
chunkFilter: chunkFilter,
pipelineWrapper: pipelineWrapper,
extractorWrapper: extractorWrapper,
streamRateCalculator: streamRateCalculator,
@ -286,7 +275,7 @@ func (i *instance) onStreamCreated(s *stream) {
memoryStreams.WithLabelValues(i.instanceID).Inc()
memoryStreamsLabelsBytes.Add(float64(len(s.labels.String())))
i.streamsCreatedTotal.Inc()
//i.addTailersToNewStream(s)
// i.addTailersToNewStream(s)
streamsCountStats.Add(1)
i.ownedStreamsSvc.incOwnedStreamCount()
if i.configs.LogStreamCreation(i.instanceID) {

@ -0,0 +1,132 @@
package objstore
import (
"context"
"fmt"
"io"
"sort"
"github.com/prometheus/common/model"
"github.com/grafana/loki/v3/pkg/storage"
"github.com/grafana/loki/v3/pkg/storage/chunk/client"
"github.com/grafana/loki/v3/pkg/storage/config"
)
type Multi struct {
stores []*storeEntry
storageConfig storage.Config
}
type storeEntry struct {
start model.Time
cfg config.PeriodConfig
objectClient client.ObjectClient
}
var _ client.ObjectClient = (*Multi)(nil)
func New(
periodicConfigs []config.PeriodConfig,
storageConfig storage.Config,
clientMetrics storage.ClientMetrics,
) (*Multi, error) {
store := &Multi{
storageConfig: storageConfig,
}
// sort by From time
sort.Slice(periodicConfigs, func(i, j int) bool {
return periodicConfigs[i].From.Time.Before(periodicConfigs[i].From.Time)
})
for _, periodicConfig := range periodicConfigs {
objectClient, err := storage.NewObjectClient(periodicConfig.ObjectType, storageConfig, clientMetrics)
if err != nil {
return nil, fmt.Errorf("creating object client for period %s: %w ", periodicConfig.From, err)
}
store.stores = append(store.stores, &storeEntry{
start: periodicConfig.From.Time,
cfg: periodicConfig,
objectClient: objectClient,
})
}
return store, nil
}
func (m *Multi) GetStoreFor(ts model.Time) (client.ObjectClient, error) {
// find the schema with the lowest start _after_ tm
j := sort.Search(len(m.stores), func(j int) bool {
return m.stores[j].start > ts
})
// reduce it by 1 because we want a schema with start <= tm
j--
if 0 <= j && j < len(m.stores) {
return m.stores[j].objectClient, nil
}
// should in theory never happen
return nil, fmt.Errorf("no store found for timestamp %s", ts)
}
func (m *Multi) ObjectExists(ctx context.Context, objectKey string) (bool, error) {
s, err := m.GetStoreFor(model.Now())
if err != nil {
return false, err
}
return s.ObjectExists(ctx, objectKey)
}
func (m *Multi) PutObject(ctx context.Context, objectKey string, object io.Reader) error {
s, err := m.GetStoreFor(model.Now())
if err != nil {
return err
}
return s.PutObject(ctx, objectKey, object)
}
func (m *Multi) GetObject(ctx context.Context, objectKey string) (io.ReadCloser, int64, error) {
s, err := m.GetStoreFor(model.Now())
if err != nil {
return nil, 0, err
}
return s.GetObject(ctx, objectKey)
}
func (m *Multi) List(ctx context.Context, prefix string, delimiter string) ([]client.StorageObject, []client.StorageCommonPrefix, error) {
s, err := m.GetStoreFor(model.Now())
if err != nil {
return nil, nil, err
}
return s.List(ctx, prefix, delimiter)
}
func (m *Multi) DeleteObject(ctx context.Context, objectKey string) error {
s, err := m.GetStoreFor(model.Now())
if err != nil {
return err
}
return s.DeleteObject(ctx, objectKey)
}
func (m *Multi) IsObjectNotFoundErr(err error) bool {
s, _ := m.GetStoreFor(model.Now())
if s == nil {
return false
}
return s.IsObjectNotFoundErr(err)
}
func (m *Multi) IsRetryableErr(err error) bool {
s, _ := m.GetStoreFor(model.Now())
if s == nil {
return false
}
return s.IsRetryableErr(err)
}
func (m *Multi) Stop() {
for _, s := range m.stores {
s.objectClient.Stop()
}
}

@ -52,8 +52,8 @@ type stream struct {
metrics *ingesterMetrics
//tailers map[uint32]*tailer
//tailerMtx sync.RWMutex
// tailers map[uint32]*tailer
// tailerMtx sync.RWMutex
// entryCt is a counter which is incremented on each accepted entry.
// This allows us to discard WAL entries during replays which were
@ -63,7 +63,7 @@ type stream struct {
entryCt int64
unorderedWrites bool
//streamRateCalculator *StreamRateCalculator
// streamRateCalculator *StreamRateCalculator
writeFailures *writefailures.Manager
@ -95,11 +95,11 @@ func newStream(
fp model.Fingerprint,
labels labels.Labels,
unorderedWrites bool,
//streamRateCalculator *StreamRateCalculator,
// streamRateCalculator *StreamRateCalculator,
metrics *ingesterMetrics,
writeFailures *writefailures.Manager,
) *stream {
//hashNoShard, _ := labels.HashWithoutLabels(make([]byte, 0, 1024), ShardLbName)
// hashNoShard, _ := labels.HashWithoutLabels(make([]byte, 0, 1024), ShardLbName)
return &stream{
limiter: NewStreamRateLimiter(limits, tenant, 10*time.Second),
cfg: cfg,
@ -107,11 +107,11 @@ func newStream(
labels: labels,
labelsString: labels.String(),
labelHash: labels.Hash(),
//labelHashNoShard: hashNoShard,
//tailers: map[uint32]*tailer{},
// labelHashNoShard: hashNoShard,
// tailers: map[uint32]*tailer{},
metrics: metrics,
tenant: tenant,
//streamRateCalculator: streamRateCalculator,
// streamRateCalculator: streamRateCalculator,
unorderedWrites: unorderedWrites,
writeFailures: writeFailures,
@ -137,13 +137,12 @@ func (s *stream) Push(
usageTracker push.UsageTracker,
flushCtx *flushCtx,
) (int, error) {
toStore, invalid := s.validateEntries(ctx, entries, rateLimitWholeStream, usageTracker)
if rateLimitWholeStream && hasRateLimitErr(invalid) {
return 0, errorForFailedEntries(s, invalid, len(entries))
}
bytesAdded, _ := s.storeEntries(ctx, toStore, usageTracker, flushCtx)
bytesAdded := s.storeEntries(ctx, toStore, usageTracker, flushCtx)
return bytesAdded, errorForFailedEntries(s, invalid, len(entries))
}
@ -196,7 +195,7 @@ func hasRateLimitErr(errs []entryWithError) bool {
return ok
}
func (s *stream) storeEntries(ctx context.Context, entries []logproto.Entry, usageTracker push.UsageTracker, flushCtx *flushCtx) (int, []*logproto.Entry) {
func (s *stream) storeEntries(ctx context.Context, entries []*logproto.Entry, usageTracker push.UsageTracker, flushCtx *flushCtx) int {
if sp := opentracing.SpanFromContext(ctx); sp != nil {
sp.LogKV("event", "stream started to store entries", "labels", s.labelsString)
defer sp.LogKV("event", "stream finished to store entries")
@ -204,7 +203,6 @@ func (s *stream) storeEntries(ctx context.Context, entries []logproto.Entry, usa
var bytesAdded, outOfOrderSamples, outOfOrderBytes int
storedEntries := make([]*logproto.Entry, 0, len(entries))
for i := 0; i < len(entries); i++ {
s.entryCt++
s.lastLine.ts = entries[i].Timestamp
@ -214,15 +212,13 @@ func (s *stream) storeEntries(ctx context.Context, entries []logproto.Entry, usa
}
bytesAdded += len(entries[i].Line)
storedEntries = append(storedEntries, &entries[i])
}
flushCtx.segmentWriter.Append(s.tenant, s.labels.String(), s.labels, storedEntries)
flushCtx.segmentWriter.Append(s.tenant, s.labels.String(), s.labels, entries)
s.reportMetrics(ctx, outOfOrderSamples, outOfOrderBytes, 0, 0, usageTracker)
return bytesAdded, storedEntries
return bytesAdded
}
func (s *stream) validateEntries(ctx context.Context, entries []logproto.Entry, rateLimitWholeStream bool, usageTracker push.UsageTracker) ([]logproto.Entry, []entryWithError) {
func (s *stream) validateEntries(ctx context.Context, entries []logproto.Entry, rateLimitWholeStream bool, usageTracker push.UsageTracker) ([]*logproto.Entry, []entryWithError) {
var (
outOfOrderSamples, outOfOrderBytes int
rateLimitedSamples, rateLimitedBytes int
@ -231,7 +227,7 @@ func (s *stream) validateEntries(ctx context.Context, entries []logproto.Entry,
limit = s.limiter.lim.Limit()
lastLine = s.lastLine
highestTs = s.highestTs
toStore = make([]logproto.Entry, 0, len(entries))
toStore = make([]*logproto.Entry, 0, len(entries))
)
for i := range entries {
@ -277,7 +273,7 @@ func (s *stream) validateEntries(ctx context.Context, entries []logproto.Entry,
highestTs = entries[i].Timestamp
}
toStore = append(toStore, entries[i])
toStore = append(toStore, &entries[i])
}
// Each successful call to 'AllowN' advances the limiter. With all-or-nothing
@ -289,12 +285,12 @@ func (s *stream) validateEntries(ctx context.Context, entries []logproto.Entry,
rateLimitedSamples = len(toStore)
failedEntriesWithError = make([]entryWithError, 0, len(toStore))
for i := 0; i < len(toStore); i++ {
failedEntriesWithError = append(failedEntriesWithError, entryWithError{&toStore[i], &validation.ErrStreamRateLimit{RateLimit: flagext.ByteSize(limit), Labels: s.labelsString, Bytes: flagext.ByteSize(len(toStore[i].Line))}})
failedEntriesWithError = append(failedEntriesWithError, entryWithError{toStore[i], &validation.ErrStreamRateLimit{RateLimit: flagext.ByteSize(limit), Labels: s.labelsString, Bytes: flagext.ByteSize(len(toStore[i].Line))}})
rateLimitedBytes += len(toStore[i].Line)
}
}
//s.streamRateCalculator.Record(s.tenant, s.labelHash, s.labelHashNoShard, totalBytes)
// s.streamRateCalculator.Record(s.tenant, s.labelHash, s.labelHashNoShard, totalBytes)
s.reportMetrics(ctx, outOfOrderSamples, outOfOrderBytes, rateLimitedSamples, rateLimitedBytes, usageTracker)
return toStore, failedEntriesWithError
}

@ -37,7 +37,6 @@ import (
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/stores/index/stats"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/sharding"
walsegment "github.com/grafana/loki/v3/pkg/storage/wal"
"github.com/grafana/loki/v3/pkg/util/constants"
"github.com/grafana/loki/v3/pkg/validation"
)
@ -433,10 +432,6 @@ func defaultIngesterTestConfig(t testing.TB) Config {
return cfg
}
func (s *testStore) PutWal(_ context.Context, _ *walsegment.SegmentWriter) error {
return nil
}
func (s *testStore) Put(ctx context.Context, chunks []chunk.Chunk) error {
s.mtx.Lock()
defer s.mtx.Unlock()

@ -48,7 +48,6 @@ import (
"github.com/grafana/loki/v3/pkg/storage/stores/index/seriesvolume"
"github.com/grafana/loki/v3/pkg/storage/stores/index/stats"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/sharding"
"github.com/grafana/loki/v3/pkg/storage/wal"
"github.com/grafana/loki/v3/pkg/util/constants"
"github.com/grafana/loki/v3/pkg/validation"
)
@ -436,10 +435,6 @@ type mockStore struct {
chunks map[string][]chunk.Chunk
}
func (s *mockStore) PutWal(_ context.Context, _ *wal.SegmentWriter) error {
return nil
}
func (s *mockStore) Put(ctx context.Context, chunks []chunk.Chunk) error {
s.mtx.Lock()
defer s.mtx.Unlock()

@ -644,7 +644,7 @@ func (t *Loki) initIngesterRF1() (_ services.Service, err error) {
level.Warn(util_log.Logger).Log("msg", "The config setting shutdown marker path is not set. The /ingester/prepare_shutdown endpoint won't work")
}
t.IngesterRF1, err = ingester_rf1.New(t.Cfg.IngesterRF1, t.Cfg.IngesterRF1Client, t.Store, t.Overrides, t.tenantConfigs, prometheus.DefaultRegisterer, t.Cfg.Distributor.WriteFailuresLogging, t.Cfg.MetricsNamespace, logger, t.UsageTracker, t.ring)
t.IngesterRF1, err = ingester_rf1.New(t.Cfg.IngesterRF1, t.Cfg.IngesterRF1Client, t.Cfg.SchemaConfig.Configs, t.Cfg.StorageConfig, t.ClientMetrics, t.Overrides, t.tenantConfigs, prometheus.DefaultRegisterer, t.Cfg.Distributor.WriteFailuresLogging, t.Cfg.MetricsNamespace, logger, t.UsageTracker, t.ring)
if err != nil {
fmt.Println("Error initializing ingester rf1", err)
return

@ -8,7 +8,6 @@ import (
"time"
"github.com/grafana/loki/v3/pkg/logql/log"
"github.com/grafana/loki/v3/pkg/storage/wal"
"github.com/grafana/loki/v3/pkg/loghttp"
@ -340,9 +339,6 @@ func (s *storeMock) GetChunks(ctx context.Context, userID string, from, through
return args.Get(0).([][]chunk.Chunk), args.Get(0).([]*fetcher.Fetcher), args.Error(2)
}
func (s *storeMock) PutWal(_ context.Context, _ *wal.SegmentWriter) error {
return errors.New("storeMock.PutWal() has not been mocked")
}
func (s *storeMock) Put(_ context.Context, _ []chunk.Chunk) error {
return errors.New("storeMock.Put() has not been mocked")
}

@ -106,18 +106,16 @@ func (s *OssObjectClient) GetObject(ctx context.Context, objectKey string) (io.R
return nil, 0, err
}
return resp.Response.Body, int64(size), err
}
// PutObject puts the specified bytes into the configured OSS bucket at the provided key
func (s *OssObjectClient) PutObject(ctx context.Context, objectKey string, object io.ReadSeeker) error {
func (s *OssObjectClient) PutObject(ctx context.Context, objectKey string, object io.Reader) error {
return instrument.CollectedRequest(ctx, "OSS.PutObject", ossRequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
if err := s.defaultBucket.PutObject(objectKey, object); err != nil {
return errors.Wrap(err, "failed to put oss object")
}
return nil
})
}
// List implements chunk.ObjectClient.

@ -33,7 +33,6 @@ import (
client_util "github.com/grafana/loki/v3/pkg/storage/chunk/client/util"
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/stores/series/index"
"github.com/grafana/loki/v3/pkg/storage/wal"
"github.com/grafana/loki/v3/pkg/util"
"github.com/grafana/loki/v3/pkg/util/log"
"github.com/grafana/loki/v3/pkg/util/math"
@ -119,10 +118,6 @@ type dynamoDBStorageClient struct {
metrics *dynamoDBMetrics
}
func (a dynamoDBStorageClient) PutWal(_ context.Context, _ *wal.SegmentWriter) error {
return errors.New("not implemented")
}
// NewDynamoDBIndexClient makes a new DynamoDB-backed IndexClient.
func NewDynamoDBIndexClient(cfg DynamoDBConfig, schemaCfg config.SchemaConfig, reg prometheus.Registerer) (index.Client, error) {
return newDynamoDBStorageClient(cfg, schemaCfg, reg)

@ -30,6 +30,7 @@ import (
bucket_s3 "github.com/grafana/loki/v3/pkg/storage/bucket/s3"
"github.com/grafana/loki/v3/pkg/storage/chunk/client"
"github.com/grafana/loki/v3/pkg/storage/chunk/client/hedging"
clientutil "github.com/grafana/loki/v3/pkg/storage/chunk/client/util"
storageawscommon "github.com/grafana/loki/v3/pkg/storage/common/aws"
"github.com/grafana/loki/v3/pkg/util"
"github.com/grafana/loki/v3/pkg/util/constants"
@ -309,7 +310,6 @@ func (a *S3ObjectClient) ObjectExists(ctx context.Context, objectKey string) (bo
_, err := a.S3.HeadObject(headObjectInput)
return err
})
if err != nil {
return false, err
}
@ -381,10 +381,14 @@ func (a *S3ObjectClient) GetObject(ctx context.Context, objectKey string) (io.Re
}
// PutObject into the store
func (a *S3ObjectClient) PutObject(ctx context.Context, objectKey string, object io.ReadSeeker) error {
func (a *S3ObjectClient) PutObject(ctx context.Context, objectKey string, object io.Reader) error {
return loki_instrument.TimeRequest(ctx, "S3.PutObject", s3RequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
readSeeker, err := clientutil.ReadSeeker(object)
if err != nil {
return err
}
putObjectInput := &s3.PutObjectInput{
Body: object,
Body: readSeeker,
Bucket: aws.String(a.bucketFromKey(objectKey)),
Key: aws.String(objectKey),
StorageClass: aws.String(a.cfg.StorageClass),
@ -396,7 +400,7 @@ func (a *S3ObjectClient) PutObject(ctx context.Context, objectKey string, object
putObjectInput.SSEKMSEncryptionContext = a.sseConfig.KMSEncryptionContext
}
_, err := a.S3.PutObjectWithContext(ctx, putObjectInput)
_, err = a.S3.PutObjectWithContext(ctx, putObjectInput)
return err
})
}
@ -405,7 +409,7 @@ func (a *S3ObjectClient) PutObject(ctx context.Context, objectKey string, object
func (a *S3ObjectClient) List(ctx context.Context, prefix, delimiter string) ([]client.StorageObject, []client.StorageCommonPrefix, error) {
var storageObjects []client.StorageObject
var commonPrefixes []client.StorageCommonPrefix
var commonPrefixesSet = make(map[string]bool)
commonPrefixesSet := make(map[string]bool)
for i := range a.bucketNames {
err := loki_instrument.TimeRequest(ctx, "S3.List", s3RequestDuration, instrument.ErrorCode, func(ctx context.Context) error {

@ -229,7 +229,6 @@ func (b *BlobStorage) ObjectExists(ctx context.Context, objectKey string) (bool,
_, err = blockBlobURL.GetProperties(ctx, azblob.BlobAccessConditions{}, azblob.ClientProvidedKeyOptions{})
return err
})
if err != nil {
return false, err
}
@ -278,7 +277,7 @@ func (b *BlobStorage) getObject(ctx context.Context, objectKey string) (rc io.Re
return downloadResponse.Body(azblob.RetryReaderOptions{MaxRetryRequests: b.cfg.MaxRetries}), downloadResponse.ContentLength(), nil
}
func (b *BlobStorage) PutObject(ctx context.Context, objectKey string, object io.ReadSeeker) error {
func (b *BlobStorage) PutObject(ctx context.Context, objectKey string, object io.Reader) error {
return loki_instrument.TimeRequest(ctx, "azure.PutObject", instrument.NewHistogramCollector(b.metrics.requestDuration), instrument.ErrorCode, func(ctx context.Context) error {
blockBlobURL, err := b.getBlobURL(objectKey, false)
if err != nil {

@ -79,7 +79,7 @@ func NewBOSObjectStorage(cfg *BOSStorageConfig) (*BOSObjectStorage, error) {
}, nil
}
func (b *BOSObjectStorage) PutObject(ctx context.Context, objectKey string, object io.ReadSeeker) error {
func (b *BOSObjectStorage) PutObject(ctx context.Context, objectKey string, object io.Reader) error {
return instrument.CollectedRequest(ctx, "BOS.PutObject", bosRequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
body, err := bce.NewBodyFromSizedReader(object, -1)
if err != nil {

@ -23,7 +23,6 @@ import (
"github.com/grafana/loki/v3/pkg/storage/chunk/client/util"
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/stores/series/index"
"github.com/grafana/loki/v3/pkg/storage/wal"
util_log "github.com/grafana/loki/v3/pkg/util/log"
)
@ -544,6 +543,7 @@ func NewObjectClient(cfg Config, schemaCfg config.SchemaConfig, registerer prome
}
return client, nil
}
func (s *ObjectClient) reconnectWriteSession() error {
s.writeMtx.Lock()
defer s.writeMtx.Unlock()
@ -568,10 +568,6 @@ func (s *ObjectClient) reconnectReadSession() error {
return nil
}
func (s *ObjectClient) PutWal(_ context.Context, _ *wal.SegmentWriter) error {
return errors.New("not implemented")
}
// PutChunks implements chunk.ObjectClient.
func (s *ObjectClient) PutChunks(ctx context.Context, chunks []chunk.Chunk) error {
err := s.putChunks(ctx, chunks)

@ -6,7 +6,6 @@ import (
"github.com/grafana/loki/v3/pkg/storage/chunk"
"github.com/grafana/loki/v3/pkg/storage/stores/series/index"
"github.com/grafana/loki/v3/pkg/storage/wal"
)
var (
@ -19,7 +18,6 @@ var (
// Client is for storing and retrieving chunks.
type Client interface {
Stop()
PutWal(ctx context.Context, writer *wal.SegmentWriter) error
PutChunks(ctx context.Context, chunks []chunk.Chunk) error
GetChunks(ctx context.Context, chunks []chunk.Chunk) ([]chunk.Chunk, error)
DeleteChunk(ctx context.Context, userID, chunkID string) error

@ -83,7 +83,7 @@ func (a *AIMDController) withLogger(logger log.Logger) Controller {
return a
}
func (a *AIMDController) PutObject(ctx context.Context, objectKey string, object io.ReadSeeker) error {
func (a *AIMDController) PutObject(ctx context.Context, objectKey string, object io.Reader) error {
return a.inner.PutObject(ctx, objectKey, object)
}
@ -208,11 +208,12 @@ func NewNoopController(Config) *NoopController {
return &NoopController{}
}
func (n *NoopController) ObjectExists(context.Context, string) (bool, error) { return true, nil }
func (n *NoopController) PutObject(context.Context, string, io.ReadSeeker) error { return nil }
func (n *NoopController) ObjectExists(context.Context, string) (bool, error) { return true, nil }
func (n *NoopController) PutObject(context.Context, string, io.Reader) error { return nil }
func (n *NoopController) GetObject(context.Context, string) (io.ReadCloser, int64, error) {
return nil, 0, nil
}
func (n *NoopController) List(context.Context, string, string) ([]client.StorageObject, []client.StorageCommonPrefix, error) {
return nil, nil, nil
}
@ -226,14 +227,17 @@ func (n *NoopController) withLogger(logger log.Logger) Controller {
n.logger = logger
return n
}
func (n *NoopController) withRetrier(r Retrier) Controller {
n.retrier = r
return n
}
func (n *NoopController) withHedger(h Hedger) Controller {
n.hedger = h
return n
}
func (n *NoopController) withMetrics(m *Metrics) Controller {
n.metrics = m
return n

@ -247,7 +247,7 @@ type mockObjectClient struct {
nonRetryableErrs bool
}
func (m *mockObjectClient) PutObject(context.Context, string, io.ReadSeeker) error {
func (m *mockObjectClient) PutObject(context.Context, string, io.Reader) error {
panic("not implemented")
}

@ -12,7 +12,6 @@ import (
"github.com/grafana/loki/v3/pkg/storage/chunk"
"github.com/grafana/loki/v3/pkg/storage/chunk/client"
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/wal"
"github.com/grafana/loki/v3/pkg/util/math"
)
@ -84,10 +83,6 @@ func (s *bigtableObjectClient) PutChunks(ctx context.Context, chunks []chunk.Chu
return nil
}
func (s *bigtableObjectClient) PutWal(_ context.Context, _ *wal.SegmentWriter) error {
return errors.New("not implemented")
}
func (s *bigtableObjectClient) GetChunks(ctx context.Context, input []chunk.Chunk) ([]chunk.Chunk, error) {
sp, ctx := ot.StartSpanFromContext(ctx, "GetChunks")
defer sp.Finish()

@ -161,7 +161,7 @@ func (s *GCSObjectClient) getObject(ctx context.Context, objectKey string) (rc i
}
// PutObject puts the specified bytes into the configured GCS bucket at the provided key
func (s *GCSObjectClient) PutObject(ctx context.Context, objectKey string, object io.ReadSeeker) error {
func (s *GCSObjectClient) PutObject(ctx context.Context, objectKey string, object io.Reader) error {
writer := s.defaultBucket.Object(objectKey).NewWriter(ctx)
// Default GCSChunkSize is 8M and for each call, 8M is allocated xD
// By setting it to 0, we just upload the object in a single a request

@ -9,7 +9,6 @@ import (
"github.com/grafana/loki/v3/pkg/storage/chunk"
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/wal"
)
type StorageClient struct {
@ -67,10 +66,6 @@ func (s *StorageClient) PutChunks(ctx context.Context, chunks []chunk.Chunk) err
return nil
}
func (s *StorageClient) PutWal(_ context.Context, _ *wal.SegmentWriter) error {
return errors.New("not implemented")
}
func (s *StorageClient) DeleteChunk(ctx context.Context, _, chunkID string) error {
chunkInfo := &ChunkID{ChunkID: chunkID}
_, err := s.client.DeleteChunks(ctx, chunkInfo)

@ -29,6 +29,7 @@ import (
"github.com/grafana/loki/v3/pkg/storage/chunk/client"
"github.com/grafana/loki/v3/pkg/storage/chunk/client/hedging"
"github.com/grafana/loki/v3/pkg/storage/chunk/client/util"
"github.com/grafana/loki/v3/pkg/util/constants"
"github.com/grafana/loki/v3/pkg/util/log"
)
@ -327,7 +328,6 @@ func (c *COSObjectClient) ObjectExists(ctx context.Context, objectKey string) (b
})
return requestErr
})
if err != nil {
return false, err
}
@ -337,7 +337,6 @@ func (c *COSObjectClient) ObjectExists(ctx context.Context, objectKey string) (b
// GetObject returns a reader and the size for the specified object key from the configured S3 bucket.
func (c *COSObjectClient) GetObject(ctx context.Context, objectKey string) (io.ReadCloser, int64, error) {
var resp *cos.GetObjectOutput
// Map the key into a bucket
@ -370,15 +369,19 @@ func (c *COSObjectClient) GetObject(ctx context.Context, objectKey string) (io.R
}
// PutObject into the store
func (c *COSObjectClient) PutObject(ctx context.Context, objectKey string, object io.ReadSeeker) error {
func (c *COSObjectClient) PutObject(ctx context.Context, objectKey string, object io.Reader) error {
return instrument.CollectedRequest(ctx, "COS.PutObject", cosRequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
readSeeker, err := util.ReadSeeker(object)
if err != nil {
return err
}
putObjectInput := &cos.PutObjectInput{
Body: object,
Body: readSeeker,
Bucket: ibm.String(c.bucketFromKey(objectKey)),
Key: ibm.String(objectKey),
}
_, err := c.cos.PutObjectWithContext(ctx, putObjectInput)
_, err = c.cos.PutObjectWithContext(ctx, putObjectInput)
return err
})
}

@ -89,7 +89,7 @@ func (f *FSObjectClient) GetObject(_ context.Context, objectKey string) (io.Read
}
// PutObject into the store
func (f *FSObjectClient) PutObject(_ context.Context, objectKey string, object io.ReadSeeker) error {
func (f *FSObjectClient) PutObject(_ context.Context, objectKey string, object io.Reader) error {
fullPath := filepath.Join(f.cfg.Directory, filepath.FromSlash(objectKey))
err := util.EnsureDirectory(filepath.Dir(fullPath))
if err != nil {

@ -7,7 +7,6 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/grafana/loki/v3/pkg/storage/chunk"
"github.com/grafana/loki/v3/pkg/storage/wal"
"github.com/grafana/loki/v3/pkg/util/constants"
)
@ -61,10 +60,6 @@ func (c MetricsChunkClient) Stop() {
c.Client.Stop()
}
func (c MetricsChunkClient) PutWal(ctx context.Context, writer *wal.SegmentWriter) error {
return c.Client.PutWal(ctx, writer)
}
func (c MetricsChunkClient) PutChunks(ctx context.Context, chunks []chunk.Chunk) error {
if err := c.Client.PutChunks(ctx, chunks); err != nil {
return err

@ -3,26 +3,23 @@ package client
import (
"bytes"
"context"
"crypto/rand"
"encoding/base64"
"io"
"strings"
"time"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/grafana/loki/v3/pkg/storage/chunk"
"github.com/grafana/loki/v3/pkg/storage/chunk/client/util"
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/wal"
)
// ObjectClient is used to store arbitrary data in Object Store (S3/GCS/Azure/...)
type ObjectClient interface {
ObjectExists(ctx context.Context, objectKey string) (bool, error)
PutObject(ctx context.Context, objectKey string, object io.ReadSeeker) error
PutObject(ctx context.Context, objectKey string, object io.Reader) error
// NOTE: The consumer of GetObject should always call the Close method when it is done reading which otherwise could cause a resource leak.
GetObject(ctx context.Context, objectKey string) (io.ReadCloser, int64, error)
@ -108,19 +105,6 @@ func (o *client) Stop() {
o.store.Stop()
}
func (o *client) PutWal(ctx context.Context, segment *wal.SegmentWriter) error {
reader, err := segment.ToReader()
if err != nil {
return err
}
defer func(reader io.ReadSeekCloser) {
_ = reader.Close()
}(reader)
newUlid := ulid.MustNew(ulid.Timestamp(time.Now()), rand.Reader)
return o.store.PutObject(ctx, "loki-v2/wal/anon/"+newUlid.String(), reader)
}
// PutChunks stores the provided chunks in the configured backend. If multiple errors are
// returned, the last one sequentially will be propagated up.
func (o *client) PutChunks(ctx context.Context, chunks []chunk.Chunk) error {

@ -145,7 +145,7 @@ func (s *SwiftObjectClient) GetObject(_ context.Context, objectKey string) (io.R
}
// PutObject puts the specified bytes into the configured Swift container at the provided key
func (s *SwiftObjectClient) PutObject(_ context.Context, objectKey string, object io.ReadSeeker) error {
func (s *SwiftObjectClient) PutObject(_ context.Context, objectKey string, object io.Reader) error {
_, err := s.conn.ObjectPut(s.cfg.ContainerName, objectKey, object, false, "", "", nil)
return err
}

@ -15,7 +15,7 @@ func NewPrefixedObjectClient(downstreamClient ObjectClient, prefix string) Objec
return PrefixedObjectClient{downstreamClient: downstreamClient, prefix: prefix}
}
func (p PrefixedObjectClient) PutObject(ctx context.Context, objectKey string, object io.ReadSeeker) error {
func (p PrefixedObjectClient) PutObject(ctx context.Context, objectKey string, object io.Reader) error {
return p.downstreamClient.PutObject(ctx, p.prefix+objectKey, object)
}

@ -426,7 +426,7 @@ func (m *InMemoryObjectClient) GetObject(_ context.Context, objectKey string) (i
}
// PutObject implements client.ObjectClient.
func (m *InMemoryObjectClient) PutObject(_ context.Context, objectKey string, object io.ReadSeeker) error {
func (m *InMemoryObjectClient) PutObject(_ context.Context, objectKey string, object io.Reader) error {
buf, err := io.ReadAll(object)
if err != nil {
return err

@ -0,0 +1,17 @@
package util
import (
"bytes"
"io"
)
func ReadSeeker(r io.Reader) (io.ReadSeeker, error) {
if rs, ok := r.(io.ReadSeeker); ok {
return rs, nil
}
data, err := io.ReadAll(r)
if err != nil {
return nil, err
}
return bytes.NewReader(data), nil
}

@ -7,7 +7,6 @@ import (
"time"
"github.com/grafana/loki/v3/pkg/storage/types"
"github.com/grafana/loki/v3/pkg/storage/wal"
"github.com/grafana/loki/v3/pkg/util/httpreq"
lokilog "github.com/grafana/loki/v3/pkg/logql/log"
@ -609,7 +608,3 @@ func (f failingChunkWriter) Put(_ context.Context, _ []chunk.Chunk) error {
func (f failingChunkWriter) PutOne(_ context.Context, _, _ model.Time, _ chunk.Chunk) error {
return errWritingChunkUnsupported
}
func (f failingChunkWriter) PutWal(_ context.Context, _ *wal.SegmentWriter) error {
return errWritingChunkUnsupported
}

@ -15,16 +15,10 @@ import (
"github.com/grafana/loki/v3/pkg/storage/stores/index/stats"
tsdb_index "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/index"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/sharding"
"github.com/grafana/loki/v3/pkg/storage/wal"
"github.com/grafana/loki/v3/pkg/util"
)
type WalSegmentWriter interface {
PutWal(ctx context.Context, writer *wal.SegmentWriter) error
}
type ChunkWriter interface {
WalSegmentWriter
Put(ctx context.Context, chunks []chunk.Chunk) error
PutOne(ctx context.Context, from, through model.Time, chunk chunk.Chunk) error
}
@ -51,7 +45,6 @@ type Store interface {
ChunkWriter
ChunkFetcher
ChunkFetcherProvider
WalSegmentWriter
Stop()
}
@ -95,12 +88,6 @@ func (c *CompositeStore) Stores() []Store {
return stores
}
func (c CompositeStore) PutWal(ctx context.Context, writer *wal.SegmentWriter) error {
// TODO: Understand how to use the forStores method to correctly pick a store for this
err := c.stores[0].PutWal(ctx, writer)
return err
}
func (c CompositeStore) Put(ctx context.Context, chunks []chunk.Chunk) error {
for _, chunk := range chunks {
err := c.forStores(ctx, chunk.From, chunk.Through, func(innerCtx context.Context, from, through model.Time, store Store) error {
@ -210,10 +197,8 @@ func (c CompositeStore) Stats(ctx context.Context, userID string, from, through
xs = append(xs, x)
return err
})
if err != nil {
return nil, err
}
res := stats.MergeStats(xs...)
return &res, err
@ -226,7 +211,6 @@ func (c CompositeStore) Volume(ctx context.Context, userID string, from, through
volumes = append(volumes, volume)
return err
})
if err != nil {
return nil, err
}
@ -254,7 +238,6 @@ func (c CompositeStore) GetShards(
groups = append(groups, shards)
return nil
})
if err != nil {
return nil, err
}

@ -9,7 +9,6 @@ import (
"github.com/pkg/errors"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/storage/wal"
"github.com/grafana/dskit/test"
"github.com/prometheus/common/model"
@ -24,10 +23,6 @@ import (
type mockStore int
func (m mockStore) PutWal(_ context.Context, _ *wal.SegmentWriter) error {
return nil
}
func (m mockStore) Put(_ context.Context, _ []chunk.Chunk) error {
return nil
}
@ -357,7 +352,6 @@ func TestVolume(t *testing.T) {
require.Error(t, err, "something bad")
require.Nil(t, volumes)
})
}
func TestFilterForTimeRange(t *testing.T) {

@ -13,7 +13,6 @@ import (
"github.com/grafana/loki/v3/pkg/storage/chunk/fetcher"
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/stores/index"
"github.com/grafana/loki/v3/pkg/storage/wal"
"github.com/grafana/loki/v3/pkg/util/constants"
"github.com/grafana/loki/v3/pkg/util/spanlogger"
)
@ -66,10 +65,6 @@ func (c *Writer) Put(ctx context.Context, chunks []chunk.Chunk) error {
return nil
}
func (c *Writer) PutWal(ctx context.Context, segment *wal.SegmentWriter) error {
return c.fetcher.Client().PutWal(ctx, segment)
}
// PutOne implements Store
func (c *Writer) PutOne(ctx context.Context, from, through model.Time, chk chunk.Chunk) error {
sp, ctx := opentracing.StartSpanFromContext(ctx, "SeriesStore.PutOne")

@ -13,7 +13,6 @@ import (
"github.com/grafana/loki/v3/pkg/storage/chunk"
"github.com/grafana/loki/v3/pkg/storage/chunk/fetcher"
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/wal"
)
type mockCache struct {
@ -56,10 +55,6 @@ type mockChunksClient struct {
called int
}
func (m *mockChunksClient) PutWal(_ context.Context, _ *wal.SegmentWriter) error {
return nil
}
func (m *mockChunksClient) PutChunks(_ context.Context, _ []chunk.Chunk) error {
m.called++
return nil
@ -67,15 +62,19 @@ func (m *mockChunksClient) PutChunks(_ context.Context, _ []chunk.Chunk) error {
func (m *mockChunksClient) Stop() {
}
func (m *mockChunksClient) GetChunks(_ context.Context, _ []chunk.Chunk) ([]chunk.Chunk, error) {
panic("GetChunks not implemented")
}
func (m *mockChunksClient) DeleteChunk(_ context.Context, _, _ string) error {
panic("DeleteChunk not implemented")
}
func (m *mockChunksClient) IsChunkNotFoundErr(_ error) bool {
panic("IsChunkNotFoundErr not implemented")
}
func (m *mockChunksClient) IsRetryableErr(_ error) bool {
panic("IsRetryableErr not implemented")
}

@ -26,7 +26,6 @@ import (
"github.com/grafana/loki/v3/pkg/storage/stores"
index_stats "github.com/grafana/loki/v3/pkg/storage/stores/index/stats"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/sharding"
"github.com/grafana/loki/v3/pkg/storage/wal"
loki_util "github.com/grafana/loki/v3/pkg/util"
"github.com/grafana/loki/v3/pkg/util/constants"
util_log "github.com/grafana/loki/v3/pkg/util/log"
@ -186,8 +185,7 @@ func newMockChunkStore(chunkFormat byte, headfmt chunkenc.HeadBlockFmt, streams
return &mockChunkStore{schemas: config.SchemaConfig{}, chunks: chunks, client: &mockChunkStoreClient{chunks: chunks, scfg: config.SchemaConfig{}}}
}
func (m *mockChunkStore) PutWal(_ context.Context, _ *wal.SegmentWriter) error { return nil }
func (m *mockChunkStore) Put(_ context.Context, _ []chunk.Chunk) error { return nil }
func (m *mockChunkStore) Put(_ context.Context, _ []chunk.Chunk) error { return nil }
func (m *mockChunkStore) PutOne(_ context.Context, _, _ model.Time, _ chunk.Chunk) error {
return nil
}
@ -294,7 +292,6 @@ func (m mockChunkStoreClient) Stop() {
panic("implement me")
}
func (m mockChunkStoreClient) PutWal(_ context.Context, _ *wal.SegmentWriter) error { return nil }
func (m mockChunkStoreClient) PutChunks(_ context.Context, _ []chunk.Chunk) error {
return nil
}

@ -20,7 +20,6 @@ import (
"github.com/grafana/loki/v3/pkg/storage/wal/chunks"
"github.com/grafana/loki/v3/pkg/storage/wal/index"
"github.com/grafana/loki/v3/pkg/util/encoding"
"github.com/grafana/loki/v3/pkg/util/pool"
)
// LOKW is the magic number for the Loki WAL format.
@ -35,10 +34,7 @@ var (
}
},
}
// 512kb - 20 mb
encodedWalSegmentBufferPool = pool.NewBuffer(512*1024, 20*1024*1024, 2)
tenantLabel = "__loki_tenant__"
tenantLabel = "__loki_tenant__"
)
func init() {
@ -276,45 +272,30 @@ func (b *SegmentWriter) Reset() {
b.inputSize.Store(0)
}
func (b *SegmentWriter) ToReader() (io.ReadSeekCloser, error) {
// snappy compression rate is ~5x , but we can not predict it, so we need to allocate bigger buffer to avoid allocations
buffer := encodedWalSegmentBufferPool.Get(int(b.inputSize.Load() / 3))
_, err := b.WriteTo(buffer)
if err != nil {
return nil, fmt.Errorf("failed to write segment to create a reader: %w", err)
}
return NewEncodedSegmentReader(buffer), nil
}
var (
_ io.ReadSeekCloser = &EncodedSegmentReader{}
)
type EncodedSegmentReader struct {
delegate io.ReadSeeker
encodedContent *bytes.Buffer
*io.PipeReader
*io.PipeWriter
}
func NewEncodedSegmentReader(encodedContent *bytes.Buffer) *EncodedSegmentReader {
return &EncodedSegmentReader{
encodedContent: encodedContent,
delegate: bytes.NewReader(encodedContent.Bytes()),
func (e *EncodedSegmentReader) Close() error {
err := e.PipeWriter.Close()
if err != nil {
return err
}
err = e.PipeReader.Close()
if err != nil {
return err
}
return nil
}
func (e *EncodedSegmentReader) Read(p []byte) (n int, err error) {
return e.delegate.Read(p)
}
func (e *EncodedSegmentReader) Seek(offset int64, whence int) (int64, error) {
return e.delegate.Seek(offset, whence)
}
func (e *EncodedSegmentReader) Close() error {
encodedWalSegmentBufferPool.Put(e.encodedContent)
e.encodedContent = nil
e.delegate = nil
return nil
func (b *SegmentWriter) Reader() io.ReadCloser {
pr, pw := io.Pipe()
go func() {
_, err := b.WriteTo(pw)
pw.CloseWithError(err)
}()
return &EncodedSegmentReader{PipeReader: pr, PipeWriter: pw}
}
// InputSize returns the total size of the input data written to the writer.

@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
"io"
"sort"
"sync"
"testing"
@ -510,16 +511,15 @@ func BenchmarkWrites(b *testing.B) {
}
})
bytesBuf := make([]byte, inputSize)
bytesBuf := make([]byte, encodedLength)
b.Run("Reader", func(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
var err error
reader, err := writer.ToReader()
require.NoError(b, err)
reader := writer.Reader()
n, err := reader.Read(bytesBuf)
n, err := io.ReadFull(reader, bytesBuf)
require.NoError(b, err)
require.EqualValues(b, encodedLength, n)
require.NoError(b, reader.Close())

Loading…
Cancel
Save