Skip to content

Rust Clients

The official Rust client for UmaDB is available on crates.io.

The Rust crate umadb-client provides both asynchronous and synchronous clients for reading and appending events in UmaDB via the UmaDB gRPC API.

The synchronous client functions effectively as a wrapper around the asynchronous client.

The Rust clients implement the same traits and types used internally in the UmaDB server, and so effectively represent remotely the essential internal server operations, with gRPC used as a transport layer for inter-process communication (IPC).

The client methods, and DCB object types, are described below, followed by some examples.

struct UmaDCBClient

Config and connection builder, which constructs synchronous and async client instances.

Fields

NameTypeDescription
urlStringDatabase URL
ca_pathOption<String>Path to server certificate (default None)
batch_sizeOption<u32>Optional hint for how many events to buffer per batch when reading events (default None).

Examples

The examples below use the UmaDCBClient builder to construct synchronous and asynchronous clients.

rust
use umadb_client::UmaDBClient;

fn main() -> Result<(), Box<dyn std::error::Error>> {
   // Synchronous client without TLS (insecure connection)
   let client = UmaDBClient::new("http://localhost:50051".to_string()).connect()?;

   // Synchronous client with TLS (secure connection)
   let client = UmaDBClient::new("https://example.com:50051".to_string()).connect()?;
  
   // Synchronous client with TLS (self-signed server certificate)
   let client = UmaDBClient::new("https://localhost:50051".to_string()).ca_path("server.pem".to_string()).connect()?;
  
   // Asynchronous client without TLS (insecure connection)
   let client = UmaDBClient::new("http://localhost:50051".to_string()).connect_async().await?;

   // Asynchronous client with TLS (secure connection)
   let client = UmaDBClient::new("https://example.com:50051".to_string()).connect_async().await?;

   // Asynchronous client with TLS (self-signed server certificate)
   let client = UmaDBClient::new("https://localhost:50051".to_string()).ca_path("server.pem".to_string()).connect_async().await?;

fn new()

Returns a new UmaDCBClient config object with the optional ca_path and batch_size fields set to None.

Arguments:

ParameterTypeDescription
urlStringDatabase URL, for example: "http://localhost:50051".to_string() for an UmaDB server running without TLS or "https://localhost:50051".to_string() for an UmaDB server running with TLS enabled

If the required url argument has protocol "https" or "grpcs", then a secure gRPC channel will created when connect() or connect_async() is called. In this case, if the server's root certificate is not installed locally, then the path to a file containing the certificate must be provided by calling ca_path().

fn ca_path()

Returns a copy of the UmaDCBClient config object with the optional ca_path field set to Some(String).

Arguments:

ParameterTypeDescription
ca_pathStringPath to PEM-encoded server root certificate, for example: "server.pem".to_string()

fn batch_size()

Returns a copy of the UmaDCBClient config object with the optional batch_size field set to a Some(u32).

Arguments:

ParameterTypeDescription
batch_sizeu32Hint for how many events to buffer per batch when reading events, for example: 100

This value can modestly affect latency and throughput. If unset, a sensible default value will be used by the server. The server will also cap this value at a reasonable level.

fn connect()

Returns an instance of SyncUmaDbClient, the synchronous UmaDB client.

async fn connect_async()

Returns an instance of AsyncUmaDbClient, the asynchronous UmaDB client.

struct SyncUmaDCBClient

The synchronous UmaDB client.

Example

Here's an example of how to use the synchronous Rust client for UmaDB:

rust
use umadb_client::UmaDBClient;
use umadb_dcb::{
    DCBAppendCondition, DCBError, DCBEvent, DCBEventStoreSync, DCBQuery, DCBQueryItem,
};
use uuid::Uuid;

fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Connect to the gRPC server
    let url = "http://localhost:50051".to_string();
    let client = UmaDBClient::new(url).connect()?;

    // Define a consistency boundary
    let boundary = DCBQuery::new().item(
        DCBQueryItem::new()
            .types(["example"])
            .tags(["tag1", "tag2"]),
    );

    // Read events for a decision model
    let mut read_response = client.read(Some(boundary.clone()), None, false, None, false)?;

    // Build decision model
    while let Some(result) = read_response.next() {
        match result {
            Ok(event) => {
                println!(
                    "Got event at position {}: {:?}",
                    event.position, event.event
                );
            }
            Err(status) => panic!("gRPC stream error: {}", status),
        }
    }

    // Remember the last-known position
    let last_known_position = read_response.head().unwrap();
    println!("Last known position is: {:?}", last_known_position);

    // Produce new event
    let event = DCBEvent::default()
        .event_type("example")
        .tags(["tag1", "tag2"])
        .data(b"Hello, world!")
        .uuid(Uuid::new_v4());

    // Append event in consistency boundary
    let append_condition = DCBAppendCondition::new(boundary.clone()).after(last_known_position);
    let position1 = client.append(vec![event.clone()], Some(append_condition.clone()))?;

    println!("Appended event at position: {}", position1);

    // Append conflicting event - expect an error
    let conflicting_event = DCBEvent::default()
        .event_type("example")
        .tags(["tag1", "tag2"])
        .data(b"Hello, world!")
        .uuid(Uuid::new_v4()); // different UUID

    let conflicting_result = client.append(vec![conflicting_event], Some(append_condition.clone()));

    // Expect an integrity error
    match conflicting_result {
        Err(DCBError::IntegrityError(integrity_error)) => {
            println!("Conflicting event was rejected: {:?}", integrity_error);
        }
        other => panic!("Expected IntegrityError, got {:?}", other),
    }

    // Appending with identical event IDs and append condition is idempotent.
    println!(
        "Retrying to append event at position: {:?}",
        last_known_position
    );
    let position2 = client.append(vec![event.clone()], Some(append_condition.clone()))?;

    if position1 == position2 {
        println!("Append method returned same commit position: {}", position2);
    } else {
        panic!("Expected idempotent retry!")
    }

    // Subscribe to all events for a projection
    let mut subscription = client.read(None, None, false, None, true)?;

    // Build an up-to-date view
    while let Some(result) = subscription.next() {
        match result {
            Ok(ev) => {
                println!("Processing event at {}: {:?}", ev.position, ev.event);
                if ev.position == position2 {
                    println!("Projection has processed new event!");
                    break;
                }
            }
            Err(status) => panic!("gRPC stream error: {}", status),
        }
    }

    Ok(())
}

fn read()

Reads events from the event store, optionally with filters, sequence number, limit, and live subscription support.

This method can be used both for constructing decision models in a domain layer, and for projecting events into materialized views in CQRS.

Arguments:

ParameterTypeDescription
queryOption<DCBQuery>Optional structured query to filter events (by tags, event types, etc).
startOption<u64>Read events from this sequence number. Only events with positions greater than or equal will be returned (or less than or equal if backwards is true.
backwardsboolIf true events will be read backwards, either from the given position or from the last recorded event.
limitOption<u32>Optional cap on the number of events to retrieve.
subscribeboolIf true, keeps the stream open to deliver future events as they arrive.

Returns a SyncReadResponse instance from which DCBSequencedEvent instances, and the most relevant "last known" sequence number, can be obtained.

fn append()

Appends new events to the store atomically, with optional optimistic concurrency conditions.

Writes one or more events to the event log in order. This method is idempotent for events that have UUIDs.

Arguments:

ParameterTypeDescription
eventsVec<DCBEvent>The list of events to append. Each includes an event type, tags, and data payload.
conditionOption<DCBAppendCondition>Optional append condition (e.g. After(u64)) to ensure no conflicting writes occur.

Returns the sequence number (u64) of the last successfully appended event from this operation.

This value can be used to wait for downstream event-processing components in a CQRS system to become up-to-date.

fn head()

Returns the sequence number (u64) of the very last successfully appended event in the database.

struct AsyncUmaDCBClient

The asynchronous UmaDB client.

Example

Here's an example of how to use the asynchronous Rust client for UmaDB:

rust
use futures::StreamExt;
use umadb_client::UmaDBClient;
use umadb_dcb::{
    DCBAppendCondition, DCBError, DCBEvent, DCBEventStoreAsync, DCBQuery, DCBQueryItem,
};
use uuid::Uuid;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Connect to the gRPC server
    let url = "http://localhost:50051".to_string();
    let client = UmaDBClient::new(url).connect_async().await?;

    // Define a consistency boundary
    let boundary = DCBQuery::new().item(
        DCBQueryItem::new()
            .types(["example"])
            .tags(["tag1", "tag2"]),
    );

    // Read events for a decision model
    let mut read_response = client
        .read(Some(boundary.clone()), None, false, None, false)
        .await?;

    // Build decision model
    while let Some(result) = read_response.next().await {
        match result {
            Ok(event) => {
                println!(
                    "Got event at position {}: {:?}",
                    event.position, event.event
                );
            }
            Err(status) => panic!("gRPC stream error: {}", status),
        }
    }

    // Remember the last-known position
    let last_known_position = read_response.head().await?;
    println!("Last known position is: {:?}", last_known_position);

    // Produce new event
    let event = DCBEvent::default()
        .event_type("example")
        .tags(["tag1", "tag2"])
        .data(b"Hello, world!")
        .uuid(Uuid::new_v4());

    // Append event in consistency boundary
    let condition = DCBAppendCondition::new(boundary.clone()).after(last_known_position);
    let position1 = client
        .append(vec![event.clone()], Some(condition.clone()))
        .await?;

    println!("Appended event at position: {}", position1);

    // Append conflicting event - expect an error
    let conflicting_event = DCBEvent::default()
        .event_type("example")
        .tags(["tag1", "tag2"])
        .data(b"Hello, world!")
        .uuid(Uuid::new_v4()); // different UUID

    let conflicting_result = client
        .append(vec![conflicting_event.clone()], Some(condition.clone()))
        .await;

    // Expect an integrity error
    match conflicting_result {
        Err(DCBError::IntegrityError(integrity_error)) => {
            println!("Conflicting event was rejected: {:?}", integrity_error);
        }
        other => panic!("Expected IntegrityError, got {:?}", other),
    }

    // Appending with identical events IDs and append conditions is idempotent.
    println!(
        "Retrying to append event at position: {:?}",
        last_known_position
    );
    let position2 = client
        .append(vec![event.clone()], Some(condition.clone()))
        .await?;

    if position1 == position2 {
        println!("Append method returned same commit position: {}", position2);
    } else {
        panic!("Expected idempotent retry!")
    }

    // Subscribe to all events for a projection
    let mut subscription = client.read(None, None, false, None, true).await?;

    // Build an up-to-date view
    while let Some(result) = subscription.next().await {
        match result {
            Ok(ev) => {
                println!("Processing event at {}: {:?}", ev.position, ev.event);
                if ev.position == position2 {
                    println!("Projection has processed new event!");
                    break;
                }
            }
            Err(status) => panic!("gRPC stream error: {}", status),
        }
    }
    Ok(())
}

async fn read()

See fn read() above.

Returns an AsyncReadResponse instance from which DCBSequencedEvent instances, and the most relevant "last known" sequence number, can be obtained.

async fn append()

See fn append() above.

async fn head()

See fn head() above.

struct DCBSequencedEvent

A recorded event with its assigned sequence number in the event store.

FieldTypeDescription
eventDCBEventThe recorded event.
positionu64The sequence number.

struct DCBEvent

Represents a single event either to be appended or already stored in the event log.

FieldTypeDescription
event_typeStringThe event’s logical type or name.
dataVec<u8>Binary payload associated with the event.
tagsVec<String>Tags assigned to the event (used for filtering and indexing).
uuidOption<Uuid>Unique event ID.

Giving events UUIDs activates idempotent support for append operations.

struct DCBQuery

A query composed of one or more DCBQueryItem filters.
An event matches the query if it matches any of the query items.

FieldTypeDescription
itemsVec<DCBQueryItem>A list of query items. Events matching any of these items are included in results.

struct DCBQueryItem

Represents a single query clause for filtering events.

FieldTypeDescription
typesVec<String>Event types to match. If empty, all event types are considered.
tagsVec<String>Tags that must all be present in the event for it to match.

struct DCBAppendCondition

Conditions that must be satisfied before an append operation succeeds.

FieldTypeDescription
fail_if_events_matchDCBQueryIf this query matches any existing events, the append operation will fail.
afterOption<u64>Optional position constraint. If set, the append will only succeed if no events exist after this position.

enum DCBError

Represents all errors that can occur in UmaDB.

VariantDescription
Io(error)I/O or filesystem error.
IntegrityError(message)Append condition failed or data integrity violated.
Corruption(message)Corruption detected in stored data.
PageNotFound(page_id)Referenced page not found in storage.
DirtyPageNotFound(page_id)Dirty page expected in cache not found.
RootIDMismatch(old_id, new_id)Mismatch between stored and computed root page IDs.
DatabaseCorrupted(message)Database file corrupted or invalid.
InternalError(message)Unexpected internal logic error.
SerializationError(message)Failure to serialize data to bytes.
DeserializationError(message)Failure to parse serialized data.
PageAlreadyFreed(page_id)Attempted to free a page that was already freed.
PageAlreadyDirty(page_id)Attempted to mark a page dirty that was already dirty.
TransportError(message)Client-server connection failed.

type DCBResult<T>

A convenience alias for results returned by the methods:

rust
type DCBResult<T> = Result<T, DCBError>;

All the client methods return this type, which yields either a successful result T or a DCBError.

Released under the MIT License.