|
|
@ -48,7 +48,7 @@ type writer interface { |
|
|
|
Stop() |
|
|
|
Stop() |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
type indexClient struct { |
|
|
|
type IndexClient struct { |
|
|
|
cfg Config |
|
|
|
cfg Config |
|
|
|
indexShipper indexshipper.IndexShipper |
|
|
|
indexShipper indexshipper.IndexShipper |
|
|
|
writer writer |
|
|
|
writer writer |
|
|
@ -59,10 +59,10 @@ type indexClient struct { |
|
|
|
stopOnce sync.Once |
|
|
|
stopOnce sync.Once |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// NewShipper creates a shipper for syncing local objects with a store
|
|
|
|
// NewIndexClient creates a shipper for syncing local objects with a store
|
|
|
|
func NewShipper(cfg Config, storageClient client.ObjectClient, limits downloads.Limits, |
|
|
|
func NewIndexClient(cfg Config, storageClient client.ObjectClient, limits downloads.Limits, |
|
|
|
tenantFilter downloads.TenantFilter, tableRange config.TableRange, registerer prometheus.Registerer, logger log.Logger) (series_index.Client, error) { |
|
|
|
tenantFilter downloads.TenantFilter, tableRange config.TableRange, registerer prometheus.Registerer, logger log.Logger) (*IndexClient, error) { |
|
|
|
i := indexClient{ |
|
|
|
i := IndexClient{ |
|
|
|
cfg: cfg, |
|
|
|
cfg: cfg, |
|
|
|
metrics: newMetrics(registerer), |
|
|
|
metrics: newMetrics(registerer), |
|
|
|
logger: logger, |
|
|
|
logger: logger, |
|
|
@ -78,7 +78,7 @@ func NewShipper(cfg Config, storageClient client.ObjectClient, limits downloads. |
|
|
|
return &i, nil |
|
|
|
return &i, nil |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
func (i *indexClient) init(storageClient client.ObjectClient, limits downloads.Limits, |
|
|
|
func (i *IndexClient) init(storageClient client.ObjectClient, limits downloads.Limits, |
|
|
|
tenantFilter downloads.TenantFilter, tableRange config.TableRange, registerer prometheus.Registerer) error { |
|
|
|
tenantFilter downloads.TenantFilter, tableRange config.TableRange, registerer prometheus.Registerer) error { |
|
|
|
var err error |
|
|
|
var err error |
|
|
|
i.indexShipper, err = indexshipper.NewIndexShipper(i.cfg.Config, storageClient, limits, tenantFilter, |
|
|
|
i.indexShipper, err = indexshipper.NewIndexShipper(i.cfg.Config, storageClient, limits, tenantFilter, |
|
|
@ -110,28 +110,28 @@ func (i *indexClient) init(storageClient client.ObjectClient, limits downloads.L |
|
|
|
return nil |
|
|
|
return nil |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
func (i *indexClient) Stop() { |
|
|
|
func (i *IndexClient) Stop() { |
|
|
|
i.stopOnce.Do(i.stop) |
|
|
|
i.stopOnce.Do(i.stop) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
func (i *indexClient) stop() { |
|
|
|
func (i *IndexClient) stop() { |
|
|
|
if i.writer != nil { |
|
|
|
if i.writer != nil { |
|
|
|
i.writer.Stop() |
|
|
|
i.writer.Stop() |
|
|
|
} |
|
|
|
} |
|
|
|
i.indexShipper.Stop() |
|
|
|
i.indexShipper.Stop() |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
func (i *indexClient) NewWriteBatch() series_index.WriteBatch { |
|
|
|
func (i *IndexClient) NewWriteBatch() series_index.WriteBatch { |
|
|
|
return local.NewWriteBatch() |
|
|
|
return local.NewWriteBatch() |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
func (i *indexClient) BatchWrite(ctx context.Context, batch series_index.WriteBatch) error { |
|
|
|
func (i *IndexClient) BatchWrite(ctx context.Context, batch series_index.WriteBatch) error { |
|
|
|
return instrument.CollectedRequest(ctx, "WRITE", instrument.NewHistogramCollector(i.metrics.requestDurationSeconds), instrument.ErrorCode, func(ctx context.Context) error { |
|
|
|
return instrument.CollectedRequest(ctx, "WRITE", instrument.NewHistogramCollector(i.metrics.requestDurationSeconds), instrument.ErrorCode, func(ctx context.Context) error { |
|
|
|
return i.writer.BatchWrite(ctx, batch) |
|
|
|
return i.writer.BatchWrite(ctx, batch) |
|
|
|
}) |
|
|
|
}) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
func (i *indexClient) QueryPages(ctx context.Context, queries []series_index.Query, callback series_index.QueryPagesCallback) error { |
|
|
|
func (i *IndexClient) QueryPages(ctx context.Context, queries []series_index.Query, callback series_index.QueryPagesCallback) error { |
|
|
|
return instrument.CollectedRequest(ctx, "Shipper.Query", instrument.NewHistogramCollector(i.metrics.requestDurationSeconds), instrument.ErrorCode, func(ctx context.Context) error { |
|
|
|
return instrument.CollectedRequest(ctx, "Shipper.Query", instrument.NewHistogramCollector(i.metrics.requestDurationSeconds), instrument.ErrorCode, func(ctx context.Context) error { |
|
|
|
return i.querier.QueryPages(ctx, queries, callback) |
|
|
|
return i.querier.QueryPages(ctx, queries, callback) |
|
|
|
}) |
|
|
|
}) |
|
|
|