< Summary

Information
Class: dotnet_etcd.Watcher
Assembly: dotnet-etcd
File(s): /home/runner/work/dotnet-etcd/dotnet-etcd/dotnet-etcd/watchclient/WatchStream.cs
Line coverage
0%
Covered lines: 0
Uncovered lines: 56
Coverable lines: 56
Total lines: 128
Line coverage: 0%
Branch coverage
0%
Covered branches: 0
Total branches: 12
Branch coverage: 0%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)0%620%
CreateWatchAsync()100%210%
CancelWatchAsync()100%210%
ProcessWatchResponses()0%110100%
Dispose()100%210%

File(s)

/home/runner/work/dotnet-etcd/dotnet-etcd/dotnet-etcd/watchclient/WatchStream.cs

#LineLine coverage
 1#nullable enable
 2using System;
 3using System.Collections.Concurrent;
 4using System.Threading;
 5using System.Threading.Tasks;
 6using dotnet_etcd.interfaces;
 7using Etcdserverpb;
 8using Grpc.Core;
 9
 10namespace dotnet_etcd;
 11
 12/// <summary>
 13///     Manages a bidirectional streaming connection to the etcd watch API
 14/// </summary>
 15public class Watcher : IWatcher
 16{
 017    private readonly ConcurrentDictionary<long, Action<WatchResponse>> _callbacks = new();
 018    private readonly CancellationTokenSource _cts = new();
 19
 20    private readonly IAsyncDuplexStreamingCall<WatchRequest, WatchResponse> _streamingCall;
 21    private readonly Action? _onConnectionFailure;
 22
 23
 24    /// <summary>
 25    ///     Creates a new Watcher
 26    /// </summary>
 27    /// <param name="streamingCall">The streaming call to use</param>
 28    /// <param name="onConnectionFailure">Action to invoke when connection fails</param>
 029    public Watcher(IAsyncDuplexStreamingCall<WatchRequest, WatchResponse> streamingCall, Action? onConnectionFailure = n
 030    {
 031        _streamingCall = streamingCall ?? throw new ArgumentNullException(nameof(streamingCall));
 032        _onConnectionFailure = onConnectionFailure;
 033        _ = ProcessWatchResponses();
 034    }
 35
 36
 37    /// <summary>
 38    ///     Creates a watch for the specified request
 39    /// </summary>
 40    /// <param name="request">The watch request</param>
 41    /// <param name="callback">The callback to invoke when a watch event is received</param>
 42    /// <returns>A task that completes when the watch is created</returns>
 43    public async Task CreateWatchAsync(WatchRequest request, Action<WatchResponse> callback)
 044    {
 045        ArgumentNullException.ThrowIfNull(request);
 46
 047        ArgumentNullException.ThrowIfNull(callback);
 48
 049        _callbacks[request.CreateRequest.WatchId] = callback;
 50
 51        // Send the watch request
 052        await _streamingCall.RequestStream.WriteAsync(request);
 053    }
 54
 55    /// <summary>
 56    ///     Cancels a watch with the specified ID
 57    /// </summary>
 58    /// <param name="watchId">The ID of the watch to cancel</param>
 59    /// <returns>A task that completes when the watch is canceled</returns>
 60    public async Task CancelWatchAsync(long watchId)
 061    {
 62        // Send a cancel request
 063        WatchRequest request = new() { CancelRequest = new WatchCancelRequest { WatchId = watchId } };
 64
 065        await _streamingCall.RequestStream.WriteAsync(request);
 66
 67        // Remove the callback
 068        _callbacks.TryRemove(watchId, out _);
 069    }
 70
 71    private async Task ProcessWatchResponses()
 072    {
 73        try
 074        {
 075            while (await _streamingCall.ResponseStream.MoveNext(_cts.Token))
 076            {
 077                WatchResponse response = _streamingCall.ResponseStream.Current;
 078                if (!_callbacks.TryGetValue(response.WatchId, out Action<WatchResponse>? cb))
 079                {
 080                    continue;
 81                }
 82
 083                cb(response);
 84
 85                // If the watch was canceled, remove the callback after invoking it
 086                if (response.Canceled)
 087                {
 088                    _callbacks.TryRemove(response.WatchId, out _);
 089                }
 090            }
 091        }
 092        catch (RpcException ex) when (ex.StatusCode == StatusCode.Cancelled)
 093        {
 94            // This is expected when the stream is canceled
 095        }
 096        catch (OperationCanceledException)
 097        {
 98            // This is expected when the token is canceled
 099        }
 0100        catch (RpcException ex)
 0101        {
 102            // Log a simplified message for expected connection failures
 0103            Console.WriteLine($"Watch stream connection lost: {ex.StatusCode} - {ex.Message}");
 0104            _onConnectionFailure?.Invoke();
 0105        }
 0106        catch (Exception ex)
 0107        {
 108            // Log the exception
 0109            await Console.Error.WriteAsync($"Error processing watch responses: {ex}");
 0110            _onConnectionFailure?.Invoke();
 111#if DEBUG
 112            // Only re-throw in debug mode to help with debugging
 0113            throw;
 114#endif
 115        }
 0116    }
 117
 118    /// <summary>
 119    ///     Disposes the watch stream
 120    /// </summary>
 121    public void Dispose()
 0122    {
 0123        _cts.Cancel();
 0124        _streamingCall.Dispose();
 125
 0126        GC.SuppressFinalize(this);
 0127    }
 128}