diff --git a/cmd/prometheus/config.go b/cmd/prometheus/config.go index 45042ded97..8d8a9f8fef 100644 --- a/cmd/prometheus/config.go +++ b/cmd/prometheus/config.go @@ -207,8 +207,8 @@ func init() { "The name of the database to use for storing samples in InfluxDB.", ) cfg.fs.StringVar( - &cfg.remote.Address, "experimental.storage.remote.address", "", - "The address of the remote server to send samples to. None, if empty. EXPERIMENTAL.", + &cfg.remote.URL, "experimental.storage.remote.url", "", + "The URL of the remote endpoint to send samples to. None, if empty. EXPERIMENTAL.", ) cfg.fs.DurationVar( diff --git a/documentation/examples/remote_storage/server.go b/documentation/examples/remote_storage/server.go index 2f5bda9eca..ea1e1ea169 100644 --- a/documentation/examples/remote_storage/server.go +++ b/documentation/examples/remote_storage/server.go @@ -15,16 +15,14 @@ package main import ( "fmt" - "io" "io/ioutil" - "log" - "net" - - "golang.org/x/net/context" - "google.golang.org/grpc" + "net/http" + "github.com/golang/protobuf/proto" "github.com/golang/snappy" "github.com/prometheus/common/model" + "golang.org/x/net/context" + "github.com/prometheus/prometheus/storage/remote" ) @@ -46,23 +44,34 @@ func (server *server) Write(ctx context.Context, req *remote.WriteRequest) (*rem return &remote.WriteResponse{}, nil } -type snappyDecompressor struct{} - -func (d *snappyDecompressor) Do(r io.Reader) ([]byte, error) { - sr := snappy.NewReader(r) - return ioutil.ReadAll(sr) +func main() { + http.Handle("/push", AppenderHandler(&server{})) + http.ListenAndServe(":1234", nil) } -func (d *snappyDecompressor) Type() string { - return "snappy" +type WriteServer interface { + Write(context.Context, *remote.WriteRequest) (*remote.WriteResponse, error) } -func main() { - lis, err := net.Listen("tcp", ":1234") - if err != nil { - log.Fatalf("Failed to listen: %v", err) - } - s := grpc.NewServer(grpc.RPCDecompressor(&snappyDecompressor{})) - remote.RegisterWriteServer(s, &server{}) - s.Serve(lis) +// AppenderHandler returns a http.Handler that accepts proto encoded samples. +func AppenderHandler(s WriteServer) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + reqBuf, err := ioutil.ReadAll(snappy.NewReader(r.Body)) + if err != nil { + return + } + + var req remote.WriteRequest + if err := proto.Unmarshal(reqBuf, &req); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + if _, err := s.Write(context.Background(), &req); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusOK) + }) } diff --git a/storage/remote/client.go b/storage/remote/client.go index 563f47c003..758706a205 100644 --- a/storage/remote/client.go +++ b/storage/remote/client.go @@ -14,38 +14,29 @@ package remote import ( + "bytes" + "fmt" + "net/http" "time" - "golang.org/x/net/context" - "google.golang.org/grpc" - + "github.com/golang/protobuf/proto" + "github.com/golang/snappy" "github.com/prometheus/common/model" ) // Client allows sending batches of Prometheus samples to an HTTP endpoint. type Client struct { - client WriteClient - timeout time.Duration + url string + client http.Client } // NewClient creates a new Client. -func NewClient(address string, timeout time.Duration) (*Client, error) { - conn, err := grpc.Dial( - address, - grpc.WithInsecure(), - grpc.WithTimeout(timeout), - grpc.WithCompressor(&snappyCompressor{}), - ) - if err != nil { - // grpc.Dial() returns immediately and doesn't error when the server is - // unreachable when not passing in the WithBlock() option. The client then - // will continuously try to (re)establish the connection in the background. - // So this will only return here if some other uncommon error occurred. - return nil, err - } +func NewClient(url string, timeout time.Duration) (*Client, error) { return &Client{ - client: NewWriteClient(conn), - timeout: timeout, + url: url, + client: http.Client{ + Timeout: timeout, + }, }, nil } @@ -74,13 +65,29 @@ func (c *Client) Store(samples model.Samples) error { req.Timeseries = append(req.Timeseries, ts) } - ctxt, cancel := context.WithTimeout(context.TODO(), c.timeout) - defer cancel() + data, err := proto.Marshal(req) + if err != nil { + return err + } - _, err := c.client.Write(ctxt, req) + buf := bytes.Buffer{} + if _, err := snappy.NewWriter(&buf).Write(data); err != nil { + return err + } + + httpReq, err := http.NewRequest("POST", c.url, &buf) + if err != nil { + return err + } + httpReq.Header.Add("Content-Encoding", "snappy") + httpResp, err := c.client.Do(httpReq) if err != nil { return err } + defer httpResp.Body.Close() + if httpResp.StatusCode/100 != 2 { + return fmt.Errorf("server returned HTTP status %s", httpResp.Status) + } return nil } diff --git a/storage/remote/remote.go b/storage/remote/remote.go index 91c25c886d..ede1d1963e 100644 --- a/storage/remote/remote.go +++ b/storage/remote/remote.go @@ -69,8 +69,8 @@ func New(o *Options) (*Storage, error) { prometheus.MustRegister(c) s.queues = append(s.queues, NewStorageQueueManager(c, nil)) } - if o.Address != "" { - c, err := NewClient(o.Address, o.StorageTimeout) + if o.URL != "" { + c, err := NewClient(o.URL, o.StorageTimeout) if err != nil { return nil, err } @@ -94,9 +94,9 @@ type Options struct { GraphiteAddress string GraphiteTransport string GraphitePrefix string - // TODO: This just being called "Address" will make more sense once the + // TODO: This just being called "URL" will make more sense once the // other remote storage mechanisms are removed. - Address string + URL string } // Run starts the background processing of the storage queues. diff --git a/storage/remote/remote.pb.go b/storage/remote/remote.pb.go index 2d67efabc4..afb55fdfac 100644 --- a/storage/remote/remote.pb.go +++ b/storage/remote/remote.pb.go @@ -21,11 +21,6 @@ import proto "github.com/golang/protobuf/proto" import fmt "fmt" import math "math" -import ( - context "golang.org/x/net/context" - grpc "google.golang.org/grpc" -) - // Reference imports to suppress errors if they are not otherwise used. var _ = proto.Marshal var _ = fmt.Errorf @@ -114,78 +109,6 @@ func init() { proto.RegisterType((*WriteResponse)(nil), "remote.WriteResponse") } -// Reference imports to suppress errors if they are not otherwise used. -var _ context.Context -var _ grpc.ClientConn - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the grpc package it is being compiled against. -const _ = grpc.SupportPackageIsVersion3 - -// Client API for Write service - -type WriteClient interface { - Write(ctx context.Context, in *WriteRequest, opts ...grpc.CallOption) (*WriteResponse, error) -} - -type writeClient struct { - cc *grpc.ClientConn -} - -func NewWriteClient(cc *grpc.ClientConn) WriteClient { - return &writeClient{cc} -} - -func (c *writeClient) Write(ctx context.Context, in *WriteRequest, opts ...grpc.CallOption) (*WriteResponse, error) { - out := new(WriteResponse) - err := grpc.Invoke(ctx, "/remote.Write/Write", in, out, c.cc, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -// Server API for Write service - -type WriteServer interface { - Write(context.Context, *WriteRequest) (*WriteResponse, error) -} - -func RegisterWriteServer(s *grpc.Server, srv WriteServer) { - s.RegisterService(&_Write_serviceDesc, srv) -} - -func _Write_Write_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(WriteRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(WriteServer).Write(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/remote.Write/Write", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(WriteServer).Write(ctx, req.(*WriteRequest)) - } - return interceptor(ctx, in, info, handler) -} - -var _Write_serviceDesc = grpc.ServiceDesc{ - ServiceName: "remote.Write", - HandlerType: (*WriteServer)(nil), - Methods: []grpc.MethodDesc{ - { - MethodName: "Write", - Handler: _Write_Write_Handler, - }, - }, - Streams: []grpc.StreamDesc{}, - Metadata: fileDescriptor0, -} - func init() { proto.RegisterFile("remote.proto", fileDescriptor0) } var fileDescriptor0 = []byte{ diff --git a/storage/remote/snappy.go b/storage/remote/snappy.go deleted file mode 100644 index 5823a1a521..0000000000 --- a/storage/remote/snappy.go +++ /dev/null @@ -1,34 +0,0 @@ -// Copyright 2016 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package remote - -import ( - "io" - - "github.com/golang/snappy" -) - -type snappyCompressor struct{} - -func (c *snappyCompressor) Do(w io.Writer, p []byte) error { - sw := snappy.NewWriter(w) - if _, err := sw.Write(p); err != nil { - return err - } - return sw.Close() -} - -func (c *snappyCompressor) Type() string { - return "snappy" -}