Promtail: Add handler timeout for GCP Logs Push target (#7401)

**What this PR does / why we need it**:
After been running the GCP Logs Push target in actual workloads, we've
noticed that GCP sends a lot of traffic, and adjusts that send rate
according to the rate at which the receiver can process. To perform that
adjustment, GCP considers as a NACK [non 2xx HTTP
responses](https://cloud.google.com/pubsub/docs/push#receive_push).

This PR adds a target handler timeout (that includes as well the
`api.Entry` channel send) to allow the user to configure this maximum
processing time, therefore giving GCP notice that the sending rate is
too high.

This new timeout is optional, and disabled by default.

**Which issue(s) this PR fixes**:
Related to https://github.com/grafana/cloud-onboarding/issues/2067

**Special notes for your reviewer**:

**Checklist**
- [ ] Reviewed the `CONTRIBUTING.md` guide
- [x] Documentation added
- [x] Tests updated
- [x] `CHANGELOG.md` updated
- [x] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/upgrading/_index.md`
pull/7421/head
Pablo 3 years ago committed by GitHub
parent cc9ba40be0
commit e73584a3cd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 3
      clients/pkg/promtail/scrapeconfig/scrapeconfig.go
  3. 32
      clients/pkg/promtail/targets/gcplog/push_target.go
  4. 64
      clients/pkg/promtail/targets/gcplog/push_target_test.go
  5. 5
      docs/sources/clients/promtail/configuration.md

@ -51,6 +51,7 @@
* [5977](https://github.com/grafana/loki/pull/5977) **juissi-t** lambda-promtail: Add support for Kinesis data stream events
* [6828](https://github.com/grafana/loki/pull/6828) **alexandre1984rj** Add the BotScore and BotScoreSrc fields once the Cloudflare API returns those two fields on the list of all available log fields.
* [6656](https://github.com/grafana/loki/pull/6656) **carlospeon**: Allow promtail to add matches to the journal reader
* [7401](https://github.com/grafana/loki/pull/7401) **thepalbi**: Add timeout to GCP Logs push target
##### Fixes
* [6766](https://github.com/grafana/loki/pull/6766) **kavirajk**: fix(logql): Make `LabelSampleExtractor` ignore processing the line if it doesn't contain that specific label. Fixes unwrap behavior explained in the issue https://github.com/grafana/loki/issues/6713

@ -367,6 +367,9 @@ type GcplogTargetConfig struct {
// Defaults to `pull` for backwards compatibility reasons.
SubscriptionType string `yaml:"subscription_type"`
// PushTimeout is used to set a maximum processing time for each incoming GCP Logs entry. Used just for `push` subscription type.
PushTimeout time.Duration `yaml:"push_timeout"`
// Server is the weaveworks server config for listening connections. Used just for `push` subscription type.
Server server.Config `yaml:"server"`
}

@ -1,6 +1,7 @@
package gcplog
import (
"context"
"encoding/json"
"fmt"
"io"
@ -51,6 +52,8 @@ func newPushTarget(metrics *Metrics, logger log.Logger, handler api.EntryHandler
return nil, fmt.Errorf("failed to parse configs and override defaults when configuring gcp push target: %w", err)
}
config.Server = mergedServerConfigs
// Avoid logging entire received request on failures
config.Server.ExcludeRequestInLog = true
err = ht.run()
if err != nil {
@ -83,8 +86,8 @@ func (h *pushTarget) run() error {
if err != nil {
return err
}
h.server = srv
h.server.HTTP.Path("/gcp/api/v1/push").Methods("POST").Handler(http.HandlerFunc(h.push))
go func() {
@ -100,6 +103,14 @@ func (h *pushTarget) run() error {
func (h *pushTarget) push(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
// Create no-op context.WithTimeout returns to simplify logic
ctx := r.Context()
cancel := context.CancelFunc(func() {})
if h.config.PushTimeout != 0 {
ctx, cancel = context.WithTimeout(r.Context(), h.config.PushTimeout)
}
defer cancel()
pushMessage := PushMessage{}
bs, err := io.ReadAll(r.Body)
if err != nil {
@ -132,11 +143,28 @@ func (h *pushTarget) push(w http.ResponseWriter, r *http.Request) {
level.Debug(h.logger).Log("msg", fmt.Sprintf("Received line: %s", entry.Line))
h.entries <- entry
if err := h.doSendEntry(ctx, entry); err != nil {
// NOTE: timeout errors can be tracked with a metrics exporter from the spun weave-works server, and the 503 status code
// promtail_gcp_push_target_{job name}_request_duration_seconds_count{status_code="503"}
level.Warn(h.logger).Log("msg", "error sending log entry", "err", err.Error())
http.Error(w, err.Error(), http.StatusServiceUnavailable)
return
}
h.metrics.gcpPushEntries.WithLabelValues().Inc()
w.WriteHeader(http.StatusNoContent)
}
func (h *pushTarget) doSendEntry(ctx context.Context, entry api.Entry) error {
select {
// Timeout the api.Entry channel send operation, which is the only blocking operation in the handler
case <-ctx.Done():
return fmt.Errorf("timeout exceeded: %w", ctx.Err())
case h.entries <- entry:
return nil
}
}
func (h *pushTarget) Type() target.TargetType {
return target.GcplogTargetType
}

@ -7,9 +7,12 @@ import (
"net/http"
"os"
"strings"
"sync"
"testing"
"time"
"github.com/grafana/loki/clients/pkg/promtail/api"
"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
@ -371,6 +374,67 @@ func TestPushTarget_ErroneousPayloadsAreRejected(t *testing.T) {
}
}
// blockingEntryHandler implements an api.EntryHandler that has no space in it's receive channel, blocking when an api.Entry
// is sent down the pipe.
type blockingEntryHandler struct {
ch chan api.Entry
once sync.Once
}
func newBlockingEntryHandler() *blockingEntryHandler {
filledChannel := make(chan api.Entry)
return &blockingEntryHandler{ch: filledChannel}
}
func (t *blockingEntryHandler) Chan() chan<- api.Entry {
return t.ch
}
func (t *blockingEntryHandler) Stop() {
t.once.Do(func() { close(t.ch) })
}
func TestPushTarget_UsePushTimeout(t *testing.T) {
w := log.NewSyncWriter(os.Stderr)
logger := log.NewLogfmtLogger(w)
eh := newBlockingEntryHandler()
defer eh.Stop()
serverConfig, port, err := getServerConfigWithAvailablePort()
require.NoError(t, err, "error generating server config or finding open port")
config := &scrapeconfig.GcplogTargetConfig{
Server: serverConfig,
Labels: nil,
UseIncomingTimestamp: true,
SubscriptionType: "push",
PushTimeout: time.Second,
}
prometheus.DefaultRegisterer = prometheus.NewRegistry()
metrics := gcplog.NewMetrics(prometheus.DefaultRegisterer)
tenantIDRelabelConfig := []*relabel.Config{
{
SourceLabels: model.LabelNames{"__tenant_id__"},
Regex: relabel.MustNewRegexp("(.*)"),
Replacement: "$1",
TargetLabel: "tenant_id",
Action: relabel.Replace,
},
}
pt, err := gcplog.NewGCPLogTarget(metrics, logger, eh, tenantIDRelabelConfig, t.Name()+"_test_job", config)
require.NoError(t, err)
defer func() {
_ = pt.Stop()
}()
req, err := makeGCPPushRequest(fmt.Sprintf("http://%s:%d", localhost, port), testPayload)
require.NoError(t, err, "expected request to be created successfully")
res, err := http.DefaultClient.Do(req)
require.NoError(t, err)
require.Equal(t, http.StatusServiceUnavailable, res.StatusCode, "expected timeout response")
}
func waitForMessages(eh *fake.Client) {
countdown := 1000
for len(eh.Received()) != 1 && countdown > 0 {

@ -964,6 +964,11 @@ When using the `push` subscription type, keep in mind:
# timestamp to the log when it was processed.
[use_incoming_timestamp: <boolean> | default = false]
# If the subscription_type is push, configures an HTTP handler timeout. If processing the incoming GCP Logs request takes longer
# than the configured duration, that is processing and then sending the entry down the processing pipeline, the server will abort
# and respond with a 503 HTTP status code.
[push_timeout: <duration>| default = 0 (no timeout)]
# Label map to add to every log message.
labels:
[ <labelname>: <labelvalue> ... ]

Loading…
Cancel
Save