-
Notifications
You must be signed in to change notification settings - Fork 55
Expand file tree
/
Copy pathAdaptiveTimeout.cs
More file actions
391 lines (360 loc) · 20.2 KB
/
AdaptiveTimeout.cs
File metadata and controls
391 lines (360 loc) · 20.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using SharpHoundCommonLib.Exceptions;
using SharpHoundCommonLib.Interfaces;
using SharpHoundCommonLib.Models;
using SharpHoundCommonLib.Static;
using SharpHoundRPC.NetAPINative;
namespace SharpHoundCommonLib;
public sealed class AdaptiveTimeout : IDisposable {
private readonly ExecutionTimeSampler _sampler;
private readonly ConcurrentQueue<DateTime> _latestSuccessTimestamps;
private readonly ILogger _log;
private readonly TimeSpan _maxTimeout;
private readonly TimeSpan _minTimeout;
private readonly bool _useAdaptiveTimeout;
private readonly int _minSamplesForAdaptiveTimeout;
private readonly bool _throwIfExcessiveTimeouts;
private int _timeSpikeDecay;
private readonly TimeSpan _defaultMinTimeout = TimeSpan.FromSeconds(1);
private const int TimeSpikePenalty = 2;
private const int TimeSpikeForgiveness = 1;
private const int TimeSpikeThreshold = 3;
private const int ExcessiveTimeoutsThreshold = 7;
private const int StdDevMultiplier = 7; // 7 standard deviations should be a very conservative upper bound
private const int CountOfLatestSuccessToKeep = 3;
private readonly IMetricRouter _metrics;
public AdaptiveTimeout(TimeSpan maxTimeout, ILogger log, int sampleCount = 100, int logFrequency = 1000, int minSamplesForAdaptiveTimeout = 30, bool useAdaptiveTimeout = true, bool throwIfExcessiveTimeouts = false) {
if (maxTimeout <= TimeSpan.Zero)
throw new ArgumentException("maxTimeout must be positive", nameof(maxTimeout));
if (sampleCount <= 0)
throw new ArgumentException("sampleCount must be positive", nameof(sampleCount));
if (logFrequency <= 0)
throw new ArgumentException("logFrequency must be positive", nameof(logFrequency));
if (minSamplesForAdaptiveTimeout <= 0)
throw new ArgumentException("minSamplesForAdaptiveTimeout must be positive", nameof(minSamplesForAdaptiveTimeout));
if (log == null)
throw new ArgumentNullException(nameof(log));
_minTimeout = _defaultMinTimeout;
_sampler = new ExecutionTimeSampler(log, sampleCount, logFrequency);
_latestSuccessTimestamps = new ConcurrentQueue<DateTime>();
_log = log;
_maxTimeout = maxTimeout;
_minSamplesForAdaptiveTimeout = minSamplesForAdaptiveTimeout;
_useAdaptiveTimeout = useAdaptiveTimeout;
_throwIfExcessiveTimeouts = throwIfExcessiveTimeouts;
_metrics = Metrics.Factory.CreateMetricRouter();
}
public AdaptiveTimeout(TimeSpan maxTimeout, TimeSpan minTimeout, ILogger log, int sampleCount = 100, int logFrequency = 1000, int minSamplesForAdaptiveTimeout = 30, bool useAdaptiveTimeout = true, bool throwIfExcessiveTimeouts = false)
: this(maxTimeout, log, sampleCount, logFrequency, minSamplesForAdaptiveTimeout, useAdaptiveTimeout, throwIfExcessiveTimeouts) {
if (minTimeout < TimeSpan.Zero)
throw new ArgumentException("minTimeout must be non-negative", nameof(minTimeout));
if (minTimeout >= maxTimeout)
throw new ArgumentException("minTimeout must be less than maxTimeout", nameof(minTimeout));
_minTimeout = minTimeout;
}
public void ClearSamples() {
Interlocked.Exchange(ref _timeSpikeDecay, 0);
_sampler.ClearSamples();
}
/// <summary>
/// Ignores the result of a function if it runs longer than a budgeted time, unblocking the caller.
/// Logs aggregate execution time data.
/// Manages its own timeout.
/// A cancellation token is passed to the executing function so it may exit cleanly if timeout is reached.
/// Please don't wrap a cached function in this timeout if adaptive timeouts enabled, normal distributions are better.
/// DO NOT use a single AdaptiveTimeout for multiple functions.
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="func"></param>
/// <param name="parentToken"></param>
/// <param name="latencyObservation">A method that is used to observe the latency of the request.</param>
/// <returns>Returns a Fail result if a task runs longer than its budgeted time.</returns>
public async Task<Result<T>> ExecuteWithTimeout<T>(Func<CancellationToken, T> func, CancellationToken parentToken = default, Action<double> latencyObservation = null) {
DateTime startTime = default;
var result = await Timeout.ExecuteWithTimeout(GetAdaptiveTimeout(), (timeoutToken) =>
_sampler.SampleExecutionTime(() => {
startTime = DateTime.Now; // for ordinal tracking; see use in TimeSpikeSafetyValve
return func(timeoutToken);
}, latencyObservation), parentToken);
TimeSpikeSafetyValve(result.IsSuccess, startTime);
if (!result.IsSuccess) {
_metrics.Observe(AdaptiveTimeoutDefinitions.TimeoutsTotal, 1, new LabelValues());
}
return result;
}
/// <summary>
/// Ignores the result of a function if it runs longer than a budgeted time, unblocking the caller.
/// Logs aggregate execution time data.
/// Manages its own timeout.
/// A cancellation token is passed to the executing function so it may exit cleanly if timeout is reached.
/// Please don't wrap a cached function in this timeout if adaptive timeouts enabled, normal distributions are better.
/// DO NOT use a single AdaptiveTimeout for multiple functions.
/// </summary>
/// <param name="func"></param>
/// <param name="parentToken"></param>
/// <param name="latencyObservation">A method that is used to observe the latency of the request.</param>
/// <returns>Returns a Fail result if a task runs longer than its budgeted time.</returns>
public async Task<Result> ExecuteWithTimeout(Action<CancellationToken> func, CancellationToken parentToken = default, Action<double> latencyObservation = null) {
DateTime startTime = default;
var result = await Timeout.ExecuteWithTimeout(GetAdaptiveTimeout(), (timeoutToken) =>
_sampler.SampleExecutionTime(() => {
startTime = DateTime.Now; // for ordinal tracking; see use in TimeSpikeSafetyValve
func(timeoutToken);
}, latencyObservation), parentToken);
TimeSpikeSafetyValve(result.IsSuccess, startTime);
if (!result.IsSuccess) {
_metrics.Observe(AdaptiveTimeoutDefinitions.TimeoutsTotal, 1, new LabelValues());
}
return result;
}
/// <summary>
/// Ignores the result of a function if it runs longer than a budgeted time, unblocking the caller.
/// Logs aggregate execution time data.
/// Manages its own timeout.
/// A cancellation token is passed to the executing function so it may exit cleanly if timeout is reached.
/// Please don't wrap a cached function in this timeout if adaptive timeouts enabled, normal distributions are better.
/// DO NOT use a single AdaptiveTimeout for multiple functions.
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="func"></param>
/// <param name="parentToken"></param>
/// <param name="latencyObservation">A method that is used to observe the latency of the request.</param>
/// <returns>Returns a Fail result if a task runs longer than its budgeted time.</returns>
public async Task<Result<T>> ExecuteWithTimeout<T>(Func<CancellationToken, Task<T>> func, CancellationToken parentToken = default, Action<double> latencyObservation = null) {
DateTime startTime = default;
var result = await Timeout.ExecuteWithTimeout(GetAdaptiveTimeout(), (timeoutToken) =>
_sampler.SampleExecutionTime(() => {
startTime = DateTime.Now; // for ordinal tracking; see use in TimeSpikeSafetyValve
return func(timeoutToken);
}, latencyObservation), parentToken);
TimeSpikeSafetyValve(result.IsSuccess, startTime);
if (!result.IsSuccess) {
_metrics.Observe(AdaptiveTimeoutDefinitions.TimeoutsTotal, 1, new LabelValues());
}
return result;
}
/// <summary>
/// Ignores the result of a function if it runs longer than a budgeted time, unblocking the caller.
/// Logs aggregate execution time data.
/// Manages its own timeout.
/// A cancellation token is passed to the executing function so it may exit cleanly if timeout is reached.
/// Please don't wrap a cached function in this timeout if adaptive timeouts enabled, normal distributions are better.
/// DO NOT use a single AdaptiveTimeout for multiple functions.
/// </summary>
/// <param name="func"></param>
/// <param name="parentToken"></param>
/// <param name="latencyObservation">A method that is used to observe the latency of the request.</param>
/// <returns>Returns a Fail result if a task runs longer than its budgeted time.</returns>
public async Task<Result> ExecuteWithTimeout(Func<CancellationToken, Task> func, CancellationToken parentToken = default, Action<double> latencyObservation = null) {
DateTime startTime = default;
var result = await Timeout.ExecuteWithTimeout(GetAdaptiveTimeout(), (timeoutToken) =>
_sampler.SampleExecutionTime(() => {
startTime = DateTime.Now; // for ordinal tracking; see use in TimeSpikeSafetyValve
return func(timeoutToken);
}, latencyObservation), parentToken);
TimeSpikeSafetyValve(result.IsSuccess, startTime);
if (!result.IsSuccess) {
_metrics.Observe(AdaptiveTimeoutDefinitions.TimeoutsTotal, 1, new LabelValues());
}
return result;
}
/// <summary>
/// Ignores the result of a function if it runs longer than a budgeted time, unblocking the caller.
/// Logs aggregate execution time data.
/// Manages its own timeout.
/// A cancellation token is passed to the executing function so it may exit cleanly if timeout is reached.
/// Please don't wrap a cached function in this timeout if adaptive timeouts enabled, normal distributions are better.
/// DO NOT use a single AdaptiveTimeout for multiple functions.
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="func"></param>
/// <param name="parentToken"></param>
/// <returns>Returns a Fail result if a task runs longer than its budgeted time.</returns>
public async Task<NetAPIResult<T>> ExecuteNetAPIWithTimeout<T>(Func<CancellationToken, NetAPIResult<T>> func, CancellationToken parentToken = default) {
DateTime startTime = default;
var result = await Timeout.ExecuteNetAPIWithTimeout(GetAdaptiveTimeout(), (timeoutToken) =>
_sampler.SampleExecutionTime(() => {
startTime = DateTime.Now; // for ordinal tracking; see use in TimeSpikeSafetyValve
return func(timeoutToken);
}), parentToken);
TimeSpikeSafetyValve(result.IsSuccess, startTime);
if (!result.IsSuccess) {
_metrics.Observe(AdaptiveTimeoutDefinitions.TimeoutsTotal, 1, new LabelValues());
}
return result;
}
/// <summary>
/// Ignores the result of a function if it runs longer than a budgeted time, unblocking the caller.
/// Logs aggregate execution time data.
/// Manages its own timeout.
/// A cancellation token is passed to the executing function so it may exit cleanly if timeout is reached.
/// Please don't wrap a cached function in this timeout if adaptive timeouts enabled, normal distributions are better.
/// DO NOT use a single AdaptiveTimeout for multiple functions.
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="func"></param>
/// <param name="parentToken"></param>
/// <returns>Returns a Fail result if a task runs longer than its budgeted time.</returns>
public async Task<SharpHoundRPC.Result<T>> ExecuteRPCWithTimeout<T>(Func<CancellationToken, SharpHoundRPC.Result<T>> func, CancellationToken parentToken = default) {
DateTime startTime = default;
var result = await Timeout.ExecuteRPCWithTimeout(GetAdaptiveTimeout(), (timeoutToken) =>
_sampler.SampleExecutionTime(() => {
startTime = DateTime.Now; // for ordinal tracking; see use in TimeSpikeSafetyValve
return func(timeoutToken);
}), parentToken);
TimeSpikeSafetyValve(result.IsSuccess, startTime);
if (!result.IsSuccess) {
_metrics.Observe(AdaptiveTimeoutDefinitions.TimeoutsTotal, 1, new LabelValues());
}
return result;
}
/// <summary>
/// Ignores the result of a function if it runs longer than a budgeted time, unblocking the caller.
/// Logs aggregate execution time data.
/// Manages its own timeout.
/// A cancellation token is passed to the executing function so it may exit cleanly if timeout is reached.
/// Please don't wrap a cached function in this timeout if adaptive timeouts enabled, normal distributions are better.
/// DO NOT use a single AdaptiveTimeout for multiple functions.
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="func"></param>
/// <param name="parentToken"></param>
/// <returns>Returns a Fail result if a task runs longer than its budgeted time.</returns>
public async Task<SharpHoundRPC.Result<T>> ExecuteRPCWithTimeout<T>(Func<CancellationToken, Task<SharpHoundRPC.Result<T>>> func, CancellationToken parentToken = default) {
DateTime startTime = default;
var result = await Timeout.ExecuteRPCWithTimeout(GetAdaptiveTimeout(), (timeoutToken) =>
_sampler.SampleExecutionTime(() => {
startTime = DateTime.Now; // for ordinal tracking; see use in TimeSpikeSafetyValve
return func(timeoutToken);
}), parentToken);
TimeSpikeSafetyValve(result.IsSuccess, startTime);
if (!result.IsSuccess) {
_metrics.Observe(AdaptiveTimeoutDefinitions.TimeoutsTotal, 1, new LabelValues());
}
return result;
}
public void Dispose() {
_sampler.Dispose();
}
// Within 7 standard deviations will have a conservative lower bound of catching 98% of executions (1 - 1/7^2),
// regardless of sample shape
// so long as those samples are independent and identically distributed
// (and if they're not, our TimeSpikeSafetyValve should provide us with some adaptability)
// But the effective collection rate is probably closer to 99+%
// (in part because we don't need to filter out "too fast" outliers)
// But we'll cap at configured maximum timeout
// https://modelassist.epixanalytics.com/space/EA/26574957/Tchebysheffs+Rule
// https://en.wikipedia.org/wiki/Independent_and_identically_distributed_random_variables
public TimeSpan GetAdaptiveTimeout() {
if (!UseAdaptiveTimeout())
return _maxTimeout;
try {
var stdDev = _sampler.StandardDeviation();
var adaptiveTimeoutMs = _sampler.Average() + (stdDev * StdDevMultiplier);
var cappedTimeoutMS = Math.Min(adaptiveTimeoutMs, _maxTimeout.TotalMilliseconds);
cappedTimeoutMS = Math.Max(cappedTimeoutMS, _minTimeout.TotalMilliseconds);
return TimeSpan.FromMilliseconds(cappedTimeoutMS);
}
catch (Exception ex) {
_log.LogError(ex, "Error calculating adaptive timeout, defaulting to max timeout.");
return _maxTimeout;
}
}
// AdaptiveTimeout will not respond well to rapid spikes in execution time
// imagine the wrapped function very regularly executes in 10ms
// then suddenly starts taking a regular 100ms
// this is fine (if it fits in our max timeout budget), and we shouldn't timeout
// so we should create a safety valve in case this happens to reset our data samples
private void TimeSpikeSafetyValve(bool isSuccess, DateTime startTime) {
if (isSuccess) {
AtomicDecrementWithFloor(ref _timeSpikeDecay, TimeSpikeForgiveness);
AddLatestSuccessTimestamp(startTime);
}
else {
Interlocked.Add(ref _timeSpikeDecay, TimeSpikePenalty);
if (Volatile.Read(ref _timeSpikeDecay) >= TimeSpikeThreshold) {
if (EnoughSuccessesSince(startTime)) {
// Time spike is in the past now, no action needed
// This happens when earlier calls report back timeouts
// but we've since seen sufficent successful calls completed in the time between
_log.LogTrace("Time spike hiccup spotted but since recovered.");
Interlocked.Exchange(ref _timeSpikeDecay, 0);
}
else {
TriggerTimeSpikeEvent();
}
}
}
}
private void TriggerTimeSpikeEvent() {
// Most recent calls made have been timing out
// If adaptive timeout is in play when a spike in timeout events occurs,
// flush our samples and back off to the max timeout until we have enough new ones
// to rebuild our data confidence
if (UseAdaptiveTimeout()) {
_log.LogTrace("Time spike safety valve event at timeout {CurrentTimeout}.", GetAdaptiveTimeout());
ClearSamples();
}
// Otherwise, if we're using the max configured timeout already and this spike in timeout events is still occuring,
// log it and maybe throw an error if so configuredx
else if (Volatile.Read(ref _timeSpikeDecay) >= ExcessiveTimeoutsThreshold) {
_log.LogWarning("This call is frequently running over the maximum allowed timeout of {MaxTimeout}.", _maxTimeout);
Interlocked.Exchange(ref _timeSpikeDecay, 0);
if (_throwIfExcessiveTimeouts)
throw new ExcessiveTimeoutsException($"This call is frequently running over the maximum allowed timeout of {_maxTimeout}.");
}
}
private bool UseAdaptiveTimeout() {
return _useAdaptiveTimeout && _sampler.Count >= _minSamplesForAdaptiveTimeout;
}
private void AddLatestSuccessTimestamp(DateTime startTime) {
while (_latestSuccessTimestamps.Count >= CountOfLatestSuccessToKeep) {
_latestSuccessTimestamps.TryDequeue(out var _);
}
_latestSuccessTimestamps.Enqueue(startTime);
}
private bool EnoughSuccessesSince(DateTime startTime) {
return _latestSuccessTimestamps.All(t => t >= startTime);
}
// AI-generated code
// Effectively accomplishes:
// // Interlocked.Add(ref location, -decrement);
// // Interlocked.Exchange(ref location, Math.Max(floor, location));
// But since the above doesn't guarnantee atomicity, we need to be more clever.
// This method will continually check the very latest value in <location>,
// compute the new expected value after the decrement,
// and try to replace <location> with this new value.
// If it fails for any reason (race condition), it does all this again
// until it wins the race.
// This is however supposedly still much faster than using lock objects.
// // Example:
/*
// target == 0
// 1: this thread
// 2: interceding thread
1: do {
1: var initialVal = target;
2: target = 2;
1: var computedVal = Math.Max(0, initialVal - 1); // computedVal == 0
1: } while (target != initialVal);
// target changed midway thru the op (2 != 0) and so isn't changed by CompareExchange, retry loop:
1: var initialVal = target; // 2
1: var computedVal = Math.Max(0, initialVal - 1); // computedVal == 1
1: } while (target != initialVal);
// target (2) == initialVal (2), assign target to 1 and exit loop
*/
public static void AtomicDecrementWithFloor(ref int target, int decrement, int floor = 0) {
int initialValue, computedValue;
do {
initialValue = Volatile.Read(ref target);
computedValue = Math.Max(floor, initialValue - decrement);
}
// If target is modified by another thread between initialValue assignment and now, continue loop
while (Interlocked.CompareExchange(ref target, computedValue, initialValue) != initialValue);
}
}