2727package org .opensearch .performanceanalyzer .writer ;
2828
2929
30+ import org .opensearch .performanceanalyzer .PerformanceAnalyzerApp ;
31+ import org .opensearch .performanceanalyzer .config .PerformanceAnalyzerController ;
32+ import org .opensearch .performanceanalyzer .config .PluginSettings ;
33+ import org .opensearch .performanceanalyzer .http_action .config .PerformanceAnalyzerConfigAction ;
34+ import org .opensearch .performanceanalyzer .metrics .MetricsConfiguration ;
35+ import org .opensearch .performanceanalyzer .metrics .PerformanceAnalyzerMetrics ;
36+ import org .opensearch .performanceanalyzer .rca .framework .metrics .WriterMetrics ;
37+ import org .opensearch .performanceanalyzer .reader_writer_shared .Event ;
38+ import org .opensearch .performanceanalyzer .reader_writer_shared .EventLogFileHandler ;
39+ import org .apache .logging .log4j .Logger ;
40+
3041import java .util .ArrayList ;
42+ import java .util .Collections ;
3143import java .util .List ;
3244import java .util .concurrent .CancellationException ;
3345import java .util .concurrent .ExecutionException ;
3446import java .util .concurrent .Executors ;
3547import java .util .concurrent .ScheduledExecutorService ;
3648import java .util .concurrent .ScheduledFuture ;
3749import java .util .concurrent .TimeUnit ;
50+ import java .util .stream .Collectors ;
51+ import java .util .stream .LongStream ;
52+
3853import org .apache .logging .log4j .LogManager ;
39- import org .apache .logging .log4j .Logger ;
40- import org .opensearch .performanceanalyzer .PerformanceAnalyzerApp ;
41- import org .opensearch .performanceanalyzer .config .PerformanceAnalyzerController ;
42- import org .opensearch .performanceanalyzer .http_action .config .PerformanceAnalyzerConfigAction ;
43- import org .opensearch .performanceanalyzer .metrics .MetricsConfiguration ;
44- import org .opensearch .performanceanalyzer .metrics .PerformanceAnalyzerMetrics ;
45- import org .opensearch .performanceanalyzer .rca .framework .metrics .WriterMetrics ;
46- import org .opensearch .performanceanalyzer .reader_writer_shared .Event ;
47- import org .opensearch .performanceanalyzer .reader_writer_shared .EventLogFileHandler ;
4854
4955public class EventLogQueueProcessor {
5056 private static final Logger LOG = LogManager .getLogger (EventLogQueueProcessor .class );
5157
5258 private final ScheduledExecutorService writerExecutor = Executors .newScheduledThreadPool (1 );
59+ private final int filesCleanupPeriodicityMillis = PluginSettings .instance ().getMetricsDeletionInterval (); // defaults to 60seconds
5360 private final EventLogFileHandler eventLogFileHandler ;
5461 private final long initialDelayMillis ;
5562 private final long purgePeriodicityMillis ;
5663 private final PerformanceAnalyzerController controller ;
64+ private long lastCleanupTimeBucket ;
5765 private long lastTimeBucket ;
58-
5966 public EventLogQueueProcessor (
6067 EventLogFileHandler eventLogFileHandler ,
6168 long initialDelayMillis ,
@@ -64,6 +71,7 @@ public EventLogQueueProcessor(
6471 this .eventLogFileHandler = eventLogFileHandler ;
6572 this .initialDelayMillis = initialDelayMillis ;
6673 this .purgePeriodicityMillis = purgePeriodicityMillis ;
74+ this .lastCleanupTimeBucket = 0 ;
6775 this .lastTimeBucket = 0 ;
6876 this .controller = controller ;
6977 }
@@ -97,7 +105,7 @@ public void scheduleExecutor() {
97105
98106 // This executes every purgePeriodicityMillis interval.
99107 public void purgeQueueAndPersist () {
100- // Return if the writer is not enabled.
108+ // Drain the Queue, and if writer is enabled then persist to event log file .
101109 if (PerformanceAnalyzerConfigAction .getInstance () == null ) {
102110 return ;
103111 } else if (!controller .isPerformanceAnalyzerEnabled ()) {
@@ -162,6 +170,33 @@ public void purgeQueueAndPersist() {
162170 eventLogFileHandler .writeTmpFile (nextMetrics , nextTimeBucket );
163171 }
164172 LOG .debug ("Writing to disk complete." );
173+
174+ // Delete the older event log files every filesCleanupPeriod (defaults to 60)
175+ // In case files deletion takes longer/fails, we are okay with eventQueue reaching
176+ // its max size (100000), post that {@link PerformanceAnalyzerMetrics#emitMetric()}
177+ // will emit metric {@link WriterMetrics#METRICS_WRITE_ERROR} and return.
178+ cleanup ();
179+ }
180+
181+ private void cleanup () {
182+ long currentTimeMillis = System .currentTimeMillis ();
183+ if (lastCleanupTimeBucket != 0 ) {
184+ // Delete Event log files belonging to time bucket older than past filesCleanupPeriod(defaults to 60s)
185+ long currCleanupTimeBucket = PerformanceAnalyzerMetrics .getTimeInterval (currentTimeMillis );
186+ if (currCleanupTimeBucket - lastCleanupTimeBucket > filesCleanupPeriodicityMillis ) {
187+ // Get list of files(time buckets) for purging, considered range : [lastCleanupTimeBucket, currCleanupTimeBucket)
188+ List <String > filesForCleanup = LongStream .range (lastCleanupTimeBucket , currCleanupTimeBucket )
189+ .filter (timeMillis -> timeMillis % MetricsConfiguration .SAMPLING_INTERVAL == 0 )
190+ .mapToObj (String ::valueOf )
191+ .collect (Collectors .toList ());
192+ eventLogFileHandler .deleteFiles (Collections .unmodifiableList (filesForCleanup ));
193+ lastCleanupTimeBucket = currCleanupTimeBucket ;
194+ }
195+ } else {
196+ // First purge since the start-up, cleanup any lingering files.
197+ eventLogFileHandler .deleteAllFiles ();
198+ lastCleanupTimeBucket = PerformanceAnalyzerMetrics .getTimeInterval (currentTimeMillis );
199+ }
165200 }
166201
167202 private void writeAndRotate (
0 commit comments