Tests: Reduce flakiness of GCPLog and integration tests (#7273)

**What this PR does / why we need it**:
- Reduce flakiness of GCPLog tests by reusing context and stopping GCP
topic
- Reduce flakiness of integration tests by:
  - using port as they're assigned
  - reusing context
- adding timeout to calls (hence, if something get stuck, tests fail
earlier)
pull/7285/head
Dylan Guedes 3 years ago committed by GitHub
parent 77c10da81d
commit 2dc5a71a67
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 37
      clients/pkg/promtail/targets/gcplog/pull_target_test.go
  2. 56
      integration/client/client.go
  3. 37
      integration/cluster/cluster.go
  4. 31
      integration/loki_micro_services_delete_test.go
  5. 19
      integration/loki_micro_services_test.go
  6. 7
      integration/loki_simple_scalable_test.go
  7. 7
      integration/loki_single_binary_test.go

@ -25,31 +25,32 @@ import (
func TestPullTarget_Run(t *testing.T) {
// Goal: Check message written to pubsub topic is received by the target.
tt, apiclient, pubsubClient, teardown := testPullTarget(t)
ctx := context.Background()
tt, apiclient, pubsubClient, teardown := testPullTarget(ctx, t)
defer teardown()
// seed pubsub
ctx := context.Background()
tp, err := pubsubClient.CreateTopic(ctx, topic)
require.NoError(t, err)
defer tp.Stop()
_, err = pubsubClient.CreateSubscription(ctx, subscription, pubsub.SubscriptionConfig{
Topic: tp,
})
require.NoError(t, err)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
_ = tt.run()
tt.run() //nolint:errcheck
}()
publishMessage(t, tp)
publishMessage(ctx, t, tp)
// Wait till message is received by the run loop.
// NOTE(kavi): sleep is not ideal. but not other way to confirm if api.Handler received messages
time.Sleep(1 * time.Second)
time.Sleep(500 * time.Millisecond)
err = tt.Stop()
require.NoError(t, err)
@ -57,6 +58,8 @@ func TestPullTarget_Run(t *testing.T) {
// wait till `run` stops.
wg.Wait()
// Sleep one more time before reading from api.Received.
time.Sleep(500 * time.Millisecond)
assert.Equal(t, 1, len(apiclient.Received()))
}
@ -65,7 +68,8 @@ func TestPullTarget_Stop(t *testing.T) {
errs := make(chan error, 1)
tt, _, _, teardown := testPullTarget(t)
ctx := context.Background()
tt, _, _, teardown := testPullTarget(ctx, t)
defer teardown()
var wg sync.WaitGroup
@ -90,30 +94,33 @@ func TestPullTarget_Stop(t *testing.T) {
}
func TestPullTarget_Type(t *testing.T) {
tt, _, _, teardown := testPullTarget(t)
ctx := context.Background()
tt, _, _, teardown := testPullTarget(ctx, t)
defer teardown()
assert.Equal(t, target.TargetType("Gcplog"), tt.Type())
}
func TestPullTarget_Ready(t *testing.T) {
tt, _, _, teardown := testPullTarget(t)
ctx := context.Background()
tt, _, _, teardown := testPullTarget(ctx, t)
defer teardown()
assert.Equal(t, true, tt.Ready())
}
func TestPullTarget_Labels(t *testing.T) {
tt, _, _, teardown := testPullTarget(t)
ctx := context.Background()
tt, _, _, teardown := testPullTarget(ctx, t)
defer teardown()
assert.Equal(t, model.LabelSet{"job": "test-gcplogtarget"}, tt.Labels())
}
func testPullTarget(t *testing.T) (*pullTarget, *fake.Client, *pubsub.Client, func()) {
func testPullTarget(ctx context.Context, t *testing.T) (*pullTarget, *fake.Client, *pubsub.Client, func()) {
t.Helper()
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(ctx)
mockSvr := pstest.NewServer()
conn, err := grpc.Dial(mockSvr.Addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
@ -146,11 +153,11 @@ func testPullTarget(t *testing.T) (*pullTarget, *fake.Client, *pubsub.Client, fu
}
}
func publishMessage(t *testing.T, topic *pubsub.Topic) {
func publishMessage(ctx context.Context, t *testing.T, topic *pubsub.Topic) {
t.Helper()
ctx := context.Background()
res := topic.Publish(ctx, &pubsub.Message{Data: []byte(gcpLogEntry)})
_, err := res.Get(ctx) // wait till message is actully published
require.NoError(t, err)
}

@ -2,6 +2,7 @@ package client
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
@ -11,8 +12,12 @@ import (
"strconv"
"strings"
"time"
"github.com/weaveworks/common/user"
)
const requestTimeout = 30 * time.Second
type roundTripper struct {
instanceID string
token string
@ -187,7 +192,7 @@ func (c *Client) Metrics() (string, error) {
// Flush all in-memory chunks held by the ingesters to the backing store
func (c *Client) Flush() error {
req, err := c.request("POST", fmt.Sprintf("%s/flush", c.baseURL))
req, err := c.request(context.Background(), "POST", fmt.Sprintf("%s/flush", c.baseURL))
if err != nil {
return err
}
@ -377,8 +382,11 @@ type Rules struct {
}
// RunRangeQuery runs a query and returns an error if anything went wrong
func (c *Client) RunRangeQuery(query string) (*Response, error) {
buf, statusCode, err := c.run(c.rangeQueryURL(query))
func (c *Client) RunRangeQuery(ctx context.Context, query string) (*Response, error) {
ctx, cancelFunc := context.WithTimeout(ctx, requestTimeout)
defer cancelFunc()
buf, statusCode, err := c.run(ctx, c.rangeQueryURL(query))
if err != nil {
return nil, err
}
@ -387,7 +395,10 @@ func (c *Client) RunRangeQuery(query string) (*Response, error) {
}
// RunQuery runs a query and returns an error if anything went wrong
func (c *Client) RunQuery(query string) (*Response, error) {
func (c *Client) RunQuery(ctx context.Context, query string) (*Response, error) {
ctx, cancelFunc := context.WithTimeout(ctx, requestTimeout)
defer cancelFunc()
v := url.Values{}
v.Set("query", query)
v.Set("time", formatTS(c.Now.Add(time.Second)))
@ -399,23 +410,27 @@ func (c *Client) RunQuery(query string) (*Response, error) {
u.Path = "/loki/api/v1/query"
u.RawQuery = v.Encode()
buf, statusCode, err := c.run(u.String())
buf, statusCode, err := c.run(ctx, u.String())
if err != nil {
return nil, err
}
return c.parseResponse(buf, statusCode)
}
// GetRules returns the loki ruler rules
func (c *Client) GetRules() (*RulesResponse, error) {
func (c *Client) GetRules(ctx context.Context) (*RulesResponse, error) {
ctx, cancelFunc := context.WithTimeout(ctx, requestTimeout)
defer cancelFunc()
u, err := url.Parse(c.baseURL)
if err != nil {
return nil, err
}
u.Path = "/prometheus/api/v1/rules"
buf, _, err := c.run(u.String())
buf, _, err := c.run(ctx, u.String())
if err != nil {
return nil, err
}
@ -458,10 +473,13 @@ func (c *Client) rangeQueryURL(query string) string {
return u.String()
}
func (c *Client) LabelNames() ([]string, error) {
func (c *Client) LabelNames(ctx context.Context) ([]string, error) {
ctx, cancelFunc := context.WithTimeout(ctx, requestTimeout)
defer cancelFunc()
url := fmt.Sprintf("%s/loki/api/v1/labels", c.baseURL)
req, err := c.request("GET", url)
req, err := c.request(ctx, "GET", url)
if err != nil {
return nil, err
}
@ -473,7 +491,7 @@ func (c *Client) LabelNames() ([]string, error) {
defer res.Body.Close()
if res.StatusCode/100 != 2 {
return nil, fmt.Errorf("Unexpected status code of %d", res.StatusCode)
return nil, fmt.Errorf("unexpected status code of %d", res.StatusCode)
}
var values struct {
@ -487,10 +505,13 @@ func (c *Client) LabelNames() ([]string, error) {
}
// LabelValues return a LabelValues query
func (c *Client) LabelValues(labelName string) ([]string, error) {
func (c *Client) LabelValues(ctx context.Context, labelName string) ([]string, error) {
ctx, cancelFunc := context.WithTimeout(ctx, requestTimeout)
defer cancelFunc()
url := fmt.Sprintf("%s/loki/api/v1/label/%s/values", c.baseURL, url.PathEscape(labelName))
req, err := c.request("GET", url)
req, err := c.request(ctx, "GET", url)
if err != nil {
return nil, err
}
@ -502,7 +523,7 @@ func (c *Client) LabelValues(labelName string) ([]string, error) {
defer res.Body.Close()
if res.StatusCode/100 != 2 {
return nil, fmt.Errorf("Unexpected status code of %d", res.StatusCode)
return nil, fmt.Errorf("unexpected status code of %d", res.StatusCode)
}
var values struct {
@ -515,8 +536,9 @@ func (c *Client) LabelValues(labelName string) ([]string, error) {
return values.Data, nil
}
func (c *Client) request(method string, url string) (*http.Request, error) {
req, err := http.NewRequest(method, url, nil)
func (c *Client) request(ctx context.Context, method string, url string) (*http.Request, error) {
ctx = user.InjectOrgID(ctx, c.instanceID)
req, err := http.NewRequestWithContext(ctx, method, url, nil)
if err != nil {
return nil, err
}
@ -524,8 +546,8 @@ func (c *Client) request(method string, url string) (*http.Request, error) {
return req, nil
}
func (c *Client) run(u string) ([]byte, int, error) {
req, err := c.request("GET", u)
func (c *Client) run(ctx context.Context, u string) ([]byte, int, error) {
req, err := c.request(ctx, "GET", u)
if err != nil {
return nil, 0, err
}

@ -1,6 +1,7 @@
package cluster
import (
"context"
"errors"
"flag"
"fmt"
@ -165,6 +166,21 @@ func New() *Cluster {
func (c *Cluster) Run() error {
for _, component := range c.components {
if component.running {
continue
}
var err error
component.httpPort, err = getFreePort()
if err != nil {
panic(fmt.Errorf("error allocating HTTP port: %w", err))
}
component.grpcPort, err = getFreePort()
if err != nil {
panic(fmt.Errorf("error allocating GRPC port: %w", err))
}
if err := component.run(); err != nil {
return err
}
@ -172,6 +188,9 @@ func (c *Cluster) Run() error {
return nil
}
func (c *Cluster) Cleanup() error {
_, cancelFunc := context.WithTimeout(context.Background(), time.Second*3)
defer cancelFunc()
var (
files []string
dirs []string
@ -210,17 +229,7 @@ func (c *Cluster) AddComponent(name string, flags ...string) *Component {
name: name,
cluster: c,
flags: flags,
}
var err error
component.httpPort, err = getFreePort()
if err != nil {
panic(fmt.Errorf("error allocating HTTP port: %w", err))
}
component.grpcPort, err = getFreePort()
if err != nil {
panic(fmt.Errorf("error allocating GRPC port: %w", err))
running: false,
}
c.components = append(c.components, component)
@ -242,6 +251,8 @@ type Component struct {
rulesPath string
RulesTenant string
running bool
RemoteWriteUrls []string
}
@ -325,6 +336,8 @@ func (c *Component) writeConfig() error {
}
func (c *Component) run() error {
c.running = true
if err := c.writeConfig(); err != nil {
return err
}
@ -359,7 +372,7 @@ func (c *Component) run() error {
go func() {
for {
time.Sleep(time.Millisecond * 200)
if c.loki.Server.HTTP == nil {
if c.loki == nil || c.loki.Server == nil || c.loki.Server.HTTP == nil {
continue
}

@ -1,6 +1,7 @@
package integration
import (
"context"
"net/http"
"testing"
"time"
@ -18,6 +19,7 @@ func TestMicroServicesDeleteRequest(t *testing.T) {
assert.NoError(t, clu.Cleanup())
}()
// initially, run only compactor, index-gateway and distributor.
var (
tCompactor = clu.AddComponent(
"compactor",
@ -36,6 +38,11 @@ func TestMicroServicesDeleteRequest(t *testing.T) {
"distributor",
"-target=distributor",
)
)
require.NoError(t, clu.Run())
// then, run only ingester and query-scheduler.
var (
tIngester = clu.AddComponent(
"ingester",
"-target=ingester",
@ -46,6 +53,11 @@ func TestMicroServicesDeleteRequest(t *testing.T) {
"-target=query-scheduler",
"-boltdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL().Host,
)
)
require.NoError(t, clu.Run())
// finally, run the query-frontend and querier.
var (
tQueryFrontend = clu.AddComponent(
"query-frontend",
"-target=query-frontend",
@ -61,6 +73,12 @@ func TestMicroServicesDeleteRequest(t *testing.T) {
"-boltdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL().Host,
"-common.compactor-address="+tCompactor.HTTPURL().String(),
)
)
require.NoError(t, clu.Run())
remoteCalled := []bool{false, false}
var (
tRuler = clu.AddComponent(
"ruler",
"-target=ruler",
@ -68,8 +86,6 @@ func TestMicroServicesDeleteRequest(t *testing.T) {
)
)
remoteCalled := []bool{false, false}
handler1 := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/api/v1/write" {
t.Errorf("Expected to request '/api/v1/write', got: %s", r.URL.Path)
@ -79,6 +95,7 @@ func TestMicroServicesDeleteRequest(t *testing.T) {
w.WriteHeader(http.StatusOK)
})
server1 := cluster.NewRemoteWriteServer(&handler1)
defer server1.Close()
handler2 := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/api/v1/write" {
@ -89,15 +106,13 @@ func TestMicroServicesDeleteRequest(t *testing.T) {
w.WriteHeader(http.StatusOK)
})
server2 := cluster.NewRemoteWriteServer(&handler2)
defer server1.Close()
defer server2.Close()
tRuler.RemoteWriteUrls = []string{
server1.URL,
server2.URL,
}
// initialize only the ruler now.
require.NoError(t, clu.Run())
tenantID := randStringRunes()
@ -131,7 +146,7 @@ func TestMicroServicesDeleteRequest(t *testing.T) {
})
t.Run("query", func(t *testing.T) {
resp, err := cliQueryFrontend.RunRangeQuery(`{job="fake"}`)
resp, err := cliQueryFrontend.RunRangeQuery(context.Background(), `{job="fake"}`)
require.NoError(t, err)
assert.Equal(t, "streams", resp.Data.ResultType)
@ -177,7 +192,7 @@ func TestMicroServicesDeleteRequest(t *testing.T) {
// Query lines
t.Run("query", func(t *testing.T) {
resp, err := cliQueryFrontend.RunRangeQuery(`{job="fake"}`)
resp, err := cliQueryFrontend.RunRangeQuery(context.Background(), `{job="fake"}`)
require.NoError(t, err)
assert.Equal(t, "streams", resp.Data.ResultType)
@ -193,7 +208,7 @@ func TestMicroServicesDeleteRequest(t *testing.T) {
t.Run("ruler", func(t *testing.T) {
// Check rules are read correctly.
resp, err := cliRuler.GetRules()
resp, err := cliRuler.GetRules(context.Background())
require.NoError(t, err)
require.NotNil(t, resp)

@ -1,6 +1,7 @@
package integration
import (
"context"
"testing"
"time"
@ -17,6 +18,7 @@ func TestMicroServicesIngestQuery(t *testing.T) {
assert.NoError(t, clu.Cleanup())
}()
// run initially the compactor, indexgateway, and distributor.
var (
tCompactor = clu.AddComponent(
"compactor",
@ -35,6 +37,11 @@ func TestMicroServicesIngestQuery(t *testing.T) {
"distributor",
"-target=distributor",
)
)
require.NoError(t, clu.Run())
// then, run only the ingester and query scheduler.
var (
tIngester = clu.AddComponent(
"ingester",
"-target=ingester",
@ -45,6 +52,11 @@ func TestMicroServicesIngestQuery(t *testing.T) {
"-target=query-scheduler",
"-boltdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL().Host,
)
)
require.NoError(t, clu.Run())
// finally, run the query-frontend and querier.
var (
tQueryFrontend = clu.AddComponent(
"query-frontend",
"-target=query-frontend",
@ -60,7 +72,6 @@ func TestMicroServicesIngestQuery(t *testing.T) {
"-common.compactor-address="+tCompactor.HTTPURL().String(),
)
)
require.NoError(t, clu.Run())
tenantID := randStringRunes()
@ -90,7 +101,7 @@ func TestMicroServicesIngestQuery(t *testing.T) {
})
t.Run("query", func(t *testing.T) {
resp, err := cliQueryFrontend.RunRangeQuery(`{job="fake"}`)
resp, err := cliQueryFrontend.RunRangeQuery(context.Background(), `{job="fake"}`)
require.NoError(t, err)
assert.Equal(t, "streams", resp.Data.ResultType)
@ -104,13 +115,13 @@ func TestMicroServicesIngestQuery(t *testing.T) {
})
t.Run("label-names", func(t *testing.T) {
resp, err := cliQueryFrontend.LabelNames()
resp, err := cliQueryFrontend.LabelNames(context.Background())
require.NoError(t, err)
assert.ElementsMatch(t, []string{"job"}, resp)
})
t.Run("label-values", func(t *testing.T) {
resp, err := cliQueryFrontend.LabelValues("job")
resp, err := cliQueryFrontend.LabelValues(context.Background(), "job")
require.NoError(t, err)
assert.ElementsMatch(t, []string{"fake"}, resp)
})

@ -1,6 +1,7 @@
package integration
import (
"context"
"testing"
"time"
@ -48,7 +49,7 @@ func TestSimpleScalableIngestQuery(t *testing.T) {
})
t.Run("query", func(t *testing.T) {
resp, err := cliRead.RunRangeQuery(`{job="fake"}`)
resp, err := cliRead.RunRangeQuery(context.Background(), `{job="fake"}`)
require.NoError(t, err)
assert.Equal(t, "streams", resp.Data.ResultType)
@ -62,13 +63,13 @@ func TestSimpleScalableIngestQuery(t *testing.T) {
})
t.Run("label-names", func(t *testing.T) {
resp, err := cliRead.LabelNames()
resp, err := cliRead.LabelNames(context.Background())
require.NoError(t, err)
assert.ElementsMatch(t, []string{"job"}, resp)
})
t.Run("label-values", func(t *testing.T) {
resp, err := cliRead.LabelValues("job")
resp, err := cliRead.LabelValues(context.Background(), "job")
require.NoError(t, err)
assert.ElementsMatch(t, []string{"fake"}, resp)
})

@ -1,6 +1,7 @@
package integration
import (
"context"
"testing"
"time"
@ -46,7 +47,7 @@ func TestSingleBinaryIngestQuery(t *testing.T) {
})
t.Run("query", func(t *testing.T) {
resp, err := cli.RunRangeQuery(`{job="fake"}`)
resp, err := cli.RunRangeQuery(context.Background(), `{job="fake"}`)
require.NoError(t, err)
assert.Equal(t, "streams", resp.Data.ResultType)
@ -60,13 +61,13 @@ func TestSingleBinaryIngestQuery(t *testing.T) {
})
t.Run("label-names", func(t *testing.T) {
resp, err := cli.LabelNames()
resp, err := cli.LabelNames(context.Background())
require.NoError(t, err)
assert.ElementsMatch(t, []string{"job"}, resp)
})
t.Run("label-values", func(t *testing.T) {
resp, err := cli.LabelValues("job")
resp, err := cli.LabelValues(context.Background(), "job")
require.NoError(t, err)
assert.ElementsMatch(t, []string{"fake"}, resp)
})

Loading…
Cancel
Save