Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions hystrix/circuit_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package hystrix

import (
"math/rand"
"sync"
"sync/atomic"
"testing"
"time"

"math/rand"
"testing/quick"
"time"

. "github.com/smartystreets/goconvey/convey"
)
Expand Down
3 changes: 1 addition & 2 deletions hystrix/hystrix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@ package hystrix

import (
"fmt"
"sync/atomic"
"testing"
"time"

"sync/atomic"

. "github.com/smartystreets/goconvey/convey"
)

Expand Down
7 changes: 5 additions & 2 deletions hystrix/metric_collector/mocks/metric_collector.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions hystrix/pool_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
package hystrix

import (
"sync/atomic"
"testing"
"time"

"sync/atomic"

. "github.com/smartystreets/goconvey/convey"
)

Expand Down
123 changes: 82 additions & 41 deletions hystrix/rolling/rolling.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,101 +5,126 @@ import (
"time"
)

const (
// This is the number of items we search for or sum (e.g. last 10 seconds)
numberWindow = int64(10)

// This is the number of items used to store data.
// The extra item is stop collision when the window "wraps around" to the next second.
numberItems = numberWindow + 1
)

// Number tracks a numberBucket over a bounded number of
// time buckets. Currently the buckets are one second long and only the last 10 seconds are kept.
type Number struct {
Buckets map[int64]*numberBucket
Mutex *sync.RWMutex

// allow of mocking of time in tests
timeGenerator func() int64
}

type numberBucket struct {
Value float64
timestamp int64
Value float64
}

// reset/empty the bucket
func (n *numberBucket) empty() {
n.timestamp = 0
n.Value = 0
}

// NewNumber initializes a RollingNumber struct.
func NewNumber() *Number {
r := &Number{
Buckets: make(map[int64]*numberBucket),
// keep only 60 seconds worth of buckets and never recreate them
Buckets: make(map[int64]*numberBucket, numberItems),
Mutex: &sync.RWMutex{},
}
return r
}

func (r *Number) getCurrentBucket() *numberBucket {
now := time.Now().Unix()
var bucket *numberBucket
var ok bool

if bucket, ok = r.Buckets[now]; !ok {
bucket = &numberBucket{}
r.Buckets[now] = bucket
// create all the buckets
for x := int64(0); x < numberItems; x++ {
r.Buckets[x] = &numberBucket{}
}

return bucket
}

func (r *Number) removeOldBuckets() {
now := time.Now().Unix() - 10

for timestamp := range r.Buckets {
// TODO: configurable rolling window
if timestamp <= now {
delete(r.Buckets, timestamp)
}
}
return r
}

// Increment increments the number in current timeBucket.
func (r *Number) Increment(i float64) {
r.Mutex.Lock()
defer r.Mutex.Unlock()

b := r.getCurrentBucket()
timeInSec := r.getTimeInSec(time.Now())
index := r.getIndex(timeInSec)

b := r.Buckets[index]
if b.timestamp != timeInSec {
// auto-empty buckets that are not clean (caused by sporadic data)
b.empty()
b.timestamp = timeInSec
}

b.Value += i
r.removeOldBuckets()
}

// UpdateMax updates the maximum value in the current bucket.
func (r *Number) UpdateMax(n float64) {
r.Mutex.Lock()
defer r.Mutex.Unlock()

b := r.getCurrentBucket()
timeInSec := r.getTimeInSec(time.Now())
index := r.getIndex(timeInSec)

b := r.Buckets[index]
if b.timestamp != timeInSec {
b.empty()
b.timestamp = timeInSec
}

// only use those buckets that are within the time box; we cannot empty them without a write lock
if n > b.Value {
b.Value = n
}
r.removeOldBuckets()
}

// Sum sums the values over the buckets in the last 10 seconds.
func (r *Number) Sum(now time.Time) float64 {
func (r *Number) Sum(in time.Time) float64 {
sum := float64(0)

r.Mutex.RLock()
defer r.Mutex.RUnlock()

for timestamp, bucket := range r.Buckets {
// TODO: configurable rolling window
if timestamp >= now.Unix()-10 {
sum += bucket.Value
minTimeInSec := r.getMinTimeInSec(r.getTimeInSec(in))

// to sum the "window" we sum all except the next (extra) one
for _, b := range r.Buckets {
if b.timestamp >= minTimeInSec {
// only use those buckets that are within the time box; we cannot empty them without a write lock
sum += b.Value
}
}

return sum
}

// Max returns the maximum value seen in the last 10 seconds.
func (r *Number) Max(now time.Time) float64 {
func (r *Number) Max(in time.Time) float64 {
var max float64

r.Mutex.RLock()
defer r.Mutex.RUnlock()

for timestamp, bucket := range r.Buckets {
// TODO: configurable rolling window
if timestamp >= now.Unix()-10 {
if bucket.Value > max {
max = bucket.Value
timeInSec := r.getTimeInSec(in)
minTimeInSec := r.getMinTimeInSec(timeInSec)

var b *numberBucket
for _, b = range r.Buckets {
if b.timestamp >= minTimeInSec {
// only use those buckets that are within the time box; we cannot empty them without a write lock
if b.Value > max {
max = b.Value
}
}
}
Expand All @@ -108,6 +133,22 @@ func (r *Number) Max(now time.Time) float64 {
}

// Avg return the average value seen in the last 10 seconds.
func (r *Number) Avg(now time.Time) float64 {
return r.Sum(now) / 10
func (r *Number) Avg(in time.Time) float64 {
return r.Sum(in) / float64(numberWindow)
}

func (r *Number) getTimeInSec(now time.Time) int64 {
if r.timeGenerator != nil {
return r.timeGenerator()
}

return now.Unix()
}

func (r *Number) getMinTimeInSec(timeInSec int64) int64 {
return timeInSec - numberWindow + 1
}

func (r *Number) getIndex(timeInSec int64) int64 {
return timeInSec % numberItems
}
Loading