Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 42 additions & 12 deletions Sources/EventViewerX/SearchEvents.QueryLog.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
using System.Net;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Collections.Concurrent;
using System.Threading.Tasks;

namespace EventViewerX;

Expand Down Expand Up @@ -426,22 +428,50 @@ public static IEnumerable<EventObject> QueryLogsParallelForEach(string logName,
}

var results = new BlockingCollection<EventObject>();
var exceptions = new ConcurrentQueue<Exception>();
var options = new ParallelOptions { MaxDegreeOfParallelism = maxThreads };

Task.Factory.StartNew(() => {
Parallel.ForEach(machineNames, options, machineName => {
_logger.WriteVerbose("Starting task for machine: " + machineName);
var queryResults = QueryLog(logName, eventIds, machineName, providerName, keywords, level, startTime, endTime, userId, maxEvents, eventRecordId, cancellationToken: cancellationToken);
foreach (var result in queryResults) {
if (cancellationToken.IsCancellationRequested) break;
results.Add(result, cancellationToken);
}
_logger.WriteVerbose("Finished task for machine: " + machineName);
});
results.CompleteAdding();
Task workerTask = Task.Factory.StartNew(() => {
try {
Parallel.ForEach(machineNames, options, machineName => {
try {
_logger.WriteVerbose("Starting task for machine: " + machineName);
var queryResults = QueryLog(logName, eventIds, machineName, providerName, keywords, level, startTime, endTime, userId, maxEvents, eventRecordId, cancellationToken: cancellationToken);
foreach (var result in queryResults) {
if (cancellationToken.IsCancellationRequested) break;
results.Add(result, cancellationToken);
}
_logger.WriteVerbose("Finished task for machine: " + machineName);
} catch (Exception ex) {
exceptions.Enqueue(ex);
}
});
} catch (Exception ex) {
exceptions.Enqueue(ex);
} finally {
results.CompleteAdding();
}
}, cancellationToken);

return results.GetConsumingEnumerable(cancellationToken);
return EnumerateResults(results, workerTask, exceptions, cancellationToken);
}

private static IEnumerable<EventObject> EnumerateResults(BlockingCollection<EventObject> results, Task workerTask, ConcurrentQueue<Exception> exceptions, CancellationToken cancellationToken) {
try {
foreach (var result in results.GetConsumingEnumerable(cancellationToken)) {
yield return result;
}
} finally {
try {
workerTask.Wait(cancellationToken);
} catch (Exception ex) {
exceptions.Enqueue(ex);
}

if (!exceptions.IsEmpty) {
throw new AggregateException(exceptions);
}
}
}

public static IEnumerable<EventObject> QueryLogsParallelForEach(KnownLog logName, List<int> eventIds = null, List<string> machineNames = null, string providerName = null, Keywords? keywords = null, Level? level = null, DateTime? startTime = null, DateTime? endTime = null, string userId = null, int maxEvents = 0, int maxThreads = 4, List<long> eventRecordId = null, CancellationToken cancellationToken = default) {
Expand Down
Loading