| | | 1 | | #nullable enable |
| | | 2 | | using System; |
| | | 3 | | using System.Collections.Concurrent; |
| | | 4 | | using System.Threading; |
| | | 5 | | using System.Threading.Tasks; |
| | | 6 | | using dotnet_etcd.interfaces; |
| | | 7 | | using Etcdserverpb; |
| | | 8 | | using Grpc.Core; |
| | | 9 | | |
| | | 10 | | namespace dotnet_etcd; |
| | | 11 | | |
| | | 12 | | /// <summary> |
| | | 13 | | /// Manages a bidirectional streaming connection to the etcd watch API |
| | | 14 | | /// </summary> |
| | | 15 | | public class Watcher : IWatcher |
| | | 16 | | { |
| | 0 | 17 | | private readonly ConcurrentDictionary<long, Action<WatchResponse>> _callbacks = new(); |
| | 0 | 18 | | private readonly CancellationTokenSource _cts = new(); |
| | | 19 | | |
| | | 20 | | private readonly IAsyncDuplexStreamingCall<WatchRequest, WatchResponse> _streamingCall; |
| | | 21 | | private readonly Action? _onConnectionFailure; |
| | | 22 | | |
| | | 23 | | |
| | | 24 | | /// <summary> |
| | | 25 | | /// Creates a new Watcher |
| | | 26 | | /// </summary> |
| | | 27 | | /// <param name="streamingCall">The streaming call to use</param> |
| | | 28 | | /// <param name="onConnectionFailure">Action to invoke when connection fails</param> |
| | 0 | 29 | | public Watcher(IAsyncDuplexStreamingCall<WatchRequest, WatchResponse> streamingCall, Action? onConnectionFailure = n |
| | 0 | 30 | | { |
| | 0 | 31 | | _streamingCall = streamingCall ?? throw new ArgumentNullException(nameof(streamingCall)); |
| | 0 | 32 | | _onConnectionFailure = onConnectionFailure; |
| | 0 | 33 | | _ = ProcessWatchResponses(); |
| | 0 | 34 | | } |
| | | 35 | | |
| | | 36 | | |
| | | 37 | | /// <summary> |
| | | 38 | | /// Creates a watch for the specified request |
| | | 39 | | /// </summary> |
| | | 40 | | /// <param name="request">The watch request</param> |
| | | 41 | | /// <param name="callback">The callback to invoke when a watch event is received</param> |
| | | 42 | | /// <returns>A task that completes when the watch is created</returns> |
| | | 43 | | public async Task CreateWatchAsync(WatchRequest request, Action<WatchResponse> callback) |
| | 0 | 44 | | { |
| | 0 | 45 | | ArgumentNullException.ThrowIfNull(request); |
| | | 46 | | |
| | 0 | 47 | | ArgumentNullException.ThrowIfNull(callback); |
| | | 48 | | |
| | 0 | 49 | | _callbacks[request.CreateRequest.WatchId] = callback; |
| | | 50 | | |
| | | 51 | | // Send the watch request |
| | 0 | 52 | | await _streamingCall.RequestStream.WriteAsync(request); |
| | 0 | 53 | | } |
| | | 54 | | |
| | | 55 | | /// <summary> |
| | | 56 | | /// Cancels a watch with the specified ID |
| | | 57 | | /// </summary> |
| | | 58 | | /// <param name="watchId">The ID of the watch to cancel</param> |
| | | 59 | | /// <returns>A task that completes when the watch is canceled</returns> |
| | | 60 | | public async Task CancelWatchAsync(long watchId) |
| | 0 | 61 | | { |
| | | 62 | | // Send a cancel request |
| | 0 | 63 | | WatchRequest request = new() { CancelRequest = new WatchCancelRequest { WatchId = watchId } }; |
| | | 64 | | |
| | 0 | 65 | | await _streamingCall.RequestStream.WriteAsync(request); |
| | | 66 | | |
| | | 67 | | // Remove the callback |
| | 0 | 68 | | _callbacks.TryRemove(watchId, out _); |
| | 0 | 69 | | } |
| | | 70 | | |
| | | 71 | | private async Task ProcessWatchResponses() |
| | 0 | 72 | | { |
| | | 73 | | try |
| | 0 | 74 | | { |
| | 0 | 75 | | while (await _streamingCall.ResponseStream.MoveNext(_cts.Token)) |
| | 0 | 76 | | { |
| | 0 | 77 | | WatchResponse response = _streamingCall.ResponseStream.Current; |
| | 0 | 78 | | if (!_callbacks.TryGetValue(response.WatchId, out Action<WatchResponse>? cb)) |
| | 0 | 79 | | { |
| | 0 | 80 | | continue; |
| | | 81 | | } |
| | | 82 | | |
| | 0 | 83 | | cb(response); |
| | | 84 | | |
| | | 85 | | // If the watch was canceled, remove the callback after invoking it |
| | 0 | 86 | | if (response.Canceled) |
| | 0 | 87 | | { |
| | 0 | 88 | | _callbacks.TryRemove(response.WatchId, out _); |
| | 0 | 89 | | } |
| | 0 | 90 | | } |
| | 0 | 91 | | } |
| | 0 | 92 | | catch (RpcException ex) when (ex.StatusCode == StatusCode.Cancelled) |
| | 0 | 93 | | { |
| | | 94 | | // This is expected when the stream is canceled |
| | 0 | 95 | | } |
| | 0 | 96 | | catch (OperationCanceledException) |
| | 0 | 97 | | { |
| | | 98 | | // This is expected when the token is canceled |
| | 0 | 99 | | } |
| | 0 | 100 | | catch (RpcException ex) |
| | 0 | 101 | | { |
| | | 102 | | // Log a simplified message for expected connection failures |
| | 0 | 103 | | Console.WriteLine($"Watch stream connection lost: {ex.StatusCode} - {ex.Message}"); |
| | 0 | 104 | | _onConnectionFailure?.Invoke(); |
| | 0 | 105 | | } |
| | 0 | 106 | | catch (Exception ex) |
| | 0 | 107 | | { |
| | | 108 | | // Log the exception |
| | 0 | 109 | | await Console.Error.WriteAsync($"Error processing watch responses: {ex}"); |
| | 0 | 110 | | _onConnectionFailure?.Invoke(); |
| | | 111 | | #if DEBUG |
| | | 112 | | // Only re-throw in debug mode to help with debugging |
| | 0 | 113 | | throw; |
| | | 114 | | #endif |
| | | 115 | | } |
| | 0 | 116 | | } |
| | | 117 | | |
| | | 118 | | /// <summary> |
| | | 119 | | /// Disposes the watch stream |
| | | 120 | | /// </summary> |
| | | 121 | | public void Dispose() |
| | 0 | 122 | | { |
| | 0 | 123 | | _cts.Cancel(); |
| | 0 | 124 | | _streamingCall.Dispose(); |
| | | 125 | | |
| | 0 | 126 | | GC.SuppressFinalize(this); |
| | 0 | 127 | | } |
| | | 128 | | } |