Skip to content

Commit 63432a2

Browse files
committed
add failback_cluster_invoker && failsafe_cluster_invoker. resolve #135
1 parent 8b66663 commit 63432a2

File tree

9 files changed

+2501
-72
lines changed

9 files changed

+2501
-72
lines changed

cluster/cluster_impl/failback_cluster_invoker.go

Lines changed: 83 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -19,34 +19,40 @@ package cluster_impl
1919

2020
import (
2121
"container/list"
22-
perrors "github.com/pkg/errors"
2322
"sync"
2423
"time"
2524
)
2625

26+
import (
27+
perrors "github.com/pkg/errors"
28+
)
29+
2730
import (
2831
"github.com/apache/dubbo-go/cluster"
2932
"github.com/apache/dubbo-go/common/constant"
3033
"github.com/apache/dubbo-go/common/extension"
3134
"github.com/apache/dubbo-go/protocol"
3235
)
3336

37+
/**
38+
* When fails, record failure requests and schedule for retry on a regular interval.
39+
* Especially useful for services of notification.
40+
*
41+
* <a href="http://en.wikipedia.org/wiki/Failback">Failback</a>
42+
*/
3443
type failbackClusterInvoker struct {
3544
baseClusterInvoker
36-
}
3745

38-
var (
39-
retries int64
40-
failbackTasks int64
4146
ticker *time.Ticker
42-
once sync.Once
43-
lock sync.Mutex
47+
maxRetries int64
48+
failbackTasks int64
4449
taskList *Queue
45-
)
50+
}
4651

4752
func newFailbackClusterInvoker(directory cluster.Directory) protocol.Invoker {
4853
invoker := &failbackClusterInvoker{
4954
baseClusterInvoker: newBaseClusterInvoker(directory),
55+
taskList: newQueue(),
5056
}
5157
retriesConfig := invoker.GetUrl().GetParamInt(constant.RETRIES_KEY, constant.DEFAULT_FAILBACK_TIMES)
5258
if retriesConfig <= 0 {
@@ -56,11 +62,50 @@ func newFailbackClusterInvoker(directory cluster.Directory) protocol.Invoker {
5662
if failbackTasksConfig <= 0 {
5763
failbackTasksConfig = constant.DEFAULT_FAILBACK_TASKS
5864
}
59-
retries = retriesConfig
60-
failbackTasks = failbackTasksConfig
65+
invoker.maxRetries = retriesConfig
66+
invoker.failbackTasks = failbackTasksConfig
67+
go invoker.process()
6168
return invoker
6269
}
6370

71+
func (invoker *failbackClusterInvoker) process() {
72+
invoker.ticker = time.NewTicker(time.Second * 1)
73+
for range invoker.ticker.C {
74+
check := true
75+
// check each timeout task and re-run
76+
for check {
77+
value := invoker.taskList.peek()
78+
if value == nil {
79+
break
80+
}
81+
82+
retryTask := value.(*retryTimerTask)
83+
if time.Since(retryTask.lastT).Seconds() < 5 {
84+
break
85+
}
86+
87+
invoker.taskList.pop()
88+
89+
invoked := []protocol.Invoker{}
90+
invoked = append(invoked, retryTask.lastInvoker)
91+
retryInvoker := invoker.doSelect(retryTask.loadbalance, retryTask.invocation, retryTask.invokers, invoked)
92+
var result protocol.Result
93+
result = retryInvoker.Invoke(retryTask.invocation)
94+
if result.Error() != nil {
95+
perrors.Errorf("Failed retry to invoke the method %v in the service %v, wait again. The exception: %v.",
96+
retryTask.invocation.MethodName(), invoker.GetUrl().Service(), result.Error().Error())
97+
retryTask.retries++
98+
if retryTask.retries > invoker.maxRetries {
99+
perrors.Errorf("Failed retry times exceed threshold (%v), We have to abandon, invocation-> %v",
100+
retryTask.retries, retryTask.invocation)
101+
} else {
102+
invoker.taskList.push(retryTask)
103+
}
104+
}
105+
}
106+
}
107+
}
108+
64109
func (invoker *failbackClusterInvoker) Invoke(invocation protocol.Invocation) protocol.Result {
65110

66111
invokers := invoker.directory.List(invocation)
@@ -94,9 +139,11 @@ func (invoker *failbackClusterInvoker) Invoke(invocation protocol.Invocation) pr
94139

95140
if result.Error() != nil {
96141
// add retry ticker task
97-
addFailed(loadbalance, invocation, invokers, invoker)
142+
timerTask := newRetryTimerTask(loadbalance, invocation, invokers, invoker, invoker.maxRetries, 5)
143+
invoker.taskList.push(timerTask)
144+
98145
perrors.Errorf("Failback to invoke the method %v in the service %v, wait for retry in background. Ignored exception: %v.",
99-
methodName, invoker.GetUrl().Service(), result.Error().Error())
146+
methodName, url.Service(), result.Error().Error())
100147
// ignore
101148
return &protocol.RPCResult{}
102149
}
@@ -105,58 +152,10 @@ func (invoker *failbackClusterInvoker) Invoke(invocation protocol.Invocation) pr
105152
}
106153

107154
func (invoker *failbackClusterInvoker) Destroy() {
108-
//this is must atom operation
109-
if invoker.destroyed.CAS(false, true) {
110-
invoker.directory.Destroy()
111-
}
112-
// stop ticker
113-
ticker.Stop()
114-
}
155+
invoker.baseClusterInvoker.Destroy()
115156

116-
func addFailed(balance cluster.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker,
117-
invoker *failbackClusterInvoker) {
118-
initSingleTickerTaskInstance()
119-
// init one retryTimerTask
120-
timerTask := newRetryTimerTask(balance, invocation, invokers, invoker, retries, 5)
121-
taskList.push(timerTask)
122-
// process ticker task
123-
go func() {
124-
<-ticker.C
125-
value := taskList.pop()
126-
if value == nil {
127-
return
128-
}
129-
130-
retryTask := value.(retryTimerTask)
131-
invoked := []protocol.Invoker{}
132-
invoked = append(invoked, retryTask.lastInvoker)
133-
retryInvoker := invoker.doSelect(retryTask.loadbalance, retryTask.invocation, retryTask.invokers,
134-
invoked)
135-
var result protocol.Result
136-
result = retryInvoker.Invoke(retryTask.invocation)
137-
if result.Error() != nil {
138-
perrors.Errorf("Failed retry to invoke the method %v in the service %v, wait again. The exception: %v.",
139-
invocation.MethodName(), invoker.GetUrl().Service(), result.Error().Error())
140-
retryTask.retries++
141-
if retryTask.retries > retries {
142-
perrors.Errorf("Failed retry times exceed threshold (%v), We have to abandon, invocation-> %v",
143-
retries, invocation)
144-
} else {
145-
taskList.push(retryTask)
146-
}
147-
}
148-
}()
149-
}
150-
151-
func initSingleTickerTaskInstance() {
152-
once.Do(func() {
153-
newTickerTask()
154-
})
155-
}
156-
157-
func newTickerTask() {
158-
ticker = time.NewTicker(time.Second * 1)
159-
taskList = newQueue()
157+
// stop ticker
158+
invoker.ticker.Stop()
160159
}
161160

162161
type retryTimerTask struct {
@@ -165,7 +164,7 @@ type retryTimerTask struct {
165164
invokers []protocol.Invoker
166165
lastInvoker *failbackClusterInvoker
167166
retries int64
168-
tick int64
167+
lastT time.Time
169168
}
170169

171170
func newRetryTimerTask(loadbalance cluster.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker,
@@ -176,31 +175,44 @@ func newRetryTimerTask(loadbalance cluster.LoadBalance, invocation protocol.Invo
176175
invokers: invokers,
177176
lastInvoker: lastInvoker,
178177
retries: retries,
179-
tick: tick,
178+
lastT: time.Now(),
180179
}
181180
}
182181

183182
type Queue struct {
184183
data *list.List
184+
sync.Locker
185185
}
186186

187187
func newQueue() *Queue {
188-
q := new(Queue)
189-
q.data = list.New()
188+
q := &Queue{
189+
data: list.New(),
190+
Locker: new(sync.Mutex),
191+
}
190192
return q
191193
}
192194

193195
func (q *Queue) push(v interface{}) {
194-
defer lock.Unlock()
195-
lock.Lock()
196+
defer q.Unlock()
197+
q.Lock()
196198
q.data.PushFront(v)
197199
}
198200

199201
func (q *Queue) pop() interface{} {
200-
defer lock.Unlock()
201-
lock.Lock()
202+
defer q.Unlock()
203+
q.Lock()
202204
iter := q.data.Back()
203205
v := iter.Value
204206
q.data.Remove(iter)
205207
return v
206208
}
209+
210+
func (q *Queue) peek() interface{} {
211+
defer q.Unlock()
212+
q.Lock()
213+
iter := q.data.Back()
214+
if iter == nil {
215+
return nil
216+
}
217+
return iter.Value
218+
}
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
/*
2+
Licensed to the Apache Software Foundation (ASF) under one or more
3+
contributor license agreements. See the NOTICE file distributed with
4+
this work for additional information regarding copyright ownership.
5+
The ASF licenses this file to You under the Apache License, Version 2.0
6+
(the "License"); you may not use this file except in compliance with
7+
the License. You may obtain a copy of the License at
8+
9+
http://www.apache.org/licenses/LICENSE-2.0
10+
11+
Unless required by applicable law or agreed to in writing, software
12+
distributed under the License is distributed on an "AS IS" BASIS,
13+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
See the License for the specific language governing permissions and
15+
limitations under the License.
16+
*/
17+
18+
package cluster_impl
19+
20+
import (
21+
"context"
22+
"testing"
23+
"time"
24+
)
25+
26+
import (
27+
"github.com/golang/mock/gomock"
28+
perrors "github.com/pkg/errors"
29+
"github.com/stretchr/testify/assert"
30+
)
31+
32+
import (
33+
"github.com/apache/dubbo-go/cluster/directory"
34+
"github.com/apache/dubbo-go/cluster/loadbalance"
35+
"github.com/apache/dubbo-go/common"
36+
"github.com/apache/dubbo-go/common/extension"
37+
"github.com/apache/dubbo-go/protocol"
38+
"github.com/apache/dubbo-go/protocol/invocation"
39+
)
40+
41+
func TestQueue(t *testing.T) {
42+
q := newQueue()
43+
d0 := retryTimerTask{retries: 0}
44+
d1 := retryTimerTask{retries: 1}
45+
d2 := retryTimerTask{retries: 2}
46+
q.push(d0)
47+
q.push(d1)
48+
q.push(d2)
49+
50+
d := q.peek()
51+
assert.Equal(t, d, d0)
52+
d = q.pop()
53+
assert.Equal(t, d, d0)
54+
d = q.pop()
55+
assert.Equal(t, d, d1)
56+
d = q.pop()
57+
assert.Equal(t, d, d2)
58+
}
59+
60+
var (
61+
failbackUrl, _ = common.NewURL(context.TODO(), "dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider")
62+
)
63+
64+
// register_failback register failbackCluster to cluster extension.
65+
func register_failback(t *testing.T, invoker *protocol.MockInvoker) protocol.Invoker {
66+
extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance)
67+
failbackCluster := NewFailbackCluster()
68+
69+
invokers := []protocol.Invoker{}
70+
invokers = append(invokers, invoker)
71+
72+
invoker.EXPECT().GetUrl().Return(failbackUrl)
73+
74+
staticDir := directory.NewStaticDirectory(invokers)
75+
clusterInvoker := failbackCluster.Join(staticDir)
76+
return clusterInvoker
77+
}
78+
79+
func Test_FailbackSuceess(t *testing.T) {
80+
ctrl := gomock.NewController(t)
81+
defer ctrl.Finish()
82+
83+
invoker := protocol.NewMockInvoker(ctrl)
84+
clusterInvoker := register_failback(t, invoker).(*failbackClusterInvoker)
85+
86+
invoker.EXPECT().GetUrl().Return(failbackUrl).Times(1)
87+
88+
mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}}
89+
invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult)
90+
91+
result := clusterInvoker.Invoke(&invocation.RPCInvocation{})
92+
assert.Equal(t, mockResult, result)
93+
}
94+
95+
func Test_FailbackRetryOneSuccess(t *testing.T) {
96+
ctrl := gomock.NewController(t)
97+
defer ctrl.Finish()
98+
99+
invoker := protocol.NewMockInvoker(ctrl)
100+
clusterInvoker := register_failback(t, invoker).(*failbackClusterInvoker)
101+
102+
invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes()
103+
104+
// failed at first
105+
mockFailedResult := &protocol.RPCResult{Err: perrors.New("error")}
106+
invoker.EXPECT().Invoke(gomock.Any()).Return(mockFailedResult)
107+
108+
// success second
109+
mockSuccResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}}
110+
invoker.EXPECT().Invoke(gomock.Any()).Return(mockSuccResult)
111+
112+
result := clusterInvoker.Invoke(&invocation.RPCInvocation{})
113+
assert.Nil(t, result.Error())
114+
assert.Nil(t, result.Result())
115+
assert.Equal(t, 0, len(result.Attachments()))
116+
117+
// ensure the retry task has been executed
118+
time.Sleep(6 * time.Second)
119+
invoker.EXPECT().Destroy().Return()
120+
121+
clusterInvoker.Destroy()
122+
123+
assert.Equal(t, 0, clusterInvoker.taskList.data.Len())
124+
}
125+
126+
func Test_FailbackRetryFailed(t *testing.T) {
127+
ctrl := gomock.NewController(t)
128+
defer ctrl.Finish()
129+
130+
invoker := protocol.NewMockInvoker(ctrl)
131+
clusterInvoker := register_failback(t, invoker).(*failbackClusterInvoker)
132+
133+
invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes()
134+
135+
// failed always
136+
mockFailedResult := &protocol.RPCResult{Err: perrors.New("error")}
137+
invoker.EXPECT().Invoke(gomock.Any()).Return(mockFailedResult).AnyTimes()
138+
139+
result := clusterInvoker.Invoke(&invocation.RPCInvocation{})
140+
assert.Nil(t, result.Error())
141+
assert.Nil(t, result.Result())
142+
assert.Equal(t, 0, len(result.Attachments()))
143+
144+
time.Sleep(2 * time.Second) // for retry work.
145+
invoker.EXPECT().Destroy().Return()
146+
147+
clusterInvoker.Destroy()
148+
149+
time.Sleep(1 * time.Second) // for retryTimerTask thrown back to queue
150+
151+
assert.Equal(t, 1, clusterInvoker.taskList.data.Len())
152+
}

0 commit comments

Comments
 (0)