Add FIFO queue persistent buffering for fluent bit output plugin (#2142)

* Add FIFO queue persistent buffering for fluent bit output plugin

* Fix configuration names in logging

* Fix typos & suggested improvments
pull/2149/head
Martin Dojcak 6 years ago committed by GitHub
parent a69a83095c
commit b27f92e86b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      cmd/fluent-bit/Dockerfile
  2. 28
      cmd/fluent-bit/README.md
  3. 30
      cmd/fluent-bit/buffer.go
  4. 14
      cmd/fluent-bit/client.go
  5. 52
      cmd/fluent-bit/config.go
  6. 132
      cmd/fluent-bit/dque.go
  7. 3
      cmd/fluent-bit/loki.go
  8. 6
      cmd/fluent-bit/out_loki.go
  9. 2
      go.mod
  10. 4
      go.sum
  11. 24
      vendor/github.com/gofrs/flock/.gitignore
  12. 10
      vendor/github.com/gofrs/flock/.travis.yml
  13. 27
      vendor/github.com/gofrs/flock/LICENSE
  14. 41
      vendor/github.com/gofrs/flock/README.md
  15. 25
      vendor/github.com/gofrs/flock/appveyor.yml
  16. 127
      vendor/github.com/gofrs/flock/flock.go
  17. 195
      vendor/github.com/gofrs/flock/flock_unix.go
  18. 76
      vendor/github.com/gofrs/flock/flock_winapi.go
  19. 140
      vendor/github.com/gofrs/flock/flock_windows.go
  20. 21
      vendor/github.com/joncrlsn/dque/LICENSE
  21. 161
      vendor/github.com/joncrlsn/dque/README.md
  22. 11
      vendor/github.com/joncrlsn/dque/go.sum
  23. 556
      vendor/github.com/joncrlsn/dque/queue.go
  24. 425
      vendor/github.com/joncrlsn/dque/segment.go
  25. 23
      vendor/github.com/joncrlsn/dque/util.go
  26. 4
      vendor/modules.txt

@ -3,7 +3,7 @@ COPY . /src/loki
WORKDIR /src/loki
RUN make clean && make BUILD_IN_CONTAINER=false fluent-bit-plugin
FROM fluent/fluent-bit:1.2
FROM fluent/fluent-bit:1.4
COPY --from=build /src/loki/cmd/fluent-bit/out_loki.so /fluent-bit/bin
COPY cmd/fluent-bit/fluent-bit.conf /fluent-bit/etc/fluent-bit.conf
EXPOSE 2020

@ -1,6 +1,6 @@
# Fluent Bit output plugin
[Fluent Bit](https://fluentbit.io/) is a Fast and Lightweight Data Forwarder, it can be configured with the [Loki output plugin](https://fluentbit.io/documentation/0.12/output/) to ship logs to Loki. You can define which log files you want to collect using the [`Tail`](https://fluentbit.io/documentation/0.12/input/tail.html) [input plugin](https://fluentbit.io/documentation/0.12/getting_started/input.html). Additionally Fluent Bit supports multiple `Filter` and `Parser` plugins (`Kubernetes`, `JSON`, etc..) to structure and alter log lines.
[Fluent Bit](https://fluentbit.io/) is a Fast and Lightweight Data Forwarder, it can be configured with the [Loki output plugin](https://fluentbit.io/documentation/0.12/output/) to ship logs to Loki. You can define which log files you want to collect using the [`Tail`](https://fluentbit.io/documentation/0.12/input/tail.html) or [`Stdin`](https://docs.fluentbit.io/manual/pipeline/inputs/standard-input) [input plugin](https://fluentbit.io/documentation/0.12/getting_started/input.html). Additionally Fluent Bit supports multiple `Filter` and `Parser` plugins (`Kubernetes`, `JSON`, etc..) to structure and alter log lines.
This plugin is implemented with [Fluent Bit's Go plugin](https://github.com/fluent/fluent-bit-go) interface. It pushes logs to Loki using a GRPC connection.
@ -22,6 +22,12 @@ This plugin is implemented with [Fluent Bit's Go plugin](https://github.com/flue
| LineFormat | Format to use when flattening the record to a log line. Valid values are "json" or "key_value". If set to "json" the log line sent to Loki will be the fluentd record (excluding any keys extracted out as labels) dumped as json. If set to "key_value", the log line will be each item in the record concatenated together (separated by a single space) in the format <key>=<value>. | json |
| DropSingleKey | If set to true and after extracting label_keys a record only has a single key remaining, the log line sent to Loki will just be the value of the record key.| true |
| LabelMapPath | Path to a json file defining how to transform nested records. | none
| Buffer | Enable buffering mechanism | false
| BufferType | Specify the buffering mechanism to use (currently only dque is implemented). | dque
| DqueDir| Path to the directory for queued logs | /tmp/flb-storage/loki
| DqueSegmentSize| Segment size in terms of number of records per segment | 500
| DqueSync| Whether to fsync each queue change | false
| DqueName | Queue name, must be uniq per output | dque
### Labels
@ -75,6 +81,26 @@ The labels extracted will be `{team="x-men", container="promtail", pod="promtail
If you don't want the `kubernetes` and `HOSTNAME` fields to appear in the log line you can use the `RemoveKeys` configuration field. (e.g. `RemoveKeys kubernetes,HOSTNAME`).
### Buffering
Buffering refers to the ability to store the records somewhere, and while they are processed and delivered, still be able to store more. Loki output plugin in certain situation can be blocked by loki client because of its design:
* BatchSize is over limit, output plugin pause receiving new records until the pending batch is sucessfully sent to the server
* Loki server is unreachable (retry 429s, 500s and connection-level errors), output plugin blocks new records until loki server will be avalible again and the pending batch is sucessfully sent to the server or as long as the maximum number of attempts has been reached within configured backoff mechanism
The blocking state with some of the input plugins is not acceptable because it can have a undesirable side effects on the part that generates the logs. Fluent Bit implements buffering mechanism that is based on parallel processing and it cannot send logs in order which is loki requirement (loki logs must be in increasing time order per stream).
Loki output plugin has buffering mechanism based on [`dque`](https://github.com/joncrlsn/dque) which is compatible with loki server strict time ordering and can be set up by configuration flag:
```properties
[Output]
Name loki
Match *
Url http://localhost:3100/loki/api/v1/push
Buffer true
DqueSegmentSize 8096
DqueDir /tmp/flb-storage/buffer
DqueName loki.0
```
### Configuration examples
To configure the Loki output plugin add this section to fluent-bit.conf

@ -0,0 +1,30 @@
package main
import (
"fmt"
"github.com/go-kit/kit/log"
"github.com/grafana/loki/pkg/promtail/client"
)
type bufferConfig struct {
buffer bool
bufferType string
dqueConfig dqueConfig
}
var defaultBufferConfig = bufferConfig{
buffer: false,
bufferType: "dque",
dqueConfig: defaultDqueConfig,
}
// NewBuffer makes a new buffered Client.
func NewBuffer(cfg *config, logger log.Logger) (client.Client, error) {
switch cfg.bufferConfig.bufferType {
case "dque":
return newDque(cfg, logger)
default:
return nil, fmt.Errorf("failed to parse bufferType: %s", cfg.bufferConfig.bufferType)
}
}

@ -0,0 +1,14 @@
package main
import (
"github.com/go-kit/kit/log"
"github.com/grafana/loki/pkg/promtail/client"
)
// NewClient creates a new client based on the fluentbit configuration.
func NewClient(cfg *config, logger log.Logger) (client.Client, error) {
if cfg.bufferConfig.buffer {
return NewBuffer(cfg, logger)
}
return client.New(cfg.clientConfig, logger)
}

@ -38,6 +38,7 @@ const (
type config struct {
clientConfig client.Config
bufferConfig bufferConfig
logLevel logging.Level
autoKubernetesLabels bool
removeKeys []string
@ -51,6 +52,7 @@ func parseConfig(cfg ConfigGetter) (*config, error) {
res := &config{}
res.clientConfig = defaultClientCfg
res.bufferConfig = defaultBufferConfig
url := cfg.Get("URL")
var clientURL flagext.URLValue
@ -159,5 +161,55 @@ func parseConfig(cfg ConfigGetter) (*config, error) {
}
res.labelKeys = nil
}
// enable loki plugin buffering
buffer := cfg.Get("Buffer")
switch buffer {
case "false", "":
res.bufferConfig.buffer = false
case "true":
res.bufferConfig.buffer = true
default:
return nil, fmt.Errorf("invalid boolean Buffer: %v", buffer)
}
// buffering type
bufferType := cfg.Get("BufferType")
if bufferType != "" {
res.bufferConfig.bufferType = bufferType
}
// dque directory
queueDir := cfg.Get("DqueDir")
if queueDir != "" {
res.bufferConfig.dqueConfig.queueDir = queueDir
}
// dque segment size (queueEntry unit)
queueSegmentSize := cfg.Get("DqueSegmentSize")
if queueSegmentSize != "" {
res.bufferConfig.dqueConfig.queueSegmentSize, err = strconv.Atoi(queueSegmentSize)
if err != nil {
return nil, fmt.Errorf("impossible to convert string to integer DqueSegmentSize: %v", queueSegmentSize)
}
}
// dque control file change sync to disk as they happen aka dque.turbo mode
queueSync := cfg.Get("DqueSync")
switch queueSync {
case "normal", "":
res.bufferConfig.dqueConfig.queueSync = false
case "full":
res.bufferConfig.dqueConfig.queueSync = true
default:
return nil, fmt.Errorf("invalid string queueSync: %v", queueSync)
}
// dque name
queueName := cfg.Get("DqueName")
if queueName != "" {
res.bufferConfig.dqueConfig.queueName = queueName
}
return res, nil
}

@ -0,0 +1,132 @@
package main
import (
"fmt"
"os"
"sync"
"time"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/grafana/loki/pkg/promtail/client"
"github.com/joncrlsn/dque"
"github.com/prometheus/common/model"
)
type dqueConfig struct {
queueDir string
queueSegmentSize int
queueSync bool
queueName string
}
var defaultDqueConfig = dqueConfig{
queueDir: "/tmp/flb-storage/loki",
queueSegmentSize: 500,
queueSync: false,
queueName: "dque",
}
type dqueEntry struct {
Lbs model.LabelSet
Ts time.Time
Line string
}
func dqueEntryBuilder() interface{} {
return &dqueEntry{}
}
type dqueClient struct {
logger log.Logger
queue *dque.DQue
loki client.Client
quit chan struct{}
once sync.Once
wg sync.WaitGroup
}
// New makes a new dque loki client
func newDque(cfg *config, logger log.Logger) (client.Client, error) {
var err error
q := &dqueClient{
logger: log.With(logger, "component", "queue", "name", cfg.bufferConfig.dqueConfig.queueName),
quit: make(chan struct{}),
}
err = os.MkdirAll(cfg.bufferConfig.dqueConfig.queueDir, 0644)
if err != nil {
return nil, fmt.Errorf("cannot create queue directory: %s", err)
}
q.queue, err = dque.NewOrOpen(cfg.bufferConfig.dqueConfig.queueName, cfg.bufferConfig.dqueConfig.queueDir, cfg.bufferConfig.dqueConfig.queueSegmentSize, dqueEntryBuilder)
if err != nil {
return nil, err
}
if !cfg.bufferConfig.dqueConfig.queueSync {
q.queue.TurboOn()
}
q.loki, err = client.New(cfg.clientConfig, logger)
if err != nil {
return nil, err
}
q.wg.Add(1)
go q.dequeuer()
return q, nil
}
func (c *dqueClient) dequeuer() {
defer func() {
if err := c.queue.Close(); err != nil {
level.Error(c.logger).Log("msg", "error closing queue", "err", err)
}
c.wg.Done()
}()
for {
select {
case <-c.quit:
return
default:
}
// Dequeue the next item in the queue
entry, err := c.queue.DequeueBlock()
if err != nil {
level.Error(c.logger).Log("msg", "error dequeuing record", "error", err)
continue
}
// Assert type of the response to an Item pointer so we can work with it
record, ok := entry.(*dqueEntry)
if !ok {
level.Error(c.logger).Log("msg", "error dequeued record is not an valid type", "error")
continue
}
if err := c.loki.Handle(record.Lbs, record.Ts, record.Line); err != nil {
level.Error(c.logger).Log("msg", "error sending record to Loki", "error", err)
}
}
}
// Stop the client
func (c *dqueClient) Stop() {
c.once.Do(func() { close(c.quit) })
c.loki.Stop()
c.wg.Wait()
}
// Handle implement EntryHandler; adds a new line to the next batch; send is async.
func (c *dqueClient) Handle(ls model.LabelSet, t time.Time, s string) error {
if err := c.queue.Enqueue(&dqueEntry{ls, t, s}); err != nil {
return fmt.Errorf("cannot enqueue record %s: %s", s, err)
}
return nil
}

@ -26,7 +26,7 @@ type loki struct {
}
func newPlugin(cfg *config, logger log.Logger) (*loki, error) {
client, err := client.New(cfg.clientConfig, logger)
client, err := NewClient(cfg, logger)
if err != nil {
return nil, err
}
@ -70,7 +70,6 @@ func (l *loki) sendRecord(r map[interface{}]interface{}, ts time.Time) error {
// prevent base64-encoding []byte values (default json.Encoder rule) by
// converting them to strings
func toStringSlice(slice []interface{}) []interface{} {
var s []interface{}
for _, v := range slice {

@ -68,7 +68,11 @@ func FLBPluginInit(ctx unsafe.Pointer) int {
level.Info(paramLogger).Log("LineFormat", conf.lineFormat)
level.Info(paramLogger).Log("DropSingleKey", conf.dropSingleKey)
level.Info(paramLogger).Log("LabelMapPath", fmt.Sprintf("%+v", conf.labelMap))
level.Info(paramLogger).Log("Buffer", conf.bufferConfig.buffer)
level.Info(paramLogger).Log("BufferType", conf.bufferConfig.bufferType)
level.Info(paramLogger).Log("DqueDir", conf.bufferConfig.dqueConfig.queueDir)
level.Info(paramLogger).Log("DqueSegmentSize", conf.bufferConfig.dqueConfig.queueSegmentSize)
level.Info(paramLogger).Log("DqueSync", conf.bufferConfig.dqueConfig.queueSync)
plugin, err := newPlugin(conf, logger)
if err != nil {
level.Error(logger).Log("newPlugin", err)

@ -22,6 +22,7 @@ require (
github.com/frankban/quicktest v1.7.2 // indirect
github.com/go-kit/kit v0.10.0
github.com/go-logfmt/logfmt v0.5.0
github.com/gofrs/flock v0.7.1 // indirect
github.com/gogo/protobuf v1.3.1 // remember to update loki-build-image/Dockerfile too
github.com/golang/snappy v0.0.1
github.com/gorilla/mux v1.7.3
@ -33,6 +34,7 @@ require (
github.com/hpcloud/tail v1.0.0
github.com/influxdata/go-syslog/v3 v3.0.1-0.20200510134747-836dce2cf6da
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af
github.com/joncrlsn/dque v2.2.1-0.20200515025108-956d14155fa2+incompatible
github.com/json-iterator/go v1.1.9
github.com/klauspost/compress v1.9.4
github.com/mitchellh/mapstructure v1.1.2

@ -352,6 +352,8 @@ github.com/gocql/gocql v0.0.0-20190301043612-f6df8288f9b4/go.mod h1:4Fw1eo5iaEhD
github.com/gocql/gocql v0.0.0-20200121121104-95d072f1b5bb h1:gAHg4RMULuLo7Y3jhY5CHh/QuSwjeTZt4qVdJ9ytcVI=
github.com/gocql/gocql v0.0.0-20200121121104-95d072f1b5bb/go.mod h1:DL0ekTmBSTdlNF25Orwt/JMzqIq3EJ4MVa/J/uK64OY=
github.com/godbus/dbus v0.0.0-20190402143921-271e53dc4968/go.mod h1:/YcGZj5zSblfDWMMoOzV4fas9FZnQYTkDnsGvmh2Grw=
github.com/gofrs/flock v0.7.1 h1:DP+LD/t0njgoPBvT5MJLeliUIVQR03hiKR6vezdwHlc=
github.com/gofrs/flock v0.7.1/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU=
github.com/gogo/googleapis v1.1.0 h1:kFkMAZBNAn4j7K0GiZr8cRYzejq68VbheufiV3YuyFI=
github.com/gogo/googleapis v1.1.0/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s=
github.com/gogo/protobuf v0.0.0-20171007142547-342cbe0a0415/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
@ -542,6 +544,8 @@ github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht
github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901/go.mod h1:Z86h9688Y0wesXCyonoVr47MasHilkuLMqGhRZ4Hpak=
github.com/jonboulle/clockwork v0.1.0 h1:VKV+ZcuP6l3yW9doeqz6ziZGgcynBVQO+obU0+0hcPo=
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
github.com/joncrlsn/dque v2.2.1-0.20200515025108-956d14155fa2+incompatible h1:f4ZGkY12AQ+YvzWDDWMLMGejA4ceg7nIPlqJ9fQ9T4c=
github.com/joncrlsn/dque v2.2.1-0.20200515025108-956d14155fa2+incompatible/go.mod h1:hDZb8oMj3Kp8MxtbNLg9vrtAUDHjgI1yZvqivT4O8Iw=
github.com/jpillora/backoff v0.0.0-20180909062703-3050d21c67d7/go.mod h1:2iMrUgbbvHEiQClaW2NsSzMyGHqN+rDFqY705q49KG0=
github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=

@ -0,0 +1,24 @@
# Compiled Object files, Static and Dynamic libs (Shared Objects)
*.o
*.a
*.so
# Folders
_obj
_test
# Architecture specific extensions/prefixes
*.[568vq]
[568vq].out
*.cgo1.go
*.cgo2.c
_cgo_defun.c
_cgo_gotypes.go
_cgo_export.*
_testmain.go
*.exe
*.test
*.prof

@ -0,0 +1,10 @@
language: go
go:
- 1.10.x
- 1.11.x
script: go test -v -check.vv -race ./...
sudo: false
notifications:
email:
on_success: never
on_failure: always

@ -0,0 +1,27 @@
Copyright (c) 2015, Tim Heckman
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
* Neither the name of linode-netint nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

@ -0,0 +1,41 @@
# flock
[![TravisCI Build Status](https://img.shields.io/travis/gofrs/flock/master.svg?style=flat)](https://travis-ci.org/gofrs/flock)
[![GoDoc](https://img.shields.io/badge/godoc-go--flock-blue.svg?style=flat)](https://godoc.org/github.com/gofrs/flock)
[![License](https://img.shields.io/badge/license-BSD_3--Clause-brightgreen.svg?style=flat)](https://github.com/gofrs/flock/blob/master/LICENSE)
[![Go Report Card](https://goreportcard.com/badge/github.com/gofrs/flock)](https://goreportcard.com/report/github.com/gofrs/flock)
`flock` implements a thread-safe sync.Locker interface for file locking. It also
includes a non-blocking TryLock() function to allow locking without blocking execution.
## License
`flock` is released under the BSD 3-Clause License. See the `LICENSE` file for more details.
## Go Compatibility
This package makes use of the `context` package that was introduced in Go 1.7. As such, this
package has an implicit dependency on Go 1.7+.
## Installation
```
go get -u github.com/gofrs/flock
```
## Usage
```Go
import "github.com/gofrs/flock"
fileLock := flock.New("/var/lock/go-lock.lock")
locked, err := fileLock.TryLock()
if err != nil {
// handle locking error
}
if locked {
// do work
fileLock.Unlock()
}
```
For more detailed usage information take a look at the package API docs on
[GoDoc](https://godoc.org/github.com/gofrs/flock).

@ -0,0 +1,25 @@
version: '{build}'
build: false
deploy: false
clone_folder: 'c:\gopath\src\github.com\gofrs\flock'
environment:
GOPATH: 'c:\gopath'
GOVERSION: '1.11'
init:
- git config --global core.autocrlf input
install:
- rmdir c:\go /s /q
- appveyor DownloadFile https://storage.googleapis.com/golang/go%GOVERSION%.windows-amd64.msi
- msiexec /i go%GOVERSION%.windows-amd64.msi /q
- set Path=c:\go\bin;c:\gopath\bin;%Path%
- go version
- go env
test_script:
- go get -t ./...
- go test -race -v ./...

@ -0,0 +1,127 @@
// Copyright 2015 Tim Heckman. All rights reserved.
// Use of this source code is governed by the BSD 3-Clause
// license that can be found in the LICENSE file.
// Package flock implements a thread-safe interface for file locking.
// It also includes a non-blocking TryLock() function to allow locking
// without blocking execution.
//
// Package flock is released under the BSD 3-Clause License. See the LICENSE file
// for more details.
//
// While using this library, remember that the locking behaviors are not
// guaranteed to be the same on each platform. For example, some UNIX-like
// operating systems will transparently convert a shared lock to an exclusive
// lock. If you Unlock() the flock from a location where you believe that you
// have the shared lock, you may accidentally drop the exclusive lock.
package flock
import (
"context"
"os"
"sync"
"time"
)
// Flock is the struct type to handle file locking. All fields are unexported,
// with access to some of the fields provided by getter methods (Path() and Locked()).
type Flock struct {
path string
m sync.RWMutex
fh *os.File
l bool
r bool
}
// New returns a new instance of *Flock. The only parameter
// it takes is the path to the desired lockfile.
func New(path string) *Flock {
return &Flock{path: path}
}
// NewFlock returns a new instance of *Flock. The only parameter
// it takes is the path to the desired lockfile.
//
// Deprecated: Use New instead.
func NewFlock(path string) *Flock {
return New(path)
}
// Close is equivalent to calling Unlock.
//
// This will release the lock and close the underlying file descriptor.
// It will not remove the file from disk, that's up to your application.
func (f *Flock) Close() error {
return f.Unlock()
}
// Path returns the path as provided in NewFlock().
func (f *Flock) Path() string {
return f.path
}
// Locked returns the lock state (locked: true, unlocked: false).
//
// Warning: by the time you use the returned value, the state may have changed.
func (f *Flock) Locked() bool {
f.m.RLock()
defer f.m.RUnlock()
return f.l
}
// RLocked returns the read lock state (locked: true, unlocked: false).
//
// Warning: by the time you use the returned value, the state may have changed.
func (f *Flock) RLocked() bool {
f.m.RLock()
defer f.m.RUnlock()
return f.r
}
func (f *Flock) String() string {
return f.path
}
// TryLockContext repeatedly tries to take an exclusive lock until one of the
// conditions is met: TryLock succeeds, TryLock fails with error, or Context
// Done channel is closed.
func (f *Flock) TryLockContext(ctx context.Context, retryDelay time.Duration) (bool, error) {
return tryCtx(ctx, f.TryLock, retryDelay)
}
// TryRLockContext repeatedly tries to take a shared lock until one of the
// conditions is met: TryRLock succeeds, TryRLock fails with error, or Context
// Done channel is closed.
func (f *Flock) TryRLockContext(ctx context.Context, retryDelay time.Duration) (bool, error) {
return tryCtx(ctx, f.TryRLock, retryDelay)
}
func tryCtx(ctx context.Context, fn func() (bool, error), retryDelay time.Duration) (bool, error) {
if ctx.Err() != nil {
return false, ctx.Err()
}
for {
if ok, err := fn(); ok || err != nil {
return ok, err
}
select {
case <-ctx.Done():
return false, ctx.Err()
case <-time.After(retryDelay):
// try again
}
}
}
func (f *Flock) setFh() error {
// open a new os.File instance
// create it if it doesn't exist, and open the file read-only.
fh, err := os.OpenFile(f.path, os.O_CREATE|os.O_RDONLY, os.FileMode(0600))
if err != nil {
return err
}
// set the filehandle on the struct
f.fh = fh
return nil
}

@ -0,0 +1,195 @@
// Copyright 2015 Tim Heckman. All rights reserved.
// Use of this source code is governed by the BSD 3-Clause
// license that can be found in the LICENSE file.
// +build !windows
package flock
import (
"os"
"syscall"
)
// Lock is a blocking call to try and take an exclusive file lock. It will wait
// until it is able to obtain the exclusive file lock. It's recommended that
// TryLock() be used over this function. This function may block the ability to
// query the current Locked() or RLocked() status due to a RW-mutex lock.
//
// If we are already exclusive-locked, this function short-circuits and returns
// immediately assuming it can take the mutex lock.
//
// If the *Flock has a shared lock (RLock), this may transparently replace the
// shared lock with an exclusive lock on some UNIX-like operating systems. Be
// careful when using exclusive locks in conjunction with shared locks
// (RLock()), because calling Unlock() may accidentally release the exclusive
// lock that was once a shared lock.
func (f *Flock) Lock() error {
return f.lock(&f.l, syscall.LOCK_EX)
}
// RLock is a blocking call to try and take a shared file lock. It will wait
// until it is able to obtain the shared file lock. It's recommended that
// TryRLock() be used over this function. This function may block the ability to
// query the current Locked() or RLocked() status due to a RW-mutex lock.
//
// If we are already shared-locked, this function short-circuits and returns
// immediately assuming it can take the mutex lock.
func (f *Flock) RLock() error {
return f.lock(&f.r, syscall.LOCK_SH)
}
func (f *Flock) lock(locked *bool, flag int) error {
f.m.Lock()
defer f.m.Unlock()
if *locked {
return nil
}
if f.fh == nil {
if err := f.setFh(); err != nil {
return err
}
}
if err := syscall.Flock(int(f.fh.Fd()), flag); err != nil {
shouldRetry, reopenErr := f.reopenFDOnError(err)
if reopenErr != nil {
return reopenErr
}
if !shouldRetry {
return err
}
if err = syscall.Flock(int(f.fh.Fd()), flag); err != nil {
return err
}
}
*locked = true
return nil
}
// Unlock is a function to unlock the file. This file takes a RW-mutex lock, so
// while it is running the Locked() and RLocked() functions will be blocked.
//
// This function short-circuits if we are unlocked already. If not, it calls
// syscall.LOCK_UN on the file and closes the file descriptor. It does not
// remove the file from disk. It's up to your application to do.
//
// Please note, if your shared lock became an exclusive lock this may
// unintentionally drop the exclusive lock if called by the consumer that
// believes they have a shared lock. Please see Lock() for more details.
func (f *Flock) Unlock() error {
f.m.Lock()
defer f.m.Unlock()
// if we aren't locked or if the lockfile instance is nil
// just return a nil error because we are unlocked
if (!f.l && !f.r) || f.fh == nil {
return nil
}
// mark the file as unlocked
if err := syscall.Flock(int(f.fh.Fd()), syscall.LOCK_UN); err != nil {
return err
}
f.fh.Close()
f.l = false
f.r = false
f.fh = nil
return nil
}
// TryLock is the preferred function for taking an exclusive file lock. This
// function takes an RW-mutex lock before it tries to lock the file, so there is
// the possibility that this function may block for a short time if another
// goroutine is trying to take any action.
//
// The actual file lock is non-blocking. If we are unable to get the exclusive
// file lock, the function will return false instead of waiting for the lock. If
// we get the lock, we also set the *Flock instance as being exclusive-locked.
func (f *Flock) TryLock() (bool, error) {
return f.try(&f.l, syscall.LOCK_EX)
}
// TryRLock is the preferred function for taking a shared file lock. This
// function takes an RW-mutex lock before it tries to lock the file, so there is
// the possibility that this function may block for a short time if another
// goroutine is trying to take any action.
//
// The actual file lock is non-blocking. If we are unable to get the shared file
// lock, the function will return false instead of waiting for the lock. If we
// get the lock, we also set the *Flock instance as being share-locked.
func (f *Flock) TryRLock() (bool, error) {
return f.try(&f.r, syscall.LOCK_SH)
}
func (f *Flock) try(locked *bool, flag int) (bool, error) {
f.m.Lock()
defer f.m.Unlock()
if *locked {
return true, nil
}
if f.fh == nil {
if err := f.setFh(); err != nil {
return false, err
}
}
var retried bool
retry:
err := syscall.Flock(int(f.fh.Fd()), flag|syscall.LOCK_NB)
switch err {
case syscall.EWOULDBLOCK:
return false, nil
case nil:
*locked = true
return true, nil
}
if !retried {
if shouldRetry, reopenErr := f.reopenFDOnError(err); reopenErr != nil {
return false, reopenErr
} else if shouldRetry {
retried = true
goto retry
}
}
return false, err
}
// reopenFDOnError determines whether we should reopen the file handle
// in readwrite mode and try again. This comes from util-linux/sys-utils/flock.c:
// Since Linux 3.4 (commit 55725513)
// Probably NFSv4 where flock() is emulated by fcntl().
func (f *Flock) reopenFDOnError(err error) (bool, error) {
if err != syscall.EIO && err != syscall.EBADF {
return false, nil
}
if st, err := f.fh.Stat(); err == nil {
// if the file is able to be read and written
if st.Mode()&0600 == 0600 {
f.fh.Close()
f.fh = nil
// reopen in read-write mode and set the filehandle
fh, err := os.OpenFile(f.path, os.O_CREATE|os.O_RDWR, os.FileMode(0600))
if err != nil {
return false, err
}
f.fh = fh
return true, nil
}
}
return false, nil
}

@ -0,0 +1,76 @@
// Copyright 2015 Tim Heckman. All rights reserved.
// Use of this source code is governed by the BSD 3-Clause
// license that can be found in the LICENSE file.
// +build windows
package flock
import (
"syscall"
"unsafe"
)
var (
kernel32, _ = syscall.LoadLibrary("kernel32.dll")
procLockFileEx, _ = syscall.GetProcAddress(kernel32, "LockFileEx")
procUnlockFileEx, _ = syscall.GetProcAddress(kernel32, "UnlockFileEx")
)
const (
winLockfileFailImmediately = 0x00000001
winLockfileExclusiveLock = 0x00000002
winLockfileSharedLock = 0x00000000
)
// Use of 0x00000000 for the shared lock is a guess based on some the MS Windows
// `LockFileEX` docs, which document the `LOCKFILE_EXCLUSIVE_LOCK` flag as:
//
// > The function requests an exclusive lock. Otherwise, it requests a shared
// > lock.
//
// https://msdn.microsoft.com/en-us/library/windows/desktop/aa365203(v=vs.85).aspx
func lockFileEx(handle syscall.Handle, flags uint32, reserved uint32, numberOfBytesToLockLow uint32, numberOfBytesToLockHigh uint32, offset *syscall.Overlapped) (bool, syscall.Errno) {
r1, _, errNo := syscall.Syscall6(
uintptr(procLockFileEx),
6,
uintptr(handle),
uintptr(flags),
uintptr(reserved),
uintptr(numberOfBytesToLockLow),
uintptr(numberOfBytesToLockHigh),
uintptr(unsafe.Pointer(offset)))
if r1 != 1 {
if errNo == 0 {
return false, syscall.EINVAL
}
return false, errNo
}
return true, 0
}
func unlockFileEx(handle syscall.Handle, reserved uint32, numberOfBytesToLockLow uint32, numberOfBytesToLockHigh uint32, offset *syscall.Overlapped) (bool, syscall.Errno) {
r1, _, errNo := syscall.Syscall6(
uintptr(procUnlockFileEx),
5,
uintptr(handle),
uintptr(reserved),
uintptr(numberOfBytesToLockLow),
uintptr(numberOfBytesToLockHigh),
uintptr(unsafe.Pointer(offset)),
0)
if r1 != 1 {
if errNo == 0 {
return false, syscall.EINVAL
}
return false, errNo
}
return true, 0
}

@ -0,0 +1,140 @@
// Copyright 2015 Tim Heckman. All rights reserved.
// Use of this source code is governed by the BSD 3-Clause
// license that can be found in the LICENSE file.
package flock
import (
"syscall"
)
// ErrorLockViolation is the error code returned from the Windows syscall when a
// lock would block and you ask to fail immediately.
const ErrorLockViolation syscall.Errno = 0x21 // 33
// Lock is a blocking call to try and take an exclusive file lock. It will wait
// until it is able to obtain the exclusive file lock. It's recommended that
// TryLock() be used over this function. This function may block the ability to
// query the current Locked() or RLocked() status due to a RW-mutex lock.
//
// If we are already locked, this function short-circuits and returns
// immediately assuming it can take the mutex lock.
func (f *Flock) Lock() error {
return f.lock(&f.l, winLockfileExclusiveLock)
}
// RLock is a blocking call to try and take a shared file lock. It will wait
// until it is able to obtain the shared file lock. It's recommended that
// TryRLock() be used over this function. This function may block the ability to
// query the current Locked() or RLocked() status due to a RW-mutex lock.
//
// If we are already locked, this function short-circuits and returns
// immediately assuming it can take the mutex lock.
func (f *Flock) RLock() error {
return f.lock(&f.r, winLockfileSharedLock)
}
func (f *Flock) lock(locked *bool, flag uint32) error {
f.m.Lock()
defer f.m.Unlock()
if *locked {
return nil
}
if f.fh == nil {
if err := f.setFh(); err != nil {
return err
}
}
if _, errNo := lockFileEx(syscall.Handle(f.fh.Fd()), flag, 0, 1, 0, &syscall.Overlapped{}); errNo > 0 {
return errNo
}
*locked = true
return nil
}
// Unlock is a function to unlock the file. This file takes a RW-mutex lock, so
// while it is running the Locked() and RLocked() functions will be blocked.
//
// This function short-circuits if we are unlocked already. If not, it calls
// UnlockFileEx() on the file and closes the file descriptor. It does not remove
// the file from disk. It's up to your application to do.
func (f *Flock) Unlock() error {
f.m.Lock()
defer f.m.Unlock()
// if we aren't locked or if the lockfile instance is nil
// just return a nil error because we are unlocked
if (!f.l && !f.r) || f.fh == nil {
return nil
}
// mark the file as unlocked
if _, errNo := unlockFileEx(syscall.Handle(f.fh.Fd()), 0, 1, 0, &syscall.Overlapped{}); errNo > 0 {
return errNo
}
f.fh.Close()
f.l = false
f.r = false
f.fh = nil
return nil
}
// TryLock is the preferred function for taking an exclusive file lock. This
// function does take a RW-mutex lock before it tries to lock the file, so there
// is the possibility that this function may block for a short time if another
// goroutine is trying to take any action.
//
// The actual file lock is non-blocking. If we are unable to get the exclusive
// file lock, the function will return false instead of waiting for the lock. If
// we get the lock, we also set the *Flock instance as being exclusive-locked.
func (f *Flock) TryLock() (bool, error) {
return f.try(&f.l, winLockfileExclusiveLock)
}
// TryRLock is the preferred function for taking a shared file lock. This
// function does take a RW-mutex lock before it tries to lock the file, so there
// is the possibility that this function may block for a short time if another
// goroutine is trying to take any action.
//
// The actual file lock is non-blocking. If we are unable to get the shared file
// lock, the function will return false instead of waiting for the lock. If we
// get the lock, we also set the *Flock instance as being shared-locked.
func (f *Flock) TryRLock() (bool, error) {
return f.try(&f.r, winLockfileSharedLock)
}
func (f *Flock) try(locked *bool, flag uint32) (bool, error) {
f.m.Lock()
defer f.m.Unlock()
if *locked {
return true, nil
}
if f.fh == nil {
if err := f.setFh(); err != nil {
return false, err
}
}
_, errNo := lockFileEx(syscall.Handle(f.fh.Fd()), flag|winLockfileFailImmediately, 0, 1, 0, &syscall.Overlapped{})
if errNo > 0 {
if errNo == ErrorLockViolation || errNo == syscall.ERROR_IO_PENDING {
return false, nil
}
return false, errNo
}
*locked = true
return true, nil
}

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2018 Jon Carlson
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

@ -0,0 +1,161 @@
# dque - a fast embedded durable queue for Go
[![Go Report Card](https://goreportcard.com/badge/github.com/joncrlsn/dque)](https://goreportcard.com/report/github.com/joncrlsn/dque)
[![GoDoc](https://godoc.org/github.com/joncrlsn/dque?status.svg)](https://godoc.org/github.com/joncrlsn/dque)
dque is:
* persistent -- survives program restarts
* scalable -- not limited by your RAM, but by your disk space
* FIFO -- First In First Out
* embedded -- compiled into your Golang program
* synchronized -- safe for concurrent usage
* fast or safe, you choose -- turbo mode lets the OS decide when to write to disk
* has a liberal license -- allows any use, commercial or personal
I love tools that do one thing well. Hopefully this fits that category.
I am indebted to Gabor Cselle who, years ago, inspired me with an example of an [in-memory persistent queue written in Java](http://www.gaborcselle.com/open_source/java/persistent_queue.html). I was intrigued by the simplicity of his approach, which became the foundation of the "segment" part of this queue which holds the head and the tail of the queue in memory as well as storing the segment files in between.
### performance
There are two performance modes: safe and turbo
##### safe mode
* safe mode is the default
* forces an fsync to disk every time you enqueue or dequeue an item.
* while this is the safest way to use dque with little risk of data loss, it is also the slowest.
##### turbo mode
* can be enabled/disabled with a call to [DQue.TurboOn()](https://godoc.org/github.com/joncrlsn/dque#DQue.TurboOn) or [DQue.TurboOff()](https://godoc.org/github.com/joncrlsn/dque#DQue.TurboOff)
* lets the OS batch up your changes to disk, which makes it a lot faster.
* also allows you to flush changes to disk at opportune times. See [DQue.TurboSync()](https://godoc.org/github.com/joncrlsn/dque#DQue.TurboSync)
* comes with a risk that a power failure could lose changes. By turning on Turbo mode you accept that risk.
* run the benchmark to see the difference on your hardware.
* there is a todo item to force flush changes to disk after a configurable amount of time to limit risk.
### implementation
* The queue is held in segments of a configurable size.
* The queue is protected against re-opening from other processes.
* Each in-memory segment corresponds with a file on disk. Think of the segment files as a bit like rolling log files. The oldest segment files are eventually deleted, not based on time, but whenever their items have all been dequeued.
* Segment files are only appended to until they fill up. At which point a new segment is created. They are never modified (other than being appended to and deleted when each of their items has been dequeued).
* If there is more than one segment, new items are enqueued to the last segment while dequeued items are taken from the first segment.
* Because the encoding/gob package is used to store the struct to disk:
* Only structs can be stored in the queue.
* Only one type of struct can be stored in each queue.
* Only public fields in a struct will be stored.
* A function is required that returns a pointer to a new struct of the type stored in the queue. This function is used when loading segments into memory from disk. I'd love to find a way to avoid this function.
* Queue segment implementation:
* For nice visuals, see [Gabor Cselle's documentation here](http://www.gaborcselle.com/open_source/java/persistent_queue.html). Note that Gabor's implementation kept the entire queue in memory as well as disk. dque keeps only the head and tail segments in memory.
* Enqueueing an item adds it both to the end of the last segment file and to the in-memory item slice for that segment.
* When a segment reaches its maximum size a new segment is created.
* Dequeueing an item removes it from the beginning of the in-memory slice and appends a 4-byte "delete" marker to the end of the segment file. This allows the item to be left in the file until the number of delete markers matches the number of items, at which point the entire file is deleted.
* When a segment is reconstituted from disk, each "delete" marker found in the file causes a removal of the first element of the in-memory slice.
* When each item in the segment has been dequeued, the segment file is deleted and the next segment is loaded into memory.
### example
See the [full example code here](https://raw.githubusercontent.com/joncrlsn/dque/v2/example_test.go)
Or a shortened version here:
```golang
package dque_test
import (
"log"
"github.com/joncrlsn/dque"
)
// Item is what we'll be storing in the queue. It can be any struct
// as long as the fields you want stored are public.
type Item struct {
Name string
Id int
}
// ItemBuilder creates a new item and returns a pointer to it.
// This is used when we load a segment of the queue from disk.
func ItemBuilder() interface{} {
return &Item{}
}
func main() {
ExampleDQue_main()
}
// ExampleQueue_main() show how the queue works
func ExampleDQue_main() {
qName := "item-queue"
qDir := "/tmp"
segmentSize := 50
// Create a new queue with segment size of 50
q, err := dque.New(qName, qDir, segmentSize, ItemBuilder)
...
// Add an item to the queue
err := q.Enqueue(&Item{"Joe", 1})
...
// Properly close a queue
q.Close()
// You can reconsitute the queue from disk at any time
q, err = dque.Open(qName, qDir, segmentSize, ItemBuilder)
...
// Peek at the next item in the queue
var iface interface{}
if iface, err = q.Peek(); err != nil {
if err != dque.ErrEmpty {
log.Fatal("Error peeking at item ", err)
}
}
// Dequeue the next item in the queue
if iface, err = q.Dequeue(); err != nil {
if err != dque.ErrEmpty {
log.Fatal("Error dequeuing item ", err)
}
}
// Dequeue the next item in the queue and block until one is available
if iface, err = q.DequeueBlock(); err != nil {
log.Fatal("Error dequeuing item ", err)
}
// Assert type of the response to an Item pointer so we can work with it
item, ok := iface.(*Item)
if !ok {
log.Fatal("Dequeued object is not an Item pointer")
}
doSomething(item)
}
func doSomething(item *Item) {
log.Println("Dequeued", item)
}
```
### contributors
* [Neil Isaac](https://github.com/neilisaac)
* [Thomas Kriechbaumer](https://github.com/Kriechi)
### todo? (feel free to submit pull requests)
* add option to enable turbo with a timeout that would ensure you would never lose more than n seconds of changes.
* add Lock() and Unlock() methods so you can peek at the first item and then conditionally dequeue it without worrying that another goroutine has grabbed it out from under you. The use case is when you don't want to actually remove it from the queue until you know you were able to successfully handle it.
* store the segment size in a config file inside the queue. Then it only needs to be specified on dque.New(...)
### alternative tools
* [CurlyQ](https://github.com/mcmathja/curlyq) is a bit heavier (requires Redis) but has more background processing features.

@ -0,0 +1,11 @@
github.com/gofrs/flock v0.7.1 h1:DP+LD/t0njgoPBvT5MJLeliUIVQR03hiKR6vezdwHlc=
github.com/gofrs/flock v0.7.1/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU=
github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs=
github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=

@ -0,0 +1,556 @@
//
// Package dque is a fast embedded durable queue for Go
//
package dque
//
// Copyright (c) 2018 Jon Carlson. All rights reserved.
// Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file.
//
import (
"strconv"
"sync"
"github.com/gofrs/flock"
"github.com/pkg/errors"
"io/ioutil"
"math"
"os"
"path"
"regexp"
)
const lockFile = "lock.lock"
// ErrQueueClosed is the error returned when a queue is closed.
var ErrQueueClosed = errors.New("queue is closed")
var (
filePattern *regexp.Regexp
// ErrEmpty is returned when attempting to dequeue from an empty queue.
ErrEmpty = errors.New("dque is empty")
)
func init() {
filePattern, _ = regexp.Compile(`^([0-9]+)\.dque$`)
}
type config struct {
ItemsPerSegment int
}
// DQue is the in-memory representation of a queue on disk. You must never have
// two *active* DQue instances pointing at the same path on disk. It is
// acceptable to reconstitute a new instance from disk, but make sure the old
// instance is never enqueued to (or dequeued from) again.
type DQue struct {
Name string
DirPath string
config config
fullPath string
fileLock *flock.Flock
firstSegment *qSegment
lastSegment *qSegment
builder func() interface{} // builds a structure to load via gob
mutex sync.Mutex
emptyCond *sync.Cond
mutexEmptyCond sync.Mutex
turbo bool
}
// New creates a new durable queue
func New(name string, dirPath string, itemsPerSegment int, builder func() interface{}) (*DQue, error) {
// Validation
if len(name) == 0 {
return nil, errors.New("the queue name requires a value")
}
if len(dirPath) == 0 {
return nil, errors.New("the queue directory requires a value")
}
if !dirExists(dirPath) {
return nil, errors.New("the given queue directory is not valid: " + dirPath)
}
fullPath := path.Join(dirPath, name)
if dirExists(fullPath) {
return nil, errors.New("the given queue directory already exists: " + fullPath + ". Use Open instead")
}
if err := os.Mkdir(fullPath, 0755); err != nil {
return nil, errors.Wrap(err, "error creating queue directory "+fullPath)
}
q := DQue{Name: name, DirPath: dirPath}
q.fullPath = fullPath
q.config.ItemsPerSegment = itemsPerSegment
q.builder = builder
q.emptyCond = sync.NewCond(&q.mutexEmptyCond)
if err := q.lock(); err != nil {
return nil, err
}
if err := q.load(); err != nil {
return nil, err
}
return &q, nil
}
// Open opens an existing durable queue.
func Open(name string, dirPath string, itemsPerSegment int, builder func() interface{}) (*DQue, error) {
// Validation
if len(name) == 0 {
return nil, errors.New("the queue name requires a value")
}
if len(dirPath) == 0 {
return nil, errors.New("the queue directory requires a value")
}
if !dirExists(dirPath) {
return nil, errors.New("the given queue directory is not valid (" + dirPath + ")")
}
fullPath := path.Join(dirPath, name)
if !dirExists(fullPath) {
return nil, errors.New("the given queue does not exist (" + fullPath + ")")
}
q := DQue{Name: name, DirPath: dirPath}
q.fullPath = fullPath
q.config.ItemsPerSegment = itemsPerSegment
q.builder = builder
q.emptyCond = sync.NewCond(&q.mutexEmptyCond)
if err := q.lock(); err != nil {
return nil, err
}
if err := q.load(); err != nil {
return nil, err
}
return &q, nil
}
// NewOrOpen either creates a new queue or opens an existing durable queue.
func NewOrOpen(name string, dirPath string, itemsPerSegment int, builder func() interface{}) (*DQue, error) {
// Validation
if len(name) == 0 {
return nil, errors.New("the queue name requires a value")
}
if len(dirPath) == 0 {
return nil, errors.New("the queue directory requires a value")
}
if !dirExists(dirPath) {
return nil, errors.New("the given queue directory is not valid (" + dirPath + ")")
}
fullPath := path.Join(dirPath, name)
if dirExists(fullPath) {
return Open(name, dirPath, itemsPerSegment, builder)
}
return New(name, dirPath, itemsPerSegment, builder)
}
// Close releases the lock on the queue rendering it unusable for further usage by this instance.
// Close will return an error if it has already been called.
func (q *DQue) Close() error {
// only allow Close while no other function is active
q.mutex.Lock()
defer q.mutex.Unlock()
if q.fileLock == nil {
return ErrQueueClosed
}
err := q.fileLock.Close()
if err != nil {
return err
}
// Finally mark this instance as closed to prevent any further access
q.fileLock = nil
// Wake-up any waiting goroutines for blocking queue access - they should get a ErrQueueClosed
q.emptyCond.Broadcast()
// Safe-guard ourself from accidentally using segments after closing the queue
q.firstSegment = nil
q.lastSegment = nil
return nil
}
// Enqueue adds an item to the end of the queue
func (q *DQue) Enqueue(obj interface{}) error {
// This is heavy-handed but its safe
q.mutex.Lock()
defer q.mutex.Unlock()
if q.fileLock == nil {
return ErrQueueClosed
}
// If this segment is full then create a new one
if q.lastSegment.sizeOnDisk() >= q.config.ItemsPerSegment {
// We have filled our last segment to capacity, so create a new one
seg, err := newQueueSegment(q.fullPath, q.lastSegment.number+1, q.turbo, q.builder)
if err != nil {
return errors.Wrapf(err, "error creating new queue segment: %d.", q.lastSegment.number+1)
}
// If the last segment is not the first segment
// then we need to close the file.
if q.firstSegment != q.lastSegment {
var err = q.lastSegment.close()
if err != nil {
return errors.Wrapf(err, "error closing previous segment file #%d.", q.lastSegment.number)
}
}
// Replace the last segment with the new one
q.lastSegment = seg
}
// Add the object to the last segment
if err := q.lastSegment.add(obj); err != nil {
return errors.Wrap(err, "error adding item to the last segment")
}
// Wakeup any goroutine that is currently waiting for an item to be enqueued
q.emptyCond.Broadcast()
return nil
}
// Dequeue removes and returns the first item in the queue.
// When the queue is empty, nil and dque.ErrEmpty are returned.
func (q *DQue) Dequeue() (interface{}, error) {
// This is heavy-handed but its safe
q.mutex.Lock()
defer q.mutex.Unlock()
if q.fileLock == nil {
return nil, ErrQueueClosed
}
// Remove the first object from the first segment
obj, err := q.firstSegment.remove()
if err == errEmptySegment {
return nil, ErrEmpty
}
if err != nil {
return nil, errors.Wrap(err, "error removing item from the first segment")
}
// If this segment is empty and we've reached the max for this segment
// then delete the file and open the next one.
if q.firstSegment.size() == 0 &&
q.firstSegment.sizeOnDisk() >= q.config.ItemsPerSegment {
// Delete the segment file
if err := q.firstSegment.delete(); err != nil {
return obj, errors.Wrap(err, "error deleting queue segment "+q.firstSegment.filePath()+". Queue is in an inconsistent state")
}
// We have only one segment and it's now empty so destroy it and
// create a new one.
if q.firstSegment.number == q.lastSegment.number {
// Create the next segment
seg, err := newQueueSegment(q.fullPath, q.firstSegment.number+1, q.turbo, q.builder)
if err != nil {
return obj, errors.Wrap(err, "error creating new segment. Queue is in an inconsistent state")
}
q.firstSegment = seg
q.lastSegment = seg
} else {
if q.firstSegment.number+1 == q.lastSegment.number {
// We have 2 segments, moving down to 1 shared segment
q.firstSegment = q.lastSegment
} else {
// Open the next segment
seg, err := openQueueSegment(q.fullPath, q.firstSegment.number+1, q.turbo, q.builder)
if err != nil {
return obj, errors.Wrap(err, "error creating new segment. Queue is in an inconsistent state")
}
q.firstSegment = seg
}
}
}
return obj, nil
}
// Peek returns the first item in the queue without dequeueing it.
// When the queue is empty, nil and dque.ErrEmpty are returned.
// Do not use this method with multiple dequeueing threads or you may regret it.
func (q *DQue) Peek() (interface{}, error) {
// This is heavy-handed but it is safe
q.mutex.Lock()
defer q.mutex.Unlock()
if q.fileLock == nil {
return nil, ErrQueueClosed
}
// Return the first object from the first segment
obj, err := q.firstSegment.peek()
if err == errEmptySegment {
return nil, ErrEmpty
}
if err != nil {
// In reality this will (i.e. should not) never happen
return nil, errors.Wrap(err, "error getting item from the first segment")
}
return obj, nil
}
// DequeueBlock behaves similar to Dequeue, but is a blocking call until an item is available.
func (q *DQue) DequeueBlock() (interface{}, error) {
q.mutexEmptyCond.Lock()
defer q.mutexEmptyCond.Unlock()
for {
obj, err := q.Dequeue()
if err == ErrEmpty {
q.emptyCond.Wait()
// Wait() atomically unlocks mutexEmptyCond and suspends execution of the calling goroutine.
// Receiving the signal does not guarantee an item is available, let's loop and check again.
continue
} else if err != nil {
return nil, err
}
return obj, nil
}
}
// PeekBlock behaves similar to Peek, but is a blocking call until an item is available.
func (q *DQue) PeekBlock() (interface{}, error) {
q.mutexEmptyCond.Lock()
defer q.mutexEmptyCond.Unlock()
for {
obj, err := q.Peek()
if err == ErrEmpty {
q.emptyCond.Wait()
// Wait() atomically unlocks mutexEmptyCond and suspends execution of the calling goroutine.
// Receiving the signal does not guarantee an item is available, let's loop and check again.
continue
} else if err != nil {
return nil, err
}
return obj, nil
}
}
// Size locks things up while calculating so you are guaranteed an accurate
// size... unless you have changed the itemsPerSegment value since the queue
// was last empty. Then it could be wildly inaccurate.
func (q *DQue) Size() int {
if q.fileLock == nil {
return 0
}
// This is heavy-handed but it is safe
q.mutex.Lock()
defer q.mutex.Unlock()
return q.SizeUnsafe()
}
// SizeUnsafe returns the approximate number of items in the queue. Use Size() if
// having the exact size is important to your use-case.
//
// The return value could be wildly inaccurate if the itemsPerSegment value has
// changed since the queue was last empty.
// Also, because this method is not synchronized, the size may change after
// entering this method.
func (q *DQue) SizeUnsafe() int {
if q.fileLock == nil {
return 0
}
if q.firstSegment.number == q.lastSegment.number {
return q.firstSegment.size()
}
numSegmentsBetween := q.lastSegment.number - q.firstSegment.number - 1
return q.firstSegment.size() + (numSegmentsBetween * q.config.ItemsPerSegment) + q.lastSegment.size()
}
// SegmentNumbers returns the number of both the first last segmment.
// There is likely no use for this information other than testing.
func (q *DQue) SegmentNumbers() (int, int) {
if q.fileLock == nil {
return 0, 0
}
return q.firstSegment.number, q.lastSegment.number
}
// Turbo returns true if the turbo flag is on. Having turbo on speeds things
// up significantly.
func (q *DQue) Turbo() bool {
return q.turbo
}
// TurboOn allows the filesystem to decide when to sync file changes to disk.
// Throughput is greatly increased by turning turbo on, however there is some
// risk of losing data if a power-loss occurs.
// If turbo is already on an error is returned
func (q *DQue) TurboOn() error {
// This is heavy-handed but it is safe
q.mutex.Lock()
defer q.mutex.Unlock()
if q.fileLock == nil {
return ErrQueueClosed
}
if q.turbo {
return errors.New("DQue.TurboOn() is not valid when turbo is on")
}
q.turbo = true
q.firstSegment.turboOn()
q.lastSegment.turboOn()
return nil
}
// TurboOff re-enables the "safety" mode that syncs every file change to disk as
// they happen.
// If turbo is already off an error is returned
func (q *DQue) TurboOff() error {
// This is heavy-handed but it is safe
q.mutex.Lock()
defer q.mutex.Unlock()
if q.fileLock == nil {
return ErrQueueClosed
}
if !q.turbo {
return errors.New("DQue.TurboOff() is not valid when turbo is off")
}
if err := q.firstSegment.turboOff(); err != nil {
return err
}
if err := q.lastSegment.turboOff(); err != nil {
return err
}
q.turbo = false
return nil
}
// TurboSync allows you to fsync changes to disk, but only if turbo is on.
// If turbo is off an error is returned
func (q *DQue) TurboSync() error {
// This is heavy-handed but it is safe
q.mutex.Lock()
defer q.mutex.Unlock()
if q.fileLock == nil {
return ErrQueueClosed
}
if !q.turbo {
return errors.New("DQue.TurboSync() is inappropriate when turbo is off")
}
if err := q.firstSegment.turboSync(); err != nil {
return errors.Wrap(err, "unable to sync changes to disk")
}
if err := q.lastSegment.turboSync(); err != nil {
return errors.Wrap(err, "unable to sync changes to disk")
}
return nil
}
// load populates the queue from disk
func (q *DQue) load() error {
// Find all queue files
files, err := ioutil.ReadDir(q.fullPath)
if err != nil {
return errors.Wrap(err, "unable to read files in "+q.fullPath)
}
// Find the smallest and the largest file numbers
minNum := math.MaxInt32
maxNum := 0
for _, f := range files {
if !f.IsDir() && filePattern.MatchString(f.Name()) {
// Extract number out of the filename
fileNumStr := filePattern.FindStringSubmatch(f.Name())[1]
fileNum, _ := strconv.Atoi(fileNumStr)
if fileNum > maxNum {
maxNum = fileNum
}
if fileNum < minNum {
minNum = fileNum
}
}
}
// If files were found, set q.firstSegment and q.lastSegment
if maxNum > 0 {
// We found files
seg, err := openQueueSegment(q.fullPath, minNum, q.turbo, q.builder)
if err != nil {
return errors.Wrap(err, "unable to create queue segment in "+q.fullPath)
}
q.firstSegment = seg
if minNum == maxNum {
// We have only one segment so the
// first and last are the same instance (in this case)
q.lastSegment = q.firstSegment
} else {
// We have multiple segments
seg, err = openQueueSegment(q.fullPath, maxNum, q.turbo, q.builder)
if err != nil {
return errors.Wrap(err, "unable to create segment for "+q.fullPath)
}
q.lastSegment = seg
}
} else {
// We found no files so build a new queue starting with segment 1
seg, err := newQueueSegment(q.fullPath, 1, q.turbo, q.builder)
if err != nil {
return errors.Wrap(err, "unable to create queue segment in "+q.fullPath)
}
// The first and last are the same instance (in this case)
q.firstSegment = seg
q.lastSegment = seg
}
return nil
}
func (q *DQue) lock() error {
l := path.Join(q.DirPath, q.Name, lockFile)
fileLock := flock.New(l)
locked, err := fileLock.TryLock()
if err != nil {
return err
}
if !locked {
return errors.New("failed to acquire flock")
}
q.fileLock = fileLock
return nil
}

@ -0,0 +1,425 @@
package dque
//
// Copyright (c) 2018 Jon Carlson. All rights reserved.
// Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file.
//
//
// This is a segment of a memory-efficient FIFO durable queue. Items in the queue must be of the same type.
//
// Each qSegment instance corresponds to a file on disk.
//
// This segment is both persistent and in-memory so there is a memory limit to the size
// (which is why it is just a segment instead of being used for the entire queue).
//
import (
"bytes"
"encoding/binary"
"encoding/gob"
"fmt"
"io"
"os"
"path"
"sync"
"github.com/pkg/errors"
)
// ErrCorruptedSegment is returned when a segment file cannot be opened due to inconsistent formatting.
// Recovery may be possible by clearing or deleting the file, then reloading using dque.New().
type ErrCorruptedSegment struct {
Path string
Err error
}
// Error returns a string describing ErrCorruptedSegment
func (e ErrCorruptedSegment) Error() string {
return fmt.Sprintf("segment file %s is corrupted: %s", e.Path, e.Err)
}
// Unwrap returns the wrapped error
func (e ErrCorruptedSegment) Unwrap() error {
return e.Err
}
// ErrUnableToDecode is returned when an object cannot be decoded.
type ErrUnableToDecode struct {
Path string
Err error
}
// Error returns a string describing ErrUnableToDecode error
func (e ErrUnableToDecode) Error() string {
return fmt.Sprintf("object in segment file %s cannot be decoded: %s", e.Path, e.Err)
}
// Unwrap returns the wrapped error
func (e ErrUnableToDecode) Unwrap() error {
return e.Err
}
var (
errEmptySegment = errors.New("Segment is empty")
)
// qSegment represents a portion (segment) of a persistent queue
type qSegment struct {
dirPath string
number int
objects []interface{}
objectBuilder func() interface{}
file *os.File
mutex sync.Mutex
removeCount int
turbo bool
maybeDirty bool // filesystem changes may not have been flushed to disk
syncCount int64 // for testing
}
// load reads all objects from the queue file into a slice
// returns ErrCorruptedSegment or ErrUnableToDecode for errors pertaining to file contents.
func (seg *qSegment) load() error {
// This is heavy-handed but its safe
seg.mutex.Lock()
defer seg.mutex.Unlock()
// Open the file in read mode
f, err := os.OpenFile(seg.filePath(), os.O_RDONLY, 0644)
if err != nil {
return errors.Wrap(err, "error opening file: "+seg.filePath())
}
defer f.Close()
seg.file = f
// Loop until we can load no more
for {
// Read the 4 byte length of the gob
lenBytes := make([]byte, 4)
if n, err := io.ReadFull(seg.file, lenBytes); err != nil {
if err == io.EOF {
return nil
}
return ErrCorruptedSegment{
Path: seg.filePath(),
Err: errors.Wrapf(err, "error reading object length (read %d/4 bytes)", n),
}
}
// Convert the bytes into a 32-bit unsigned int
gobLen := binary.LittleEndian.Uint32(lenBytes)
if gobLen == 0 {
// Remove the first item from the in-memory queue
if len(seg.objects) == 0 {
return ErrCorruptedSegment{
Path: seg.filePath(),
Err: fmt.Errorf("excess deletion records (%d)", seg.removeCount+1),
}
}
seg.objects = seg.objects[1:]
// log.Println("TEMP: Detected delete in load()")
seg.removeCount++
continue
}
data := make([]byte, int(gobLen))
if _, err := io.ReadFull(seg.file, data); err != nil {
return ErrCorruptedSegment{
Path: seg.filePath(),
Err: errors.Wrap(err, "error reading gob data from file"),
}
}
// Decode the bytes into an object
object := seg.objectBuilder()
if err := gob.NewDecoder(bytes.NewReader(data)).Decode(object); err != nil {
return ErrUnableToDecode{
Path: seg.filePath(),
Err: errors.Wrapf(err, "failed to decode %T", object),
}
}
// Add item to the objects slice
seg.objects = append(seg.objects, object)
// log.Printf("TEMP: Loaded: %#v\n", object)
}
}
// peek returns the first item in the segment without removing it.
// If the queue is already empty, the emptySegment error will be returned.
func (seg *qSegment) peek() (interface{}, error) {
// This is heavy-handed but its safe
seg.mutex.Lock()
defer seg.mutex.Unlock()
if len(seg.objects) == 0 {
// Queue is empty so return nil object (and emptySegment error)
return nil, errEmptySegment
}
// Save a reference to the first item in the in-memory queue
object := seg.objects[0]
return object, nil
}
// remove removes and returns the first item in the segment and adds
// a zero length marker to the end of the queue file to signify a removal.
// If the queue is already empty, the emptySegment error will be returned.
func (seg *qSegment) remove() (interface{}, error) {
// This is heavy-handed but its safe
seg.mutex.Lock()
defer seg.mutex.Unlock()
if len(seg.objects) == 0 {
// Queue is empty so return nil object (and empty_segment error)
return nil, errEmptySegment
}
// Create a 4-byte length of value zero (this signifies a removal)
deleteLen := 0
deleteLenBytes := make([]byte, 4)
binary.LittleEndian.PutUint32(deleteLenBytes, uint32(deleteLen))
// Write the 4-byte length (of zero) first
if _, err := seg.file.Write(deleteLenBytes); err != nil {
return nil, errors.Wrapf(err, "failed to remove item from segment %d", seg.number)
}
// Save a reference to the first item in the in-memory queue
object := seg.objects[0]
// Remove the first item from the in-memory queue
seg.objects = seg.objects[1:]
// Increment the delete count
seg.removeCount++
// Possibly force writes to disk
if err := seg._sync(); err != nil {
return nil, err
}
return object, nil
}
// Add adds an item to the in-memory queue segment and appends it to the persistent file
func (seg *qSegment) add(object interface{}) error {
// This is heavy-handed but its safe
seg.mutex.Lock()
defer seg.mutex.Unlock()
// Encode the struct to a byte buffer
var buff bytes.Buffer
enc := gob.NewEncoder(&buff)
if err := enc.Encode(object); err != nil {
return errors.Wrap(err, "error gob encoding object")
}
// Count the bytes stored in the byte buffer
// and store the count into a 4-byte byte array
buffLen := len(buff.Bytes())
buffLenBytes := make([]byte, 4)
binary.LittleEndian.PutUint32(buffLenBytes, uint32(buffLen))
// Write the 4-byte buffer length first
if _, err := seg.file.Write(buffLenBytes); err != nil {
return errors.Wrapf(err, "failed to write object length to segment %d", seg.number)
}
// Then write the buffer bytes
if _, err := seg.file.Write(buff.Bytes()); err != nil {
return errors.Wrapf(err, "failed to write object to segment %d", seg.number)
}
seg.objects = append(seg.objects, object)
// Possibly force writes to disk
return seg._sync()
}
// size returns the number of objects in this segment.
// The size does not include items that have been removed.
func (seg *qSegment) size() int {
// This is heavy-handed but its safe
seg.mutex.Lock()
defer seg.mutex.Unlock()
return len(seg.objects)
}
// sizeOnDisk returns the number of objects in memory plus removed objects. This
// number will match the number of objects still on disk.
// This number is used to keep the file from growing forever when items are
// removed about as fast as they are added.
func (seg *qSegment) sizeOnDisk() int {
// This is heavy-handed but its safe
seg.mutex.Lock()
defer seg.mutex.Unlock()
return len(seg.objects) + seg.removeCount
}
// delete wipes out the queue and its persistent state
func (seg *qSegment) delete() error {
// This is heavy-handed but its safe
seg.mutex.Lock()
defer seg.mutex.Unlock()
if err := seg.file.Close(); err != nil {
return errors.Wrap(err, "unable to close the segment file before deleting")
}
// Delete the storage for this queue
err := os.Remove(seg.filePath())
if err != nil {
return errors.Wrap(err, "error deleting file: "+seg.filePath())
}
// Empty the in-memory slice of objects
seg.objects = seg.objects[:0]
seg.file = nil
return nil
}
func (seg *qSegment) fileName() string {
return fmt.Sprintf("%013d.dque", seg.number)
}
func (seg *qSegment) filePath() string {
return path.Join(seg.dirPath, seg.fileName())
}
// turboOn allows the filesystem to decide when to sync file changes to disk
// Speed is be greatly increased by turning turbo on, however there is some
// risk of losing data should a power-loss occur.
func (seg *qSegment) turboOn() {
seg.turbo = true
}
// turboOff re-enables the "safety" mode that syncs every file change to disk as
// they happen.
func (seg *qSegment) turboOff() error {
if !seg.turbo {
// turboOff is know to be called twice when the first and last ssegments
// are the same.
return nil
}
if err := seg.turboSync(); err != nil {
return err
}
seg.turbo = false
return nil
}
// turboSync does an fsync to disk if turbo is on.
func (seg *qSegment) turboSync() error {
if !seg.turbo {
// When the first and last segments are the same, this method
// will be called twice.
return nil
}
if seg.maybeDirty {
if err := seg.file.Sync(); err != nil {
return errors.Wrap(err, "unable to sync file changes.")
}
seg.syncCount++
seg.maybeDirty = false
}
return nil
}
// _sync must only be called by the add and remove methods on qSegment.
// Only syncs if turbo is off
func (seg *qSegment) _sync() error {
if seg.turbo {
// We do *not* force a sync if turbo is on
// We just mark it maybe dirty
seg.maybeDirty = true
return nil
}
if err := seg.file.Sync(); err != nil {
return errors.Wrap(err, "unable to sync file changes in _sync method.")
}
seg.syncCount++
seg.maybeDirty = false
return nil
}
// close is used when this is the last segment, but is now full, so we are
// creating a new last segment.
// This should only be called if this segment is not also the first segment.
func (seg *qSegment) close() error {
if err := seg.file.Close(); err != nil {
return errors.Wrapf(err, "unable to close segment file %s.", seg.fileName())
}
return nil
}
// newQueueSegment creates a new, persistent segment of the queue
func newQueueSegment(dirPath string, number int, turbo bool, builder func() interface{}) (*qSegment, error) {
seg := qSegment{dirPath: dirPath, number: number, turbo: turbo, objectBuilder: builder}
if !dirExists(seg.dirPath) {
return nil, errors.New("dirPath is not a valid directory: " + seg.dirPath)
}
if fileExists(seg.filePath()) {
return nil, errors.New("file already exists: " + seg.filePath())
}
// Create the file in append mode
var err error
seg.file, err = os.OpenFile(seg.filePath(), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return nil, errors.Wrapf(err, "error creating file: %s.", seg.filePath())
}
// Leave the file open for future writes
return &seg, nil
}
// openQueueSegment reads an existing persistent segment of the queue into memory
func openQueueSegment(dirPath string, number int, turbo bool, builder func() interface{}) (*qSegment, error) {
seg := qSegment{dirPath: dirPath, number: number, turbo: turbo, objectBuilder: builder}
if !dirExists(seg.dirPath) {
return nil, errors.New("dirPath is not a valid directory: " + seg.dirPath)
}
if !fileExists(seg.filePath()) {
return nil, errors.New("file does not exist: " + seg.filePath())
}
// Load the items into memory
if err := seg.load(); err != nil {
return nil, errors.Wrap(err, "unable to load queue segment in "+dirPath)
}
// Re-open the file in append mode
var err error
seg.file, err = os.OpenFile(seg.filePath(), os.O_APPEND|os.O_WRONLY, 0644)
if err != nil {
return nil, errors.Wrap(err, "error opening file: "+seg.filePath())
}
// Leave the file open for future writes
return &seg, nil
}

@ -0,0 +1,23 @@
package dque
import (
"os"
)
// dirExists returns true or false
func dirExists(path string) bool {
fileInfo, err := os.Stat(path)
if err == nil {
return fileInfo.IsDir()
}
return false
}
// fileExists returns true or false
func fileExists(path string) bool {
fileInfo, err := os.Stat(path)
if err == nil {
return !fileInfo.IsDir()
}
return false
}

@ -322,6 +322,8 @@ github.com/gocql/gocql
github.com/gocql/gocql/internal/lru
github.com/gocql/gocql/internal/murmur
github.com/gocql/gocql/internal/streams
# github.com/gofrs/flock v0.7.1
github.com/gofrs/flock
# github.com/gogo/googleapis v1.1.0
github.com/gogo/googleapis/google/rpc
# github.com/gogo/protobuf v1.3.1
@ -461,6 +463,8 @@ github.com/jessevdk/go-flags
github.com/jmespath/go-jmespath
# github.com/jonboulle/clockwork v0.1.0
github.com/jonboulle/clockwork
# github.com/joncrlsn/dque v2.2.1-0.20200515025108-956d14155fa2+incompatible
github.com/joncrlsn/dque
# github.com/jpillora/backoff v1.0.0
github.com/jpillora/backoff
# github.com/json-iterator/go v1.1.9

Loading…
Cancel
Save