@ -222,18 +222,28 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
return nil , err
}
// Return early if request does not contain any streams
if len ( req . Streams ) == 0 {
return & logproto . PushResponse { } , nil
}
// First we flatten out the request into a list of samples.
// We use the heuristic of 1 sample per TS to size the array.
// We also work out the hash value at the same time.
streams := make ( [ ] streamTracker , 0 , len ( req . Streams ) )
keys := make ( [ ] uint32 , 0 , len ( req . Streams ) )
var validationErr error
validatedSamplesSize := 0
validatedSamplesCount := 0
var validationErr error
validationContext := d . validator . getValidationContextForTime ( time . Now ( ) , userID )
for _ , stream := range req . Streams {
// Return early if stream does not contain any entries
if len ( stream . Entries ) == 0 {
continue
}
// Truncate first so subsequent steps have consistent line lengths
d . truncateLines ( validationContext , & stream )
@ -262,16 +272,11 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
}
stream . Entries = stream . Entries [ : n ]
if len ( stream . Entries ) == 0 {
continue
}
keys = append ( keys , util . TokenFor ( userID , stream . Labels ) )
streams = append ( streams , streamTracker {
stream : stream ,
} )
streams = append ( streams , streamTracker { stream : stream } )
}
// Return early if none of the streams contained entries
if len ( streams ) == 0 {
return & logproto . PushResponse { } , validationErr
}