|
|
|
@ -3,6 +3,7 @@ package ingester |
|
|
|
|
import ( |
|
|
|
|
"fmt" |
|
|
|
|
"net/http" |
|
|
|
|
"sync" |
|
|
|
|
"time" |
|
|
|
|
|
|
|
|
|
"golang.org/x/net/context" |
|
|
|
@ -221,7 +222,7 @@ func (i *Ingester) flushUserSeries(userID string, fp model.Fingerprint, immediat |
|
|
|
|
ctx := user.InjectOrgID(context.Background(), userID) |
|
|
|
|
ctx, cancel := context.WithTimeout(ctx, i.cfg.FlushOpTimeout) |
|
|
|
|
defer cancel() |
|
|
|
|
err := i.flushChunks(ctx, fp, labels, chunks) |
|
|
|
|
err := i.flushChunks(ctx, fp, labels, chunks, &instance.streamsMtx) |
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
@ -306,7 +307,7 @@ func (i *Ingester) removeFlushedChunks(instance *instance, stream *stream) { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, labelPairs labels.Labels, cs []*chunkDesc) error { |
|
|
|
|
func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, labelPairs labels.Labels, cs []*chunkDesc, streamsMtx *sync.RWMutex) error { |
|
|
|
|
userID, err := user.ExtractOrgID(ctx) |
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
@ -327,7 +328,10 @@ func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, labelP |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
start := time.Now() |
|
|
|
|
if err := c.Encode(); err != nil { |
|
|
|
|
streamsMtx.Lock() |
|
|
|
|
err := c.Encode() |
|
|
|
|
streamsMtx.Unlock() |
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
chunkEncodeTime.Observe(time.Since(start).Seconds()) |
|
|
|
|