live: pipeline remote write buffer (#39586)

pull/39603/head
Alexander Emelin 4 years ago committed by GitHub
parent c2604e04ab
commit 914ae81026
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      pkg/services/live/pipeline/devdata.go
  2. 65
      pkg/services/live/pipeline/output_remote_write.go

@ -107,8 +107,10 @@ func (f *DevRuleBuilder) BuildRules(_ context.Context, _ int64) ([]*LiveChannelR
Converter: NewJsonFrameConverter(JsonFrameConverterConfig{}), Converter: NewJsonFrameConverter(JsonFrameConverterConfig{}),
Outputter: NewMultipleOutput( Outputter: NewMultipleOutput(
NewManagedStreamOutput(f.ManagedStream), NewManagedStreamOutput(f.ManagedStream),
NewRedirectOutput(RedirectOutputConfig{ NewRemoteWriteOutput(RemoteWriteConfig{
Channel: "stream/testdata/random-20Hz-stream", Endpoint: os.Getenv("GF_LIVE_REMOTE_WRITE_ENDPOINT"),
User: os.Getenv("GF_LIVE_REMOTE_WRITE_USER"),
Password: os.Getenv("GF_LIVE_REMOTE_WRITE_PASSWORD"),
}), }),
), ),
}, },

@ -4,11 +4,14 @@ import (
"bytes" "bytes"
"context" "context"
"errors" "errors"
"fmt"
"net/http" "net/http"
"sync"
"time" "time"
"github.com/grafana/grafana-plugin-sdk-go/data" "github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/services/live/remotewrite" "github.com/grafana/grafana/pkg/services/live/remotewrite"
"github.com/prometheus/prometheus/prompb"
) )
type RemoteWriteConfig struct { type RemoteWriteConfig struct {
@ -21,15 +24,21 @@ type RemoteWriteConfig struct {
} }
type RemoteWriteOutput struct { type RemoteWriteOutput struct {
mu sync.Mutex
config RemoteWriteConfig config RemoteWriteConfig
httpClient *http.Client httpClient *http.Client
buffer []prompb.TimeSeries
} }
func NewRemoteWriteOutput(config RemoteWriteConfig) *RemoteWriteOutput { func NewRemoteWriteOutput(config RemoteWriteConfig) *RemoteWriteOutput {
return &RemoteWriteOutput{ out := &RemoteWriteOutput{
config: config, config: config,
httpClient: &http.Client{Timeout: 2 * time.Second}, httpClient: &http.Client{Timeout: 2 * time.Second},
} }
if config.Endpoint != "" {
go out.flushPeriodically()
}
return out
} }
const OutputTypeRemoteWrite = "remoteWrite" const OutputTypeRemoteWrite = "remoteWrite"
@ -38,24 +47,39 @@ func (out *RemoteWriteOutput) Type() string {
return OutputTypeRemoteWrite return OutputTypeRemoteWrite
} }
func (out *RemoteWriteOutput) Output(_ context.Context, _ OutputVars, frame *data.Frame) ([]*ChannelFrame, error) { func (out *RemoteWriteOutput) flushPeriodically() {
if out.config.Endpoint == "" { for range time.NewTicker(15 * time.Second).C {
logger.Debug("Skip sending to remote write: no url") out.mu.Lock()
return nil, nil if len(out.buffer) == 0 {
out.mu.Unlock()
continue
}
tmpBuffer := make([]prompb.TimeSeries, len(out.buffer))
copy(tmpBuffer, out.buffer)
out.buffer = nil
out.mu.Unlock()
err := out.flush(tmpBuffer)
if err != nil {
logger.Error("Error flush to remote write", "error", err)
out.mu.Lock()
// TODO: drop in case of large buffer size? Make several attempts only?
out.buffer = append(tmpBuffer, out.buffer...)
out.mu.Unlock()
}
} }
}
// Use remote write for a stream. func (out *RemoteWriteOutput) flush(timeSeries []prompb.TimeSeries) error {
remoteWriteData, err := remotewrite.SerializeLabelsColumn(frame) logger.Debug("Remote write flush", "num time series", len(timeSeries))
remoteWriteData, err := remotewrite.TimeSeriesToBytes(timeSeries)
if err != nil { if err != nil {
logger.Error("Error serializing to remote write format", "error", err) return fmt.Errorf("error converting time series to bytes: %v", err)
return nil, err
} }
logger.Debug("Sending to remote write endpoint", "url", out.config.Endpoint, "bodyLength", len(remoteWriteData)) logger.Debug("Sending to remote write endpoint", "url", out.config.Endpoint, "bodyLength", len(remoteWriteData))
req, err := http.NewRequest(http.MethodPost, out.config.Endpoint, bytes.NewReader(remoteWriteData)) req, err := http.NewRequest(http.MethodPost, out.config.Endpoint, bytes.NewReader(remoteWriteData))
if err != nil { if err != nil {
logger.Error("Error constructing remote write request", "error", err) return fmt.Errorf("error constructing remote write request: %w", err)
return nil, err
} }
req.Header.Set("Content-Type", "application/x-protobuf") req.Header.Set("Content-Type", "application/x-protobuf")
req.Header.Set("Content-Encoding", "snappy") req.Header.Set("Content-Encoding", "snappy")
@ -65,14 +89,25 @@ func (out *RemoteWriteOutput) Output(_ context.Context, _ OutputVars, frame *dat
started := time.Now() started := time.Now()
resp, err := out.httpClient.Do(req) resp, err := out.httpClient.Do(req)
if err != nil { if err != nil {
logger.Error("Error sending remote write request", "error", err) return fmt.Errorf("error sending remote write request: %w", err)
return nil, err
} }
_ = resp.Body.Close() _ = resp.Body.Close()
if resp.StatusCode != http.StatusOK { if resp.StatusCode != http.StatusOK {
logger.Error("Unexpected response code from remote write endpoint", "code", resp.StatusCode) logger.Error("Unexpected response code from remote write endpoint", "code", resp.StatusCode)
return nil, errors.New("unexpected response code from remote write endpoint") return errors.New("unexpected response code from remote write endpoint")
} }
logger.Debug("Successfully sent to remote write endpoint", "url", out.config.Endpoint, "elapsed", time.Since(started)) logger.Debug("Successfully sent to remote write endpoint", "url", out.config.Endpoint, "elapsed", time.Since(started))
return nil
}
func (out *RemoteWriteOutput) Output(_ context.Context, _ OutputVars, frame *data.Frame) ([]*ChannelFrame, error) {
if out.config.Endpoint == "" {
logger.Debug("Skip sending to remote write: no url")
return nil, nil
}
ts := remotewrite.TimeSeriesFromFramesLabelsColumn(frame)
out.mu.Lock()
out.buffer = append(out.buffer, ts...)
out.mu.Unlock()
return nil, nil return nil, nil
} }

Loading…
Cancel
Save