mirror of https://github.com/grafana/loki
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
211 lines
5.1 KiB
211 lines
5.1 KiB
|
7 years ago
|
package ingester
|
||
|
|
|
||
|
|
import (
|
||
|
|
"fmt"
|
||
|
|
"io"
|
||
|
|
"io/ioutil"
|
||
|
|
"sort"
|
||
|
|
"testing"
|
||
|
|
"time"
|
||
|
|
|
||
|
|
"google.golang.org/grpc"
|
||
|
|
"google.golang.org/grpc/health/grpc_health_v1"
|
||
|
|
|
||
|
|
"github.com/cortexproject/cortex/pkg/ring"
|
||
|
|
"github.com/stretchr/testify/assert"
|
||
|
|
"github.com/stretchr/testify/require"
|
||
|
|
"github.com/weaveworks/common/user"
|
||
|
|
|
||
|
|
"github.com/grafana/loki/pkg/ingester/client"
|
||
|
|
"github.com/grafana/loki/pkg/logproto"
|
||
|
|
"golang.org/x/net/context"
|
||
|
|
)
|
||
|
|
|
||
|
|
func TestTransferOut(t *testing.T) {
|
||
|
|
f := newTestIngesterFactory(t)
|
||
|
|
|
||
|
|
ing := f.getIngester(time.Duration(0))
|
||
|
|
|
||
|
|
// Push some data into our original ingester
|
||
|
|
ctx := user.InjectOrgID(context.Background(), "test")
|
||
|
|
_, err := ing.Push(ctx, &logproto.PushRequest{
|
||
|
|
Streams: []*logproto.Stream{
|
||
|
|
{
|
||
|
|
Entries: []logproto.Entry{
|
||
|
|
{Line: "line 0", Timestamp: time.Unix(0, 0)},
|
||
|
|
{Line: "line 1", Timestamp: time.Unix(1, 0)},
|
||
|
|
},
|
||
|
|
Labels: `{foo="bar",bar="baz1"}`,
|
||
|
|
},
|
||
|
|
{
|
||
|
|
Entries: []logproto.Entry{
|
||
|
|
{Line: "line 2", Timestamp: time.Unix(2, 0)},
|
||
|
|
{Line: "line 3", Timestamp: time.Unix(3, 0)},
|
||
|
|
},
|
||
|
|
Labels: `{foo="bar",bar="baz2"}`,
|
||
|
|
},
|
||
|
|
},
|
||
|
|
})
|
||
|
|
require.NoError(t, err)
|
||
|
|
|
||
|
|
assert.Len(t, ing.instances, 1)
|
||
|
|
if assert.Contains(t, ing.instances, "test") {
|
||
|
|
assert.Len(t, ing.instances["test"].streams, 2)
|
||
|
|
}
|
||
|
|
|
||
|
|
// Create a new ingester and trasfer data to it
|
||
|
|
ing2 := f.getIngester(time.Second * 60)
|
||
|
|
ing.Shutdown()
|
||
|
|
|
||
|
|
assert.Len(t, ing2.instances, 1)
|
||
|
|
if assert.Contains(t, ing2.instances, "test") {
|
||
|
|
assert.Len(t, ing2.instances["test"].streams, 2)
|
||
|
|
|
||
|
|
lines := []string{}
|
||
|
|
|
||
|
|
// Get all the lines back and make sure the blocks transferred successfully
|
||
|
|
for _, stream := range ing2.instances["test"].streams {
|
||
|
|
it, err := stream.Iterator(
|
||
|
|
time.Unix(0, 0),
|
||
|
|
time.Unix(10, 0),
|
||
|
|
logproto.FORWARD,
|
||
|
|
func([]byte) bool { return true },
|
||
|
|
)
|
||
|
|
if !assert.NoError(t, err) {
|
||
|
|
continue
|
||
|
|
}
|
||
|
|
|
||
|
|
for it.Next() {
|
||
|
|
entry := it.Entry()
|
||
|
|
lines = append(lines, entry.Line)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
sort.Strings(lines)
|
||
|
|
|
||
|
|
assert.Equal(
|
||
|
|
t,
|
||
|
|
[]string{"line 0", "line 1", "line 2", "line 3"},
|
||
|
|
lines,
|
||
|
|
)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
type testIngesterFactory struct {
|
||
|
|
t *testing.T
|
||
|
|
store ring.KVClient
|
||
|
|
n int
|
||
|
|
ingesters map[string]*Ingester
|
||
|
|
}
|
||
|
|
|
||
|
|
func newTestIngesterFactory(t *testing.T) *testIngesterFactory {
|
||
|
|
return &testIngesterFactory{
|
||
|
|
t: t,
|
||
|
|
store: ring.NewInMemoryKVClient(ring.ProtoCodec{Factory: ring.ProtoDescFactory}),
|
||
|
|
ingesters: make(map[string]*Ingester),
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func (f *testIngesterFactory) getIngester(joinAfter time.Duration) *Ingester {
|
||
|
|
f.n++
|
||
|
|
|
||
|
|
cfg := defaultIngesterTestConfig()
|
||
|
|
cfg.MaxTransferRetries = 1
|
||
|
|
cfg.LifecyclerConfig.ClaimOnRollout = true
|
||
|
|
cfg.LifecyclerConfig.ID = fmt.Sprintf("localhost-%d", f.n)
|
||
|
|
cfg.LifecyclerConfig.RingConfig.KVStore.Mock = f.store
|
||
|
|
cfg.LifecyclerConfig.JoinAfter = joinAfter
|
||
|
|
cfg.LifecyclerConfig.Addr = cfg.LifecyclerConfig.ID
|
||
|
|
|
||
|
|
cfg.ingesterClientFactory = func(cfg client.Config, addr string) (grpc_health_v1.HealthClient, error) {
|
||
|
|
ingester, ok := f.ingesters[addr]
|
||
|
|
if !ok {
|
||
|
|
return nil, fmt.Errorf("no ingester %s", addr)
|
||
|
|
}
|
||
|
|
|
||
|
|
return struct {
|
||
|
|
logproto.PusherClient
|
||
|
|
logproto.QuerierClient
|
||
|
|
logproto.IngesterClient
|
||
|
|
grpc_health_v1.HealthClient
|
||
|
|
io.Closer
|
||
|
|
}{
|
||
|
|
PusherClient: nil,
|
||
|
|
QuerierClient: nil,
|
||
|
|
IngesterClient: &testIngesterClient{t: f.t, i: ingester},
|
||
|
|
Closer: ioutil.NopCloser(nil),
|
||
|
|
}, nil
|
||
|
|
}
|
||
|
|
|
||
|
|
_, ing := newTestStore(f.t, cfg)
|
||
|
|
f.ingesters[fmt.Sprintf("%s:0", cfg.LifecyclerConfig.ID)] = ing
|
||
|
|
|
||
|
|
// NB there's some kind of race condition with the in-memory KV client when
|
||
|
|
// we don't give the ingester a little bit of time to initialize. a 100ms
|
||
|
|
// wait time seems effective.
|
||
|
|
time.Sleep(time.Millisecond * 100)
|
||
|
|
return ing
|
||
|
|
}
|
||
|
|
|
||
|
|
type testIngesterClient struct {
|
||
|
|
t *testing.T
|
||
|
|
i *Ingester
|
||
|
|
}
|
||
|
|
|
||
|
|
func (c *testIngesterClient) TransferChunks(context.Context, ...grpc.CallOption) (logproto.Ingester_TransferChunksClient, error) {
|
||
|
|
chunkCh := make(chan *logproto.TimeSeriesChunk)
|
||
|
|
respCh := make(chan *logproto.TransferChunksResponse)
|
||
|
|
|
||
|
|
client := &testTransferChunksClient{ch: chunkCh, resp: respCh}
|
||
|
|
go func() {
|
||
|
|
server := &testTransferChunksServer{ch: chunkCh, resp: respCh}
|
||
|
|
err := c.i.TransferChunks(server)
|
||
|
|
require.NoError(c.t, err)
|
||
|
|
}()
|
||
|
|
|
||
|
|
return client, nil
|
||
|
|
}
|
||
|
|
|
||
|
|
type testTransferChunksClient struct {
|
||
|
|
ch chan *logproto.TimeSeriesChunk
|
||
|
|
resp chan *logproto.TransferChunksResponse
|
||
|
|
|
||
|
|
grpc.ClientStream
|
||
|
|
}
|
||
|
|
|
||
|
|
func (c *testTransferChunksClient) Send(chunk *logproto.TimeSeriesChunk) error {
|
||
|
|
c.ch <- chunk
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func (c *testTransferChunksClient) CloseAndRecv() (*logproto.TransferChunksResponse, error) {
|
||
|
|
close(c.ch)
|
||
|
|
resp := <-c.resp
|
||
|
|
close(c.resp)
|
||
|
|
return resp, nil
|
||
|
|
}
|
||
|
|
|
||
|
|
type testTransferChunksServer struct {
|
||
|
|
ch chan *logproto.TimeSeriesChunk
|
||
|
|
resp chan *logproto.TransferChunksResponse
|
||
|
|
|
||
|
|
grpc.ServerStream
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *testTransferChunksServer) Context() context.Context {
|
||
|
|
return context.Background()
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *testTransferChunksServer) SendAndClose(resp *logproto.TransferChunksResponse) error {
|
||
|
|
s.resp <- resp
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *testTransferChunksServer) Recv() (*logproto.TimeSeriesChunk, error) {
|
||
|
|
chunk, ok := <-s.ch
|
||
|
|
if !ok {
|
||
|
|
return nil, io.EOF
|
||
|
|
}
|
||
|
|
return chunk, nil
|
||
|
|
}
|