Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/advanced-guide/circuit-breaker/page.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func main() {
&service.CircuitBreakerConfig{
// Number of consecutive failed requests after which circuit breaker will be enabled
Threshold: 4,
// Time interval at which circuit breaker will hit the aliveness endpoint.
// Time interval at which circuit breaker will hit the health endpoint.
Interval: 1 * time.Second,
},
)
Expand All @@ -85,7 +85,7 @@ func main() {
```

Circuit breaker state changes to open when number of consecutive failed requests increases the threshold.
When it is in open state, GoFr makes request to the aliveness endpoint (default being - /.well-known/alive) at an equal interval of time provided in config.
When it is in open state, GoFr makes request to the health endpoint (default being - /.well-known/alive, or the custom endpoint if configured) at an equal interval of time provided in config.

GoFr publishes the following metric to track circuit breaker state:

Expand Down
14 changes: 8 additions & 6 deletions pkg/gofr/service/circuit_breaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,14 @@ func (cb *circuitBreaker) isOpen() bool {
return cb.state == OpenState
}

// healthCheck performs the health check for the circuit breaker.
func (cb *circuitBreaker) healthCheck(ctx context.Context) bool {
resp := cb.HealthCheck(ctx)
if httpSvc := extractHTTPService(cb.HTTP); httpSvc != nil && httpSvc.healthEndpoint != "" {
resp := cb.HTTP.getHealthResponseForEndpoint(ctx, httpSvc.healthEndpoint, httpSvc.healthTimeout)

return resp.Status == serviceUp
}

resp := cb.HTTP.HealthCheck(ctx)

return resp.Status == serviceUp
}
Expand Down Expand Up @@ -164,10 +169,7 @@ func (cb *CircuitBreakerConfig) AddOption(h HTTP) HTTP {
circuitBreaker.serviceName = httpSvc.name

if circuitBreaker.metrics != nil {
registerGauge(circuitBreaker.metrics, "app_http_circuit_breaker_state",
"Current state of the circuit breaker (0 for Closed, 1 for Open)")

// Initialize the gauge to 0 (Closed)
// Initialize the gauge to 0 (Closed) - gauge is already registered in container.go
circuitBreaker.metrics.SetGauge("app_http_circuit_breaker_state", 0, "service", circuitBreaker.serviceName)
}
}
Expand Down
189 changes: 189 additions & 0 deletions pkg/gofr/service/circuit_breaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -738,3 +738,192 @@ func (*customTransport) RoundTrip(r *http.Request) (*http.Response, error) {

return nil, testutil.CustomError{ErrorMessage: "cb error"}
}

func TestCircuitBreaker_CustomHealthEndpoint_Recovery(t *testing.T) {
// Server that returns 502 for /fail (triggers circuit breaker), 200 for /health and /success
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/health":
w.WriteHeader(http.StatusOK)
case "/fail":
w.WriteHeader(http.StatusBadGateway)
default:
w.WriteHeader(http.StatusOK)
}
}))
defer server.Close()

ctrl := gomock.NewController(t)
mockMetric := NewMockMetrics(ctrl)

mockMetric.EXPECT().RecordHistogram(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
mockMetric.EXPECT().NewCounter(gomock.Any(), gomock.Any()).AnyTimes()
mockMetric.EXPECT().NewGauge(gomock.Any(), gomock.Any()).AnyTimes()
mockMetric.EXPECT().SetGauge(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()

httpSvc := NewHTTPService(server.URL,
logging.NewMockLogger(logging.DEBUG),
mockMetric,
&CircuitBreakerConfig{Threshold: 1, Interval: 200 * time.Millisecond},
&HealthConfig{HealthEndpoint: "health", Timeout: 5},
)

// First request returns 502 - failure count becomes 1, threshold not exceeded (1 > 1 is false)
resp, err := httpSvc.Get(t.Context(), "fail", nil)
require.NoError(t, err)
assert.Equal(t, http.StatusBadGateway, resp.StatusCode)
resp.Body.Close()

// Second request returns 502 - failure count becomes 2, exceeds threshold (2 > 1)
// Circuit opens and returns ErrCircuitOpen
resp, err = httpSvc.Get(t.Context(), "fail", nil)
if resp != nil && resp.Body != nil {
resp.Body.Close()
}

require.ErrorIs(t, err, ErrCircuitOpen)

// Third request - circuit is still open, returns ErrCircuitOpen
resp, err = httpSvc.Get(t.Context(), "fail", nil)
if resp != nil && resp.Body != nil {
resp.Body.Close()
}

require.ErrorIs(t, err, ErrCircuitOpen)

// Wait for interval to pass so circuit can attempt recovery via /health endpoint
time.Sleep(1 * time.Second)

// Fourth request - circuit should recover via /health endpoint and succeed
resp, err = httpSvc.Get(t.Context(), "success", nil)
require.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)
resp.Body.Close()
}

func TestCircuitBreaker_DefaultHealthEndpoint_NoRecoveryWhenMissing(t *testing.T) {
// Server that returns 502 for /fail (triggers circuit breaker),
// 404 for /.well-known/alive (default health endpoint missing)
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/.well-known/alive":
w.WriteHeader(http.StatusNotFound) // Default health endpoint not available
case "/fail":
w.WriteHeader(http.StatusBadGateway)
default:
w.WriteHeader(http.StatusOK)
}
}))
defer server.Close()

ctrl := gomock.NewController(t)
mockMetric := NewMockMetrics(ctrl)

mockMetric.EXPECT().RecordHistogram(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
mockMetric.EXPECT().NewCounter(gomock.Any(), gomock.Any()).AnyTimes()
mockMetric.EXPECT().NewGauge(gomock.Any(), gomock.Any()).AnyTimes()
mockMetric.EXPECT().SetGauge(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()

// No HealthConfig - will use default /.well-known/alive which returns 404
httpSvc := NewHTTPService(server.URL,
logging.NewMockLogger(logging.DEBUG),
mockMetric,
&CircuitBreakerConfig{Threshold: 1, Interval: 200 * time.Millisecond},
)

// First request returns 502 - failure count becomes 1, threshold not exceeded
resp, err := httpSvc.Get(t.Context(), "fail", nil)
require.NoError(t, err)
assert.Equal(t, http.StatusBadGateway, resp.StatusCode)
resp.Body.Close()

// Second request returns 502 - failure count becomes 2, exceeds threshold
// Circuit opens and returns ErrCircuitOpen
resp, err = httpSvc.Get(t.Context(), "fail", nil)
if resp != nil && resp.Body != nil {
resp.Body.Close()
}

require.ErrorIs(t, err, ErrCircuitOpen)

// Third request - circuit is still open
resp, err = httpSvc.Get(t.Context(), "fail", nil)
if resp != nil && resp.Body != nil {
resp.Body.Close()
}

require.ErrorIs(t, err, ErrCircuitOpen)

// Wait for interval to pass
time.Sleep(500 * time.Millisecond)

// Fourth request should also fail - circuit cannot recover because /.well-known/alive returns 404
resp, err = httpSvc.Get(t.Context(), "success", nil)
if resp != nil && resp.Body != nil {
resp.Body.Close()
}

require.ErrorIs(t, err, ErrCircuitOpen)
}

func TestCircuitBreaker_HealthEndpointWithTimeout(t *testing.T) {
// Server that returns 502 for /fail, 200 for /health and other paths
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/health":
w.WriteHeader(http.StatusOK)
case "/fail":
w.WriteHeader(http.StatusBadGateway)
default:
w.WriteHeader(http.StatusOK)
}
}))
defer server.Close()

ctrl := gomock.NewController(t)
mockMetric := NewMockMetrics(ctrl)

mockMetric.EXPECT().RecordHistogram(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
mockMetric.EXPECT().NewCounter(gomock.Any(), gomock.Any()).AnyTimes()
mockMetric.EXPECT().NewGauge(gomock.Any(), gomock.Any()).AnyTimes()
mockMetric.EXPECT().SetGauge(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()

httpSvc := NewHTTPService(server.URL,
logging.NewMockLogger(logging.DEBUG),
mockMetric,
&CircuitBreakerConfig{Threshold: 1, Interval: 200 * time.Millisecond},
&HealthConfig{HealthEndpoint: "health", Timeout: 10},
)

// First request returns 502 - failure count becomes 1, threshold not exceeded
resp, err := httpSvc.Get(t.Context(), "fail", nil)
require.NoError(t, err)
assert.Equal(t, http.StatusBadGateway, resp.StatusCode)
resp.Body.Close()

// Second request returns 502 - failure count becomes 2, exceeds threshold
// Circuit opens and returns ErrCircuitOpen
resp, err = httpSvc.Get(t.Context(), "fail", nil)
if resp != nil && resp.Body != nil {
resp.Body.Close()
}

require.ErrorIs(t, err, ErrCircuitOpen)

// Third request - circuit is still open
resp, err = httpSvc.Get(t.Context(), "fail", nil)
if resp != nil && resp.Body != nil {
resp.Body.Close()
}

require.ErrorIs(t, err, ErrCircuitOpen)

// Wait for interval to pass so circuit can attempt recovery
time.Sleep(500 * time.Millisecond)

// Fourth request - circuit should recover using custom health endpoint
resp, err = httpSvc.Get(t.Context(), "success", nil)
require.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)
resp.Body.Close()
}
6 changes: 6 additions & 0 deletions pkg/gofr/service/health_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ func (h *HealthConfig) AddOption(svc HTTP) HTTP {
h.Timeout = defaultTimeout
}

// Set health config on the parent httpService so other options can access it
if httpSvc := extractHTTPService(svc); httpSvc != nil {
httpSvc.healthEndpoint = h.HealthEndpoint
httpSvc.healthTimeout = h.Timeout
}

return &customHealthService{
healthEndpoint: h.HealthEndpoint,
timeout: h.Timeout,
Expand Down
Loading
Loading