From 52c269d8ae70f3a3de9a0279850ea58425c0a76c Mon Sep 17 00:00:00 2001 From: Patrick Mylund Nielsen Date: Fri, 22 Jun 2012 09:24:09 +0100 Subject: [PATCH] Testing a sharded cache. Could be useful for massively parallel applications --- cache.go | 161 ++++++++++++++++++++++++++++++++++++++++++++------ cache_test.go | 73 +++++++++++++++++++++++ 2 files changed, 216 insertions(+), 18 deletions(-) diff --git a/cache.go b/cache.go index 43d40fe..519ce02 100644 --- a/cache.go +++ b/cache.go @@ -1,8 +1,10 @@ package cache import ( + "encoding/binary" "encoding/gob" "fmt" + "hash/fnv" "io" "os" "reflect" @@ -277,20 +279,19 @@ func (j *janitor) Run(c *cache) { } } -func (j *janitor) Stop() { - j.stop <- true +func stopJanitor(c *Cache) { + c.janitor.stop <- true } -func stopJanitor(c *Cache) { - c.janitor.Stop() +func runJanitor(c *cache, ci time.Duration) { + j := &janitor{ + Interval: ci, + } + c.janitor = j + go j.Run(c) } -// Return a new cache with a given default expiration duration and cleanup -// interval. If the expiration duration is less than 1, the items in the cache -// never expire (by default), and must be deleted manually. If the cleanup -// interval is less than one, expired items are not deleted from the cache -// before their next lookup or before calling DeleteExpired. -func New(de, ci time.Duration) *Cache { +func newCache(de time.Duration) *cache { if de == 0 { de = -1 } @@ -299,21 +300,145 @@ func New(de, ci time.Duration) *Cache { Items: map[string]*Item{}, mu: sync.Mutex{}, } - if ci > 0 { - j := &janitor{ - Interval: ci, - } - c.janitor = j - go j.Run(c) - } + return c +} + +// Return a new cache with a given default expiration duration and cleanup +// interval. If the expiration duration is less than 1, the items in the cache +// never expire (by default), and must be deleted manually. If the cleanup +// interval is less than one, expired items are not deleted from the cache +// before their next lookup or before calling DeleteExpired. +func New(defaultExpiration, cleanupInterval time.Duration) *Cache { + c := newCache(defaultExpiration) // This trick ensures that the janitor goroutine (which--granted it // was enabled--is running DeleteExpired on c forever) does not keep // the returned C object from being garbage collected. When it is // garbage collected, the finalizer stops the janitor goroutine, after // which c can be collected. C := &Cache{c} - if ci > 0 { + if cleanupInterval > 0 { + runJanitor(c, cleanupInterval) runtime.SetFinalizer(C, stopJanitor) } return C } + +type ShardedCache struct { + *shardedCache +} + +type shardedCache struct { + m uint32 + cs []*cache + janitor *shardedJanitor +} + +func (sc *shardedCache) index(k string) uint32 { + h := fnv.New32() + h.Write([]byte(k)) + n := binary.BigEndian.Uint32(h.Sum(nil)) + return n % sc.m +} + +func (sc *shardedCache) Set(k string, x interface{}, d time.Duration) { + sc.cs[sc.index(k)].Set(k, x, d) +} + +func (sc *shardedCache) Add(k string, x interface{}, d time.Duration) error { + return sc.cs[sc.index(k)].Add(k, x, d) +} + +func (sc *shardedCache) Replace(k string, x interface{}, d time.Duration) error { + return sc.cs[sc.index(k)].Replace(k, x, d) +} + +func (sc *shardedCache) Get(k string) (interface{}, bool) { + return sc.cs[sc.index(k)].Get(k) +} + +func (sc *shardedCache) Increment(k string, n int64) error { + return sc.cs[sc.index(k)].Increment(k, n) +} + +func (sc *shardedCache) IncrementFloat(k string, n float64) error { + return sc.cs[sc.index(k)].IncrementFloat(k, n) +} + +func (sc *shardedCache) Decrement(k string, n int64) error { + return sc.cs[sc.index(k)].Decrement(k, n) +} + +func (sc *shardedCache) Delete(k string) { + sc.cs[sc.index(k)].Delete(k) +} + +func (sc *shardedCache) DeleteExpired() { + for _, v := range sc.cs { + v.DeleteExpired() + } +} + +func (sc *shardedCache) Flush() { + for _, v := range sc.cs { + v.Flush() + } +} + +type shardedJanitor struct { + Interval time.Duration + stop chan bool +} + +func (j *shardedJanitor) Run(sc *shardedCache) { + j.stop = make(chan bool) + tick := time.Tick(j.Interval) + for { + select { + case <-tick: + sc.DeleteExpired() + case <-j.stop: + return + } + } +} + +func stopShardedJanitor(sc *ShardedCache) { + sc.janitor.stop <- true +} + +func runShardedJanitor(sc *shardedCache, ci time.Duration) { + j := &shardedJanitor{ + Interval: ci, + } + sc.janitor = j + go j.Run(sc) +} + +func newShardedCache(n int, de time.Duration) *shardedCache { + sc := &shardedCache{ + m: uint32(n - 1), + cs: make([]*cache, n), + } + for i := 0; i < n; i++ { + c := &cache{ + DefaultExpiration: de, + Items: map[string]*Item{}, + mu: sync.Mutex{}, + } + sc.cs[i] = c + } + return sc +} + +func NewSharded(shards int, defaultExpiration, cleanupInterval time.Duration) *ShardedCache { + if defaultExpiration == 0 { + defaultExpiration = -1 + } + sc := newShardedCache(shards, defaultExpiration) + SC := &ShardedCache{sc} + if cleanupInterval > 0 { + runShardedJanitor(sc, cleanupInterval) + runtime.SetFinalizer(SC, stopShardedJanitor) + } + return SC +} diff --git a/cache_test.go b/cache_test.go index 2fdcb82..93712c7 100644 --- a/cache_test.go +++ b/cache_test.go @@ -4,6 +4,7 @@ import ( "bytes" "io/ioutil" "runtime" + "strconv" "sync" "testing" "time" @@ -636,18 +637,22 @@ func TestSerializeUnserializable(t *testing.T) { } func BenchmarkCacheGet(b *testing.B) { + b.StopTimer() tc := New(0, 0) tc.Set("foo", "bar", 0) + b.StartTimer() for i := 0; i < b.N; i++ { tc.Get("foo") } } func BenchmarkMutexMapGet(b *testing.B) { + b.StopTimer() m := map[string]string{ "foo": "bar", } mu := sync.Mutex{} + b.StartTimer() for i := 0; i < b.N; i++ { mu.Lock() _, _ = m["foo"] @@ -656,12 +661,14 @@ func BenchmarkMutexMapGet(b *testing.B) { } func BenchmarkCacheGetConcurrent(b *testing.B) { + b.StopTimer() tc := New(0, 0) tc.Set("foo", "bar", 0) wg := new(sync.WaitGroup) workers := runtime.NumCPU() each := b.N / workers wg.Add(workers) + b.StartTimer() for i := 0; i < workers; i++ { go func() { for j := 0; j < each; j++ { @@ -674,6 +681,7 @@ func BenchmarkCacheGetConcurrent(b *testing.B) { } func BenchmarkMutexMapGetConcurrent(b *testing.B) { + b.StopTimer() m := map[string]string{ "foo": "bar", } @@ -682,6 +690,7 @@ func BenchmarkMutexMapGetConcurrent(b *testing.B) { workers := runtime.NumCPU() each := b.N / workers wg.Add(workers) + b.StartTimer() for i := 0; i < workers; i++ { go func() { for j := 0; j < each; j++ { @@ -695,16 +704,72 @@ func BenchmarkMutexMapGetConcurrent(b *testing.B) { wg.Wait() } +func BenchmarkCacheGetManyConcurrent(b *testing.B) { + // This is the same as BenchmarkCacheGetConcurrent, but its result + // can be compared against BenchmarkShardedCacheGetManyConcurrent. + b.StopTimer() + n := 10000 + tc := New(0, 0) + keys := make([]string, n) + for i := 0; i < n; i++ { + k := "foo" + strconv.Itoa(n) + keys[i] = k + tc.Set(k, "bar", 0) + } + each := b.N / n + wg := new(sync.WaitGroup) + wg.Add(n) + for _, v := range keys { + go func() { + for j := 0; j < each; j++ { + tc.Get(v) + } + wg.Done() + }() + } + b.StartTimer() + wg.Wait() +} + +func BenchmarkShardedCacheGetManyConcurrent(b *testing.B) { + b.StopTimer() + n := 10000 + tsc := NewSharded(20, 0, 0) + keys := make([]string, n) + for i := 0; i < n; i++ { + k := "foo" + strconv.Itoa(n) + keys[i] = k + tsc.Set(k, "bar", 0) + } + each := b.N / n + wg := new(sync.WaitGroup) + wg.Add(n) + for _, v := range keys { + go func() { + for j := 0; j < each; j++ { + tsc.Get(v) + } + wg.Done() + }() + } + b.StartTimer() + wg.Wait() +} + func BenchmarkCacheSet(b *testing.B) { + b.StopTimer() tc := New(0, 0) + b.StartTimer() for i := 0; i < b.N; i++ { tc.Set("foo", "bar", 0) } } func BenchmarkMutexMapSet(b *testing.B) { + b.StopTimer() m := map[string]string{} mu := sync.Mutex{} + b.StartTimer() for i := 0; i < b.N; i++ { mu.Lock() m["foo"] = "bar" @@ -713,7 +778,9 @@ func BenchmarkMutexMapSet(b *testing.B) { } func BenchmarkCacheSetDelete(b *testing.B) { + b.StopTimer() tc := New(0, 0) + b.StartTimer() for i := 0; i < b.N; i++ { tc.Set("foo", "bar", 0) tc.Delete("foo") @@ -721,8 +788,10 @@ func BenchmarkCacheSetDelete(b *testing.B) { } func BenchmarkMutexMapSetDelete(b *testing.B) { + b.StopTimer() m := map[string]string{} mu := sync.Mutex{} + b.StartTimer() for i := 0; i < b.N; i++ { mu.Lock() m["foo"] = "bar" @@ -734,7 +803,9 @@ func BenchmarkMutexMapSetDelete(b *testing.B) { } func BenchmarkCacheSetDeleteSingleLock(b *testing.B) { + b.StopTimer() tc := New(0, 0) + b.StartTimer() for i := 0; i < b.N; i++ { tc.mu.Lock() tc.set("foo", "bar", 0) @@ -744,8 +815,10 @@ func BenchmarkCacheSetDeleteSingleLock(b *testing.B) { } func BenchmarkMutexMapSetDeleteSingleLock(b *testing.B) { + b.StopTimer() m := map[string]string{} mu := sync.Mutex{} + b.StartTimer() for i := 0; i < b.N; i++ { mu.Lock() m["foo"] = "bar"