|
|
|
|
@ -50,7 +50,15 @@ func (h *RemoteLokiBackend) TestConnection(ctx context.Context) error { |
|
|
|
|
func (h *RemoteLokiBackend) RecordStatesAsync(ctx context.Context, rule history_model.RuleMeta, states []state.StateTransition) <-chan error { |
|
|
|
|
logger := h.log.FromContext(ctx) |
|
|
|
|
streams := h.statesToStreams(rule, states, logger) |
|
|
|
|
return h.recordStreamsAsync(ctx, streams, logger) |
|
|
|
|
errCh := make(chan error, 1) |
|
|
|
|
go func() { |
|
|
|
|
defer close(errCh) |
|
|
|
|
if err := h.recordStreams(ctx, streams, logger); err != nil { |
|
|
|
|
logger.Error("Failed to save alert state history batch", "error", err) |
|
|
|
|
errCh <- fmt.Errorf("failed to save alert state history batch: %w", err) |
|
|
|
|
} |
|
|
|
|
}() |
|
|
|
|
return errCh |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (h *RemoteLokiBackend) QueryStates(ctx context.Context, query models.HistoryQuery) (*data.Frame, error) { |
|
|
|
|
@ -106,18 +114,6 @@ func (h *RemoteLokiBackend) statesToStreams(rule history_model.RuleMeta, states |
|
|
|
|
return result |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (h *RemoteLokiBackend) recordStreamsAsync(ctx context.Context, streams []stream, logger log.Logger) <-chan error { |
|
|
|
|
errCh := make(chan error, 1) |
|
|
|
|
go func() { |
|
|
|
|
defer close(errCh) |
|
|
|
|
if err := h.recordStreams(ctx, streams, logger); err != nil { |
|
|
|
|
logger.Error("Failed to save alert state history batch", "error", err) |
|
|
|
|
errCh <- fmt.Errorf("failed to save alert state history batch: %w", err) |
|
|
|
|
} |
|
|
|
|
}() |
|
|
|
|
return errCh |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (h *RemoteLokiBackend) recordStreams(ctx context.Context, streams []stream, logger log.Logger) error { |
|
|
|
|
if err := h.client.push(ctx, streams); err != nil { |
|
|
|
|
return err |
|
|
|
|
|