diff --git a/Sources/EventViewerX/SearchEvents.QueryLog.cs b/Sources/EventViewerX/SearchEvents.QueryLog.cs index 3ecd893..cfb016a 100644 --- a/Sources/EventViewerX/SearchEvents.QueryLog.cs +++ b/Sources/EventViewerX/SearchEvents.QueryLog.cs @@ -3,6 +3,8 @@ using System.Net; using System.Runtime.CompilerServices; using System.Threading; +using System.Collections.Concurrent; +using System.Threading.Tasks; namespace EventViewerX; @@ -426,22 +428,50 @@ public static IEnumerable QueryLogsParallelForEach(string logName, } var results = new BlockingCollection(); + var exceptions = new ConcurrentQueue(); var options = new ParallelOptions { MaxDegreeOfParallelism = maxThreads }; - Task.Factory.StartNew(() => { - Parallel.ForEach(machineNames, options, machineName => { - _logger.WriteVerbose("Starting task for machine: " + machineName); - var queryResults = QueryLog(logName, eventIds, machineName, providerName, keywords, level, startTime, endTime, userId, maxEvents, eventRecordId, cancellationToken: cancellationToken); - foreach (var result in queryResults) { - if (cancellationToken.IsCancellationRequested) break; - results.Add(result, cancellationToken); - } - _logger.WriteVerbose("Finished task for machine: " + machineName); - }); - results.CompleteAdding(); + Task workerTask = Task.Factory.StartNew(() => { + try { + Parallel.ForEach(machineNames, options, machineName => { + try { + _logger.WriteVerbose("Starting task for machine: " + machineName); + var queryResults = QueryLog(logName, eventIds, machineName, providerName, keywords, level, startTime, endTime, userId, maxEvents, eventRecordId, cancellationToken: cancellationToken); + foreach (var result in queryResults) { + if (cancellationToken.IsCancellationRequested) break; + results.Add(result, cancellationToken); + } + _logger.WriteVerbose("Finished task for machine: " + machineName); + } catch (Exception ex) { + exceptions.Enqueue(ex); + } + }); + } catch (Exception ex) { + exceptions.Enqueue(ex); + } finally { + results.CompleteAdding(); + } }, cancellationToken); - return results.GetConsumingEnumerable(cancellationToken); + return EnumerateResults(results, workerTask, exceptions, cancellationToken); + } + + private static IEnumerable EnumerateResults(BlockingCollection results, Task workerTask, ConcurrentQueue exceptions, CancellationToken cancellationToken) { + try { + foreach (var result in results.GetConsumingEnumerable(cancellationToken)) { + yield return result; + } + } finally { + try { + workerTask.Wait(cancellationToken); + } catch (Exception ex) { + exceptions.Enqueue(ex); + } + + if (!exceptions.IsEmpty) { + throw new AggregateException(exceptions); + } + } } public static IEnumerable QueryLogsParallelForEach(KnownLog logName, List eventIds = null, List machineNames = null, string providerName = null, Keywords? keywords = null, Level? level = null, DateTime? startTime = null, DateTime? endTime = null, string userId = null, int maxEvents = 0, int maxThreads = 4, List eventRecordId = null, CancellationToken cancellationToken = default) {