Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 30 additions & 3 deletions src/Grpc.Net.Client/Internal/GrpcCall.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
using Grpc.Net.Client.Internal.Http;
using Grpc.Shared;
using Microsoft.Extensions.Logging;
using System.Threading.Tasks;
#if SUPPORT_LOAD_BALANCING
using Grpc.Net.Client.Balancer.Internal;
#endif
Expand Down Expand Up @@ -182,6 +183,10 @@ public void Dispose()
Disposed = true;

Cleanup(GrpcProtocolConstants.DisposeCanceledStatus);

// If the call was disposed then observe any potential response exception.
// Observe the task's exception to prevent TaskScheduler.UnobservedTaskException from firing.
_responseTcs?.Task.ObserveException();
}
}

Expand Down Expand Up @@ -316,9 +321,21 @@ private async Task<Metadata> GetResponseHeadersCoreAsync()

return metadata;
}
catch (Exception ex) when (ResolveException(ErrorStartingCallMessage, ex, out _, out var resolvedException))
catch (Exception ex)
{
throw resolvedException;
// If there was an error fetching response headers then it's likely the same error is reported
// by response TCS. The user is unlikely to observe both errors.
// Observe the task's exception to prevent TaskScheduler.UnobservedTaskException from firing.
_responseTcs?.Task.ObserveException();

if (ResolveException(ErrorStartingCallMessage, ex, out _, out var resolvedException))
{
throw resolvedException;
}
else
{
throw;
}
}
}

Expand Down Expand Up @@ -584,13 +601,23 @@ private async Task RunCall(HttpRequestMessage request, TimeSpan? timeout)
// Update HTTP response TCS before clean up. Needs to happen first because cleanup will
// cancel the TCS for anyone still listening.
_httpResponseTcs.TrySetException(resolvedException);
_httpResponseTcs.Task.ObserveException();

Cleanup(status.Value);

// Update response TCS after overall call status is resolved. This is required so that
// the call is completed before an error is thrown from ResponseAsync. If it happens
// afterwards then there is a chance GetStatus() will error because the call isn't complete.
_responseTcs?.TrySetException(resolvedException);
if (_responseTcs != null)
{
_responseTcs.TrySetException(resolvedException);

// Always observe cancellation-like exceptions.
if (IsCancellationOrDeadlineException(ex))
{
_responseTcs.Task.ObserveException();
}
}
}

// Verify that FinishCall is called in every code path of this method.
Expand Down
51 changes: 51 additions & 0 deletions src/Grpc.Net.Client/Internal/TaskExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#region Copyright notice and license

// Copyright 2019 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#endregion

namespace Grpc.Net.Client.Internal
{
internal static class TaskExtensions
{
private static readonly Action<Task> IgnoreTaskContinuation = t => { _ = t.Exception; };

/// <summary>
/// Observes and ignores a potential exception on a given Task.
/// If a Task fails and throws an exception which is never observed, it will be caught by the .NET finalizer thread.
/// This function awaits the given task and if the exception is thrown, it observes this exception and simply ignores it.
/// This will prevent the escalation of this exception to the .NET finalizer thread.
/// </summary>
/// <param name="task">The task to be ignored.</param>
public static void ObserveException(this Task task)
{
if (task.IsCompleted)
{
if (task.IsFaulted)
{
_ = task.Exception;
}
}
else
{
task.ContinueWith(
IgnoreTaskContinuation,
CancellationToken.None,
TaskContinuationOptions.OnlyOnFaulted | TaskContinuationOptions.ExecuteSynchronously,
TaskScheduler.Default);
}
}
}
}
86 changes: 86 additions & 0 deletions test/Grpc.Net.Client.Tests/AsyncUnaryCallTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
using Grpc.Net.Client.Tests.Infrastructure;
using Grpc.Shared;
using Grpc.Tests.Shared;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using NUnit.Framework;

namespace Grpc.Net.Client.Tests
Expand Down Expand Up @@ -227,5 +229,89 @@ public async Task AsyncUnaryCall_SuccessTrailersOnly_ThrowNoMessageError()
Assert.AreEqual(0, headers.Count);
Assert.AreEqual(0, call.GetTrailers().Count);
}

public enum ResponseHandleAction
{
ResponseAsync,
ResponseHeadersAsync,
Dispose,
Nothing
}

[Test]
[TestCase(0, ResponseHandleAction.ResponseAsync)]
[TestCase(0, ResponseHandleAction.ResponseHeadersAsync)]
[TestCase(0, ResponseHandleAction.Dispose)]
[TestCase(1, ResponseHandleAction.Nothing)]
public async Task AsyncUnaryCall_CallFailed_NoUnobservedExceptions(int expectedUnobservedExceptions, ResponseHandleAction action)
{
// Arrange
var services = new ServiceCollection();
services.AddNUnitLogger();
var loggerFactory = services.BuildServiceProvider().GetRequiredService<ILoggerFactory>();
var logger = loggerFactory.CreateLogger<CancellationTests>();

var unobservedExceptions = new List<Exception>();
EventHandler<UnobservedTaskExceptionEventArgs> onUnobservedTaskException = (sender, e) =>
{
unobservedExceptions.Add(e.Exception!);

logger.LogCritical(e.Exception!, "Unobserved task exception. Observed: " + e.Observed);
};

TaskScheduler.UnobservedTaskException += onUnobservedTaskException;

try
{
var httpClient = ClientTestHelpers.CreateTestClient(request =>
{
throw new Exception("Test error");
});
var invoker = HttpClientCallInvokerFactory.Create(httpClient, loggerFactory: loggerFactory);

// Act
logger.LogDebug("Starting call");
await MakeGrpcCallAsync(logger, invoker, action);

logger.LogDebug("Waiting for finalizers");
// Provoke the garbage collector to find the unobserved exception.
GC.Collect();
// Wait for any failed tasks to be garbage collected
GC.WaitForPendingFinalizers();

// Assert
Assert.AreEqual(expectedUnobservedExceptions, unobservedExceptions.Count);

static async Task MakeGrpcCallAsync(ILogger logger, HttpClientCallInvoker invoker, ResponseHandleAction action)
{
var runTask = Task.Run(async () =>
{
var call = invoker.AsyncUnaryCall(ClientTestHelpers.ServiceMethod, string.Empty, new CallOptions(), new HelloRequest());

switch (action)
{
case ResponseHandleAction.ResponseAsync:
await ExceptionAssert.ThrowsAsync<RpcException>(() => call.ResponseAsync);
break;
case ResponseHandleAction.ResponseHeadersAsync:
await ExceptionAssert.ThrowsAsync<RpcException>(() => call.ResponseHeadersAsync);
break;
case ResponseHandleAction.Dispose:
call.Dispose();
break;
default:
// Do nothing.
break;
}
});

await runTask;
}
}
finally
{
TaskScheduler.UnobservedTaskException -= onUnobservedTaskException;
}
}
}
}
2 changes: 0 additions & 2 deletions test/Grpc.Net.Client.Tests/Retry/HedgingCallTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,6 @@ public async Task ActiveCalls_FatalStatusCode_CleansUpActiveCalls()
// Fatal status code will cancel other calls
Assert.AreEqual(0, hedgingCall._activeCalls.Count);
await hedgingCall.CreateHedgingCallsTask!.DefaultTimeout();

waitUntilFinishedTcs.SetResult(null);
}

[Test]
Expand Down