< Summary

Information
Class: dotnet_etcd.WatchManager
Assembly: dotnet-etcd
File(s): /home/runner/work/dotnet-etcd/dotnet-etcd/dotnet-etcd/watchclient/WatchManager.cs
Line coverage
4%
Covered lines: 16
Uncovered lines: 370
Coverable lines: 386
Total lines: 696
Line coverage: 4.1%
Branch coverage
6%
Covered branches: 4
Total branches: 60
Branch coverage: 6.6%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)50%22100%
WatchAsync()100%210%
WrappedCallback()0%2040%
Watch(...)100%210%
Watch(...)100%210%
Callback()0%2040%
WatchRange(...)0%2040%
Watch(...)0%2040%
WatchRange(...)0%2040%
WatchAsync()100%210%
WatchRangeAsync()100%210%
WatchAsync()100%210%
WatchRangeAsync()100%210%
WatchRange(...)100%210%
CancelWatch(...)0%4260%
Dispose()50%12644.44%
GetRangeEnd(...)0%2040%
GetServerWatchId(...)0%2040%
EnsureWatchStream(...)0%620%
HandleConnectionFailure()100%210%
get_WatchId()100%210%
get_CancellationTokenSource()100%210%
get_Request()100%210%
get_Callback()100%210%

File(s)

/home/runner/work/dotnet-etcd/dotnet-etcd/dotnet-etcd/watchclient/WatchManager.cs

#LineLine coverage
 1#nullable enable
 2using System;
 3using System.Collections.Concurrent;
 4using System.Collections.Generic;
 5using System.Text;
 6using System.Threading;
 7using System.Threading.Tasks;
 8using dotnet_etcd.interfaces;
 9using Etcdserverpb;
 10using Google.Protobuf;
 11using Grpc.Core;
 12using Mvccpb;
 13
 14namespace dotnet_etcd;
 15
 16/// <summary>
 17///     Manages watch streams and provides a way to cancel watches
 18/// </summary>
 19public class WatchManager : IWatchManager
 20{
 6321    private readonly object _lockObject = new();
 6322    private readonly ConcurrentDictionary<long, WatchCancellation> _watches = new();
 6323    private readonly ConcurrentDictionary<long, long> _watchIdMapping = new();
 24
 25    private readonly
 26        Func<Metadata?, DateTime?, CancellationToken, IAsyncDuplexStreamingCall<WatchRequest, WatchResponse>>
 27        _watchStreamFactory;
 28
 29    private bool _disposed;
 6330    private long _nextWatchId = 1;
 31    private Watcher? _watchStream;
 32
 33    /// <summary>
 34    ///     Creates a new WatchManager
 35    /// </summary>
 36    /// <param name="watchStreamFactory">A factory function that creates a new watch stream</param>
 6337    public WatchManager(
 6338        Func<Metadata?, DateTime?, CancellationToken, IAsyncDuplexStreamingCall<WatchRequest, WatchResponse>>
 12639            watchStreamFactory) => _watchStreamFactory =
 6340        watchStreamFactory ?? throw new ArgumentNullException(nameof(watchStreamFactory));
 41
 42    /// <summary>
 43    ///     Creates a new watch request
 44    /// </summary>
 45    /// <param name="request">The watch requests to create</param>
 46    /// <param name="callback">The callback to invoke when a watch event is received</param>
 47    /// <param name="headers">The initial metadata to send with the call</param>
 48    /// <param name="deadline">An optional deadline for the call</param>
 49    /// <param name="cancellationToken">An optional token for canceling the call</param>
 50    /// <returns>A watch ID that can be used to cancel the watch</returns>
 51    public async Task<long> WatchAsync(WatchRequest request, Action<WatchResponse> callback, Metadata? headers = null,
 52        DateTime? deadline = null, CancellationToken cancellationToken = default)
 053    {
 054        ObjectDisposedException.ThrowIf(_disposed, this);
 55
 56        // Create a new watch stream if needed
 057        EnsureWatchStream(headers, deadline, cancellationToken);
 58
 59        // Generate a new watch ID
 060        long watchId = Interlocked.Increment(ref _nextWatchId);
 61
 62        // Create a cancellation token source that can be used to cancel the watch
 063        CancellationTokenSource cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
 64
 65        // Create a watch cancellation object
 066        WatchCancellation watchCancellation = new()
 067        {
 068            WatchId = watchId,
 069            CancellationTokenSource = cts,
 070            Request = request,
 071            Callback = WrappedCallback
 072        };
 73
 074        request.CreateRequest.WatchId = watchId;
 75        // Create the watch
 076        await _watchStream!.CreateWatchAsync(request, WrappedCallback).ConfigureAwait(false);
 77
 78        // Add the watch cancellation to the dictionary
 079        _watches[watchId] = watchCancellation;
 80
 81        // Since we don't get a server watch ID from CreateWatchAsync, we can't map it
 82        // The server will assign a watch ID and include it in the watch response
 83        // Our wrappedCallback will handle this mapping when it receives the response
 84
 085        return watchId;
 86
 87        // Create a wrapper callback that checks if the watch has been canceled
 88        void WrappedCallback(WatchResponse response)
 089        {
 090            if (cts.IsCancellationRequested)
 091            {
 092                return;
 93            }
 94
 95            // If this is a creation response, update our watch ID mapping
 096            if (response.Created)
 097            {
 98                // Map the server-assigned watch ID to our client-generated watch ID
 099                _watchIdMapping[response.WatchId] = watchId;
 0100            }
 101
 0102            callback(response);
 0103        }
 0104    }
 105
 106    /// <summary>
 107    ///     Creates a new watch request
 108    /// </summary>
 109    /// <param name="request">The watch request to create</param>
 110    /// <param name="callback">The callback to invoke when a watch event is received</param>
 111    /// <param name="headers">The initial metadata to send with the call</param>
 112    /// <param name="deadline">An optional deadline for the call</param>
 113    /// <param name="cancellationToken">An optional token for canceling the call</param>
 114    /// <returns>A watch ID that can be used to cancel the watch</returns>
 115    public long Watch(WatchRequest request, Action<WatchResponse> callback, Metadata? headers = null,
 116        DateTime? deadline = null, CancellationToken cancellationToken = default)
 0117    {
 118        // Run the async method synchronously
 0119        Task<long> task = Task.Run(() => WatchAsync(request, callback, headers, deadline, cancellationToken), cancellati
 0120        task.Wait(cancellationToken);
 0121        return task.Result;
 0122    }
 123
 124    /// <summary>
 125    ///     Watches a specific key
 126    /// </summary>
 127    /// <param name="key">Key to watch</param>
 128    /// <param name="action">Action to be executed when watch event is triggered</param>
 129    /// <param name="headers">The initial metadata to send with the call</param>
 130    /// <param name="deadline">An optional deadline for the call</param>
 131    /// <param name="cancellationToken">An optional token for canceling the call</param>
 132    /// <returns>Watch ID</returns>
 133    public long Watch(string key, Action<WatchEvent> action, Metadata? headers = null, DateTime? deadline = null,
 134        CancellationToken cancellationToken = default)
 0135    {
 0136        ObjectDisposedException.ThrowIf(_disposed, this);
 137
 138        // Create a watch request for the key
 0139        WatchRequest request = new()
 0140        {
 0141            CreateRequest = new WatchCreateRequest
 0142            {
 0143                Key = ByteString.CopyFromUtf8(key), ProgressNotify = true, PrevKv = true
 0144            }
 0145        };
 146
 147        // Call the Watch method with the request
 0148        return Watch(request, Callback, headers, deadline, cancellationToken);
 149
 150        // Create a wrapper callback that converts the WatchResponse to a WatchEvent
 151        void Callback(WatchResponse response)
 0152        {
 0153            if (response.Events == null)
 0154            {
 0155                return;
 156            }
 157
 0158            foreach (Event evt in response.Events)
 0159            {
 0160                WatchEvent watchEvent = new() { Key = evt.Kv.Key.ToStringUtf8(), Value = evt.Kv.Value.ToStringUtf8(), Ty
 0161                action(watchEvent);
 0162            }
 0163        }
 0164    }
 165
 166    /// <summary>
 167    ///     Watches a range of keys with a prefix
 168    /// </summary>
 169    /// <param name="prefixKey">Prefix key to watch</param>
 170    /// <param name="action">Action to be executed when watch event is triggered</param>
 171    /// <param name="headers">The initial metadata to send with the call</param>
 172    /// <param name="deadline">An optional deadline for the call</param>
 173    /// <param name="cancellationToken">An optional token for canceling the call</param>
 174    /// <returns>Watch ID</returns>
 175    public long WatchRange(string prefixKey, Action<WatchEvent> action, Metadata? headers = null,
 176        DateTime? deadline = null, CancellationToken cancellationToken = default)
 0177    {
 0178        ObjectDisposedException.ThrowIf(_disposed, this);
 179
 180        // Create a watch request for the range
 0181        WatchRequest request = new()
 0182        {
 0183            CreateRequest = new WatchCreateRequest
 0184            {
 0185                Key = ByteString.CopyFromUtf8(prefixKey),
 0186                RangeEnd = ByteString.CopyFromUtf8(GetRangeEnd(prefixKey)),
 0187                ProgressNotify = true,
 0188                PrevKv = true
 0189            }
 0190        };
 191
 192        // Create a wrapper callback that converts the WatchResponse to a WatchEvent
 0193        Action<WatchResponse> callback = response =>
 0194        {
 0195            if (response.Events == null)
 0196            {
 0197                return;
 0198            }
 0199
 0200            foreach (Event evt in response.Events)
 0201            {
 0202                WatchEvent watchEvent = new()
 0203                {
 0204                    Key = evt.Kv.Key.ToStringUtf8(), Value = evt.Kv.Value.ToStringUtf8(), Type = evt.Type
 0205                };
 0206                action(watchEvent);
 0207            }
 0208        };
 209
 210        // Call the Watch method with the request
 0211        return Watch(request, callback, headers, deadline, cancellationToken);
 0212    }
 213
 214    /// <summary>
 215    ///     Watches a specific key with start revision
 216    /// </summary>
 217    /// <param name="key">Key to watch</param>
 218    /// <param name="startRevision">Start revision</param>
 219    /// <param name="action">Action to be executed when watch event is triggered</param>
 220    /// <param name="headers">The initial metadata to send with the call</param>
 221    /// <param name="deadline">An optional deadline for the call</param>
 222    /// <param name="cancellationToken">An optional token for canceling the call</param>
 223    /// <returns>Watch ID</returns>
 224    public long Watch(string key, long startRevision, Action<WatchEvent> action, Metadata? headers = null,
 225        DateTime? deadline = null, CancellationToken cancellationToken = default)
 0226    {
 0227        ObjectDisposedException.ThrowIf(_disposed, this);
 228
 229        // Create a watch request for the key with start revision
 0230        WatchRequest request = new()
 0231        {
 0232            CreateRequest = new WatchCreateRequest
 0233            {
 0234                Key = ByteString.CopyFromUtf8(key),
 0235                StartRevision = startRevision,
 0236                ProgressNotify = true,
 0237                PrevKv = true
 0238            }
 0239        };
 240
 241        // Create a wrapper callback that converts the WatchResponse to a WatchEvent
 0242        Action<WatchResponse> callback = response =>
 0243        {
 0244            if (response.Events == null)
 0245            {
 0246                return;
 0247            }
 0248
 0249            foreach (Event evt in response.Events)
 0250            {
 0251                WatchEvent watchEvent = new()
 0252                {
 0253                    Key = evt.Kv.Key.ToStringUtf8(), Value = evt.Kv.Value.ToStringUtf8(), Type = evt.Type
 0254                };
 0255                action(watchEvent);
 0256            }
 0257        };
 258
 259        // Call the Watch method with the request
 0260        return Watch(request, callback, headers, deadline, cancellationToken);
 0261    }
 262
 263    /// <summary>
 264    ///     Watches a range of keys with a prefix and start revision
 265    /// </summary>
 266    /// <param name="prefixKey">Prefix key to watch</param>
 267    /// <param name="startRevision">Start revision</param>
 268    /// <param name="action">Action to be executed when watch event is triggered</param>
 269    /// <param name="headers">The initial metadata to send with the call</param>
 270    /// <param name="deadline">An optional deadline for the call</param>
 271    /// <param name="cancellationToken">An optional token for canceling the call</param>
 272    /// <returns>Watch ID</returns>
 273    public long WatchRange(string prefixKey, long startRevision, Action<WatchEvent> action, Metadata? headers = null,
 274        DateTime? deadline = null, CancellationToken cancellationToken = default)
 0275    {
 0276        ObjectDisposedException.ThrowIf(_disposed, this);
 277
 278        // Create a watch request for the range with start revision
 0279        WatchRequest request = new()
 0280        {
 0281            CreateRequest = new WatchCreateRequest
 0282            {
 0283                Key = ByteString.CopyFromUtf8(prefixKey),
 0284                RangeEnd = ByteString.CopyFromUtf8(GetRangeEnd(prefixKey)),
 0285                StartRevision = startRevision,
 0286                ProgressNotify = true,
 0287                PrevKv = true
 0288            }
 0289        };
 290
 291        // Create a wrapper callback that converts the WatchResponse to a WatchEvent
 0292        Action<WatchResponse> callback = response =>
 0293        {
 0294            if (response.Events == null)
 0295            {
 0296                return;
 0297            }
 0298
 0299            foreach (Event evt in response.Events)
 0300            {
 0301                WatchEvent watchEvent = new()
 0302                {
 0303                    Key = evt.Kv.Key.ToStringUtf8(), Value = evt.Kv.Value.ToStringUtf8(), Type = evt.Type
 0304                };
 0305                action(watchEvent);
 0306            }
 0307        };
 308
 309        // Call the Watch method with the request
 0310        return Watch(request, callback, headers, deadline, cancellationToken);
 0311    }
 312
 313    /// <summary>
 314    ///     Watches a specific key asynchronously
 315    /// </summary>
 316    /// <param name="key">Key to watch</param>
 317    /// <param name="action">Action to be executed when watch event is triggered</param>
 318    /// <param name="headers">The initial metadata to send with the call</param>
 319    /// <param name="deadline">An optional deadline for the call</param>
 320    /// <param name="cancellationToken">An optional token for canceling the call</param>
 321    /// <returns>Watch ID</returns>
 322    public async Task<long> WatchAsync(string key, Action<WatchEvent> action, Metadata? headers = null,
 323        DateTime? deadline = null, CancellationToken cancellationToken = default)
 0324    {
 0325        ObjectDisposedException.ThrowIf(_disposed, this);
 326
 327        // Create a watch request for the key
 0328        WatchRequest request = new()
 0329        {
 0330            CreateRequest = new WatchCreateRequest
 0331            {
 0332                Key = ByteString.CopyFromUtf8(key), ProgressNotify = true, PrevKv = true
 0333            }
 0334        };
 335
 336        // Create a wrapper callback that converts the WatchResponse to a WatchEvent
 0337        Action<WatchResponse> callback = response =>
 0338        {
 0339            if (response.Events == null)
 0340            {
 0341                return;
 0342            }
 0343
 0344            foreach (Event evt in response.Events)
 0345            {
 0346                WatchEvent watchEvent = new()
 0347                {
 0348                    Key = evt.Kv.Key.ToStringUtf8(), Value = evt.Kv.Value.ToStringUtf8(), Type = evt.Type
 0349                };
 0350                action(watchEvent);
 0351            }
 0352        };
 353
 354        // Call the WatchAsync method with the request
 0355        return await WatchAsync(request, callback, headers, deadline, cancellationToken).ConfigureAwait(false);
 0356    }
 357
 358    /// <summary>
 359    ///     Watches a range of keys with a prefix asynchronously
 360    /// </summary>
 361    /// <param name="prefixKey">Prefix key to watch</param>
 362    /// <param name="action">Action to be executed when watch event is triggered</param>
 363    /// <param name="headers">The initial metadata to send with the call</param>
 364    /// <param name="deadline">An optional deadline for the call</param>
 365    /// <param name="cancellationToken">An optional token for canceling the call</param>
 366    /// <returns>Watch ID</returns>
 367    public async Task<long> WatchRangeAsync(string prefixKey, Action<WatchEvent> action, Metadata? headers = null,
 368        DateTime? deadline = null, CancellationToken cancellationToken = default)
 0369    {
 0370        ObjectDisposedException.ThrowIf(_disposed, this);
 371
 372        // Create a watch request for the range
 0373        WatchRequest request = new()
 0374        {
 0375            CreateRequest = new WatchCreateRequest
 0376            {
 0377                Key = ByteString.CopyFromUtf8(prefixKey),
 0378                RangeEnd = ByteString.CopyFromUtf8(GetRangeEnd(prefixKey)),
 0379                ProgressNotify = true,
 0380                PrevKv = true
 0381            }
 0382        };
 383
 384        // Create a wrapper callback that converts the WatchResponse to a WatchEvent
 0385        Action<WatchResponse> callback = response =>
 0386        {
 0387            if (response.Events == null)
 0388            {
 0389                return;
 0390            }
 0391
 0392            foreach (Event evt in response.Events)
 0393            {
 0394                WatchEvent watchEvent = new()
 0395                {
 0396                    Key = evt.Kv.Key.ToStringUtf8(), Value = evt.Kv.Value.ToStringUtf8(), Type = evt.Type
 0397                };
 0398                action(watchEvent);
 0399            }
 0400        };
 401
 402        // Call the WatchAsync method with the request
 0403        return await WatchAsync(request, callback, headers, deadline, cancellationToken).ConfigureAwait(false);
 0404    }
 405
 406    /// <summary>
 407    ///     Watches a specific key with start revision asynchronously
 408    /// </summary>
 409    /// <param name="key">Key to watch</param>
 410    /// <param name="startRevision">Start revision</param>
 411    /// <param name="action">Action to be executed when watch event is triggered</param>
 412    /// <param name="headers">The initial metadata to send with the call</param>
 413    /// <param name="deadline">An optional deadline for the call</param>
 414    /// <param name="cancellationToken">An optional token for canceling the call</param>
 415    /// <returns>Watch ID</returns>
 416    public async Task<long> WatchAsync(string key, long startRevision, Action<WatchEvent> action,
 417        Metadata? headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default)
 0418    {
 0419        ObjectDisposedException.ThrowIf(_disposed, this);
 420
 421        // Create a watch request for the key with start revision
 0422        WatchRequest request = new()
 0423        {
 0424            CreateRequest = new WatchCreateRequest
 0425            {
 0426                Key = ByteString.CopyFromUtf8(key),
 0427                StartRevision = startRevision,
 0428                ProgressNotify = true,
 0429                PrevKv = true
 0430            }
 0431        };
 432
 433        // Create a wrapper callback that converts the WatchResponse to a WatchEvent
 0434        Action<WatchResponse> callback = response =>
 0435        {
 0436            if (response.Events == null)
 0437            {
 0438                return;
 0439            }
 0440
 0441            foreach (Event evt in response.Events)
 0442            {
 0443                WatchEvent watchEvent = new()
 0444                {
 0445                    Key = evt.Kv.Key.ToStringUtf8(), Value = evt.Kv.Value.ToStringUtf8(), Type = evt.Type
 0446                };
 0447                action(watchEvent);
 0448            }
 0449        };
 450
 451        // Call the WatchAsync method with the request
 0452        return await WatchAsync(request, callback, headers, deadline, cancellationToken).ConfigureAwait(false);
 0453    }
 454
 455    /// <summary>
 456    ///     Watches a range of keys with a prefix and start revision asynchronously
 457    /// </summary>
 458    /// <param name="prefixKey">Prefix key to watch</param>
 459    /// <param name="startRevision">Start revision</param>
 460    /// <param name="action">Action to be executed when watch event is triggered</param>
 461    /// <param name="headers">The initial metadata to send with the call</param>
 462    /// <param name="deadline">An optional deadline for the call</param>
 463    /// <param name="cancellationToken">An optional token for canceling the call</param>
 464    /// <returns>Watch ID</returns>
 465    public async Task<long> WatchRangeAsync(string prefixKey, long startRevision, Action<WatchEvent> action,
 466        Metadata? headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default)
 0467    {
 0468        ObjectDisposedException.ThrowIf(_disposed, this);
 469
 470        // Create a watch request for the range with start revision
 0471        WatchRequest request = new()
 0472        {
 0473            CreateRequest = new WatchCreateRequest
 0474            {
 0475                Key = ByteString.CopyFromUtf8(prefixKey),
 0476                RangeEnd = ByteString.CopyFromUtf8(GetRangeEnd(prefixKey)),
 0477                StartRevision = startRevision,
 0478                ProgressNotify = true,
 0479                PrevKv = true
 0480            }
 0481        };
 482
 483        // Create a wrapper callback that converts the WatchResponse to a WatchEvent
 0484        Action<WatchResponse> callback = response =>
 0485        {
 0486            if (response.Events == null)
 0487            {
 0488                return;
 0489            }
 0490
 0491            foreach (Event evt in response.Events)
 0492            {
 0493                WatchEvent watchEvent = new()
 0494                {
 0495                    Key = evt.Kv.Key.ToStringUtf8(), Value = evt.Kv.Value.ToStringUtf8(), Type = evt.Type
 0496                };
 0497                action(watchEvent);
 0498            }
 0499        };
 500
 501        // Call the WatchAsync method with the request
 0502        return await WatchAsync(request, callback, headers, deadline, cancellationToken).ConfigureAwait(false);
 0503    }
 504
 505    /// <summary>
 506    ///     Watches a key range
 507    /// </summary>
 508    /// <param name="path">The path to watch</param>
 509    /// <param name="callback">The callback to invoke when a watch event is received</param>
 510    /// <param name="headers">The initial metadata to send with the call</param>
 511    /// <param name="deadline">An optional deadline for the call</param>
 512    /// <param name="cancellationToken">An optional token for canceling the call</param>
 513    /// <returns>A watch ID that can be used to cancel the watch</returns>
 514    public long WatchRange(string path, Action<WatchResponse> callback, Metadata? headers = null,
 515        DateTime? deadline = null, CancellationToken cancellationToken = default)
 0516    {
 0517        ObjectDisposedException.ThrowIf(_disposed, this);
 518
 519        // Create a watch request for the range
 0520        WatchRequest request = new()
 0521        {
 0522            CreateRequest = new WatchCreateRequest
 0523            {
 0524                Key = ByteString.CopyFromUtf8(path),
 0525                RangeEnd = ByteString.CopyFromUtf8(GetRangeEnd(path)),
 0526                ProgressNotify = true,
 0527                PrevKv = true
 0528            }
 0529        };
 530
 531        // Call the Watch method with the request
 0532        return Watch(request, callback, headers, deadline, cancellationToken);
 0533    }
 534
 535    /// <summary>
 536    ///     Cancels a watch request
 537    /// </summary>
 538    /// <param name="watchId">The ID of the watch to cancel</param>
 539    public void CancelWatch(long watchId)
 0540    {
 0541        ObjectDisposedException.ThrowIf(_disposed, this);
 542
 0543        if (!_watches.TryRemove(watchId, out WatchCancellation? watchCancellation))
 0544        {
 0545            return;
 546        }
 547
 548        // Cancel the watch
 0549        watchCancellation.CancellationTokenSource.Cancel();
 550
 551        // Find the server watch ID that corresponds to our client watch ID
 0552        long serverWatchId = GetServerWatchId(watchId);
 553
 554        // Cancel the watch on the server if we found a mapping
 0555        if (serverWatchId != -1 && _watchStream != null)
 0556        {
 0557            _watchIdMapping.TryRemove(serverWatchId, out _);
 0558            _watchStream.CancelWatchAsync(serverWatchId).ContinueWith(_ =>
 0559            {
 0560                // Ignore exceptions
 0561            });
 0562        }
 563
 564        // Dispose the cancellation token source
 0565        watchCancellation.CancellationTokenSource.Dispose();
 0566    }
 567
 568    /// <summary>
 569    ///     Disposes the watch manager
 570    /// </summary>
 571    public void Dispose()
 1572    {
 1573        if (_disposed)
 0574        {
 0575            return;
 576        }
 577
 1578        _disposed = true;
 579
 580        // Cancel all watches
 3581        foreach (WatchCancellation watchCancellation in _watches.Values)
 0582        {
 0583            watchCancellation.CancellationTokenSource.Cancel();
 0584            watchCancellation.CancellationTokenSource.Dispose();
 0585        }
 586
 1587        _watches.Clear();
 588
 589        // Dispose the watch stream
 1590        if (_watchStream != null)
 0591        {
 0592            _watchStream.Dispose();
 0593            _watchStream = null;
 0594        }
 595
 1596        GC.SuppressFinalize(this);
 1597    }
 598
 599    private static string GetRangeEnd(string path)
 0600    {
 601        // Calculate the range end for the given path
 602        // This is the same logic used in EtcdClient.GetRangeEnd
 0603        byte[] bytes = Encoding.UTF8.GetBytes(path);
 0604        for (int i = bytes.Length - 1; i >= 0; i--)
 0605        {
 0606            if (bytes[i] >= 0xff)
 0607            {
 0608                continue;
 609            }
 610
 0611            bytes[i]++;
 0612            return Encoding.UTF8.GetString(bytes, 0, i + 1);
 613        }
 614
 0615        return string.Empty;
 0616    }
 617
 618    /// <summary>
 619    ///     Gets the server watch ID for a client watch ID
 620    /// </summary>
 621    /// <param name="clientWatchId">The client watch ID</param>
 622    /// <returns>The server watch ID, or -1 if not found</returns>
 623    private long GetServerWatchId(long clientWatchId)
 0624    {
 0625        foreach (KeyValuePair<long, long> kvp in _watchIdMapping)
 0626        {
 0627            if (kvp.Value == clientWatchId)
 0628            {
 0629                return kvp.Key;
 630            }
 0631        }
 632
 0633        return -1;
 0634    }
 635
 636    /// <param name="cancellationToken">An optional token for canceling the call</param>
 637    private void EnsureWatchStream(Metadata? headers, DateTime? deadline, CancellationToken cancellationToken)
 0638    {
 0639        lock (_lockObject)
 0640        {
 0641            if (_watchStream != null)
 0642            {
 0643                return;
 644            }
 645
 646            // Create a new watch stream
 0647            IAsyncDuplexStreamingCall<WatchRequest, WatchResponse> watchStreamCall =
 0648                _watchStreamFactory(headers, deadline, cancellationToken);
 0649            _watchStream = new Watcher(watchStreamCall, HandleConnectionFailure);
 0650        }
 0651    }
 652
 653    private void HandleConnectionFailure()
 0654    {
 0655        lock (_lockObject)
 0656        {
 0657            _watchStream = null;
 0658            _watchIdMapping.Clear();
 0659        }
 660
 661        // Must run async to avoid blocking the caller (which might be the dead stream loop)
 0662        Task.Run(async () =>
 0663        {
 0664            try
 0665            {
 0666                // Wait small delay to allow network to stabilize
 0667                await Task.Delay(500);
 0668
 0669                lock (_lockObject)
 0670                {
 0671                   if (_disposed) return;
 0672                   EnsureWatchStream(null, null, default);
 0673                }
 0674
 0675                foreach (var watch in _watches.Values)
 0676                {
 0677                    await _watchStream!.CreateWatchAsync(watch.Request, watch.Callback).ConfigureAwait(false);
 0678                }
 0679            }
 0680            catch (Exception ex)
 0681            {
 0682                Console.Error.WriteLine($"Watch reconnection failed: {ex.Message}");
 0683                // Retry in 5s
 0684                _ = Task.Delay(5000).ContinueWith(_ => HandleConnectionFailure());
 0685            }
 0686        });
 0687    }
 688
 689    private class WatchCancellation
 690    {
 0691        public long WatchId { get; set; }
 0692        public required CancellationTokenSource CancellationTokenSource { get; set; }
 0693        public required WatchRequest Request { get; set; }
 0694        public required Action<WatchResponse> Callback { get; set; }
 695    }
 696}

Methods/Properties

.ctor(System.Func`4<Grpc.Core.Metadata,System.Nullable`1<System.DateTime>,System.Threading.CancellationToken,dotnet_etcd.interfaces.IAsyncDuplexStreamingCall`2<Etcdserverpb.WatchRequest,Etcdserverpb.WatchResponse>>)
WatchAsync()
WrappedCallback()
Watch(Etcdserverpb.WatchRequest,System.Action`1<Etcdserverpb.WatchResponse>,Grpc.Core.Metadata,System.Nullable`1<System.DateTime>,System.Threading.CancellationToken)
Watch(System.String,System.Action`1<dotnet_etcd.WatchEvent>,Grpc.Core.Metadata,System.Nullable`1<System.DateTime>,System.Threading.CancellationToken)
Callback()
WatchRange(System.String,System.Action`1<dotnet_etcd.WatchEvent>,Grpc.Core.Metadata,System.Nullable`1<System.DateTime>,System.Threading.CancellationToken)
Watch(System.String,System.Int64,System.Action`1<dotnet_etcd.WatchEvent>,Grpc.Core.Metadata,System.Nullable`1<System.DateTime>,System.Threading.CancellationToken)
WatchRange(System.String,System.Int64,System.Action`1<dotnet_etcd.WatchEvent>,Grpc.Core.Metadata,System.Nullable`1<System.DateTime>,System.Threading.CancellationToken)
WatchAsync()
WatchRangeAsync()
WatchAsync()
WatchRangeAsync()
WatchRange(System.String,System.Action`1<Etcdserverpb.WatchResponse>,Grpc.Core.Metadata,System.Nullable`1<System.DateTime>,System.Threading.CancellationToken)
CancelWatch(System.Int64)
Dispose()
GetRangeEnd(System.String)
GetServerWatchId(System.Int64)
EnsureWatchStream(Grpc.Core.Metadata,System.Nullable`1<System.DateTime>,System.Threading.CancellationToken)
HandleConnectionFailure()
get_WatchId()
get_CancellationTokenSource()
get_Request()
get_Callback()