Skip to content

.NET Client

A high-performance, low-allocation .NET client for interacting with UmaDB via gRPC.

Developed and maintained by Marcin Golenia.

The client is async-only (no synchronous API is provided or planned). If you must call from synchronous code, block on the returned task (e.g. client.AppendAsync(...).GetAwaiter().GetResult()); avoid doing that on a thread that has a synchronization context (e.g. UI or legacy ASP.NET) to prevent deadlocks.

Connection

csharp
using UmaDb.Csharp;

// No TLS
using var client = UmaClient.Connect("localhost", 50051);

// TLS + API key
using var client = UmaClient.Connect("localhost", 50051, caCert: "certs/ca.pem", apiKey: "your-api-key");

Use one client per process. Performance best practices with gRPC (Microsoft Learn):

  • Reuse channels: "A gRPC channel should be reused when making gRPC calls. Reusing a channel allows calls to be multiplexed through an existing HTTP/2 connection."

  • Cost of creating a new channel per call: "If a new channel is created for each gRPC call then the amount of time it takes to complete can increase significantly. Each call will require multiple network round-trips between the client and the server to create a new HTTP/2 connection: 1. Opening a socket 2. Establishing TCP connection 3. Negotiating TLS 4. Starting HTTP/2 connection 5. Making the gRPC call."

  • Sharing and concurrency: "Channels are safe to share and reuse between gRPC calls." "A channel and clients created from the channel can safely be used by multiple threads." "Clients created from the channel can make multiple simultaneous calls."

DI (recommended): register as singleton; host disposes on shutdown.

csharp
// store config as you wish, for example create UmaDbOptions class and use IOptions
// Config: "UmaDb": { "Host", "Port", "CaCert", "ApiKey" }
builder.Services.Configure<UmaDbOptions>(builder.Configuration.GetSection("UmaDb"));
builder.Services.AddSingleton<UmaClient>(sp =>
{
    var o = sp.GetRequiredService<IOptions<UmaDbOptions>>().Value;
    return UmaClient.Connect(o.Host, o.Port, o.CaCert, o.ApiKey);
});

Without DI: create at startup, dispose in shutdown path.

Concepts

  • Query — A filter over the log. Built with UmaFilter.Where(types, tags) and .Or(...). Each query item matches events whose type is in types (or any if empty) and whose tags include all of tags (or any if empty). Multiple items are combined with OR (an event matches if any item matches).
  • Append conditionfailIfMatch + after. The append fails if the store contains any event matching the query after position after. Use the same query you used to read and the head from that read as after; then no one else can have written matching events in between.
  • TrackingUmaTrackingInfo(Source, Position). Records “I’ve processed up to this position on this upstream.” Stored atomically with the events you append. Positions must be strictly increasing per source.

Recipes

1. Append and read

csharp
using UmaDb.Csharp;
using UmaDb.Csharp.Messages;
using System.Text.Json;

using var client = UmaClient.Connect("localhost", 50051);

// Your event (e.g. record)
public record OrderCreated(Guid OrderId, decimal Amount);

var payload = new OrderCreated(Guid.NewGuid(), 100.32m);
var evt = new UmaEvent(
    nameof(OrderCreated),
    JsonSerializer.SerializeToUtf8Bytes(payload),
    [$"order-{payload.OrderId}"]
);

var res = await client.AppendAsync([evt]);

var filter = UmaFilter.Where(types: [nameof(OrderCreated)], tags: [$"order-{payload.OrderId}"]);
var (events, head) = await client.ReadListAsync(filter);

2. Consistency boundary (read–decide–append)

Same query for read and for the append condition; use the head from the read as after:

csharp
var tag = $"order-{orderId}";
var filter = UmaFilter.Where(types: [nameof(OrderCreated), nameof(OrderShipped)], tags: [tag]);

// Read → build decision model
var (events, head) = await client.ReadListAsync(filter);
foreach (var evt in events)
    Apply(evt);  // your logic
var after = head;

// Append with condition: fail if anything matching filter was written after `after`
var newEvt = new UmaEvent(nameof(OrderShipped), data, [tag]);
try
{
    await client.AppendAsync([newEvt], failIfMatch: filter, after: after);
}
catch (UmaDbException.IntegrityException)
{
    // Concurrent write — reload and retry
}

3. Projections (subscribe)

Run in a worker (e.g. BackgroundService), not in HTTP pipeline:

csharp
public class OrderProjectionService : BackgroundService
{
    private readonly UmaClient _client;
    private readonly IProjectionStore _store;

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        using var sub = _client.Subscribe(
            UmaFilter.Where(types: [nameof(OrderCreated), nameof(OrderShipped)]),
            evt => _store.Upsert(evt),  // idempotent
            stoppingToken
        );
        await Task.Delay(Timeout.Infinite, stoppingToken);
    }
}

For full control over the stream, use ReadAsync with filter.WithOptions(o => o.Subscribe = true).

4. Upstream tracking (exactly-once)

Record the upstream position atomically with the events produced from it:

csharp
var source = "upstream-orders";
var last = await client.GetTrackingInfoAsync(source);
var next = (last ?? 0) + 1;

// ... process upstream event at next, produce local events ...

await client.AppendAsync(
    events: [localEvent],
    trackingInfo: new UmaTrackingInfo(source, next)
);

Recording a position that is not greater than the last one throws UmaDbException.IntegrityException.

5. Idempotent append (event Id)

Set UmaEvent.Id (e.g. Guid). Retrying the same append returns the same commit position.

csharp
var evt = new UmaEvent("OrderCreated", data, [tag], id: Guid.NewGuid());
// ... read to get `after` for your boundary ...
var r1 = await client.AppendAsync([evt], failIfMatch: filter, after: after);
var r2 = await client.AppendAsync([evt], failIfMatch: filter, after: after);
// r1.Position == r2.Position

Example: full flow

One narrative: read → conditional append → conflict → idempotent retry.

csharp
using var client = UmaClient.Connect("localhost", 50051);

var tag = "order-123";
var filter = UmaFilter.Where(types: ["OrderCreated", "OrderShipped"], tags: [tag]);

// Read, get head
var (events, head) = await client.ReadListAsync(filter);
foreach (var evt in events) Apply(evt);
var after = head;

// Append with condition
var evt = new UmaEvent("OrderShipped", data, [tag], id: Guid.NewGuid());
var pos = await client.AppendAsync([evt], failIfMatch: filter, after: after);

// Concurrent write: same condition + after would throw IntegrityException → reload and retry.

// Idempotent retry with same event Id returns same position
var pos2 = await client.AppendAsync([evt], failIfMatch: filter, after: after);
// pos.Position == pos2.Position

API reference

UmaClient

MethodPurpose
Connect(host, port, caCert?, apiKey?)Build client. TLS when caCert is set. Reuse instance.
AppendAsync(events, failIfMatch?, after?, trackingInfo?, ct)Append; returns AppendResponse.Position. Throws IntegrityException when condition fails.
ReadListAsync(filter | query, ct)Returns (Events, Head) tuple.
ReadAsync(filter | query, ct)IAsyncEnumerable<UmaReadBatch>. Each batch: Events, Head.
Subscribe(filter, onEvent, ct)Background subscription; returns IDisposable. Handle exceptions in onEvent.
GetHeadAsync(ct)Last position or null.
GetTrackingInfoAsync(source, ct)Last tracked position for source, or null.

UmaFilter

  • UmaFilter.All — match all.
  • UmaFilter.Where(types: ["A","B"], tags: ["x"]) — types OR’d, tags AND’d per item.
  • .Or(types?, tags?) — add another OR clause.
  • .WithOptions(o => { o.FromPosition = n; o.Limit = n; o.BatchSize = n; o.Backwards = true; o.Subscribe = true; }) — read options.

When to use: FromPosition / Limit for resuming or paging; Subscribe for live projections; Backwards to read from the end.

Core types

  • UmaEvent(EventType, Data (bytes), Tags?, Id?) — event to append or read.
  • SequencedUmaEvent(Position, Event) — read result.
  • UmaReadBatch(Events, Head?) — batch and last known position.
  • UmaTrackingInfo(Source, Position) — upstream checkpoint.
  • AppendResponsePosition (commit position).

Exceptions

UmaDbException and derived: AuthenticationException, IntegrityException, CorruptionException, SerializationException, InternalException, IoException.

Released under the MIT License.