|
|
|
|
@ -165,10 +165,8 @@ func (i *Ingester) transferOut(ctx context.Context) error { |
|
|
|
|
|
|
|
|
|
ctx = user.InjectOrgID(ctx, "-1") |
|
|
|
|
stream, err := ic.TransferChunks(ctx) |
|
|
|
|
|
|
|
|
|
_, err = stream.CloseAndRecv() |
|
|
|
|
if err != nil { |
|
|
|
|
return errors.Wrap(err, "CloseAndRecv") |
|
|
|
|
return errors.Wrap(err, "TransferChunks") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
for instanceID, inst := range i.instances { |
|
|
|
|
@ -191,17 +189,26 @@ func (i *Ingester) transferOut(ctx context.Context) error { |
|
|
|
|
lbls = append(lbls, &logproto.LabelPair{Name: lbl.Name, Value: lbl.Value}) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
stream.Send(&logproto.TimeSeriesChunk{ |
|
|
|
|
err := stream.Send(&logproto.TimeSeriesChunk{ |
|
|
|
|
Chunks: chunks, |
|
|
|
|
UserId: instanceID, |
|
|
|
|
Labels: lbls, |
|
|
|
|
FromIngesterId: i.lifecycler.ID, |
|
|
|
|
}) |
|
|
|
|
if err != nil { |
|
|
|
|
level.Error(util.Logger).Log("msg", "failed sending stream's chunks to ingester", "to_ingester", targetIngester.Addr, "err", err) |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
sentChunks.Add(float64(len(chunks))) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
_, err = stream.CloseAndRecv() |
|
|
|
|
if err != nil { |
|
|
|
|
return errors.Wrap(err, "CloseAndRecv") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
for _, flushQueue := range i.flushQueues { |
|
|
|
|
flushQueue.DiscardAndClose() |
|
|
|
|
} |
|
|
|
|
|