Add scaffold and distributor implementation.

pull/1/head
Tom Wilkie 8 years ago
parent a1d66f42c0
commit ca6dd2c8d5
  1. 3
      .gitignore
  2. 130
      Makefile
  3. 23
      build-image/Dockerfile
  4. 17
      build-image/build.sh
  5. 5
      cmd/distributor/main.go
  6. 5
      cmd/ingester/main.go
  7. 201
      pkg/distributor/distributor.go
  8. 42
      pkg/ingester/client/client.go
  9. 14
      pkg/ingester/ingester.go
  10. 28
      pkg/logproto/logproto.proto
  11. 9
      tools/image-tag

3
.gitignore vendored

@ -0,0 +1,3 @@
.uptodate
*.pb.go
.pkg

@ -0,0 +1,130 @@
.PHONY: all test clean images protos
.DEFAULT_GOAL := all
# Boiler plate for bulding Docker containers.
# All this must go at top of file I'm afraid.
IMAGE_PREFIX ?= quay.io/grafana/logish-
IMAGE_TAG := $(shell ./tools/image-tag)
UPTODATE := .uptodate
# Building Docker images is now automated. The convention is every directory
# with a Dockerfile in it builds an image calls quay.io/grafana/logish-<dirname>.
# Dependencies (i.e. things that go in the image) still need to be explicitly
# declared.
%/$(UPTODATE): %/Dockerfile
$(SUDO) docker build -t $(IMAGE_PREFIX)$(shell basename $(@D)) $(@D)/
$(SUDO) docker tag $(IMAGE_PREFIX)$(shell basename $(@D)) $(IMAGE_PREFIX)$(shell basename $(@D)):$(IMAGE_TAG)
touch $@
# Get a list of directories containing Dockerfiles
DOCKERFILES := $(shell find . -name tools -prune -o -name vendor -prune -o -type f -name 'Dockerfile' -print)
UPTODATE_FILES := $(patsubst %/Dockerfile,%/$(UPTODATE),$(DOCKERFILES))
DOCKER_IMAGE_DIRS := $(patsubst %/Dockerfile,%,$(DOCKERFILES))
IMAGE_NAMES := $(foreach dir,$(DOCKER_IMAGE_DIRS),$(patsubst %,$(IMAGE_PREFIX)%,$(shell basename $(dir))))
images:
$(info $(IMAGE_NAMES))
@echo > /dev/null
# Generating proto code is automated.
PROTO_DEFS := $(shell find . -name tools -prune -o -name vendor -prune -o -type f -name '*.proto' -print)
PROTO_GOS := $(patsubst %.proto,%.pb.go,$(PROTO_DEFS)) vendor/github.com/weaveworks/cortex/pkg/ring/ring.pb.go vendor/github.com/weaveworks/cortex/pkg/ingester/client/cortex.pb.go
# Building binaries is now automated. The convention is to build a binary
# for every directory with main.go in it, in the ./cmd directory.
MAIN_GO := $(shell find . -name tools -prune -o -name vendor -prune -o -type f -name 'main.go' -print)
EXES := $(foreach exe, $(patsubst ./cmd/%/main.go, %, $(MAIN_GO)), ./cmd/$(exe)/$(exe))
GO_FILES := $(shell find . -name tools -prune -o -name vendor -prune -o -name cmd -prune -o -type f -name '*.go' -print)
define dep_exe
$(1): $(dir $(1))/main.go $(GO_FILES) $(PROTO_GOS)
$(dir $(1))$(UPTODATE): $(1)
endef
$(foreach exe, $(EXES), $(eval $(call dep_exe, $(exe))))
# Manually declared dependancies And what goes into each exe
pkg/logproto/logproto.pb.go: pkg/logproto/logproto.proto
vendor/github.com/weaveworks/cortex/pkg/ring/ring.pb.go: vendor/github.com/weaveworks/cortex/pkg/ring/ring.proto
all: $(UPTODATE_FILES)
test: $(PROTO_GOS)
protos: $(PROTO_GOS)
# And now what goes into each image
build-image/$(UPTODATE): build-image/*
# All the boiler plate for building golang follows:
SUDO := $(shell docker info >/dev/null 2>&1 || echo "sudo -E")
BUILD_IN_CONTAINER := true
# RM is parameterized to allow CircleCI to run builds, as it
# currently disallows `docker run --rm`. This value is overridden
# in circle.yml
RM := --rm
# TTY is parameterized to allow Google Cloud Builder to run builds,
# as it currently disallows TTY devices. This value needs to be overridden
# in any custom cloudbuild.yaml files
TTY := --tty
GO_FLAGS := -ldflags "-extldflags \"-static\" -linkmode=external -s -w" -tags netgo -i
NETGO_CHECK = @strings $@ | grep cgo_stub\\\.go >/dev/null || { \
rm $@; \
echo "\nYour go standard library was built without the 'netgo' build tag."; \
echo "To fix that, run"; \
echo " sudo go clean -i net"; \
echo " sudo go install -tags netgo std"; \
false; \
}
ifeq ($(BUILD_IN_CONTAINER),true)
$(EXES) $(PROTO_GOS) lint test shell: build-image/$(UPTODATE)
@mkdir -p $(shell pwd)/.pkg
$(SUDO) time docker run $(RM) $(TTY) -i \
-v $(shell pwd)/.pkg:/go/pkg \
-v $(shell pwd):/go/src/github.com/grafana/logish \
$(IMAGE_PREFIX)build-image $@;
configs-integration-test: build-image/$(UPTODATE)
@mkdir -p $(shell pwd)/.pkg
DB_CONTAINER="$$(docker run -d -e 'POSTGRES_DB=configs_test' postgres:9.6)"; \
$(SUDO) docker run $(RM) $(TTY) -i \
-v $(shell pwd)/.pkg:/go/pkg \
-v $(shell pwd):/go/src/github.com/grafana/logish \
-v $(shell pwd)/cmd/configs/migrations:/migrations \
--workdir /go/src/github.com/grafana/logish \
--link "$$DB_CONTAINER":configs-db.cortex.local \
$(IMAGE_PREFIX)build-image $@; \
status=$$?; \
test -n "$(CIRCLECI)" || docker rm -f "$$DB_CONTAINER"; \
exit $$status
else
$(EXES): build-image/$(UPTODATE)
go build $(GO_FLAGS) -o $@ ./$(@D)
$(NETGO_CHECK)
%.pb.go: build-image/$(UPTODATE)
case "$@" in \
vendor*) \
protoc -I ./vendor:./$(@D) --gogoslick_out=plugins=grpc:./vendor ./$(patsubst %.pb.go,%.proto,$@); \
;; \
*) \
protoc -I ./vendor:./$(@D) --gogoslick_out=Mgoogle/protobuf/timestamp.proto=github.com/gogo/protobuf/types,plugins=grpc:./$(@D) ./$(patsubst %.pb.go,%.proto,$@); \
;; \
esac
lint: build-image/$(UPTODATE)
./tools/lint -notestpackage -ignorespelling queriers -ignorespelling Queriers .
test: build-image/$(UPTODATE)
./tools/test -netgo
shell: build-image/$(UPTODATE)
bash
configs-integration-test:
/bin/bash -c "go test -tags 'netgo integration' -timeout 30s ./pkg/configs/... ./pkg/ruler/..."
endif
clean:
$(SUDO) docker rmi $(IMAGE_NAMES) >/dev/null 2>&1 || true
rm -rf $(UPTODATE_FILES) $(EXES) $(PROTO_GOS)
go clean ./...

@ -0,0 +1,23 @@
FROM golang:1.10.1-stretch
RUN apt-get update && apt-get install -y python-requests python-yaml file jq unzip protobuf-compiler libprotobuf-dev && \
rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
RUN go clean -i net && \
go install -tags netgo std && \
go install -race -tags netgo std
RUN curl -fsSLo shfmt https://github.com/mvdan/sh/releases/download/v1.3.0/shfmt_v1.3.0_linux_amd64 && \
echo "b1925c2c405458811f0c227266402cf1868b4de529f114722c2e3a5af4ac7bb2 shfmt" | sha256sum -c && \
chmod +x shfmt && \
mv shfmt /usr/bin
RUN go get -tags netgo \
github.com/fzipp/gocyclo \
github.com/golang/lint/golint \
github.com/kisielk/errcheck \
github.com/mjibson/esc \
github.com/client9/misspell/cmd/misspell \
github.com/jteeuwen/go-bindata/go-bindata \
github.com/golang/protobuf/protoc-gen-go \
github.com/gogo/protobuf/protoc-gen-gogoslick \
github.com/gogo/protobuf/gogoproto && \
rm -rf /go/pkg /go/src
COPY build.sh /
ENTRYPOINT ["/build.sh"]

@ -0,0 +1,17 @@
#!/bin/bash
set -eu
SRC_PATH=$GOPATH/src/github.com/grafana/logish
# If we run make directly, any files created on the bind mount
# will have awkward ownership. So we switch to a user with the
# same user and group IDs as source directory. We have to set a
# few things up so that sudo works without complaining later on.
uid=$(stat --format="%u" $SRC_PATH)
gid=$(stat --format="%g" $SRC_PATH)
echo "grafana:x:$uid:$gid::$SRC_PATH:/bin/bash" >>/etc/passwd
echo "grafana:*:::::::" >>/etc/shadow
echo "grafana ALL=(ALL) NOPASSWD: ALL" >>/etc/sudoers
su grafana -c "PATH=$PATH make -C $SRC_PATH BUILD_IN_CONTAINER=false $*"

@ -0,0 +1,5 @@
package main
func main() {
}

@ -0,0 +1,5 @@
package main
func main() {
}

@ -0,0 +1,201 @@
package distributor
import (
"context"
"hash/fnv"
"sync/atomic"
opentracing "github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/weaveworks/common/instrument"
"github.com/weaveworks/common/user"
cortex "github.com/weaveworks/cortex/pkg/distributor"
cortex_client "github.com/weaveworks/cortex/pkg/ingester/client"
"github.com/weaveworks/cortex/pkg/ring"
"google.golang.org/grpc/health/grpc_health_v1"
"github.com/grafana/logish/pkg/ingester/client"
"github.com/grafana/logish/pkg/logproto"
)
var (
sendDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "logish",
Name: "distributor_send_duration_seconds",
Help: "Time spent sending a sample batch to multiple replicated ingesters.",
Buckets: []float64{.001, .0025, .005, .01, .025, .05, .1, .25, .5, 1},
}, []string{"method", "status_code"})
ingesterAppends = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex",
Name: "distributor_ingester_appends_total",
Help: "The total number of batch appends sent to ingesters.",
}, []string{"ingester"})
ingesterAppendFailures = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex",
Name: "distributor_ingester_append_failures_total",
Help: "The total number of failed batch appends sent to ingesters.",
}, []string{"ingester"})
)
// Config for a Distributor.
type Config struct {
cortex.Config
ClientConfig client.Config
}
// Distributor coordinates replicates and distribution of log streams.
type Distributor struct {
cfg Config
ring ring.ReadRing
pool *cortex_client.IngesterPool
}
// New a distributor creates.
func New(cfg Config, ring ring.ReadRing) (*Distributor, error) {
factory := func(addr string) (grpc_health_v1.HealthClient, error) {
return client.New(cfg.ClientConfig, addr)
}
return &Distributor{
cfg: cfg,
ring: ring,
pool: cortex_client.NewIngesterPool(factory, cfg.RemoteTimeout),
}, nil
}
type streamTracker struct {
stream *logproto.Stream
minSuccess int
maxFailures int
succeeded int32
failed int32
}
type pushTracker struct {
samplesPending int32
samplesFailed int32
done chan struct{}
err chan error
}
// Push a set of streams.
func (d *Distributor) Push(ctx context.Context, req *logproto.WriteRequest) (*logproto.WriteResponse, error) {
userID, err := user.ExtractOrgID(ctx)
if err != nil {
return nil, err
}
// First we flatten out the request into a list of samples.
// We use the heuristic of 1 sample per TS to size the array.
// We also work out the hash value at the same time.
streams := make([]streamTracker, len(req.Streams), len(req.Streams))
keys := make([]uint32, 0, len(req.Streams))
for i, stream := range req.Streams {
keys = append(keys, tokenFor(userID, stream.Labels))
streams[i].stream = stream
}
if len(streams) == 0 {
return &logproto.WriteResponse{}, nil
}
replicationSets, err := d.ring.BatchGet(keys, ring.Write)
if err != nil {
return nil, err
}
samplesByIngester := map[*ring.IngesterDesc][]*streamTracker{}
for i, replicationSet := range replicationSets {
streams[i].minSuccess = len(replicationSet.Ingesters) - replicationSet.MaxErrors
streams[i].maxFailures = replicationSet.MaxErrors
for _, ingester := range replicationSet.Ingesters {
samplesByIngester[ingester] = append(samplesByIngester[ingester], &streams[i])
}
}
pushTracker := pushTracker{
samplesPending: int32(len(streams)),
done: make(chan struct{}),
err: make(chan error),
}
for ingester, samples := range samplesByIngester {
go func(ingester *ring.IngesterDesc, samples []*streamTracker) {
// Use a background context to make sure all ingesters get samples even if we return early
localCtx, cancel := context.WithTimeout(context.Background(), d.cfg.RemoteTimeout)
defer cancel()
localCtx = user.InjectOrgID(localCtx, userID)
if sp := opentracing.SpanFromContext(ctx); sp != nil {
localCtx = opentracing.ContextWithSpan(localCtx, sp)
}
d.sendSamples(localCtx, ingester, samples, &pushTracker)
}(ingester, samples)
}
select {
case err := <-pushTracker.err:
return nil, err
case <-pushTracker.done:
return &logproto.WriteResponse{}, nil
}
}
func (d *Distributor) sendSamples(ctx context.Context, ingester *ring.IngesterDesc, streamTrackers []*streamTracker, pushTracker *pushTracker) {
err := d.sendSamplesErr(ctx, ingester, streamTrackers)
// If we succeed, decrement each sample's pending count by one. If we reach
// the required number of successful puts on this sample, then decrement the
// number of pending samples by one. If we successfully push all samples to
// min success ingesters, wake up the waiting rpc so it can return early.
// Similarly, track the number of errors, and if it exceeds maxFailures
// shortcut the waiting rpc.
//
// The use of atomic increments here guarantees only a single sendSamples
// goroutine will write to either channel.
for i := range streamTrackers {
if err != nil {
if atomic.AddInt32(&streamTrackers[i].failed, 1) <= int32(streamTrackers[i].maxFailures) {
continue
}
if atomic.AddInt32(&pushTracker.samplesFailed, 1) == 1 {
pushTracker.err <- err
}
} else {
if atomic.AddInt32(&streamTrackers[i].succeeded, 1) != int32(streamTrackers[i].minSuccess) {
continue
}
if atomic.AddInt32(&pushTracker.samplesPending, -1) == 0 {
pushTracker.done <- struct{}{}
}
}
}
}
func (d *Distributor) sendSamplesErr(ctx context.Context, ingester *ring.IngesterDesc, streams []*streamTracker) error {
c, err := d.pool.GetClientFor(ingester.Addr)
if err != nil {
return err
}
req := &logproto.WriteRequest{
Streams: make([]*logproto.Stream, 0, len(streams)),
}
for i, s := range streams {
req.Streams[i] = s.stream
}
err = instrument.TimeRequestHistogram(ctx, "Distributor.sendSamples", sendDuration, func(ctx context.Context) error {
_, err := c.(logproto.AggregatorClient).Push(ctx, req)
return err
})
ingesterAppends.WithLabelValues(ingester.Addr).Inc()
if err != nil {
ingesterAppendFailures.WithLabelValues(ingester.Addr).Inc()
}
return err
}
func tokenFor(userID, labels string) uint32 {
h := fnv.New32()
h.Write([]byte(userID))
h.Write([]byte(labels))
return h.Sum32()
}

@ -0,0 +1,42 @@
package client
import (
"io"
"github.com/grafana/logish/pkg/logproto"
"github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc"
"github.com/mwitkow/go-grpc-middleware"
opentracing "github.com/opentracing/opentracing-go"
"github.com/weaveworks/common/middleware"
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"
)
type Config struct {
MaxRecvMsgSize int
}
func New(cfg Config, addr string) (grpc_health_v1.HealthClient, error) {
opts := []grpc.DialOption{
grpc.WithInsecure(),
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()),
middleware.ClientUserHeaderInterceptor,
)),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(cfg.MaxRecvMsgSize),
grpc.UseCompressor("gzip"),
),
}
conn, err := grpc.Dial(addr, opts...)
if err != nil {
return nil, err
}
return struct {
grpc_health_v1.HealthClient
io.Closer
}{
HealthClient: logproto.NewAggregatorClient(conn),
Closer: conn,
}, nil
}

@ -0,0 +1,14 @@
package ingester
type Config struct {
}
type Ingester struct {
cfg Config
}
func New(cfg Config) (*Ingester, error) {
return &Ingester{
cfg: cfg,
}, nil
}

@ -0,0 +1,28 @@
syntax = "proto3";
package logproto;
import "google/protobuf/timestamp.proto";
import "google.golang.org/grpc/health/grpc_health_v1/health.proto";
service Aggregator {
rpc Push(WriteRequest) returns (WriteResponse) {};
rpc Check(grpc.health.v1.HealthCheckRequest) returns (grpc.health.v1.HealthCheckResponse);
}
message WriteRequest {
repeated Stream streams = 1;
}
message WriteResponse {
}
message Stream {
string labels = 1;
repeated Entry entries = 2;
}
message Entry {
google.protobuf.Timestamp timestamp = 1;
string line = 2;
}

@ -0,0 +1,9 @@
#!/usr/bin/env bash
set -o errexit
set -o nounset
set -o pipefail
WORKING_SUFFIX=$(if git status --porcelain | grep -qE '^(?:[^?][^ ]|[^ ][^?])\s'; then echo "-WIP"; else echo ""; fi)
BRANCH_PREFIX=$(git rev-parse --abbrev-ref HEAD)
echo "${BRANCH_PREFIX//\//-}-$(git rev-parse --short HEAD)$WORKING_SUFFIX"
Loading…
Cancel
Save