Remove groupcache (#7126)

As discussed during the caching retro, this PR removes the experimental
distributed cache based on Mailgun's fork of groupcache.

Signed-off-by: Kaviraj <kavirajkanagaraj@gmail.com>
Co-authored-by: Kaviraj <kavirajkanagaraj@gmail.com>
pull/7504/head
Travis Patterson 3 years ago committed by GitHub
parent ae2a6f31ae
commit b4ed595461
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      cmd/loki/loki-local-config.yaml
  2. 11
      docs/sources/configuration/_index.md
  3. 8
      docs/sources/upgrading/_index.md
  4. 2
      go.mod
  5. 4
      go.sum
  6. 6
      pkg/loki/common/common.go
  7. 30
      pkg/loki/config_wrapper.go
  8. 19
      pkg/loki/config_wrapper_test.go
  9. 8
      pkg/loki/loki.go
  10. 48
      pkg/loki/modules.go
  11. 12
      pkg/storage/chunk/cache/cache.go
  12. 42
      pkg/storage/chunk/cache/embeddedcache.go
  13. 301
      pkg/storage/chunk/cache/groupcache.go
  14. 206
      pkg/storage/chunk/cache/groupcache_ringmanager.go
  15. 87
      pkg/storage/chunk/cache/groupcache_stats.go
  16. 88
      pkg/storage/chunk/cache/groupcache_test.go
  17. 15
      vendor/github.com/grafana/groupcache_exporter/.gitignore
  18. 21
      vendor/github.com/grafana/groupcache_exporter/LICENSE
  19. 1
      vendor/github.com/grafana/groupcache_exporter/README.md
  20. 206
      vendor/github.com/grafana/groupcache_exporter/exporter.go
  21. 2
      vendor/github.com/mailgun/groupcache/v2/.gitignore
  22. 20
      vendor/github.com/mailgun/groupcache/v2/.travis.yml
  23. 62
      vendor/github.com/mailgun/groupcache/v2/CHANGELOG
  24. 191
      vendor/github.com/mailgun/groupcache/v2/LICENSE
  25. 158
      vendor/github.com/mailgun/groupcache/v2/README.md
  26. 182
      vendor/github.com/mailgun/groupcache/v2/byteview.go
  27. 84
      vendor/github.com/mailgun/groupcache/v2/consistenthash/consistenthash.go
  28. 697
      vendor/github.com/mailgun/groupcache/v2/groupcache.go
  29. 12
      vendor/github.com/mailgun/groupcache/v2/groupcachepb/example.proto
  30. 155
      vendor/github.com/mailgun/groupcache/v2/groupcachepb/groupcache.pb.go
  31. 42
      vendor/github.com/mailgun/groupcache/v2/groupcachepb/groupcache.proto
  32. 354
      vendor/github.com/mailgun/groupcache/v2/http.go
  33. 144
      vendor/github.com/mailgun/groupcache/v2/lru/lru.go
  34. 89
      vendor/github.com/mailgun/groupcache/v2/peers.go
  35. 16
      vendor/github.com/mailgun/groupcache/v2/proto.sh
  36. 81
      vendor/github.com/mailgun/groupcache/v2/singleflight/singleflight.go
  37. 338
      vendor/github.com/mailgun/groupcache/v2/sinks.go
  38. 108
      vendor/github.com/segmentio/fasthash/fnv1/hash.go
  39. 104
      vendor/github.com/segmentio/fasthash/fnv1/hash32.go
  40. 11
      vendor/modules.txt

@ -21,7 +21,6 @@ query_range:
cache:
embedded_cache:
enabled: true
distributed: false
max_size_mb: 100
schema_config:

@ -2896,17 +2896,6 @@ This way, one doesn't have to replicate configuration in multiple places.
# CLI flag: -common.compactor-address
[compactor_address: <string> | default = ""]
# Groupcache is an in-process, distributed cache that behaves similarly to memcached but is built-in to Loki
groupcache:
# Enable groupcache
# CLI flag: -common.groupcache.enabled
[enabled: <boolean>: default = false]
# Set the maximum available memory to use for each groupcache group
# NOTE: there are 3 caches (result, chunk, and index query), so the maximum used memory will be *triple* the value specified here.
# CLI flag: -common.groupcache.capacity-per-cache-mb
[capacity_per_cache_mb: <int>: default = 100]
```
## analytics
The `analytics` block configures the reporting of Loki analytics to grafana.com.

@ -43,11 +43,9 @@ Previously, we had two configurations to define a query timeout: `engine.timeout
As they were conflicting and `engine.timeout` isn't as expressive as `querier.query-tiomeout`,
we're deprecating it in favor of relying on `engine.query-timeout` only.
#### Fifocache is deprecated
#### `fifocache` has been renamed
We introduced a new cache called `embedded-cache` which is an in-process cache system that make it possible to run Loki without the need for an external cache (like Memcached, Redis, etc). It can be run in two modes `distributed: false` (default, and same as old `fifocache`) and `distributed: true` which runs cache in distributed fashion sharding keys across peers if Loki is run in microservices or SSD mode.
Currently `embedded-cache` with `distributed: true` can be enabled only for results cache.
The in-memory `fifocache` has been renamed to `embedded-cache`. This allows us to replace the implementation (currently a simple FIFO datastructure) with something else in the future without causing confusion
#### Evenly spread Memcached pods for chunks across kubernetes nodes
@ -98,7 +96,7 @@ The global `deletion_mode` option in the compactor configuration moved to runtim
- The `deletion_mode` global override needs to be set to the desired mode: `disabled`, `filter-only`, or `filter-and-delete`. By default, `filter-and-delete` is enabled.
- Any `allow_delete` per-tenant overrides need to be removed or changed to `deletion_mode` overrides with the desired mode.
#### Metric name for `loki_log_messages_total` changed
#### Metric name for `loki_log_messages_total` changed
The name of this metric was changed to `loki_internal_log_messages_total` to reduce ambiguity. The previous name is still present but is deprecated.

@ -113,9 +113,7 @@ require (
)
require (
github.com/grafana/groupcache_exporter v0.0.0-20220629095919-59a8c6428a43
github.com/heroku/x v0.0.50
github.com/mailgun/groupcache/v2 v2.3.2
github.com/prometheus/alertmanager v0.24.0
github.com/prometheus/common/sigv4 v0.1.0
github.com/thanos-io/objstore v0.0.0-20220715165016-ce338803bc1e

@ -713,8 +713,6 @@ github.com/grafana/go-gelf/v2 v2.0.1 h1:BOChP0h/jLeD+7F9mL7tq10xVkDG15he3T1zHuQa
github.com/grafana/go-gelf/v2 v2.0.1/go.mod h1:lexHie0xzYGwCgiRGcvZ723bSNyNI8ZRD4s0CLobh90=
github.com/grafana/gocql v0.0.0-20200605141915-ba5dc39ece85 h1:xLuzPoOzdfNb/RF/IENCw+oLVdZB4G21VPhkHBgwSHY=
github.com/grafana/gocql v0.0.0-20200605141915-ba5dc39ece85/go.mod h1:crI9WX6p0IhrqB+DqIUHulRW853PaNFf7o4UprV//3I=
github.com/grafana/groupcache_exporter v0.0.0-20220629095919-59a8c6428a43 h1:yOw0zAMMp/GXp6eQ2hc5c9w6RcZ44u+QeyNeeAVoV7w=
github.com/grafana/groupcache_exporter v0.0.0-20220629095919-59a8c6428a43/go.mod h1:hgtvYpo9DY0PnOJHQTlV12Uecz0jbDK/o0L9Vz4WvLs=
github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe h1:yIXAAbLswn7VNWBIvM71O2QsgfgW9fRXZNR0DXe6pDU=
github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE=
github.com/grafana/regexp v0.0.0-20221005093135-b4c2bcb0a4b6 h1:A3dhViTeFDSQcGOXuUi6ukCQSMyDtDISBp2z6OOo2YM=
@ -945,8 +943,6 @@ github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0U
github.com/linode/linodego v1.9.1 h1:29UpEPpYcGFnbwiJW8mbk/bjBZpgd/pv68io2IKTo34=
github.com/lstoll/grpce v1.7.0/go.mod h1:XiCWl3R+avNCT7KsTjv3qCblgsSqd0SC4ymySrH226g=
github.com/lyft/protoc-gen-validate v0.0.0-20180911180927-64fcb82c878e/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ=
github.com/mailgun/groupcache/v2 v2.3.2 h1:5dU4h13edj8lqMvdjmpmudv2l6iym5J7jqxf7KeE6Zg=
github.com/mailgun/groupcache/v2 v2.3.2/go.mod h1:tH8aMaTRIjFMJsmJ9p7Y5HGBj9hV/J9rKQ+/3dIXzNU=
github.com/mailru/easyjson v0.0.0-20180717111219-efc7eb8984d6/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=

@ -6,7 +6,6 @@ import (
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/netutil"
"github.com/grafana/loki/pkg/storage/chunk/cache"
"github.com/grafana/loki/pkg/storage/chunk/client/aws"
"github.com/grafana/loki/pkg/storage/chunk/client/azure"
"github.com/grafana/loki/pkg/storage/chunk/client/baidubce"
@ -44,9 +43,6 @@ type Config struct {
// CompactorAddress is the http address of the compactor in the form http://host:port
CompactorAddress string `yaml:"compactor_address"`
// Global embedded-cache config. Independent of what type of cache, we need some singleton configs like Ring configuration when running in distributed fashion.
EmbeddedCacheConfig cache.EmbeddedCacheSingletonConfig `yaml:"embedded_cache"`
}
func (c *Config) RegisterFlags(f *flag.FlagSet) {
@ -61,8 +57,6 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) {
throwaway.Var((*flagext.StringSlice)(&c.InstanceInterfaceNames), "common.instance-interface-names", "List of network interfaces to read address from.")
f.StringVar(&c.CompactorAddress, "common.compactor-address", "", "the http address of the compactor in the form http://host:port")
c.EmbeddedCacheConfig.RegisterFlagsWithPrefix("common.embedded-cache", "", f)
}
type Storage struct {

@ -164,9 +164,6 @@ func applyInstanceConfigs(r, defaults *ConfigWrapper) {
}
r.Frontend.FrontendV2.Addr = r.Common.InstanceAddr
r.IndexGateway.Ring.InstanceAddr = r.Common.InstanceAddr
if r.QueryRange.CacheConfig.EmbeddedCache.IsEnabledWithDistributed() {
r.Common.EmbeddedCacheConfig.Ring.InstanceAddr = r.Common.InstanceAddr
}
}
if !reflect.DeepEqual(r.Common.InstanceInterfaceNames, defaults.Common.InstanceInterfaceNames) {
@ -175,9 +172,6 @@ func applyInstanceConfigs(r, defaults *ConfigWrapper) {
}
r.Frontend.FrontendV2.InfNames = r.Common.InstanceInterfaceNames
r.IndexGateway.Ring.InstanceInterfaceNames = r.Common.InstanceInterfaceNames
if r.QueryRange.CacheConfig.EmbeddedCache.IsEnabledWithDistributed() {
r.Common.EmbeddedCacheConfig.Ring.InstanceInterfaceNames = r.Common.InstanceInterfaceNames
}
}
}
@ -305,20 +299,6 @@ func applyConfigToRings(r, defaults *ConfigWrapper, rc util.RingConfig, mergeWit
r.IndexGateway.Ring.ZoneAwarenessEnabled = rc.ZoneAwarenessEnabled
r.IndexGateway.Ring.KVStore = rc.KVStore
}
// EmbeddedCache distributed ring.
if r.QueryRange.CacheConfig.EmbeddedCache.IsEnabledWithDistributed() &&
(mergeWithExisting || reflect.DeepEqual(r.Common.EmbeddedCacheConfig.Ring, defaults.Common.EmbeddedCacheConfig.Ring)) {
r.Common.EmbeddedCacheConfig.Ring.HeartbeatTimeout = rc.HeartbeatTimeout
r.Common.EmbeddedCacheConfig.Ring.HeartbeatPeriod = rc.HeartbeatPeriod
r.Common.EmbeddedCacheConfig.Ring.InstancePort = rc.InstancePort
r.Common.EmbeddedCacheConfig.Ring.InstanceAddr = rc.InstanceAddr
r.Common.EmbeddedCacheConfig.Ring.InstanceID = rc.InstanceID
r.Common.EmbeddedCacheConfig.Ring.InstanceInterfaceNames = rc.InstanceInterfaceNames
r.Common.EmbeddedCacheConfig.Ring.InstanceZone = rc.InstanceZone
r.Common.EmbeddedCacheConfig.Ring.ZoneAwarenessEnabled = rc.ZoneAwarenessEnabled
r.Common.EmbeddedCacheConfig.Ring.KVStore = rc.KVStore
}
}
func applyTokensFilePath(cfg *ConfigWrapper) error {
@ -349,11 +329,6 @@ func applyTokensFilePath(cfg *ConfigWrapper) error {
}
cfg.IndexGateway.Ring.TokensFilePath = f
f, err = tokensFile(cfg, "groupcache.tokens")
if err != nil {
return err
}
cfg.Common.EmbeddedCacheConfig.Ring.TokensFilePath = f
return nil
}
@ -431,10 +406,6 @@ func appendLoopbackInterface(cfg, defaults *ConfigWrapper) {
if reflect.DeepEqual(cfg.IndexGateway.Ring.InstanceInterfaceNames, defaults.IndexGateway.Ring.InstanceInterfaceNames) {
cfg.IndexGateway.Ring.InstanceInterfaceNames = append(cfg.IndexGateway.Ring.InstanceInterfaceNames, loopbackIface)
}
if reflect.DeepEqual(cfg.Common.EmbeddedCacheConfig.Ring.InstanceInterfaceNames, defaults.Common.EmbeddedCacheConfig.Ring.InstanceInterfaceNames) {
cfg.Common.EmbeddedCacheConfig.Ring.InstanceInterfaceNames = append(cfg.Common.EmbeddedCacheConfig.Ring.InstanceInterfaceNames, loopbackIface)
}
}
// applyMemberlistConfig will change the default ingester, distributor, ruler, and query scheduler ring configurations to use memberlist.
@ -448,7 +419,6 @@ func applyMemberlistConfig(r *ConfigWrapper) {
r.QueryScheduler.SchedulerRing.KVStore.Store = memberlistStr
r.CompactorConfig.CompactorRing.KVStore.Store = memberlistStr
r.IndexGateway.Ring.KVStore.Store = memberlistStr
r.Common.EmbeddedCacheConfig.Ring.KVStore.Store = memberlistStr
}
var ErrTooManyStorageConfigs = errors.New("too many storage configs provided in the common config, please only define one storage backend")

@ -824,20 +824,17 @@ ingester:
// ensure they are all false by default
config, _, _ := configWrapperFromYAML(t, minimalConfig, nil)
assert.False(t, config.QueryRange.ResultsCacheConfig.CacheConfig.EmbeddedCache.Enabled)
assert.False(t, config.QueryRange.ResultsCacheConfig.CacheConfig.EmbeddedCache.Distributed)
configFileString := `---
query_range:
results_cache:
cache:
embedded_cache:
enabled: true
distributed: true`
enabled: true`
config, _ = testContext(configFileString, nil)
assert.True(t, config.QueryRange.ResultsCacheConfig.CacheConfig.EmbeddedCache.Enabled)
assert.True(t, config.QueryRange.ResultsCacheConfig.CacheConfig.EmbeddedCache.Distributed)
})
}
@ -867,20 +864,6 @@ chunk_store_config:
assert.False(t, config.ChunkStoreConfig.ChunkCacheConfig.EnableFifoCache)
})
t.Run("if distributed cache is set for results cache, FIFO cache should be disabled.", func(t *testing.T) {
configFileString := `---
query_range:
results_cache:
cache:
embedded_cache:
enabled: true
distributed: true`
config, _, _ := configWrapperFromYAML(t, configFileString, nil)
assert.True(t, config.QueryRange.CacheConfig.EmbeddedCache.IsEnabledWithDistributed())
assert.False(t, config.QueryRange.CacheConfig.EnableFifoCache)
})
t.Run("FIFO cache is enabled by default if no other cache is set", func(t *testing.T) {
config, _, _ := configWrapperFromYAML(t, minimalConfig, nil)
assert.True(t, config.ChunkStoreConfig.ChunkCacheConfig.EnableFifoCache)

@ -43,7 +43,6 @@ import (
"github.com/grafana/loki/pkg/scheduler"
internalserver "github.com/grafana/loki/pkg/server"
"github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/storage/chunk/cache"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/deletion"
@ -270,7 +269,6 @@ type Loki struct {
queryScheduler *scheduler.Scheduler
usageReport *usagestats.Reporter
indexGatewayRingManager *indexgateway.RingManager
embeddedcacheRingManager *cache.GroupcacheRingManager
clientMetrics storage.ClientMetrics
deleteClientMetrics *deletion.DeleteRequestClientMetrics
@ -503,7 +501,6 @@ func (t *Loki) setupModuleManager() error {
mm.RegisterModule(RuntimeConfig, t.initRuntimeConfig, modules.UserInvisibleModule)
mm.RegisterModule(MemberlistKV, t.initMemberlistKV, modules.UserInvisibleModule)
mm.RegisterModule(Ring, t.initRing, modules.UserInvisibleModule)
mm.RegisterModule(Embededcache, t.initEmbeddedCache, modules.UserInvisibleModule)
mm.RegisterModule(Overrides, t.initOverrides, modules.UserInvisibleModule)
mm.RegisterModule(OverridesExporter, t.initOverridesExporter)
mm.RegisterModule(TenantConfigs, t.initTenantConfigs, modules.UserInvisibleModule)
@ -531,16 +528,15 @@ func (t *Loki) setupModuleManager() error {
// Add dependencies
deps := map[string][]string{
Ring: {RuntimeConfig, Server, MemberlistKV},
Embededcache: {RuntimeConfig, Server, MemberlistKV},
UsageReport: {},
Overrides: {RuntimeConfig},
OverridesExporter: {Overrides, Server},
TenantConfigs: {RuntimeConfig},
Distributor: {Ring, Server, Overrides, TenantConfigs, UsageReport},
Store: {Overrides, Embededcache, IndexGatewayRing},
Store: {Overrides, IndexGatewayRing},
Ingester: {Store, Server, MemberlistKV, TenantConfigs, UsageReport},
Querier: {Store, Ring, Server, IngesterQuerier, TenantConfigs, UsageReport, CacheGenerationLoader},
QueryFrontendTripperware: {Server, Embededcache, Overrides, TenantConfigs},
QueryFrontendTripperware: {Server, Overrides, TenantConfigs},
QueryFrontend: {QueryFrontendTripperware, UsageReport, CacheGenerationLoader},
QueryScheduler: {Server, Overrides, MemberlistKV, UsageReport},
Ruler: {Ring, Server, Store, RulerStorage, IngesterQuerier, Overrides, TenantConfigs, UsageReport},

@ -37,7 +37,6 @@ import (
"github.com/grafana/loki/pkg/ingester"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logqlmodel/stats"
"github.com/grafana/loki/pkg/lokifrontend/frontend"
"github.com/grafana/loki/pkg/lokifrontend/frontend/transport"
"github.com/grafana/loki/pkg/lokifrontend/frontend/v1/frontendv1pb"
@ -74,7 +73,6 @@ const maxChunkAgeForTableManager = 12 * time.Hour
// The various modules that make up Loki.
const (
Ring string = "ring"
Embededcache string = "embedded-cache"
RuntimeConfig string = "runtime-config"
Overrides string = "overrides"
OverridesExporter string = "overrides-exporter"
@ -209,49 +207,6 @@ func (t *Loki) initRing() (_ services.Service, err error) {
return t.ring, nil
}
func (t *Loki) initEmbeddedCache() (_ services.Service, err error) {
if !t.Cfg.QueryRange.CacheConfig.EmbeddedCache.IsEnabledWithDistributed() {
return nil, nil
}
groupCacheConfig := cache.GroupCacheConfig{
Enabled: true,
Ring: t.Cfg.Common.EmbeddedCacheConfig.Ring,
MaxSizeMB: t.Cfg.Common.EmbeddedCacheConfig.MaxSizeMB,
ListenPort: t.Cfg.Common.EmbeddedCacheConfig.ListenPort,
HeartbeatInterval: t.Cfg.Common.EmbeddedCacheConfig.HeartbeatInterval,
HeartbeatTimeout: t.Cfg.Common.EmbeddedCacheConfig.HeartbeatTimeout,
WriteByteTimeout: t.Cfg.Common.EmbeddedCacheConfig.WriteByteTimeout,
}
groupCacheConfig.Ring.ListenPort = groupCacheConfig.ListenPort
rm, err := cache.NewGroupcacheRingManager(groupCacheConfig, util_log.Logger, prometheus.DefaultRegisterer)
if err != nil {
return nil, gerrors.Wrap(err, "new embedded-cache ring manager")
}
t.embeddedcacheRingManager = rm
t.Server.HTTP.Path("/embedded-cache/ring").Methods("GET", "POST").Handler(t.embeddedcacheRingManager)
gc, err := cache.NewGroupCache(rm, groupCacheConfig, util_log.Logger, prometheus.DefaultRegisterer)
if err != nil {
return nil, err
}
groupConfig := cache.GroupConfig{
MaxSizeMB: t.Cfg.QueryRange.CacheConfig.EmbeddedCache.MaxSizeMB,
}
t.Cfg.QueryRange.ResultsCacheConfig.CacheConfig.Cache = gc.NewGroup(
t.Cfg.QueryRange.ResultsCacheConfig.CacheConfig.Prefix+"groupcache",
&groupConfig,
stats.ResultCache,
)
return t.embeddedcacheRingManager, nil
}
func (t *Loki) initRuntimeConfig() (services.Service, error) {
if len(t.Cfg.RuntimeConfig.LoadPath) == 0 {
if len(t.Cfg.LimitsConfig.PerTenantOverrideConfig) != 0 {
@ -998,9 +953,6 @@ func (t *Loki) initMemberlistKV() (services.Service, error) {
t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
t.Cfg.QueryScheduler.SchedulerRing.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
t.Cfg.Ruler.Ring.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
if t.Cfg.QueryRange.CacheConfig.EmbeddedCache.IsEnabledWithDistributed() {
t.Cfg.Common.EmbeddedCacheConfig.Ring.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
}
t.Server.HTTP.Handle("/memberlist", t.MemberlistKV)

@ -98,7 +98,7 @@ func New(cfg Config, reg prometheus.Registerer, logger log.Logger, cacheType sta
// Have additional check for embeddedcache with distributed mode, because those cache will already be initialized in modules
// but still need stats collector wrapper for it.
if cfg.Cache != nil && !cfg.EmbeddedCache.IsEnabledWithDistributed() {
if cfg.Cache != nil && !cfg.EmbeddedCache.IsEnabled() {
return cfg.Cache, nil
}
@ -107,8 +107,7 @@ func New(cfg Config, reg prometheus.Registerer, logger log.Logger, cacheType sta
// Currently fifocache can be enabled in two ways.
// 1. cfg.EnableFifocache (old deprecated way)
// 2. cfg.EmbeddedCache.Enabled=true and cfg.EmbeddedCache.Distributed=false (new way)
if cfg.EnableFifoCache || (IsEmbeddedCacheSet(cfg) && !cfg.EmbeddedCache.Distributed) {
if cfg.EnableFifoCache || cfg.EmbeddedCache.IsEnabled() {
var fifocfg FifoCacheConfig
if cfg.EnableFifoCache {
@ -116,7 +115,7 @@ func New(cfg Config, reg prometheus.Registerer, logger log.Logger, cacheType sta
fifocfg = cfg.Fifocache
}
if cfg.EmbeddedCache.IsEnabledWithoutDistributed() {
if cfg.EmbeddedCache.IsEnabled() {
fifocfg = FifoCacheConfig{
MaxSizeBytes: fmt.Sprint(cfg.EmbeddedCache.MaxSizeMB * 1e6),
TTL: cfg.EmbeddedCache.TTL,
@ -162,11 +161,6 @@ func New(cfg Config, reg prometheus.Registerer, logger log.Logger, cacheType sta
caches = append(caches, CollectStats(NewBackground(cacheName, cfg.Background, Instrument(cacheName, cache, reg), reg)))
}
if IsEmbeddedCacheSet(cfg) && cfg.EmbeddedCache.Distributed {
cacheName := cfg.Prefix + "embedded-cache"
caches = append(caches, CollectStats(Instrument(cacheName, cfg.Cache, reg)))
}
cache := NewTiered(caches)
if len(caches) > 1 {
cache = Instrument(cfg.Prefix+"tiered", cache, reg)

@ -10,13 +10,10 @@ const (
)
// EmbeddedCacheConfig represents in-process embedded cache config.
// It can also be distributed, sharding keys across peers when run with microservices
// or SSD mode.
type EmbeddedCacheConfig struct {
Distributed bool `yaml:"distributed,omitempty"`
Enabled bool `yaml:"enabled,omitempty"`
MaxSizeMB int64 `yaml:"max_size_mb"`
TTL time.Duration `yaml:"ttl"`
Enabled bool `yaml:"enabled,omitempty"`
MaxSizeMB int64 `yaml:"max_size_mb"`
TTL time.Duration `yaml:"ttl"`
// PurgeInterval tell how often should we remove keys that are expired.
// by default it takes `DefaultPurgeInterval`
@ -25,40 +22,11 @@ type EmbeddedCacheConfig struct {
func (cfg *EmbeddedCacheConfig) RegisterFlagsWithPrefix(prefix, description string, f *flag.FlagSet) {
f.BoolVar(&cfg.Enabled, prefix+"embedded-cache.enabled", false, description+"Whether embedded cache is enabled.")
f.BoolVar(&cfg.Distributed, prefix+"embedded-cache.distributed", false, description+"Whether embedded cache is enabled with distributed mode.")
f.Int64Var(&cfg.MaxSizeMB, prefix+"embedded-cache.max-size-mb", 100, description+"Maximum memory size of the cache in MB.")
f.DurationVar(&cfg.TTL, prefix+"embedded-cache.ttl", time.Hour, description+"The time to live for items in the cache before they get purged.")
}
func (cfg *EmbeddedCacheConfig) IsEnabledWithDistributed() bool {
return cfg.Enabled && cfg.Distributed
}
func (cfg *EmbeddedCacheConfig) IsEnabledWithoutDistributed() bool {
return cfg.Enabled && !cfg.Distributed
}
// EmbeddedCacheSingletonConfig defines global singleton needed by Embedded cache(particularly used in distributed fashion)
type EmbeddedCacheSingletonConfig struct {
// distributed cache configs. Have no meaning if `Distributed=false`.
ListenPort int `yaml:"listen_port,omitempty"`
Ring RingCfg `yaml:"ring,omitempty"`
// Default capacity if none provided while creating each "Group".
MaxSizeMB int64 `yaml:"max_size__mb,omitempty"`
// Different timeouts
HeartbeatInterval time.Duration `yaml:"heartbeat_interval,omitempty"`
HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout,omitempty"`
WriteByteTimeout time.Duration `yaml:"write_timeout,omitempty"`
}
func (cfg *EmbeddedCacheSingletonConfig) RegisterFlagsWithPrefix(prefix, description string, f *flag.FlagSet) {
f.IntVar(&cfg.ListenPort, prefix+"embedded-cache.listen_port", 4100, "The port to use for cache communications across the peers when run in distributed fashion")
cfg.Ring.RegisterFlagsWithPrefix(prefix, "", f)
f.Int64Var(&cfg.MaxSizeMB, prefix+"embedded-cache.max-size-mb", 100, "Maximum memory size of the cache in MB.")
f.DurationVar(&cfg.HeartbeatInterval, prefix+"embedded-cache.heartbeat-interval", time.Second, "If the connection is idle, the interval the cache will send heartbeats")
f.DurationVar(&cfg.HeartbeatTimeout, prefix+"embedded-cache.heartbeat-timeout", time.Second, "Timeout for heartbeat responses")
f.DurationVar(&cfg.WriteByteTimeout, prefix+"embeddec-cache.write-timeout", time.Second, "Maximum time for the cache to try writing")
func (cfg *EmbeddedCacheConfig) IsEnabled() bool {
return cfg.Enabled
}

@ -1,301 +0,0 @@
package cache
import (
"context"
"crypto/tls"
"fmt"
"net"
"net/http"
"strings"
"sync"
"time"
"golang.org/x/net/http2"
"github.com/weaveworks/common/instrument"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/grafana/groupcache_exporter"
"github.com/mailgun/groupcache/v2"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/ring"
"github.com/grafana/loki/pkg/logqlmodel/stats"
lokiutil "github.com/grafana/loki/pkg/util"
)
var (
ErrGroupcacheMiss = errors.New("cache miss")
http2Transport = &http2.Transport{
DialTLS: func(network, addr string, cfg *tls.Config) (net.Conn, error) {
return net.Dial(network, addr)
},
AllowHTTP: true,
}
)
type GroupCache struct {
peerRing ring.ReadRing
cache *groupcache.Group
pool *groupcache.HTTPPool
stopChan chan struct{}
updateInterval time.Duration
logger log.Logger
wg sync.WaitGroup
reg prometheus.Registerer
startWaitingForClose context.CancelFunc
cacheBytes int64
}
// RingCfg is a wrapper for the Groupcache ring configuration plus the replication factor.
type RingCfg struct {
lokiutil.RingConfig `yaml:",inline"`
}
type GroupCacheConfig struct {
Enabled bool `yaml:"enabled,omitempty"`
Ring RingCfg `yaml:"ring,omitempty"`
MaxSizeMB int64 `yaml:"max_size_mb,omitempty"`
ListenPort int `yaml:"listen_port,omitempty"`
HeartbeatInterval time.Duration `yaml:"heartbeat_interval,omitempty"`
HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout,omitempty"`
WriteByteTimeout time.Duration `yaml:"write_timeout,omitempty"`
Cache Cache `yaml:"-"`
}
type ringManager interface {
Addr() string
Ring() ring.ReadRing
}
func NewGroupCache(rm ringManager, config GroupCacheConfig, logger log.Logger, reg prometheus.Registerer) (*GroupCache, error) {
addr := fmt.Sprintf("http://%s", rm.Addr())
level.Info(logger).Log("msg", "groupcache local address set to", "addr", addr)
http2Transport.ReadIdleTimeout = config.HeartbeatInterval
http2Transport.PingTimeout = config.HeartbeatTimeout
http2Transport.WriteByteTimeout = config.WriteByteTimeout
pool := groupcache.NewHTTPPoolOpts(
addr,
&groupcache.HTTPPoolOptions{
Transport: func(_ context.Context) http.RoundTripper {
return http2Transport
},
},
)
startCtx, cancel := context.WithCancel(context.Background())
cache := &GroupCache{
peerRing: rm.Ring(),
pool: pool,
logger: logger,
stopChan: make(chan struct{}),
updateInterval: 5 * time.Minute,
wg: sync.WaitGroup{},
startWaitingForClose: cancel,
reg: reg,
cacheBytes: config.MaxSizeMB * 1e6, // MB => B
}
go cache.serveGroupcache(config.ListenPort)
go func() {
// Avoid starting the cache and peer discovery until
// a cache is being used
<-startCtx.Done()
go cache.updatePeers()
cache.wg.Wait()
close(cache.stopChan)
}()
return cache, nil
}
func (c *GroupCache) serveGroupcache(listenPort int) {
addr := fmt.Sprintf(":%d", listenPort)
l, err := net.Listen("tcp", addr)
if err != nil {
level.Error(c.logger).Log("msg", "unable to serve groupcache", "err", err)
return
}
level.Info(c.logger).Log("msg", "groupcache listening", "addr", addr)
server := http2.Server{}
for {
select {
case <-c.stopChan:
return
default:
conn, err := l.Accept()
if err != nil {
level.Error(c.logger).Log("msg", "groupcache connection failed", "err", err)
continue
}
go server.ServeConn(conn, &http2.ServeConnOpts{Handler: c.pool})
}
}
}
func (c *GroupCache) updatePeers() {
c.update()
t := time.NewTicker(c.updateInterval)
for {
select {
case <-t.C:
c.update()
case <-c.stopChan:
return
}
}
}
func (c *GroupCache) update() {
urls, err := c.peerUrls()
if err != nil {
level.Warn(c.logger).Log("msg", "unable to get groupcache peer urls", "err", err)
return
}
level.Info(c.logger).Log("msg", "got groupcache peers", "peers", strings.Join(urls, ","))
c.pool.Set(urls...)
}
func (c *GroupCache) peerUrls() ([]string, error) {
replicationSet, err := c.peerRing.GetAllHealthy(ring.WriteNoExtend)
if err != nil {
return nil, err
}
var addrs []string
for _, i := range replicationSet.Instances {
addrs = append(addrs, fmt.Sprintf("http://%s", i.Addr))
}
return addrs, nil
}
func (c *GroupCache) Stats() *groupcache.Stats {
if c.cache == nil {
return nil
}
return &c.cache.Stats
}
// Groupconfig represents config per Group.
type GroupConfig struct {
MaxSizeMB int64 `yaml:"max_size_mb,omitempty"`
}
type group struct {
cache *groupcache.Group
logger log.Logger
wg *sync.WaitGroup
cacheType stats.CacheType
// cacheBytes represents maxSize (in bytes) this group can grow.
cacheBytes int64 // TODO(kavi): expose it as _info metrics later?
fetchDuration prometheus.Observer
storeDuration prometheus.Observer
}
func (c *GroupCache) NewGroup(name string, cfg *GroupConfig, ct stats.CacheType) Cache {
// Return a known error on miss to track which keys need to be inserted
missGetter := groupcache.GetterFunc(func(_ context.Context, _ string, _ groupcache.Sink) error {
return ErrGroupcacheMiss
})
c.wg.Add(1)
c.startWaitingForClose()
cap := c.cacheBytes
if cfg.MaxSizeMB != 0 {
cap = cfg.MaxSizeMB * 1e6 // MB into bytes
}
requestDuration := promauto.With(c.reg).NewHistogramVec(prometheus.HistogramOpts{
Namespace: "loki",
Name: "groupcache_request_duration_seconds",
Help: "Total time spent in seconds doing groupcache requests.",
Buckets: instrument.DefBuckets,
ConstLabels: prometheus.Labels{"cache_type": string(ct)},
}, []string{"operation"})
g := &group{
cache: groupcache.NewGroup(name, cap, missGetter),
cacheBytes: cap,
logger: c.logger,
wg: &c.wg,
cacheType: ct,
fetchDuration: requestDuration.WithLabelValues("fetch"),
storeDuration: requestDuration.WithLabelValues("store"),
}
exp := groupcache_exporter.NewExporter(map[string]string{"cache_type": string(ct)}, g)
prometheus.WrapRegistererWithPrefix("loki_groupcache_", c.reg).MustRegister(exp)
return g
}
func (c *group) Fetch(ctx context.Context, keys []string) ([]string, [][]byte, []string, error) {
var (
start = time.Now()
values = make([][]byte, 0, len(keys))
missed = make([]string, 0, len(keys))
found = make([]string, 0, len(keys))
data = groupcache.ByteView{}
sink = groupcache.ByteViewSink(&data)
)
for _, key := range keys {
if err := c.cache.Get(ctx, key, sink); err != nil {
if errors.Is(err, ErrGroupcacheMiss) {
missed = append(missed, key)
continue
}
level.Error(c.logger).Log("msg", "unable to fetch from groupcache", "err", err)
return found, values, missed, err
}
found = append(found, key)
values = append(values, data.ByteSlice())
}
c.fetchDuration.Observe(time.Since(start).Seconds())
return found, values, missed, nil
}
func (c *group) Store(ctx context.Context, keys []string, values [][]byte) error {
start := time.Now()
var lastErr error
for i, key := range keys {
if err := c.cache.Set(ctx, key, values[i], time.Time{}, c.GetCacheType() != stats.ChunkCache); err != nil {
level.Warn(c.logger).Log("msg", "failed to put to groupcache", "err", err)
lastErr = err
}
}
c.storeDuration.Observe(time.Since(start).Seconds())
return lastErr
}
func (c *group) Stop() {
c.wg.Done()
}
func (c *group) GetCacheType() stats.CacheType {
return c.cacheType
}

@ -1,206 +0,0 @@
package cache
import (
"context"
"net/http"
"time"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/go-kit/log"
)
const (
ringAutoForgetUnhealthyPeriods = 10
ringNameForServer = "groupcache"
ringNumTokens = 1
ringCheckPeriod = 3 * time.Second
GroupcacheRingKey = "groupcache"
)
// GroupcacheRingManager is a component instantiated before all the others and is responsible for the ring setup.
type GroupcacheRingManager struct {
services.Service
subservices *services.Manager
subservicesWatcher *services.FailureWatcher
addr string
ringLifecycler *ring.BasicLifecycler
ring *ring.Ring
cfg GroupCacheConfig
log log.Logger
}
// NewRingManager is the recommended way of instantiating a GroupcacheRingManager.
//
// The other functions will assume the GroupcacheRingManager was instantiated through this function.
func NewGroupcacheRingManager(cfg GroupCacheConfig, log log.Logger, registerer prometheus.Registerer) (*GroupcacheRingManager, error) {
rm := &GroupcacheRingManager{
cfg: cfg, log: log,
}
ringStore, err := kv.NewClient(
rm.cfg.Ring.KVStore,
ring.GetCodec(),
kv.RegistererWithKVName(prometheus.WrapRegistererWithPrefix("loki_", registerer), "groupcache-ring-manager"),
rm.log,
)
if err != nil {
return nil, errors.Wrap(err, "groupcache ring manager create KV store client")
}
ringCfg := rm.cfg.Ring.ToRingConfig(1)
rm.ring, err = ring.NewWithStoreClientAndStrategy(ringCfg, ringNameForServer, GroupcacheRingKey, ringStore, ring.NewIgnoreUnhealthyInstancesReplicationStrategy(), prometheus.WrapRegistererWithPrefix("loki_", registerer), rm.log)
if err != nil {
return nil, errors.Wrap(err, "groupcache ring manager create ring client")
}
if err := rm.startRing(ringStore, registerer); err != nil {
return nil, err
}
return rm, nil
}
func (rm *GroupcacheRingManager) startRing(ringStore kv.Client, registerer prometheus.Registerer) error {
lifecyclerCfg, err := rm.cfg.Ring.ToLifecyclerConfig(ringNumTokens, rm.log)
if err != nil {
return errors.Wrap(err, "invalid ring lifecycler config")
}
rm.addr = lifecyclerCfg.Addr
delegate := ring.BasicLifecyclerDelegate(rm)
delegate = ring.NewLeaveOnStoppingDelegate(delegate, rm.log)
delegate = ring.NewTokensPersistencyDelegate(rm.cfg.Ring.TokensFilePath, ring.JOINING, delegate, rm.log)
delegate = ring.NewAutoForgetDelegate(ringAutoForgetUnhealthyPeriods*rm.cfg.Ring.HeartbeatTimeout, delegate, rm.log)
rm.ringLifecycler, err = ring.NewBasicLifecycler(lifecyclerCfg, ringNameForServer, GroupcacheRingKey, ringStore, delegate, rm.log, registerer)
if err != nil {
return errors.Wrap(err, "groupcache ring manager create ring lifecycler")
}
svcs := []services.Service{rm.ringLifecycler, rm.ring}
rm.subservices, err = services.NewManager(svcs...)
if err != nil {
return errors.Wrap(err, "new groupcache services manager in server mode")
}
rm.subservicesWatcher = services.NewFailureWatcher()
rm.subservicesWatcher.WatchManager(rm.subservices)
rm.Service = services.NewBasicService(rm.starting, rm.running, rm.stopping)
return nil
}
// starting implements the Lifecycler interface and is one of the lifecycle hooks.
func (rm *GroupcacheRingManager) starting(ctx context.Context) (err error) {
// In case this function will return error we want to unregister the instance
// from the ring. We do it ensuring dependencies are gracefully stopped if they
// were already started.
defer func() {
if err == nil || rm.subservices == nil {
return
}
if stopErr := services.StopManagerAndAwaitStopped(context.Background(), rm.subservices); stopErr != nil {
level.Error(rm.log).Log("msg", "failed to gracefully stop groupcache ring manager dependencies", "err", stopErr)
}
}()
if err := services.StartManagerAndAwaitHealthy(ctx, rm.subservices); err != nil {
return errors.Wrap(err, "unable to start groupcache ring manager subservices")
}
// The BasicLifecycler does not automatically move state to ACTIVE such that any additional work that
// someone wants to do can be done before becoming ACTIVE. For groupcache we don't currently
// have any additional work so we can become ACTIVE right away.
// Wait until the ring client detected this instance in the JOINING
// state to make sure that when we'll run the initial sync we already
// know the tokens assigned to this instance.
level.Info(rm.log).Log("msg", "waiting until groupcache is JOINING in the ring")
if err := ring.WaitInstanceState(ctx, rm.ring, rm.ringLifecycler.GetInstanceID(), ring.JOINING); err != nil {
return err
}
level.Info(rm.log).Log("msg", "groupcache is JOINING in the ring")
if err = rm.ringLifecycler.ChangeState(ctx, ring.ACTIVE); err != nil {
return errors.Wrapf(err, "switch instance to %s in the ring", ring.ACTIVE)
}
// Wait until the ring client detected this instance in the ACTIVE state to
// make sure that when we'll run the loop it won't be detected as a ring
// topology change.
level.Info(rm.log).Log("msg", "waiting until groupcache is ACTIVE in the ring")
if err := ring.WaitInstanceState(ctx, rm.ring, rm.ringLifecycler.GetInstanceID(), ring.ACTIVE); err != nil {
return err
}
level.Info(rm.log).Log("msg", "groupcache is ACTIVE in the ring")
return nil
}
// running implements the Lifecycler interface and is one of the lifecycle hooks.
func (rm *GroupcacheRingManager) running(ctx context.Context) error {
t := time.NewTicker(ringCheckPeriod)
defer t.Stop()
for {
select {
case <-ctx.Done():
return nil
case err := <-rm.subservicesWatcher.Chan():
return errors.Wrap(err, "running groupcache ring manager subservice failed")
case <-t.C:
continue
}
}
}
// stopping implements the Lifecycler interface and is one of the lifecycle hooks.
func (rm *GroupcacheRingManager) stopping(_ error) error {
level.Debug(rm.log).Log("msg", "stopping groupcache ring manager")
return services.StopManagerAndAwaitStopped(context.Background(), rm.subservices)
}
// ServeHTTP serves the HTTP route /groupcache/ring.
func (rm *GroupcacheRingManager) ServeHTTP(w http.ResponseWriter, req *http.Request) {
rm.ring.ServeHTTP(w, req)
}
func (rm *GroupcacheRingManager) OnRingInstanceRegister(_ *ring.BasicLifecycler, ringDesc ring.Desc, instanceExists bool, instanceID string, instanceDesc ring.InstanceDesc) (ring.InstanceState, ring.Tokens) {
// When we initialize the groupcache instance in the ring we want to start from
// a clean situation, so whatever is the state we set it JOINING, while we keep existing
// tokens (if any) or the ones loaded from file.
var tokens []uint32
if instanceExists {
tokens = instanceDesc.GetTokens()
}
takenTokens := ringDesc.GetTokens()
newTokens := ring.GenerateTokens(ringNumTokens-len(tokens), takenTokens)
// Tokens sorting will be enforced by the parent caller.
tokens = append(tokens, newTokens...)
return ring.JOINING, tokens
}
func (rm *GroupcacheRingManager) Addr() string {
return rm.addr
}
func (rm *GroupcacheRingManager) Ring() ring.ReadRing {
return rm.ring
}
func (rm *GroupcacheRingManager) OnRingInstanceTokens(_ *ring.BasicLifecycler, _ ring.Tokens) {}
func (rm *GroupcacheRingManager) OnRingInstanceStopping(_ *ring.BasicLifecycler) {}
func (rm *GroupcacheRingManager) OnRingInstanceHeartbeat(_ *ring.BasicLifecycler, _ *ring.Desc, _ *ring.InstanceDesc) {
}

@ -1,87 +0,0 @@
package cache
import "github.com/mailgun/groupcache/v2"
func (c *group) Name() string {
return c.cache.Name()
}
func (c *group) Gets() int64 {
return c.cache.Stats.Gets.Get()
}
func (c *group) CacheHits() int64 {
return c.cache.Stats.CacheHits.Get()
}
func (c *group) GetFromPeersLatencyLower() int64 {
return c.cache.Stats.GetFromPeersLatencyLower.Get()
}
func (c *group) PeerLoads() int64 {
return c.cache.Stats.PeerLoads.Get()
}
func (c *group) PeerErrors() int64 {
return c.cache.Stats.PeerErrors.Get()
}
func (c *group) Loads() int64 {
return c.cache.Stats.Loads.Get()
}
func (c *group) LoadsDeduped() int64 {
return c.cache.Stats.LoadsDeduped.Get()
}
func (c *group) LocalLoads() int64 {
return c.cache.Stats.LocalLoads.Get()
}
func (c *group) LocalLoadErrs() int64 {
return c.cache.Stats.LocalLoadErrs.Get()
}
func (c *group) ServerRequests() int64 {
return c.cache.Stats.ServerRequests.Get()
}
func (c *group) MainCacheItems() int64 {
return c.cache.CacheStats(groupcache.MainCache).Items
}
func (c *group) MainCacheBytes() int64 {
return c.cache.CacheStats(groupcache.MainCache).Bytes
}
func (c *group) MainCacheGets() int64 {
return c.cache.CacheStats(groupcache.MainCache).Gets
}
func (c *group) MainCacheHits() int64 {
return c.cache.CacheStats(groupcache.MainCache).Hits
}
func (c *group) MainCacheEvictions() int64 {
return c.cache.CacheStats(groupcache.MainCache).Evictions
}
func (c *group) HotCacheItems() int64 {
return c.cache.CacheStats(groupcache.HotCache).Items
}
func (c *group) HotCacheBytes() int64 {
return c.cache.CacheStats(groupcache.HotCache).Bytes
}
func (c *group) HotCacheGets() int64 {
return c.cache.CacheStats(groupcache.HotCache).Gets
}
func (c *group) HotCacheHits() int64 {
return c.cache.CacheStats(groupcache.HotCache).Hits
}
func (c *group) HotCacheEvictions() int64 {
return c.cache.CacheStats(groupcache.HotCache).Evictions
}

@ -1,88 +0,0 @@
package cache
import (
"context"
"testing"
"github.com/go-kit/log"
"github.com/grafana/dskit/ring"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestGroupCache(t *testing.T) {
gc, err := setupGroupCache()
require.Nil(t, err)
c := gc.NewGroup("test-group", &GroupConfig{}, "test")
defer c.Stop()
keys := []string{"key1", "key2", "key3"}
bufs := [][]byte{[]byte("data1"), []byte("data2"), []byte("data3")}
miss := []string{"miss1", "miss2"}
err = c.Store(context.Background(), keys, bufs)
require.NoError(t, err)
// test hits
found, data, missed, _ := c.Fetch(context.Background(), keys)
require.Len(t, found, len(keys))
require.Len(t, missed, 0)
for i := 0; i < len(keys); i++ {
require.Equal(t, keys[i], found[i])
require.Equal(t, bufs[i], data[i])
}
// test misses
found, _, missed, _ = c.Fetch(context.Background(), miss)
require.Len(t, found, 0)
require.Len(t, missed, len(miss))
for i := 0; i < len(miss); i++ {
require.Equal(t, miss[i], missed[i])
}
// passing empty GroupConfig should use global `CapacityMB`.(which is 1MB).
c1 := gc.NewGroup("test-group1", &GroupConfig{}, "test1")
defer c.Stop()
assert.Equal(t, c1.(*group).cacheBytes, int64(1*1e6))
// pass explicitly capacity per group should take preference.
c2 := gc.NewGroup("test-group2", &GroupConfig{MaxSizeMB: 6}, "test2")
defer c.Stop()
assert.Equal(t, c2.(*group).cacheBytes, int64(6*1e6))
}
func setupGroupCache() (*GroupCache, error) {
return NewGroupCache(&mockRingManager{}, GroupCacheConfig{
Enabled: true,
MaxSizeMB: 1,
}, log.NewNopLogger(), nil)
}
type mockRingManager struct{}
func (rm *mockRingManager) Addr() string {
return "http://localhost:1234"
}
func (rm *mockRingManager) Ring() ring.ReadRing {
return &mockRing{}
}
type mockRing struct {
ring.ReadRing
}
func (r *mockRing) GetAllHealthy(op ring.Operation) (ring.ReplicationSet, error) {
return ring.ReplicationSet{Instances: []ring.InstanceDesc{
{
Addr: "http://localhost:1234",
},
}}, nil
}

@ -1,15 +0,0 @@
# Created by .ignore support plugin (hsz.mobi)
### Go template
# Binaries for programs and plugins
*.exe
*.exe~
*.dll
*.so
*.dylib
# Test binary, build with `go test -c`
*.test
# Output of the go coverage tool, specifically when used with LiteIDE
*.out

@ -1,21 +0,0 @@
MIT License
Copyright (c) 2022 Grafana Labs
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

@ -1 +0,0 @@
# Prometheus Groupcache exporter

@ -1,206 +0,0 @@
package groupcache_exporter
import (
"github.com/prometheus/client_golang/prometheus"
)
type Exporter struct {
groups []GroupStatistics
groupGets *prometheus.Desc
groupCacheHits *prometheus.Desc
groupPeerLoads *prometheus.Desc
groupPeerErrors *prometheus.Desc
groupLoads *prometheus.Desc
groupLoadsDeduped *prometheus.Desc
groupLocalLoads *prometheus.Desc
groupLocalLoadErrs *prometheus.Desc
groupServerRequests *prometheus.Desc
cacheBytes *prometheus.Desc
cacheItems *prometheus.Desc
cacheGets *prometheus.Desc
cacheHits *prometheus.Desc
cacheEvictions *prometheus.Desc
}
type GroupStatistics interface {
// Name returns the group's name
Name() string
// Gets represents any Get request, including from peers
Gets() int64
// CacheHits represents either cache was good
CacheHits() int64
// GetFromPeersLatencyLower represents slowest duration to request value from peers
GetFromPeersLatencyLower() int64
// PeerLoads represents either remote load or remote cache hit (not an error)
PeerLoads() int64
// PeerErrors represents a count of errors from peers
PeerErrors() int64
// Loads represents (gets - cacheHits)
Loads() int64
// LoadsDeduped represents after singleflight
LoadsDeduped() int64
// LocalLoads represents total good local loads
LocalLoads() int64
// LocalLoadErrs represents total bad local loads
LocalLoadErrs() int64
// ServerRequests represents gets that came over the network from peers
ServerRequests() int64
MainCacheItems() int64
MainCacheBytes() int64
MainCacheGets() int64
MainCacheHits() int64
MainCacheEvictions() int64
HotCacheItems() int64
HotCacheBytes() int64
HotCacheGets() int64
HotCacheHits() int64
HotCacheEvictions() int64
}
func NewExporter(labels map[string]string, groups ...GroupStatistics) *Exporter {
return &Exporter{
groups: groups,
groupGets: prometheus.NewDesc(
"gets_total",
"todo",
[]string{"group"},
labels,
),
groupCacheHits: prometheus.NewDesc(
"hits_total",
"todo",
[]string{"group"},
labels,
),
groupPeerLoads: prometheus.NewDesc(
"peer_loads_total",
"todo",
[]string{"group"},
labels,
),
groupPeerErrors: prometheus.NewDesc(
"peer_errors_total",
"todo",
[]string{"group"},
labels,
),
groupLoads: prometheus.NewDesc(
"loads_total",
"todo",
[]string{"group"},
labels,
),
groupLoadsDeduped: prometheus.NewDesc(
"loads_deduped_total",
"todo",
[]string{"group"},
labels,
),
groupLocalLoads: prometheus.NewDesc(
"local_load_total",
"todo",
[]string{"group"},
labels,
),
groupLocalLoadErrs: prometheus.NewDesc(
"local_load_errs_total",
"todo",
[]string{"group"},
labels,
),
groupServerRequests: prometheus.NewDesc(
"server_requests_total",
"todo",
[]string{"group"},
labels,
),
cacheBytes: prometheus.NewDesc(
"cache_bytes",
"todo",
[]string{"group", "type"},
labels,
),
cacheItems: prometheus.NewDesc(
"cache_items",
"todo",
[]string{"group", "type"},
labels,
),
cacheGets: prometheus.NewDesc(
"cache_gets_total",
"todo",
[]string{"group", "type"},
labels,
),
cacheHits: prometheus.NewDesc(
"cache_hits_total",
"todo",
[]string{"group", "type"},
labels,
),
cacheEvictions: prometheus.NewDesc(
"cache_evictions_total",
"todo",
[]string{"group", "type"},
labels,
),
}
}
func (e *Exporter) Describe(ch chan<- *prometheus.Desc) {
ch <- e.groupGets
ch <- e.groupCacheHits
ch <- e.groupPeerLoads
ch <- e.groupPeerErrors
ch <- e.groupLoads
ch <- e.groupLoadsDeduped
ch <- e.groupLocalLoads
ch <- e.groupLocalLoadErrs
ch <- e.groupServerRequests
ch <- e.cacheBytes
ch <- e.cacheItems
ch <- e.cacheGets
ch <- e.cacheHits
ch <- e.cacheEvictions
}
func (e *Exporter) Collect(ch chan<- prometheus.Metric) {
for _, group := range e.groups {
e.collectFromGroup(ch, group)
}
}
func (e *Exporter) collectFromGroup(ch chan<- prometheus.Metric, stats GroupStatistics) {
e.collectStats(ch, stats)
e.collectCacheStats(ch, stats)
}
func (e *Exporter) collectStats(ch chan<- prometheus.Metric, stats GroupStatistics) {
ch <- prometheus.MustNewConstMetric(e.groupGets, prometheus.CounterValue, float64(stats.Gets()), stats.Name())
ch <- prometheus.MustNewConstMetric(e.groupCacheHits, prometheus.CounterValue, float64(stats.CacheHits()), stats.Name())
ch <- prometheus.MustNewConstMetric(e.groupPeerLoads, prometheus.CounterValue, float64(stats.PeerLoads()), stats.Name())
ch <- prometheus.MustNewConstMetric(e.groupPeerErrors, prometheus.CounterValue, float64(stats.PeerErrors()), stats.Name())
ch <- prometheus.MustNewConstMetric(e.groupLoads, prometheus.CounterValue, float64(stats.Loads()), stats.Name())
ch <- prometheus.MustNewConstMetric(e.groupLoadsDeduped, prometheus.CounterValue, float64(stats.LoadsDeduped()), stats.Name())
ch <- prometheus.MustNewConstMetric(e.groupLocalLoads, prometheus.CounterValue, float64(stats.LocalLoads()), stats.Name())
ch <- prometheus.MustNewConstMetric(e.groupLocalLoadErrs, prometheus.CounterValue, float64(stats.LocalLoadErrs()), stats.Name())
ch <- prometheus.MustNewConstMetric(e.groupServerRequests, prometheus.CounterValue, float64(stats.ServerRequests()), stats.Name())
}
func (e *Exporter) collectCacheStats(ch chan<- prometheus.Metric, stats GroupStatistics) {
ch <- prometheus.MustNewConstMetric(e.cacheItems, prometheus.GaugeValue, float64(stats.MainCacheItems()), stats.Name(), "main")
ch <- prometheus.MustNewConstMetric(e.cacheBytes, prometheus.GaugeValue, float64(stats.MainCacheBytes()), stats.Name(), "main")
ch <- prometheus.MustNewConstMetric(e.cacheGets, prometheus.CounterValue, float64(stats.MainCacheGets()), stats.Name(), "main")
ch <- prometheus.MustNewConstMetric(e.cacheHits, prometheus.CounterValue, float64(stats.MainCacheHits()), stats.Name(), "main")
ch <- prometheus.MustNewConstMetric(e.cacheEvictions, prometheus.CounterValue, float64(stats.MainCacheEvictions()), stats.Name(), "main")
ch <- prometheus.MustNewConstMetric(e.cacheItems, prometheus.GaugeValue, float64(stats.HotCacheItems()), stats.Name(), "hot")
ch <- prometheus.MustNewConstMetric(e.cacheBytes, prometheus.GaugeValue, float64(stats.HotCacheBytes()), stats.Name(), "hot")
ch <- prometheus.MustNewConstMetric(e.cacheGets, prometheus.CounterValue, float64(stats.HotCacheGets()), stats.Name(), "hot")
ch <- prometheus.MustNewConstMetric(e.cacheHits, prometheus.CounterValue, float64(stats.HotCacheHits()), stats.Name(), "hot")
ch <- prometheus.MustNewConstMetric(e.cacheEvictions, prometheus.CounterValue, float64(stats.HotCacheEvictions()), stats.Name(), "hot")
}

@ -1,2 +0,0 @@
*~
.idea/

@ -1,20 +0,0 @@
language: go
go_import_path: github.com/mailgun/groupcache
os: linux
dist: xenial
sudo: false
script:
- go test ./...
go:
- 1.13.x
- 1.14.x
- 1.15.x
- 1.17.x
- master
cache:
directories:
- $GOPATH/pkg

@ -1,62 +0,0 @@
# Changelog
All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
#
## [2.3.1] - 2022-05-17
### Changed
* Fix example in README #40
* fix: deal with panics in Getter.Get #39
## [2.3.0] - 2022-01-06
### Added
* Added Group.Set() to allow users to explicity set values in the cache.
## [2.2.1] - 2021-01-13
### Changes
* Now uses the much faster fnv1
* Now md5 hashs the keys to help distribute hosts more evenly in some
cases.
## [2.2.0] - 2019-07-09
### Added
* Added `SetLogger()` to pass in a logrus entry for logging peer errors
* Added `DeregisterGroup()` to remove an existing group cache
### Changes
* Fixed race condition in `Remove()`
## [2.1.0] - 2019-06-10
### Changes
* `Get()` now returns immediately when context is done during a groupcache peer
conversation. Previously `Get()` would call the `Getter` with a done context.
## [2.0.0] - 2019-06-04
### Changes
* Now using golang standard `context.Context` instead of `groupcache.Context`.
* HTTP requests made by `httpGetter` now respect `context.Context` done.
* Moved `HTTPPool` config `Context` and `Transport` to `HTTPPoolOptions` for consist configuration.
* Now always populating the hotcache. A more complex algorithm is unnecessary
when the LRU cache will ensure the most used values remain in the cache. The
evict code ensures the hotcache does not overcrowd the maincache.
* Changed import paths to /v2 in accordance with go modules rules
* Fixed Issue where `DefaultTransport` was always used even if `Transport` was
specified by the user.
### Removed
* Reverted change to associate `Transport` to `httpGetter`, Which caused a data
race. Also discovered `DefaultTransport` has per address connection pooling
only when the request was a success, which is sufficient for most use cases.
## [1.3.0] - 2019-05-23
### Added
* Added `Remove()` method to `Group` to purge a key from the group.
## [1.1.0] - 2019-04-10
### Added
* Sinks can now accept an expire time
* Changed import path to mailgun/groupcache
## [hash 5b532d6fd5efaf7fa130d4e859a2fde0fc3a9e1b] - 2019-01-29
### Changes
* Initial import from https://github.com/golang/groupcache

@ -1,191 +0,0 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction, and
distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by the copyright
owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all other entities
that control, are controlled by, or are under common control with that entity.
For the purposes of this definition, "control" means (i) the power, direct or
indirect, to cause the direction or management of such entity, whether by
contract or otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity exercising
permissions granted by this License.
"Source" form shall mean the preferred form for making modifications, including
but not limited to software source code, documentation source, and configuration
files.
"Object" form shall mean any form resulting from mechanical transformation or
translation of a Source form, including but not limited to compiled object code,
generated documentation, and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or Object form, made
available under the License, as indicated by a copyright notice that is included
in or attached to the work (an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object form, that
is based on (or derived from) the Work and for which the editorial revisions,
annotations, elaborations, or other modifications represent, as a whole, an
original work of authorship. For the purposes of this License, Derivative Works
shall not include works that remain separable from, or merely link (or bind by
name) to the interfaces of, the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including the original version
of the Work and any modifications or additions to that Work or Derivative Works
thereof, that is intentionally submitted to Licensor for inclusion in the Work
by the copyright owner or by an individual or Legal Entity authorized to submit
on behalf of the copyright owner. For the purposes of this definition,
"submitted" means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems, and
issue tracking systems that are managed by, or on behalf of, the Licensor for
the purpose of discussing and improving the Work, but excluding communication
that is conspicuously marked or otherwise designated in writing by the copyright
owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity on behalf
of whom a Contribution has been received by Licensor and subsequently
incorporated within the Work.
2. Grant of Copyright License.
Subject to the terms and conditions of this License, each Contributor hereby
grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free,
irrevocable copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the Work and such
Derivative Works in Source or Object form.
3. Grant of Patent License.
Subject to the terms and conditions of this License, each Contributor hereby
grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free,
irrevocable (except as stated in this section) patent license to make, have
made, use, offer to sell, sell, import, and otherwise transfer the Work, where
such license applies only to those patent claims licensable by such Contributor
that are necessarily infringed by their Contribution(s) alone or by combination
of their Contribution(s) with the Work to which such Contribution(s) was
submitted. If You institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work or a
Contribution incorporated within the Work constitutes direct or contributory
patent infringement, then any patent licenses granted to You under this License
for that Work shall terminate as of the date such litigation is filed.
4. Redistribution.
You may reproduce and distribute copies of the Work or Derivative Works thereof
in any medium, with or without modifications, and in Source or Object form,
provided that You meet the following conditions:
You must give any other recipients of the Work or Derivative Works a copy of
this License; and
You must cause any modified files to carry prominent notices stating that You
changed the files; and
You must retain, in the Source form of any Derivative Works that You distribute,
all copyright, patent, trademark, and attribution notices from the Source form
of the Work, excluding those notices that do not pertain to any part of the
Derivative Works; and
If the Work includes a "NOTICE" text file as part of its distribution, then any
Derivative Works that You distribute must include a readable copy of the
attribution notices contained within such NOTICE file, excluding those notices
that do not pertain to any part of the Derivative Works, in at least one of the
following places: within a NOTICE text file distributed as part of the
Derivative Works; within the Source form or documentation, if provided along
with the Derivative Works; or, within a display generated by the Derivative
Works, if and wherever such third-party notices normally appear. The contents of
the NOTICE file are for informational purposes only and do not modify the
License. You may add Your own attribution notices within Derivative Works that
You distribute, alongside or as an addendum to the NOTICE text from the Work,
provided that such additional attribution notices cannot be construed as
modifying the License.
You may add Your own copyright statement to Your modifications and may provide
additional or different license terms and conditions for use, reproduction, or
distribution of Your modifications, or for any such Derivative Works as a whole,
provided Your use, reproduction, and distribution of the Work otherwise complies
with the conditions stated in this License.
5. Submission of Contributions.
Unless You explicitly state otherwise, any Contribution intentionally submitted
for inclusion in the Work by You to the Licensor shall be under the terms and
conditions of this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify the terms of
any separate license agreement you may have executed with Licensor regarding
such Contributions.
6. Trademarks.
This License does not grant permission to use the trade names, trademarks,
service marks, or product names of the Licensor, except as required for
reasonable and customary use in describing the origin of the Work and
reproducing the content of the NOTICE file.
7. Disclaimer of Warranty.
Unless required by applicable law or agreed to in writing, Licensor provides the
Work (and each Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied,
including, without limitation, any warranties or conditions of TITLE,
NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A PARTICULAR PURPOSE. You are
solely responsible for determining the appropriateness of using or
redistributing the Work and assume any risks associated with Your exercise of
permissions under this License.
8. Limitation of Liability.
In no event and under no legal theory, whether in tort (including negligence),
contract, or otherwise, unless required by applicable law (such as deliberate
and grossly negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special, incidental,
or consequential damages of any character arising as a result of this License or
out of the use or inability to use the Work (including but not limited to
damages for loss of goodwill, work stoppage, computer failure or malfunction, or
any and all other commercial damages or losses), even if such Contributor has
been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability.
While redistributing the Work or Derivative Works thereof, You may choose to
offer, and charge a fee for, acceptance of support, warranty, indemnity, or
other liability obligations and/or rights consistent with this License. However,
in accepting such obligations, You may act only on Your own behalf and on Your
sole responsibility, not on behalf of any other Contributor, and only if You
agree to indemnify, defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason of your
accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work
To apply the Apache License to your work, attach the following boilerplate
notice, with the fields enclosed by brackets "[]" replaced with your own
identifying information. (Don't include the brackets!) The text should be
enclosed in the appropriate comment syntax for the file format. We also
recommend that a file or class name and description of purpose be included on
the same "printed page" as the copyright notice for easier identification within
third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

@ -1,158 +0,0 @@
# groupcache
A modified version of [group cache](https://github.com/golang/groupcache) with
support for `context.Context`, [go modules](https://github.com/golang/go/wiki/Modules),
and explicit key removal and expiration. See the `CHANGELOG` for a complete list of
modifications.
## Summary
groupcache is a caching and cache-filling library, intended as a
replacement for memcached in many cases.
For API docs and examples, see http://godoc.org/github.com/mailgun/groupcache
### Modifications from original library
* Support for explicit key removal from a group. `Remove()` requests are
first sent to the peer who owns the key, then the remove request is
forwarded to every peer in the groupcache. NOTE: This is a best case design
since it is possible a temporary network disruption could occur resulting
in remove requests never making it their peers. In practice this scenario
is very rare and the system remains very consistent. In case of an
inconsistency placing a expiration time on your values will ensure the
cluster eventually becomes consistent again.
* Support for expired values. `SetBytes()`, `SetProto()` and `SetString()` now
accept an optional `time.Time{}` which represents a time in the future when the
value will expire. Expiration is handled by the LRU Cache when a `Get()` on a
key is requested. This means no network coordination of expired values is needed.
However this does require that time on all nodes in the cluster is synchronized
for consistent expiration of values.
* Now always populating the hotcache. A more complex algorithm is unnecessary
when the LRU cache will ensure the most used values remain in the cache. The
evict code ensures the hotcache never overcrowds the maincache.
## Comparing Groupcache to memcached
### **Like memcached**, groupcache:
* shards by key to select which peer is responsible for that key
### **Unlike memcached**, groupcache:
* does not require running a separate set of servers, thus massively
reducing deployment/configuration pain. groupcache is a client
library as well as a server. It connects to its own peers.
* comes with a cache filling mechanism. Whereas memcached just says
"Sorry, cache miss", often resulting in a thundering herd of
database (or whatever) loads from an unbounded number of clients
(which has resulted in several fun outages), groupcache coordinates
cache fills such that only one load in one process of an entire
replicated set of processes populates the cache, then multiplexes
the loaded value to all callers.
* does not support versioned values. If key "foo" is value "bar",
key "foo" must always be "bar".
## Loading process
In a nutshell, a groupcache lookup of **Get("foo")** looks like:
(On machine #5 of a set of N machines running the same code)
1. Is the value of "foo" in local memory because it's super hot? If so, use it.
2. Is the value of "foo" in local memory because peer #5 (the current
peer) is the owner of it? If so, use it.
3. Amongst all the peers in my set of N, am I the owner of the key
"foo"? (e.g. does it consistent hash to 5?) If so, load it. If
other callers come in, via the same process or via RPC requests
from peers, they block waiting for the load to finish and get the
same answer. If not, RPC to the peer that's the owner and get
the answer. If the RPC fails, just load it locally (still with
local dup suppression).
## Example
```go
import (
"context"
"fmt"
"log"
"time"
"github.com/mailgun/groupcache/v2"
)
func ExampleUsage() {
// NOTE: It is important to pass the same peer `http://192.168.1.1:8080` to `NewHTTPPoolOpts`
// which is provided to `pool.Set()` so the pool can identify which of the peers is our instance.
// The pool will not operate correctly if it can't identify which peer is our instance.
// Pool keeps track of peers in our cluster and identifies which peer owns a key.
pool := groupcache.NewHTTPPoolOpts("http://192.168.1.1:8080", &groupcache.HTTPPoolOptions{})
// Add more peers to the cluster You MUST Ensure our instance is included in this list else
// determining who owns the key accross the cluster will not be consistent, and the pool won't
// be able to determine if our instance owns the key.
pool.Set("http://192.168.1.1:8080", "http://192.168.1.2:8080", "http://192.168.1.3:8080")
server := http.Server{
Addr: "192.168.1.1:8080",
Handler: pool,
}
// Start a HTTP server to listen for peer requests from the groupcache
go func() {
log.Printf("Serving....\n")
if err := server.ListenAndServe(); err != nil {
log.Fatal(err)
}
}()
defer server.Shutdown(context.Background())
// Create a new group cache with a max cache size of 3MB
group := groupcache.NewGroup("users", 3000000, groupcache.GetterFunc(
func(ctx context.Context, id string, dest groupcache.Sink) error {
// Returns a protobuf struct `User`
user, err := fetchUserFromMongo(ctx, id)
if err != nil {
return err
}
// Set the user in the groupcache to expire after 5 minutes
return dest.SetProto(&user, time.Now().Add(time.Minute*5))
},
))
var user User
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500)
defer cancel()
if err := group.Get(ctx, "12345", groupcache.ProtoSink(&user)); err != nil {
log.Fatal(err)
}
fmt.Printf("-- User --\n")
fmt.Printf("Id: %s\n", user.Id)
fmt.Printf("Name: %s\n", user.Name)
fmt.Printf("Age: %d\n", user.Age)
fmt.Printf("IsSuper: %t\n", user.IsSuper)
// Remove the key from the groupcache
if err := group.Remove(ctx, "12345"); err != nil {
log.Fatal(err)
}
}
```
### Note
The call to `groupcache.NewHTTPPoolOpts()` is a bit misleading. `NewHTTPPoolOpts()` creates a new pool internally within the `groupcache` package where it is uitilized by any groups created. The `pool` returned is only a pointer to the internallly registered pool so the caller can update the peers in the pool as needed.

@ -1,182 +0,0 @@
/*
Copyright 2012 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package groupcache
import (
"bytes"
"errors"
"io"
"strings"
"time"
)
// A ByteView holds an immutable view of bytes.
// Internally it wraps either a []byte or a string,
// but that detail is invisible to callers.
//
// A ByteView is meant to be used as a value type, not
// a pointer (like a time.Time).
type ByteView struct {
// If b is non-nil, b is used, else s is used.
b []byte
s string
e time.Time
}
// Returns the expire time associated with this view
func (v ByteView) Expire() time.Time {
return v.e
}
// Len returns the view's length.
func (v ByteView) Len() int {
if v.b != nil {
return len(v.b)
}
return len(v.s)
}
// ByteSlice returns a copy of the data as a byte slice.
func (v ByteView) ByteSlice() []byte {
if v.b != nil {
return cloneBytes(v.b)
}
return []byte(v.s)
}
// String returns the data as a string, making a copy if necessary.
func (v ByteView) String() string {
if v.b != nil {
return string(v.b)
}
return v.s
}
// At returns the byte at index i.
func (v ByteView) At(i int) byte {
if v.b != nil {
return v.b[i]
}
return v.s[i]
}
// Slice slices the view between the provided from and to indices.
func (v ByteView) Slice(from, to int) ByteView {
if v.b != nil {
return ByteView{b: v.b[from:to]}
}
return ByteView{s: v.s[from:to]}
}
// SliceFrom slices the view from the provided index until the end.
func (v ByteView) SliceFrom(from int) ByteView {
if v.b != nil {
return ByteView{b: v.b[from:]}
}
return ByteView{s: v.s[from:]}
}
// Copy copies b into dest and returns the number of bytes copied.
func (v ByteView) Copy(dest []byte) int {
if v.b != nil {
return copy(dest, v.b)
}
return copy(dest, v.s)
}
// Equal returns whether the bytes in b are the same as the bytes in
// b2.
func (v ByteView) Equal(b2 ByteView) bool {
if b2.b == nil {
return v.EqualString(b2.s)
}
return v.EqualBytes(b2.b)
}
// EqualString returns whether the bytes in b are the same as the bytes
// in s.
func (v ByteView) EqualString(s string) bool {
if v.b == nil {
return v.s == s
}
l := v.Len()
if len(s) != l {
return false
}
for i, bi := range v.b {
if bi != s[i] {
return false
}
}
return true
}
// EqualBytes returns whether the bytes in b are the same as the bytes
// in b2.
func (v ByteView) EqualBytes(b2 []byte) bool {
if v.b != nil {
return bytes.Equal(v.b, b2)
}
l := v.Len()
if len(b2) != l {
return false
}
for i, bi := range b2 {
if bi != v.s[i] {
return false
}
}
return true
}
// Reader returns an io.ReadSeeker for the bytes in v.
func (v ByteView) Reader() io.ReadSeeker {
if v.b != nil {
return bytes.NewReader(v.b)
}
return strings.NewReader(v.s)
}
// ReadAt implements io.ReaderAt on the bytes in v.
func (v ByteView) ReadAt(p []byte, off int64) (n int, err error) {
if off < 0 {
return 0, errors.New("view: invalid offset")
}
if off >= int64(v.Len()) {
return 0, io.EOF
}
n = v.SliceFrom(int(off)).Copy(p)
if n < len(p) {
err = io.EOF
}
return
}
// WriteTo implements io.WriterTo on the bytes in v.
func (v ByteView) WriteTo(w io.Writer) (n int64, err error) {
var m int
if v.b != nil {
m, err = w.Write(v.b)
} else {
m, err = io.WriteString(w, v.s)
}
if err == nil && m < v.Len() {
err = io.ErrShortWrite
}
n = int64(m)
return
}

@ -1,84 +0,0 @@
/*
Copyright 2013 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package consistenthash provides an implementation of a ring hash.
package consistenthash
import (
"crypto/md5"
"fmt"
"sort"
"strconv"
"github.com/segmentio/fasthash/fnv1"
)
type Hash func(data []byte) uint64
type Map struct {
hash Hash
replicas int
keys []int // Sorted
hashMap map[int]string
}
func New(replicas int, fn Hash) *Map {
m := &Map{
replicas: replicas,
hash: fn,
hashMap: make(map[int]string),
}
if m.hash == nil {
m.hash = fnv1.HashBytes64
}
return m
}
// Returns true if there are no items available.
func (m *Map) IsEmpty() bool {
return len(m.keys) == 0
}
// Adds some keys to the hash.
func (m *Map) Add(keys ...string) {
for _, key := range keys {
for i := 0; i < m.replicas; i++ {
hash := int(m.hash([]byte(fmt.Sprintf("%x", md5.Sum([]byte(strconv.Itoa(i)+key))))))
m.keys = append(m.keys, hash)
m.hashMap[hash] = key
}
}
sort.Ints(m.keys)
}
// Gets the closest item in the hash to the provided key.
func (m *Map) Get(key string) string {
if m.IsEmpty() {
return ""
}
hash := int(m.hash([]byte(key)))
// Binary search for appropriate replica.
idx := sort.Search(len(m.keys), func(i int) bool { return m.keys[i] >= hash })
// Means we have cycled back to the first replica.
if idx == len(m.keys) {
idx = 0
}
return m.hashMap[m.keys[idx]]
}

@ -1,697 +0,0 @@
/*
Copyright 2012 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package groupcache provides a data loading mechanism with caching
// and de-duplication that works across a set of peer processes.
//
// Each data Get first consults its local cache, otherwise delegates
// to the requested key's canonical owner, which then checks its cache
// or finally gets the data. In the common case, many concurrent
// cache misses across a set of peers for the same key result in just
// one cache fill.
package groupcache
import (
"context"
"errors"
"strconv"
"sync"
"sync/atomic"
"time"
pb "github.com/mailgun/groupcache/v2/groupcachepb"
"github.com/mailgun/groupcache/v2/lru"
"github.com/mailgun/groupcache/v2/singleflight"
"github.com/sirupsen/logrus"
)
var logger *logrus.Entry
func SetLogger(log *logrus.Entry) {
logger = log
}
// A Getter loads data for a key.
type Getter interface {
// Get returns the value identified by key, populating dest.
//
// The returned data must be unversioned. That is, key must
// uniquely describe the loaded data, without an implicit
// current time, and without relying on cache expiration
// mechanisms.
Get(ctx context.Context, key string, dest Sink) error
}
// A GetterFunc implements Getter with a function.
type GetterFunc func(ctx context.Context, key string, dest Sink) error
func (f GetterFunc) Get(ctx context.Context, key string, dest Sink) error {
return f(ctx, key, dest)
}
var (
mu sync.RWMutex
groups = make(map[string]*Group)
initPeerServerOnce sync.Once
initPeerServer func()
)
// GetGroup returns the named group previously created with NewGroup, or
// nil if there's no such group.
func GetGroup(name string) *Group {
mu.RLock()
g := groups[name]
mu.RUnlock()
return g
}
// NewGroup creates a coordinated group-aware Getter from a Getter.
//
// The returned Getter tries (but does not guarantee) to run only one
// Get call at once for a given key across an entire set of peer
// processes. Concurrent callers both in the local process and in
// other processes receive copies of the answer once the original Get
// completes.
//
// The group name must be unique for each getter.
func NewGroup(name string, cacheBytes int64, getter Getter) *Group {
return newGroup(name, cacheBytes, getter, nil)
}
// DeregisterGroup removes group from group pool
func DeregisterGroup(name string) {
mu.Lock()
delete(groups, name)
mu.Unlock()
}
// If peers is nil, the peerPicker is called via a sync.Once to initialize it.
func newGroup(name string, cacheBytes int64, getter Getter, peers PeerPicker) *Group {
if getter == nil {
panic("nil Getter")
}
mu.Lock()
defer mu.Unlock()
initPeerServerOnce.Do(callInitPeerServer)
if _, dup := groups[name]; dup {
panic("duplicate registration of group " + name)
}
g := &Group{
name: name,
getter: getter,
peers: peers,
cacheBytes: cacheBytes,
loadGroup: &singleflight.Group{},
setGroup: &singleflight.Group{},
removeGroup: &singleflight.Group{},
}
if fn := newGroupHook; fn != nil {
fn(g)
}
groups[name] = g
return g
}
// newGroupHook, if non-nil, is called right after a new group is created.
var newGroupHook func(*Group)
// RegisterNewGroupHook registers a hook that is run each time
// a group is created.
func RegisterNewGroupHook(fn func(*Group)) {
if newGroupHook != nil {
panic("RegisterNewGroupHook called more than once")
}
newGroupHook = fn
}
// RegisterServerStart registers a hook that is run when the first
// group is created.
func RegisterServerStart(fn func()) {
if initPeerServer != nil {
panic("RegisterServerStart called more than once")
}
initPeerServer = fn
}
func callInitPeerServer() {
if initPeerServer != nil {
initPeerServer()
}
}
// A Group is a cache namespace and associated data loaded spread over
// a group of 1 or more machines.
type Group struct {
name string
getter Getter
peersOnce sync.Once
peers PeerPicker
cacheBytes int64 // limit for sum of mainCache and hotCache size
// mainCache is a cache of the keys for which this process
// (amongst its peers) is authoritative. That is, this cache
// contains keys which consistent hash on to this process's
// peer number.
mainCache cache
// hotCache contains keys/values for which this peer is not
// authoritative (otherwise they would be in mainCache), but
// are popular enough to warrant mirroring in this process to
// avoid going over the network to fetch from a peer. Having
// a hotCache avoids network hotspotting, where a peer's
// network card could become the bottleneck on a popular key.
// This cache is used sparingly to maximize the total number
// of key/value pairs that can be stored globally.
hotCache cache
// loadGroup ensures that each key is only fetched once
// (either locally or remotely), regardless of the number of
// concurrent callers.
loadGroup flightGroup
// setGroup ensures that each added key is only added
// remotely once regardless of the number of concurrent callers.
setGroup flightGroup
// removeGroup ensures that each removed key is only removed
// remotely once regardless of the number of concurrent callers.
removeGroup flightGroup
_ int32 // force Stats to be 8-byte aligned on 32-bit platforms
// Stats are statistics on the group.
Stats Stats
}
// flightGroup is defined as an interface which flightgroup.Group
// satisfies. We define this so that we may test with an alternate
// implementation.
type flightGroup interface {
Do(key string, fn func() (interface{}, error)) (interface{}, error)
Lock(fn func())
}
// Stats are per-group statistics.
type Stats struct {
Gets AtomicInt // any Get request, including from peers
CacheHits AtomicInt // either cache was good
GetFromPeersLatencyLower AtomicInt // slowest duration to request value from peers
PeerLoads AtomicInt // either remote load or remote cache hit (not an error)
PeerErrors AtomicInt
Loads AtomicInt // (gets - cacheHits)
LoadsDeduped AtomicInt // after singleflight
LocalLoads AtomicInt // total good local loads
LocalLoadErrs AtomicInt // total bad local loads
ServerRequests AtomicInt // gets that came over the network from peers
}
// Name returns the name of the group.
func (g *Group) Name() string {
return g.name
}
func (g *Group) initPeers() {
if g.peers == nil {
g.peers = getPeers(g.name)
}
}
func (g *Group) Get(ctx context.Context, key string, dest Sink) error {
g.peersOnce.Do(g.initPeers)
g.Stats.Gets.Add(1)
if dest == nil {
return errors.New("groupcache: nil dest Sink")
}
value, cacheHit := g.lookupCache(key)
if cacheHit {
g.Stats.CacheHits.Add(1)
return setSinkView(dest, value)
}
// Optimization to avoid double unmarshalling or copying: keep
// track of whether the dest was already populated. One caller
// (if local) will set this; the losers will not. The common
// case will likely be one caller.
destPopulated := false
value, destPopulated, err := g.load(ctx, key, dest)
if err != nil {
return err
}
if destPopulated {
return nil
}
return setSinkView(dest, value)
}
func (g *Group) Set(ctx context.Context, key string, value []byte, expire time.Time, hotCache bool) error {
g.peersOnce.Do(g.initPeers)
if key == "" {
return errors.New("empty Set() key not allowed")
}
_, err := g.setGroup.Do(key, func() (interface{}, error) {
// If remote peer owns this key
owner, ok := g.peers.PickPeer(key)
if ok {
if err := g.setFromPeer(ctx, owner, key, value, expire); err != nil {
return nil, err
}
// TODO(thrawn01): Not sure if this is useful outside of tests...
// maybe we should ALWAYS update the local cache?
if hotCache {
g.localSet(key, value, expire, &g.hotCache)
}
return nil, nil
}
// We own this key
g.localSet(key, value, expire, &g.mainCache)
return nil, nil
})
return err
}
// Remove clears the key from our cache then forwards the remove
// request to all peers.
func (g *Group) Remove(ctx context.Context, key string) error {
g.peersOnce.Do(g.initPeers)
_, err := g.removeGroup.Do(key, func() (interface{}, error) {
// Remove from key owner first
owner, ok := g.peers.PickPeer(key)
if ok {
if err := g.removeFromPeer(ctx, owner, key); err != nil {
return nil, err
}
}
// Remove from our cache next
g.localRemove(key)
wg := sync.WaitGroup{}
errs := make(chan error)
// Asynchronously clear the key from all hot and main caches of peers
for _, peer := range g.peers.GetAll() {
// avoid deleting from owner a second time
if peer == owner {
continue
}
wg.Add(1)
go func(peer ProtoGetter) {
errs <- g.removeFromPeer(ctx, peer, key)
wg.Done()
}(peer)
}
go func() {
wg.Wait()
close(errs)
}()
// TODO(thrawn01): Should we report all errors? Reporting context
// cancelled error for each peer doesn't make much sense.
var err error
for e := range errs {
err = e
}
return nil, err
})
return err
}
// load loads key either by invoking the getter locally or by sending it to another machine.
func (g *Group) load(ctx context.Context, key string, dest Sink) (value ByteView, destPopulated bool, err error) {
g.Stats.Loads.Add(1)
viewi, err := g.loadGroup.Do(key, func() (interface{}, error) {
// Check the cache again because singleflight can only dedup calls
// that overlap concurrently. It's possible for 2 concurrent
// requests to miss the cache, resulting in 2 load() calls. An
// unfortunate goroutine scheduling would result in this callback
// being run twice, serially. If we don't check the cache again,
// cache.nbytes would be incremented below even though there will
// be only one entry for this key.
//
// Consider the following serialized event ordering for two
// goroutines in which this callback gets called twice for hte
// same key:
// 1: Get("key")
// 2: Get("key")
// 1: lookupCache("key")
// 2: lookupCache("key")
// 1: load("key")
// 2: load("key")
// 1: loadGroup.Do("key", fn)
// 1: fn()
// 2: loadGroup.Do("key", fn)
// 2: fn()
if value, cacheHit := g.lookupCache(key); cacheHit {
g.Stats.CacheHits.Add(1)
return value, nil
}
g.Stats.LoadsDeduped.Add(1)
var value ByteView
var err error
if peer, ok := g.peers.PickPeer(key); ok {
// metrics duration start
start := time.Now()
// get value from peers
value, err = g.getFromPeer(ctx, peer, key)
// metrics duration compute
duration := int64(time.Since(start)) / int64(time.Millisecond)
// metrics only store the slowest duration
if g.Stats.GetFromPeersLatencyLower.Get() < duration {
g.Stats.GetFromPeersLatencyLower.Store(duration)
}
if err == nil {
g.Stats.PeerLoads.Add(1)
return value, nil
} else if errors.Is(err, context.Canceled) {
// do not count context cancellation as a peer error
return nil, err
}
if logger != nil {
logger.WithFields(logrus.Fields{
"err": err,
"key": key,
"category": "groupcache",
}).Errorf("error retrieving key from peer '%s'", peer.GetURL())
}
g.Stats.PeerErrors.Add(1)
if ctx != nil && ctx.Err() != nil {
// Return here without attempting to get locally
// since the context is no longer valid
return nil, err
}
// TODO(bradfitz): log the peer's error? keep
// log of the past few for /groupcachez? It's
// probably boring (normal task movement), so not
// worth logging I imagine.
}
value, err = g.getLocally(ctx, key, dest)
if err != nil {
g.Stats.LocalLoadErrs.Add(1)
return nil, err
}
g.Stats.LocalLoads.Add(1)
destPopulated = true // only one caller of load gets this return value
g.populateCache(key, value, &g.mainCache)
return value, nil
})
if err == nil {
value = viewi.(ByteView)
}
return
}
func (g *Group) getLocally(ctx context.Context, key string, dest Sink) (ByteView, error) {
err := g.getter.Get(ctx, key, dest)
if err != nil {
return ByteView{}, err
}
return dest.view()
}
func (g *Group) getFromPeer(ctx context.Context, peer ProtoGetter, key string) (ByteView, error) {
req := &pb.GetRequest{
Group: &g.name,
Key: &key,
}
res := &pb.GetResponse{}
err := peer.Get(ctx, req, res)
if err != nil {
return ByteView{}, err
}
var expire time.Time
if res.Expire != nil && *res.Expire != 0 {
expire = time.Unix(*res.Expire/int64(time.Second), *res.Expire%int64(time.Second))
if time.Now().After(expire) {
return ByteView{}, errors.New("peer returned expired value")
}
}
value := ByteView{b: res.Value, e: expire}
// Always populate the hot cache
g.populateCache(key, value, &g.hotCache)
return value, nil
}
func (g *Group) setFromPeer(ctx context.Context, peer ProtoGetter, k string, v []byte, e time.Time) error {
var expire int64
if !e.IsZero() {
expire = e.UnixNano()
}
req := &pb.SetRequest{
Expire: &expire,
Group: &g.name,
Key: &k,
Value: v,
}
return peer.Set(ctx, req)
}
func (g *Group) removeFromPeer(ctx context.Context, peer ProtoGetter, key string) error {
req := &pb.GetRequest{
Group: &g.name,
Key: &key,
}
return peer.Remove(ctx, req)
}
func (g *Group) lookupCache(key string) (value ByteView, ok bool) {
if g.cacheBytes <= 0 {
return
}
value, ok = g.mainCache.get(key)
if ok {
return
}
value, ok = g.hotCache.get(key)
return
}
func (g *Group) localSet(key string, value []byte, expire time.Time, cache *cache) {
if g.cacheBytes <= 0 {
return
}
bv := ByteView{
b: value,
e: expire,
}
// Ensure no requests are in flight
g.loadGroup.Lock(func() {
g.populateCache(key, bv, cache)
})
}
func (g *Group) localRemove(key string) {
// Clear key from our local cache
if g.cacheBytes <= 0 {
return
}
// Ensure no requests are in flight
g.loadGroup.Lock(func() {
g.hotCache.remove(key)
g.mainCache.remove(key)
})
}
func (g *Group) populateCache(key string, value ByteView, cache *cache) {
if g.cacheBytes <= 0 {
return
}
cache.add(key, value)
// Evict items from cache(s) if necessary.
for {
mainBytes := g.mainCache.bytes()
hotBytes := g.hotCache.bytes()
if mainBytes+hotBytes <= g.cacheBytes {
return
}
// TODO(bradfitz): this is good-enough-for-now logic.
// It should be something based on measurements and/or
// respecting the costs of different resources.
victim := &g.mainCache
if hotBytes > mainBytes/8 {
victim = &g.hotCache
}
victim.removeOldest()
}
}
// CacheType represents a type of cache.
type CacheType int
const (
// The MainCache is the cache for items that this peer is the
// owner for.
MainCache CacheType = iota + 1
// The HotCache is the cache for items that seem popular
// enough to replicate to this node, even though it's not the
// owner.
HotCache
)
// CacheStats returns stats about the provided cache within the group.
func (g *Group) CacheStats(which CacheType) CacheStats {
switch which {
case MainCache:
return g.mainCache.stats()
case HotCache:
return g.hotCache.stats()
default:
return CacheStats{}
}
}
// cache is a wrapper around an *lru.Cache that adds synchronization,
// makes values always be ByteView, and counts the size of all keys and
// values.
type cache struct {
mu sync.RWMutex
nbytes int64 // of all keys and values
lru *lru.Cache
nhit, nget int64
nevict int64 // number of evictions
}
func (c *cache) stats() CacheStats {
c.mu.RLock()
defer c.mu.RUnlock()
return CacheStats{
Bytes: c.nbytes,
Items: c.itemsLocked(),
Gets: c.nget,
Hits: c.nhit,
Evictions: c.nevict,
}
}
func (c *cache) add(key string, value ByteView) {
c.mu.Lock()
defer c.mu.Unlock()
if c.lru == nil {
c.lru = &lru.Cache{
OnEvicted: func(key lru.Key, value interface{}) {
val := value.(ByteView)
c.nbytes -= int64(len(key.(string))) + int64(val.Len())
c.nevict++
},
}
}
c.lru.Add(key, value, value.Expire())
c.nbytes += int64(len(key)) + int64(value.Len())
}
func (c *cache) get(key string) (value ByteView, ok bool) {
c.mu.Lock()
defer c.mu.Unlock()
c.nget++
if c.lru == nil {
return
}
vi, ok := c.lru.Get(key)
if !ok {
return
}
c.nhit++
return vi.(ByteView), true
}
func (c *cache) remove(key string) {
c.mu.Lock()
defer c.mu.Unlock()
if c.lru == nil {
return
}
c.lru.Remove(key)
}
func (c *cache) removeOldest() {
c.mu.Lock()
defer c.mu.Unlock()
if c.lru != nil {
c.lru.RemoveOldest()
}
}
func (c *cache) bytes() int64 {
c.mu.RLock()
defer c.mu.RUnlock()
return c.nbytes
}
func (c *cache) items() int64 {
c.mu.RLock()
defer c.mu.RUnlock()
return c.itemsLocked()
}
func (c *cache) itemsLocked() int64 {
if c.lru == nil {
return 0
}
return int64(c.lru.Len())
}
// An AtomicInt is an int64 to be accessed atomically.
type AtomicInt int64
// Add atomically adds n to i.
func (i *AtomicInt) Add(n int64) {
atomic.AddInt64((*int64)(i), n)
}
// Store atomically stores n to i.
func (i *AtomicInt) Store(n int64) {
atomic.StoreInt64((*int64)(i), n)
}
// Get atomically gets the value of i.
func (i *AtomicInt) Get() int64 {
return atomic.LoadInt64((*int64)(i))
}
func (i *AtomicInt) String() string {
return strconv.FormatInt(i.Get(), 10)
}
// CacheStats are returned by stats accessors on Group.
type CacheStats struct {
Bytes int64
Items int64
Gets int64
Hits int64
Evictions int64
}

@ -1,12 +0,0 @@
syntax = "proto3";
option go_package = "groupcache_test";
package groupcachepb;
message User {
string id = 1;
string name = 2;
int64 age = 3;
bool is_super = 4;
}

@ -1,155 +0,0 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: groupcache.proto
/*
Package groupcachepb is a generated protocol buffer package.
It is generated from these files:
groupcache.proto
It has these top-level messages:
GetRequest
GetResponse
SetRequest
*/
package groupcachepb
import proto "github.com/golang/protobuf/proto"
import fmt "fmt"
import math "math"
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
type GetRequest struct {
Group *string `protobuf:"bytes,1,req,name=group" json:"group,omitempty"`
Key *string `protobuf:"bytes,2,req,name=key" json:"key,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *GetRequest) Reset() { *m = GetRequest{} }
func (m *GetRequest) String() string { return proto.CompactTextString(m) }
func (*GetRequest) ProtoMessage() {}
func (*GetRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
func (m *GetRequest) GetGroup() string {
if m != nil && m.Group != nil {
return *m.Group
}
return ""
}
func (m *GetRequest) GetKey() string {
if m != nil && m.Key != nil {
return *m.Key
}
return ""
}
type GetResponse struct {
Value []byte `protobuf:"bytes,1,opt,name=value" json:"value,omitempty"`
MinuteQps *float64 `protobuf:"fixed64,2,opt,name=minute_qps,json=minuteQps" json:"minute_qps,omitempty"`
Expire *int64 `protobuf:"varint,3,opt,name=expire" json:"expire,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *GetResponse) Reset() { *m = GetResponse{} }
func (m *GetResponse) String() string { return proto.CompactTextString(m) }
func (*GetResponse) ProtoMessage() {}
func (*GetResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} }
func (m *GetResponse) GetValue() []byte {
if m != nil {
return m.Value
}
return nil
}
func (m *GetResponse) GetMinuteQps() float64 {
if m != nil && m.MinuteQps != nil {
return *m.MinuteQps
}
return 0
}
func (m *GetResponse) GetExpire() int64 {
if m != nil && m.Expire != nil {
return *m.Expire
}
return 0
}
type SetRequest struct {
Group *string `protobuf:"bytes,1,req,name=group" json:"group,omitempty"`
Key *string `protobuf:"bytes,2,req,name=key" json:"key,omitempty"`
Value []byte `protobuf:"bytes,3,opt,name=value" json:"value,omitempty"`
Expire *int64 `protobuf:"varint,4,opt,name=expire" json:"expire,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *SetRequest) Reset() { *m = SetRequest{} }
func (m *SetRequest) String() string { return proto.CompactTextString(m) }
func (*SetRequest) ProtoMessage() {}
func (*SetRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} }
func (m *SetRequest) GetGroup() string {
if m != nil && m.Group != nil {
return *m.Group
}
return ""
}
func (m *SetRequest) GetKey() string {
if m != nil && m.Key != nil {
return *m.Key
}
return ""
}
func (m *SetRequest) GetValue() []byte {
if m != nil {
return m.Value
}
return nil
}
func (m *SetRequest) GetExpire() int64 {
if m != nil && m.Expire != nil {
return *m.Expire
}
return 0
}
func init() {
proto.RegisterType((*GetRequest)(nil), "groupcachepb.GetRequest")
proto.RegisterType((*GetResponse)(nil), "groupcachepb.GetResponse")
proto.RegisterType((*SetRequest)(nil), "groupcachepb.SetRequest")
}
func init() { proto.RegisterFile("groupcache.proto", fileDescriptor0) }
var fileDescriptor0 = []byte{
// 215 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x50, 0x31, 0x4b, 0xc5, 0x30,
0x18, 0x34, 0x8d, 0x0a, 0xfd, 0xec, 0x50, 0x82, 0x48, 0x14, 0x84, 0x90, 0x29, 0x53, 0x07, 0x71,
0x74, 0x73, 0x28, 0xb8, 0x19, 0x37, 0x17, 0x69, 0xcb, 0x87, 0x16, 0xb5, 0x49, 0x9b, 0x44, 0x7c,
0xff, 0xfe, 0x91, 0xe6, 0x41, 0x3a, 0xbc, 0xe5, 0x6d, 0xb9, 0x3b, 0x2e, 0x77, 0xdf, 0x41, 0xfd,
0xb9, 0x98, 0x60, 0x87, 0x6e, 0xf8, 0xc2, 0xc6, 0x2e, 0xc6, 0x1b, 0x56, 0x65, 0xc6, 0xf6, 0xf2,
0x11, 0xa0, 0x45, 0xaf, 0x71, 0x0e, 0xe8, 0x3c, 0xbb, 0x86, 0x8b, 0x55, 0xe5, 0x44, 0x14, 0xaa,
0xd4, 0x09, 0xb0, 0x1a, 0xe8, 0x37, 0xee, 0x78, 0xb1, 0x72, 0xf1, 0x29, 0xdf, 0xe1, 0x6a, 0x75,
0x39, 0x6b, 0x26, 0x87, 0xd1, 0xf6, 0xd7, 0xfd, 0x04, 0xe4, 0x44, 0x10, 0x55, 0xe9, 0x04, 0xd8,
0x3d, 0xc0, 0xef, 0x38, 0x05, 0x8f, 0x1f, 0xb3, 0x75, 0xbc, 0x10, 0x44, 0x11, 0x5d, 0x26, 0xe6,
0xd5, 0x3a, 0x76, 0x03, 0x97, 0xf8, 0x6f, 0xc7, 0x05, 0x39, 0x15, 0x44, 0x51, 0x7d, 0x40, 0xb2,
0x07, 0x78, 0x3b, 0xb9, 0x51, 0xae, 0x40, 0xb7, 0x15, 0x72, 0xc6, 0xf9, 0x36, 0xe3, 0xe1, 0x05,
0xa0, 0x8d, 0x1f, 0x3d, 0xc7, 0x15, 0xd8, 0x13, 0xd0, 0x16, 0x3d, 0xe3, 0xcd, 0x76, 0x99, 0x26,
0xcf, 0x72, 0x77, 0x7b, 0x44, 0x49, 0xa7, 0xcb, 0xb3, 0x7d, 0x00, 0x00, 0x00, 0xff, 0xff, 0x02,
0x10, 0x64, 0xec, 0x62, 0x01, 0x00, 0x00,
}

@ -1,42 +0,0 @@
/*
Copyright 2012 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
syntax = "proto2";
package groupcachepb;
message GetRequest {
required string group = 1;
required string key = 2; // not actually required/guaranteed to be UTF-8
}
message GetResponse {
optional bytes value = 1;
optional double minute_qps = 2;
optional int64 expire = 3;
}
message SetRequest {
required string group = 1;
required string key = 2;
optional bytes value = 3;
optional int64 expire = 4;
}
service GroupCache {
rpc Get(GetRequest) returns (GetResponse) {
};
}

@ -1,354 +0,0 @@
/*
Copyright 2013 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package groupcache
import (
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"strings"
"sync"
"time"
"github.com/golang/protobuf/proto"
"github.com/mailgun/groupcache/v2/consistenthash"
pb "github.com/mailgun/groupcache/v2/groupcachepb"
)
const defaultBasePath = "/_groupcache/"
const defaultReplicas = 50
// HTTPPool implements PeerPicker for a pool of HTTP peers.
type HTTPPool struct {
// this peer's base URL, e.g. "https://example.net:8000"
self string
// opts specifies the options.
opts HTTPPoolOptions
mu sync.Mutex // guards peers and httpGetters
peers *consistenthash.Map
httpGetters map[string]*httpGetter // keyed by e.g. "http://10.0.0.2:8008"
}
// HTTPPoolOptions are the configurations of a HTTPPool.
type HTTPPoolOptions struct {
// BasePath specifies the HTTP path that will serve groupcache requests.
// If blank, it defaults to "/_groupcache/".
BasePath string
// Replicas specifies the number of key replicas on the consistent hash.
// If blank, it defaults to 50.
Replicas int
// HashFn specifies the hash function of the consistent hash.
// If blank, it defaults to crc32.ChecksumIEEE.
HashFn consistenthash.Hash
// Transport optionally specifies an http.RoundTripper for the client
// to use when it makes a request.
// If nil, the client uses http.DefaultTransport.
Transport func(context.Context) http.RoundTripper
// Context optionally specifies a context for the server to use when it
// receives a request.
// If nil, uses the http.Request.Context()
Context func(*http.Request) context.Context
}
// NewHTTPPool initializes an HTTP pool of peers, and registers itself as a PeerPicker.
// For convenience, it also registers itself as an http.Handler with http.DefaultServeMux.
// The self argument should be a valid base URL that points to the current server,
// for example "http://example.net:8000".
func NewHTTPPool(self string) *HTTPPool {
p := NewHTTPPoolOpts(self, nil)
http.Handle(p.opts.BasePath, p)
return p
}
var httpPoolMade bool
// NewHTTPPoolOpts initializes an HTTP pool of peers with the given options.
// Unlike NewHTTPPool, this function does not register the created pool as an HTTP handler.
// The returned *HTTPPool implements http.Handler and must be registered using http.Handle.
func NewHTTPPoolOpts(self string, o *HTTPPoolOptions) *HTTPPool {
if httpPoolMade {
panic("groupcache: NewHTTPPool must be called only once")
}
httpPoolMade = true
p := &HTTPPool{
self: self,
httpGetters: make(map[string]*httpGetter),
}
if o != nil {
p.opts = *o
}
if p.opts.BasePath == "" {
p.opts.BasePath = defaultBasePath
}
if p.opts.Replicas == 0 {
p.opts.Replicas = defaultReplicas
}
p.peers = consistenthash.New(p.opts.Replicas, p.opts.HashFn)
RegisterPeerPicker(func() PeerPicker { return p })
return p
}
// Set updates the pool's list of peers.
// Each peer value should be a valid base URL,
// for example "http://example.net:8000".
func (p *HTTPPool) Set(peers ...string) {
p.mu.Lock()
defer p.mu.Unlock()
p.peers = consistenthash.New(p.opts.Replicas, p.opts.HashFn)
p.peers.Add(peers...)
p.httpGetters = make(map[string]*httpGetter, len(peers))
for _, peer := range peers {
p.httpGetters[peer] = &httpGetter{
getTransport: p.opts.Transport,
baseURL: peer + p.opts.BasePath,
}
}
}
// GetAll returns all the peers in the pool
func (p *HTTPPool) GetAll() []ProtoGetter {
p.mu.Lock()
defer p.mu.Unlock()
var i int
res := make([]ProtoGetter, len(p.httpGetters))
for _, v := range p.httpGetters {
res[i] = v
i++
}
return res
}
func (p *HTTPPool) PickPeer(key string) (ProtoGetter, bool) {
p.mu.Lock()
defer p.mu.Unlock()
if p.peers.IsEmpty() {
return nil, false
}
if peer := p.peers.Get(key); peer != p.self {
return p.httpGetters[peer], true
}
return nil, false
}
func (p *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Parse request.
if !strings.HasPrefix(r.URL.Path, p.opts.BasePath) {
panic("HTTPPool serving unexpected path: " + r.URL.Path)
}
parts := strings.SplitN(r.URL.Path[len(p.opts.BasePath):], "/", 2)
if len(parts) != 2 {
http.Error(w, "bad request", http.StatusBadRequest)
return
}
groupName := parts[0]
key := parts[1]
// Fetch the value for this group/key.
group := GetGroup(groupName)
if group == nil {
http.Error(w, "no such group: "+groupName, http.StatusNotFound)
return
}
var ctx context.Context
if p.opts.Context != nil {
ctx = p.opts.Context(r)
} else {
ctx = r.Context()
}
group.Stats.ServerRequests.Add(1)
// Delete the key and return 200
if r.Method == http.MethodDelete {
group.localRemove(key)
return
}
// The read the body and set the key value
if r.Method == http.MethodPut {
defer r.Body.Close()
b := bufferPool.Get().(*bytes.Buffer)
b.Reset()
defer bufferPool.Put(b)
_, err := io.Copy(b, r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
var out pb.SetRequest
err = proto.Unmarshal(b.Bytes(), &out)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
var expire time.Time
if out.Expire != nil && *out.Expire != 0 {
expire = time.Unix(*out.Expire/int64(time.Second), *out.Expire%int64(time.Second))
}
group.localSet(*out.Key, out.Value, expire, &group.mainCache)
return
}
var b []byte
value := AllocatingByteSliceSink(&b)
err := group.Get(ctx, key, value)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
view, err := value.view()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
var expireNano int64
if !view.e.IsZero() {
expireNano = view.Expire().UnixNano()
}
// Write the value to the response body as a proto message.
body, err := proto.Marshal(&pb.GetResponse{Value: b, Expire: &expireNano})
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/x-protobuf")
w.Write(body)
}
type httpGetter struct {
getTransport func(context.Context) http.RoundTripper
baseURL string
}
func (p *httpGetter) GetURL() string {
return p.baseURL
}
var bufferPool = sync.Pool{
New: func() interface{} { return new(bytes.Buffer) },
}
type request interface {
GetGroup() string
GetKey() string
}
func (h *httpGetter) makeRequest(ctx context.Context, m string, in request, b io.Reader, out *http.Response) error {
u := fmt.Sprintf(
"%v%v/%v",
h.baseURL,
url.QueryEscape(in.GetGroup()),
url.QueryEscape(in.GetKey()),
)
req, err := http.NewRequestWithContext(ctx, m, u, b)
if err != nil {
return err
}
tr := http.DefaultTransport
if h.getTransport != nil {
tr = h.getTransport(ctx)
}
res, err := tr.RoundTrip(req)
if err != nil {
return err
}
*out = *res
return nil
}
func (h *httpGetter) Get(ctx context.Context, in *pb.GetRequest, out *pb.GetResponse) error {
var res http.Response
if err := h.makeRequest(ctx, http.MethodGet, in, nil, &res); err != nil {
return err
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
return fmt.Errorf("server returned: %v", res.Status)
}
b := bufferPool.Get().(*bytes.Buffer)
b.Reset()
defer bufferPool.Put(b)
_, err := io.Copy(b, res.Body)
if err != nil {
return fmt.Errorf("reading response body: %v", err)
}
err = proto.Unmarshal(b.Bytes(), out)
if err != nil {
return fmt.Errorf("decoding response body: %v", err)
}
return nil
}
func (h *httpGetter) Set(ctx context.Context, in *pb.SetRequest) error {
body, err := proto.Marshal(in)
if err != nil {
return fmt.Errorf("while marshaling SetRequest body: %w", err)
}
var res http.Response
if err := h.makeRequest(ctx, http.MethodPut, in, bytes.NewReader(body), &res); err != nil {
return err
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
body, err := ioutil.ReadAll(res.Body)
if err != nil {
return fmt.Errorf("while reading body response: %v", res.Status)
}
return fmt.Errorf("server returned status %d: %s", res.StatusCode, body)
}
return nil
}
func (h *httpGetter) Remove(ctx context.Context, in *pb.GetRequest) error {
var res http.Response
if err := h.makeRequest(ctx, http.MethodDelete, in, nil, &res); err != nil {
return err
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
body, err := ioutil.ReadAll(res.Body)
if err != nil {
return fmt.Errorf("while reading body response: %v", res.Status)
}
return fmt.Errorf("server returned status %d: %s", res.StatusCode, body)
}
return nil
}

@ -1,144 +0,0 @@
/*
Copyright 2013 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package lru implements an LRU cache.
package lru
import (
"container/list"
"time"
)
// Cache is an LRU cache. It is not safe for concurrent access.
type Cache struct {
// MaxEntries is the maximum number of cache entries before
// an item is evicted. Zero means no limit.
MaxEntries int
// OnEvicted optionally specifies a callback function to be
// executed when an entry is purged from the cache.
OnEvicted func(key Key, value interface{})
ll *list.List
cache map[interface{}]*list.Element
}
// A Key may be any value that is comparable. See http://golang.org/ref/spec#Comparison_operators
type Key interface{}
type entry struct {
key Key
value interface{}
expire time.Time
}
// New creates a new Cache.
// If maxEntries is zero, the cache has no limit and it's assumed
// that eviction is done by the caller.
func New(maxEntries int) *Cache {
return &Cache{
MaxEntries: maxEntries,
ll: list.New(),
cache: make(map[interface{}]*list.Element),
}
}
// Add adds a value to the cache.
func (c *Cache) Add(key Key, value interface{}, expire time.Time) {
if c.cache == nil {
c.cache = make(map[interface{}]*list.Element)
c.ll = list.New()
}
if ee, ok := c.cache[key]; ok {
c.ll.MoveToFront(ee)
ee.Value.(*entry).value = value
return
}
ele := c.ll.PushFront(&entry{key, value, expire})
c.cache[key] = ele
if c.MaxEntries != 0 && c.ll.Len() > c.MaxEntries {
c.RemoveOldest()
}
}
// Get looks up a key's value from the cache.
func (c *Cache) Get(key Key) (value interface{}, ok bool) {
if c.cache == nil {
return
}
if ele, hit := c.cache[key]; hit {
entry := ele.Value.(*entry)
// If the entry has expired, remove it from the cache
if !entry.expire.IsZero() && entry.expire.Before(time.Now()) {
c.removeElement(ele)
return nil, false
}
c.ll.MoveToFront(ele)
return entry.value, true
}
return
}
// Remove removes the provided key from the cache.
func (c *Cache) Remove(key Key) {
if c.cache == nil {
return
}
if ele, hit := c.cache[key]; hit {
c.removeElement(ele)
}
}
// RemoveOldest removes the oldest item from the cache.
func (c *Cache) RemoveOldest() {
if c.cache == nil {
return
}
ele := c.ll.Back()
if ele != nil {
c.removeElement(ele)
}
}
func (c *Cache) removeElement(e *list.Element) {
c.ll.Remove(e)
kv := e.Value.(*entry)
delete(c.cache, kv.key)
if c.OnEvicted != nil {
c.OnEvicted(kv.key, kv.value)
}
}
// Len returns the number of items in the cache.
func (c *Cache) Len() int {
if c.cache == nil {
return 0
}
return c.ll.Len()
}
// Clear purges all stored items from the cache.
func (c *Cache) Clear() {
if c.OnEvicted != nil {
for _, e := range c.cache {
kv := e.Value.(*entry)
c.OnEvicted(kv.key, kv.value)
}
}
c.ll = nil
c.cache = nil
}

@ -1,89 +0,0 @@
/*
Copyright 2012 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// peers.go defines how processes find and communicate with their peers.
package groupcache
import (
"context"
pb "github.com/mailgun/groupcache/v2/groupcachepb"
)
// ProtoGetter is the interface that must be implemented by a peer.
type ProtoGetter interface {
Get(context context.Context, in *pb.GetRequest, out *pb.GetResponse) error
Remove(context context.Context, in *pb.GetRequest) error
Set(context context.Context, in *pb.SetRequest) error
// GetURL returns the peer URL
GetURL() string
}
// PeerPicker is the interface that must be implemented to locate
// the peer that owns a specific key.
type PeerPicker interface {
// PickPeer returns the peer that owns the specific key
// and true to indicate that a remote peer was nominated.
// It returns nil, false if the key owner is the current peer.
PickPeer(key string) (peer ProtoGetter, ok bool)
// GetAll returns all the peers in the group
GetAll() []ProtoGetter
}
// NoPeers is an implementation of PeerPicker that never finds a peer.
type NoPeers struct{}
func (NoPeers) PickPeer(key string) (peer ProtoGetter, ok bool) { return }
func (NoPeers) GetAll() []ProtoGetter { return []ProtoGetter{} }
var (
portPicker func(groupName string) PeerPicker
)
// RegisterPeerPicker registers the peer initialization function.
// It is called once, when the first group is created.
// Either RegisterPeerPicker or RegisterPerGroupPeerPicker should be
// called exactly once, but not both.
func RegisterPeerPicker(fn func() PeerPicker) {
if portPicker != nil {
panic("RegisterPeerPicker called more than once")
}
portPicker = func(_ string) PeerPicker { return fn() }
}
// RegisterPerGroupPeerPicker registers the peer initialization function,
// which takes the groupName, to be used in choosing a PeerPicker.
// It is called once, when the first group is created.
// Either RegisterPeerPicker or RegisterPerGroupPeerPicker should be
// called exactly once, but not both.
func RegisterPerGroupPeerPicker(fn func(groupName string) PeerPicker) {
if portPicker != nil {
panic("RegisterPeerPicker called more than once")
}
portPicker = fn
}
func getPeers(groupName string) PeerPicker {
if portPicker == nil {
return NoPeers{}
}
pk := portPicker(groupName)
if pk == nil {
pk = NoPeers{}
}
return pk
}

@ -1,16 +0,0 @@
#! /bin/sh
# Make sure the script fails fast.
set -e
set -u
set -x
PROTO_DIR=groupcachepb
protoc -I=$PROTO_DIR \
--go_out=$PROTO_DIR \
$PROTO_DIR/groupcache.proto
protoc -I=$PROTO_DIR \
--go_out=. \
$PROTO_DIR/example.proto

@ -1,81 +0,0 @@
/*
Copyright 2012 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package singleflight provides a duplicate function call suppression
// mechanism.
package singleflight
import (
"fmt"
"sync"
)
// call is an in-flight or completed Do call
type call struct {
wg sync.WaitGroup
val interface{}
err error
}
// Group represents a class of work and forms a namespace in which
// units of work can be executed with duplicate suppression.
type Group struct {
mu sync.Mutex // protects m
m map[string]*call // lazily initialized
}
// Do executes and returns the results of the given function, making
// sure that only one execution is in-flight for a given key at a
// time. If a duplicate comes in, the duplicate caller waits for the
// original to complete and receives the same results.
func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
g.mu.Unlock()
c.wg.Wait()
return c.val, c.err
}
c := &call{
err: fmt.Errorf("singleflight leader panicked"),
}
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
defer func() {
c.wg.Done()
g.mu.Lock()
delete(g.m, key)
g.mu.Unlock()
}()
c.val, c.err = fn()
return c.val, c.err
}
// Lock prevents single flights from occurring for the duration
// of the provided function. This allows users to clear caches
// or preform some operation in between running flights.
func (g *Group) Lock(fn func()) {
g.mu.Lock()
defer g.mu.Unlock()
fn()
}

@ -1,338 +0,0 @@
/*
Copyright 2012 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package groupcache
import (
"errors"
"time"
"github.com/golang/protobuf/proto"
)
var _ Sink = &stringSink{}
var _ Sink = &allocBytesSink{}
var _ Sink = &protoSink{}
var _ Sink = &truncBytesSink{}
var _ Sink = &byteViewSink{}
// A Sink receives data from a Get call.
//
// Implementation of Getter must call exactly one of the Set methods
// on success.
type Sink interface {
// SetString sets the value to s.
SetString(s string, e time.Time) error
// SetBytes sets the value to the contents of v.
// The caller retains ownership of v.
SetBytes(v []byte, e time.Time) error
// SetProto sets the value to the encoded version of m.
// The caller retains ownership of m.
SetProto(m proto.Message, e time.Time) error
// view returns a frozen view of the bytes for caching.
view() (ByteView, error)
}
func cloneBytes(b []byte) []byte {
c := make([]byte, len(b))
copy(c, b)
return c
}
func setSinkView(s Sink, v ByteView) error {
// A viewSetter is a Sink that can also receive its value from
// a ByteView. This is a fast path to minimize copies when the
// item was already cached locally in memory (where it's
// cached as a ByteView)
type viewSetter interface {
setView(v ByteView) error
}
if vs, ok := s.(viewSetter); ok {
return vs.setView(v)
}
if v.b != nil {
return s.SetBytes(v.b, v.Expire())
}
return s.SetString(v.s, v.Expire())
}
// StringSink returns a Sink that populates the provided string pointer.
func StringSink(sp *string) Sink {
return &stringSink{sp: sp}
}
type stringSink struct {
sp *string
v ByteView
// TODO(bradfitz): track whether any Sets were called.
}
func (s *stringSink) view() (ByteView, error) {
// TODO(bradfitz): return an error if no Set was called
return s.v, nil
}
func (s *stringSink) SetString(v string, e time.Time) error {
s.v.b = nil
s.v.s = v
*s.sp = v
s.v.e = e
return nil
}
func (s *stringSink) SetBytes(v []byte, e time.Time) error {
return s.SetString(string(v), e)
}
func (s *stringSink) SetProto(m proto.Message, e time.Time) error {
b, err := proto.Marshal(m)
if err != nil {
return err
}
s.v.b = b
*s.sp = string(b)
s.v.e = e
return nil
}
// ByteViewSink returns a Sink that populates a ByteView.
func ByteViewSink(dst *ByteView) Sink {
if dst == nil {
panic("nil dst")
}
return &byteViewSink{dst: dst}
}
type byteViewSink struct {
dst *ByteView
// if this code ever ends up tracking that at least one set*
// method was called, don't make it an error to call set
// methods multiple times. Lorry's payload.go does that, and
// it makes sense. The comment at the top of this file about
// "exactly one of the Set methods" is overly strict. We
// really care about at least once (in a handler), but if
// multiple handlers fail (or multiple functions in a program
// using a Sink), it's okay to re-use the same one.
}
func (s *byteViewSink) setView(v ByteView) error {
*s.dst = v
return nil
}
func (s *byteViewSink) view() (ByteView, error) {
return *s.dst, nil
}
func (s *byteViewSink) SetProto(m proto.Message, e time.Time) error {
b, err := proto.Marshal(m)
if err != nil {
return err
}
*s.dst = ByteView{b: b, e: e}
return nil
}
func (s *byteViewSink) SetBytes(b []byte, e time.Time) error {
*s.dst = ByteView{b: cloneBytes(b), e: e}
return nil
}
func (s *byteViewSink) SetString(v string, e time.Time) error {
*s.dst = ByteView{s: v, e: e}
return nil
}
// ProtoSink returns a sink that unmarshals binary proto values into m.
func ProtoSink(m proto.Message) Sink {
return &protoSink{
dst: m,
}
}
type protoSink struct {
dst proto.Message // authoritative value
typ string
v ByteView // encoded
}
func (s *protoSink) view() (ByteView, error) {
return s.v, nil
}
func (s *protoSink) SetBytes(b []byte, e time.Time) error {
err := proto.Unmarshal(b, s.dst)
if err != nil {
return err
}
s.v.b = cloneBytes(b)
s.v.s = ""
s.v.e = e
return nil
}
func (s *protoSink) SetString(v string, e time.Time) error {
b := []byte(v)
err := proto.Unmarshal(b, s.dst)
if err != nil {
return err
}
s.v.b = b
s.v.s = ""
s.v.e = e
return nil
}
func (s *protoSink) SetProto(m proto.Message, e time.Time) error {
b, err := proto.Marshal(m)
if err != nil {
return err
}
// TODO(bradfitz): optimize for same-task case more and write
// right through? would need to document ownership rules at
// the same time. but then we could just assign *dst = *m
// here. This works for now:
err = proto.Unmarshal(b, s.dst)
if err != nil {
return err
}
s.v.b = b
s.v.s = ""
s.v.e = e
return nil
}
// AllocatingByteSliceSink returns a Sink that allocates
// a byte slice to hold the received value and assigns
// it to *dst. The memory is not retained by groupcache.
func AllocatingByteSliceSink(dst *[]byte) Sink {
return &allocBytesSink{dst: dst}
}
type allocBytesSink struct {
dst *[]byte
v ByteView
}
func (s *allocBytesSink) view() (ByteView, error) {
return s.v, nil
}
func (s *allocBytesSink) setView(v ByteView) error {
if v.b != nil {
*s.dst = cloneBytes(v.b)
} else {
*s.dst = []byte(v.s)
}
s.v = v
return nil
}
func (s *allocBytesSink) SetProto(m proto.Message, e time.Time) error {
b, err := proto.Marshal(m)
if err != nil {
return err
}
return s.setBytesOwned(b, e)
}
func (s *allocBytesSink) SetBytes(b []byte, e time.Time) error {
return s.setBytesOwned(cloneBytes(b), e)
}
func (s *allocBytesSink) setBytesOwned(b []byte, e time.Time) error {
if s.dst == nil {
return errors.New("nil AllocatingByteSliceSink *[]byte dst")
}
*s.dst = cloneBytes(b) // another copy, protecting the read-only s.v.b view
s.v.b = b
s.v.s = ""
s.v.e = e
return nil
}
func (s *allocBytesSink) SetString(v string, e time.Time) error {
if s.dst == nil {
return errors.New("nil AllocatingByteSliceSink *[]byte dst")
}
*s.dst = []byte(v)
s.v.b = nil
s.v.s = v
s.v.e = e
return nil
}
// TruncatingByteSliceSink returns a Sink that writes up to len(*dst)
// bytes to *dst. If more bytes are available, they're silently
// truncated. If fewer bytes are available than len(*dst), *dst
// is shrunk to fit the number of bytes available.
func TruncatingByteSliceSink(dst *[]byte) Sink {
return &truncBytesSink{dst: dst}
}
type truncBytesSink struct {
dst *[]byte
v ByteView
}
func (s *truncBytesSink) view() (ByteView, error) {
return s.v, nil
}
func (s *truncBytesSink) SetProto(m proto.Message, e time.Time) error {
b, err := proto.Marshal(m)
if err != nil {
return err
}
return s.setBytesOwned(b, e)
}
func (s *truncBytesSink) SetBytes(b []byte, e time.Time) error {
return s.setBytesOwned(cloneBytes(b), e)
}
func (s *truncBytesSink) setBytesOwned(b []byte, e time.Time) error {
if s.dst == nil {
return errors.New("nil TruncatingByteSliceSink *[]byte dst")
}
n := copy(*s.dst, b)
if n < len(*s.dst) {
*s.dst = (*s.dst)[:n]
}
s.v.b = b
s.v.s = ""
s.v.e = e
return nil
}
func (s *truncBytesSink) SetString(v string, e time.Time) error {
if s.dst == nil {
return errors.New("nil TruncatingByteSliceSink *[]byte dst")
}
n := copy(*s.dst, v)
if n < len(*s.dst) {
*s.dst = (*s.dst)[:n]
}
s.v.b = nil
s.v.s = v
s.v.e = e
return nil
}

@ -1,108 +0,0 @@
package fnv1
const (
// FNV-1
offset64 = uint64(14695981039346656037)
prime64 = uint64(1099511628211)
// Init64 is what 64 bits hash values should be initialized with.
Init64 = offset64
)
// HashString64 returns the hash of s.
func HashString64(s string) uint64 {
return AddString64(Init64, s)
}
// HashBytes64 returns the hash of u.
func HashBytes64(b []byte) uint64 {
return AddBytes64(Init64, b)
}
// HashUint64 returns the hash of u.
func HashUint64(u uint64) uint64 {
return AddUint64(Init64, u)
}
// AddString64 adds the hash of s to the precomputed hash value h.
func AddString64(h uint64, s string) uint64 {
for len(s) >= 8 {
h = (h * prime64) ^ uint64(s[0])
h = (h * prime64) ^ uint64(s[1])
h = (h * prime64) ^ uint64(s[2])
h = (h * prime64) ^ uint64(s[3])
h = (h * prime64) ^ uint64(s[4])
h = (h * prime64) ^ uint64(s[5])
h = (h * prime64) ^ uint64(s[6])
h = (h * prime64) ^ uint64(s[7])
s = s[8:]
}
if len(s) >= 4 {
h = (h * prime64) ^ uint64(s[0])
h = (h * prime64) ^ uint64(s[1])
h = (h * prime64) ^ uint64(s[2])
h = (h * prime64) ^ uint64(s[3])
s = s[4:]
}
if len(s) >= 2 {
h = (h * prime64) ^ uint64(s[0])
h = (h * prime64) ^ uint64(s[1])
s = s[2:]
}
if len(s) > 0 {
h = (h * prime64) ^ uint64(s[0])
}
return h
}
// AddBytes64 adds the hash of b to the precomputed hash value h.
func AddBytes64(h uint64, b []byte) uint64 {
for len(b) >= 8 {
h = (h * prime64) ^ uint64(b[0])
h = (h * prime64) ^ uint64(b[1])
h = (h * prime64) ^ uint64(b[2])
h = (h * prime64) ^ uint64(b[3])
h = (h * prime64) ^ uint64(b[4])
h = (h * prime64) ^ uint64(b[5])
h = (h * prime64) ^ uint64(b[6])
h = (h * prime64) ^ uint64(b[7])
b = b[8:]
}
if len(b) >= 4 {
h = (h * prime64) ^ uint64(b[0])
h = (h * prime64) ^ uint64(b[1])
h = (h * prime64) ^ uint64(b[2])
h = (h * prime64) ^ uint64(b[3])
b = b[4:]
}
if len(b) >= 2 {
h = (h * prime64) ^ uint64(b[0])
h = (h * prime64) ^ uint64(b[1])
b = b[2:]
}
if len(b) > 0 {
h = (h * prime64) ^ uint64(b[0])
}
return h
}
// AddUint64 adds the hash value of the 8 bytes of u to h.
func AddUint64(h uint64, u uint64) uint64 {
h = (h * prime64) ^ ((u >> 56) & 0xFF)
h = (h * prime64) ^ ((u >> 48) & 0xFF)
h = (h * prime64) ^ ((u >> 40) & 0xFF)
h = (h * prime64) ^ ((u >> 32) & 0xFF)
h = (h * prime64) ^ ((u >> 24) & 0xFF)
h = (h * prime64) ^ ((u >> 16) & 0xFF)
h = (h * prime64) ^ ((u >> 8) & 0xFF)
h = (h * prime64) ^ ((u >> 0) & 0xFF)
return h
}

@ -1,104 +0,0 @@
package fnv1
const (
// FNV-1
offset32 = uint32(2166136261)
prime32 = uint32(16777619)
// Init32 is what 32 bits hash values should be initialized with.
Init32 = offset32
)
// HashString32 returns the hash of s.
func HashString32(s string) uint32 {
return AddString32(Init32, s)
}
// HashBytes32 returns the hash of u.
func HashBytes32(b []byte) uint32 {
return AddBytes32(Init32, b)
}
// HashUint32 returns the hash of u.
func HashUint32(u uint32) uint32 {
return AddUint32(Init32, u)
}
// AddString32 adds the hash of s to the precomputed hash value h.
func AddString32(h uint32, s string) uint32 {
for len(s) >= 8 {
h = (h * prime32) ^ uint32(s[0])
h = (h * prime32) ^ uint32(s[1])
h = (h * prime32) ^ uint32(s[2])
h = (h * prime32) ^ uint32(s[3])
h = (h * prime32) ^ uint32(s[4])
h = (h * prime32) ^ uint32(s[5])
h = (h * prime32) ^ uint32(s[6])
h = (h * prime32) ^ uint32(s[7])
s = s[8:]
}
if len(s) >= 4 {
h = (h * prime32) ^ uint32(s[0])
h = (h * prime32) ^ uint32(s[1])
h = (h * prime32) ^ uint32(s[2])
h = (h * prime32) ^ uint32(s[3])
s = s[4:]
}
if len(s) >= 2 {
h = (h * prime32) ^ uint32(s[0])
h = (h * prime32) ^ uint32(s[1])
s = s[2:]
}
if len(s) > 0 {
h = (h * prime32) ^ uint32(s[0])
}
return h
}
// AddBytes32 adds the hash of b to the precomputed hash value h.
func AddBytes32(h uint32, b []byte) uint32 {
for len(b) >= 8 {
h = (h * prime32) ^ uint32(b[0])
h = (h * prime32) ^ uint32(b[1])
h = (h * prime32) ^ uint32(b[2])
h = (h * prime32) ^ uint32(b[3])
h = (h * prime32) ^ uint32(b[4])
h = (h * prime32) ^ uint32(b[5])
h = (h * prime32) ^ uint32(b[6])
h = (h * prime32) ^ uint32(b[7])
b = b[8:]
}
if len(b) >= 4 {
h = (h * prime32) ^ uint32(b[0])
h = (h * prime32) ^ uint32(b[1])
h = (h * prime32) ^ uint32(b[2])
h = (h * prime32) ^ uint32(b[3])
b = b[4:]
}
if len(b) >= 2 {
h = (h * prime32) ^ uint32(b[0])
h = (h * prime32) ^ uint32(b[1])
b = b[2:]
}
if len(b) > 0 {
h = (h * prime32) ^ uint32(b[0])
}
return h
}
// AddUint32 adds the hash value of the 8 bytes of u to h.
func AddUint32(h, u uint32) uint32 {
h = (h * prime32) ^ ((u >> 24) & 0xFF)
h = (h * prime32) ^ ((u >> 16) & 0xFF)
h = (h * prime32) ^ ((u >> 8) & 0xFF)
h = (h * prime32) ^ ((u >> 0) & 0xFF)
return h
}

11
vendor/modules.txt vendored

@ -631,9 +631,6 @@ github.com/grafana/dskit/tenant
# github.com/grafana/go-gelf/v2 v2.0.1
## explicit; go 1.17
github.com/grafana/go-gelf/v2/gelf
# github.com/grafana/groupcache_exporter v0.0.0-20220629095919-59a8c6428a43
## explicit; go 1.17
github.com/grafana/groupcache_exporter
# github.com/grafana/regexp v0.0.0-20221005093135-b4c2bcb0a4b6
## explicit; go 1.17
github.com/grafana/regexp
@ -801,13 +798,6 @@ github.com/klauspost/pgzip
## explicit
github.com/leodido/ragel-machinery
github.com/leodido/ragel-machinery/parser
# github.com/mailgun/groupcache/v2 v2.3.2
## explicit; go 1.15
github.com/mailgun/groupcache/v2
github.com/mailgun/groupcache/v2/consistenthash
github.com/mailgun/groupcache/v2/groupcachepb
github.com/mailgun/groupcache/v2/lru
github.com/mailgun/groupcache/v2/singleflight
# github.com/mailru/easyjson v0.7.7
## explicit; go 1.12
github.com/mailru/easyjson/buffer
@ -1029,7 +1019,6 @@ github.com/rs/xid
github.com/sean-/seed
# github.com/segmentio/fasthash v1.0.3
## explicit; go 1.11
github.com/segmentio/fasthash/fnv1
github.com/segmentio/fasthash/fnv1a
# github.com/sercand/kuberesolver v2.4.0+incompatible => github.com/sercand/kuberesolver v2.4.0+incompatible
## explicit

Loading…
Cancel
Save