fix: Replace tokenbucket with standard limiter on CLI#7585
Conversation
1429544 to
b8d089e
Compare
| batchSize := c.Int(FlagBatchSize) | ||
| rps := c.Int(FlagRPS) | ||
| ratelimiter := tokenbucket.New(rps, clock.NewRealTimeSource()) | ||
| ratelimiter := clock.NewRatelimiter(rate.Limit(rps), rps) |
There was a problem hiding this comment.
What will happen is rps is negative? Is the rate limiter going to block forever? If so, should we check if rps is always >= 0?
There was a problem hiding this comment.
Hi, Thanks for reviewing my pull request.
I investigated the behavior of rate.NewLimiter with a negative RPS.
And i found that while the initial burst tokens are available, any subsequent requests will calculate a delay of rate.InfDuration.
For the ratelimiter.Wait(ctx) call used in this PR, this would cause the process to hang until the context deadline is reached (or infinitely waits).
And also if burst (the second parameter) has negative value, it means the maximum available token is none (or unexpected behavior). It makes consumer couldn't receive any token through this token bucket style rate-limiter.
I have researched this with below test codes.
Furthermore, I think there are no validation logic that ensures the limit > 0 and burst > 0 in the official golang ratelimiter package
Reference
So, I think it would be better to add validation logic to ensure rps > 0 to prevent this kind of 'deadlock' behavior.
If there is any inaccurate explanation, please kindly correct my answer! 👍
if rps<= 0 {
return commoncli.Problem("Required positive value of FlagRPS for ratelimiter but got: ", rps)
}Test 1
package main
import (
"log"
"golang.org/x/time/rate"
)
func main() {
log.Println("########### POC: If limit is negative ###########")
limiter := rate.NewLimiter(-1, 1)
log.Println("########### 1st trial: Reserve token ###########")
reserve := limiter.Reserve()
if !reserve.OK() {
log.Println("########### Reservation Failed ###########")
} else {
delay := reserve.Delay()
log.Printf("Reserved.... required delay: %v", delay)
log.Printf("Is Delay InfDuration?: %v", delay == rate.InfDuration)
}
log.Println("########### 2nd trial: Reserve token ###########")
reserve2 := limiter.Reserve()
if !reserve2.OK() {
log.Println("########### Reservation Failed ###########")
} else {
delay := reserve2.Delay()
log.Printf("Reserved.... required delay: %v", delay)
log.Printf("Is Delay InfDuration?: %v", delay == rate.InfDuration)
}
}Result
PS asdfasdfasdfasdfasdfd\go-ratelimiter-poc> ./test.exe
2026/01/14 13:22:28 ########### POC: If limit is negative ###########
2026/01/14 13:22:28 ########### 1st trial: Reserve token ###########
2026/01/14 13:22:28 Reserved.... required delay: 0s
2026/01/14 13:22:28 Is Delay InfDuration?: false
2026/01/14 13:22:28 ########### 2nd trial: Reserve token ###########
2026/01/14 13:22:28 Reserved.... required delay: 2562047h47m16.854775807s
2026/01/14 13:22:28 Is Delay InfDuration?: trueTest 2
package main
import (
"log"
"golang.org/x/time/rate"
)
func main() {
log.Println("########### POC: If burst is negative ###########")
limiter := rate.NewLimiter(1, -1)
log.Println("########### 1st trial: Reserve token ###########")
reserve := limiter.Reserve()
if !reserve.OK() {
log.Println("########### Reservation Failed ###########")
} else {
delay := reserve.Delay()
log.Printf("Reserved.... required delay: %v", delay)
log.Printf("Is Delay InfDuration?: %v", delay == rate.InfDuration)
}
}Result
PS asdfasdfasdfasdfasdfd\go-ratelimiter-poc> ./test.exe
2026/01/14 13:58:20 ########### POC: If burst is negative ###########
2026/01/14 13:58:20 ########### 1st trial: Reserve token ###########
2026/01/14 13:58:20 ########### Reservation Failed ###########There was a problem hiding this comment.
Or what do you think about adding validation logic in ratelimiter.go/NewRatelimiter function? I think it is more useful way to ensure all ratelimiter has appropriate limit and burst value
There was a problem hiding this comment.
Hi,
I think this is good enough:
if rps<= 0 {
return commoncli.Problem("Required positive value of FlagRPS for ratelimiter but got: ", rps)
}
|
This is probably good to merge, but it's failing due to the DCO signoff, which is some details on how to fix this (basically just ammending the commit while running |
9591323 to
d74ae03
Compare
Signed-off-by: Scanf-s <sullung2yo@gmail.com>
Signed-off-by: Scanf-s <sullung2yo@gmail.com>
071ac71 to
e476fd2
Compare
bowenxia
left a comment
There was a problem hiding this comment.
Thanks for your contribution!
What changed?
Replaced
common/tokenbucketwithcommon/clock/ratelimiterin the CLI'sAdminDeletefunction (tools/cli/admin_elastic_search_commands.go).Why?
This PR is the first step toward resolving the issue #7562.
The current custom
tokenbucketimplementation is only used in two placesAdminDeletefunction: Uses only basic rate limiting, seems straightforward to replace.common/persistence/wrappers/sampledmodule: Required priority support, whichcommon/clock/ratelimiterdoes not currently support.To replace the custom ratelimiter in the
AdminDeletefunction, I followed the comments on the ratelimiter.go 84 line and used theWaitfunction to ensure it waits until a token is available. This allows the implementation to leverage context-based cancellation.How did you test it?

Ran unit tests in
admin_elastic_search_commands_test.goPotential risks
Release notes
Documentation Changes