Python Client for UmaDB
The Python package umadb is available on PyPI and provides a synchronous client for reading and appending events to UmaDB over its gRPC API using Rust-powered bindings via PyO3.
It is adapted into the Python eventsourcing library via the eventsourcing-umadb package.
Installation
Add umadb to your Python project:
uv add "umadb"Connecting to UmaDB
Use the Client class as the main entry point for connecting to an UmaDB server.
class Client(
url: str,
ca_path: str | None = None,
api_key: str | None = None,
batch_size: int | None = None,
):
...Parameters
| Name | Type | Description |
|---|---|---|
url | str | Required connection string. If the argument starts with https or grpcs, a secure TLS channel is created; otherwise an insecure channel is used. |
ca_path | str|None | Optional path to a PEM-encoded root/CA certificate for TLS connections (useful for self-signed servers). |
api_key | str|None | Optional API key used for authenticating with the server. |
batch_size | int|None | Optional hint for how many events to buffer per batch when reading. The server may cap this; a sensible default is used if unset.. |
Connection Examples
from umadb import Client
# Without TLS (insecure connection)
client = Client("http://localhost:50051")
# With batch size hint (insecure connection)
client = Client(
url="http://example.com:50051",
batch_size=1000,
)
# With TLS (system CAs)
client = Client("https://example.com:50051")
# With TLS (system CAs) + API key
client = Client(
url="https://example.com:50051",
api_key="umadb:example-api-key-4f7c2b1d9e5f4a038c1a",
)
# With TLS (self-signed)
client = Client(
url="https://localhost:50051",
ca_path="server.pem",
)
# With TLS (self-signed) + API key
client = Client(
url="https://example.com:50051",
ca_path="server.pem",
api_key="umadb:example-api-key-4f7c2b1d9e5f4a038c1a",
)
# With TLS (self-signed) + API key + batch size hint
client = Client(
url="https://example.com:50051",
ca_path="server.pem",
api_key="umadb:example-api-key-4f7c2b1d9e5f4a038c1a",
batch_size=1000,
)Appending Events
The Client.append() method writes new events to an UmaDB server.
def append(
events: list[Event],
condition: AppendCondition | None = None,
tracking_info: TrackingInfo | None = None,
) -> int:
...The Client.append() method can be used to append new Event instances to UmaDB atomically, with an optional append condition, and optional tracking information. Events are written in order.
Conditional appends with event UUIDs are idempotent. The server does not enforce uniqueness of events IDs.
Parameters
| Name | Type | Description |
|---|---|---|
events | list[Event] | The list of events to append. Each includes an event type, tags, and data payload. |
condition | AppendCondition|None | Optional append condition to ensure no conflicting writes occur. |
tracking_info | TrackingInfo|None | Optional tracking information – for event-processing components only. |
Return Value
Returns the sequence number (int) of the last successfully appended event from this operation. This value can be used to wait for downstream event-processing components in a CQRS system to become up-to-date.
Example
import uuid
from umadb import AppendCondition, Client, Event, IntegrityError, Query, QueryItem
client = Client("http://localhost:50051")
# Define a consistency boundary (same query you use while reading)
cb = Query(items=[QueryItem(types=["example"], tags=["tag1", "tag2"])])
# Read to build decision model
read_resp = client.read(query=cb)
for r in read_resp:
print(f"Existing event at {r.position}: {r.event}")
last_known = read_resp.head()
print("Last known position:", last_known)
# Produce a new event with a UUID (for idempotent retries) and metadata
ev = Event(
event_type="example",
tags=["tag1", "tag2"],
data=b"Hello, world!",
uuid=str(uuid.uuid4()),
metadata={"source": "example", "correlation_id": str(uuid.uuid4())},
)
# Append with an optimistic condition: fail if conflicting events exist after last_known
cond = AppendCondition(fail_if_events_match=cb, after=last_known)
position1 = client.append([ev], condition=cond)
print("Appended at:", position1)
# Conflicting append should raise an error
try:
client.append(
[
Event(
event_type="example",
tags=["tag1", "tag2"],
data=b"Hello, world!",
uuid=str(uuid.uuid4()),
metadata={},
)
],
condition=cond,
)
except IntegrityError as e:
print("Conflicting event was rejected:", e)
# Idempotent retry with same event UUID and condition should return same commit position
position2 = client.append([ev], condition=cond)
assert position1 == position2
print("Idempotent retry returned position:", position2)Reading Events
The Client.read() method returns recorded events from an UmaDB server.
def read(
query: Query | None = None,
start: int | None = None,
backwards: bool = False,
limit: int | None = None,
) -> ReadResponse:
...The Client.read() method can be used both for constructing decision models in a domain layer, and for projecting events into materialized views in CQRS. An optional Query can be provided to select by tags and types.
Parameters
| Name | Type | Description |
|---|---|---|
query | Query|None | Optional structured query to filter events (by tags, event types, etc). |
start | int|None | Read events from this sequence number. Only events with positions greater than or equal will be returned (or less than or equal if backwards is True. |
backwards | bool | If True events will be read backwards, either from the given position or from the last recorded event. |
limit | int|None | Optional cap on the number of events to retrieve. |
Return Value
Returns an iterable "read response" instance from which SequencedEvent instances, and the most relevant "last known" sequence number, can be obtained. The "last known" sequence number can be obtained from the head() method on the response object.
if limit was a int, the value returned by the response's head() method will be the sequence position of the last event received from the server.
Otherwise, the value returned by the response's head() method will be the position of the last recorded event in the database when the reader transaction started.
Example
from umadb import Client, Query, QueryItem
client = Client("http://localhost:50051")
# Filter by type(s) and tag(s)
q = Query(items=[QueryItem(types=["example"], tags=["tag1", "tag2"])])
resp = client.read(
query=q, start=None, backwards=False, limit=None
)
for item in resp:
print(f"Got event at position {item.position}: {item.event}")
last_known = resp.head()
print("Last known position:", last_known)Subscribing
The Client.subscribe() method returns recorded events from an UmaDB server, and delivers new events as they arrive.
def subscribe(
query: Query | None = None,
after: int | None = None,
) -> Subscription:
...The Client.subscription() method can be used for projecting events into materialized views in CQRS. An optional Query can be provided to select by tags and types.
Parameters
| Name | Type | Description |
|---|---|---|
query | Query|None | Optional structured query to filter events (by tags, event types, etc). |
after | int|None | Receive events after this sequence number. Only events with greater positions will be received. |
Return Value
Returns an iterable "subscription" instance from which SequencedEvent instances can be obtained.
Example
from umadb import Client, Query, QueryItem
client = Client("http://localhost:50051")
# Filter by type(s) and tag(s)
q = Query(items=[QueryItem(types=["example"], tags=["tag1", "tag2"])])
# Subscribe to selected events
subscription = client.subscribe(query=q, after=None)
for se in subscription:
print("New event:", se.position, se.event)
# Break for demo purposes
breakGetting Head Position
The Client.head() method returns the position of the last event recorded in an UmaDB server.
def head(self) -> int | None: ...The Client.head() method can be used for counting the number of recorded events in the database, or for determining the position of the last recorded event when subscribing only to new events.
Return Value
Returns the position (u64) of the last recorded event in the event store, or None if no events have been recorded yet.
Getting Tracking Info
The Client.get_tracking_info() method returns the last recorded position in an upstream sequence of events.
def get_tracking_info(self, source: str) -> int | None: ...The Client.get_tracking_info() method can be used when starting or resuming an event-processing component. The event-processing component will start by requesting new events from the upstream sequence after this position. The position of an upstream event that has been processed successfully can be recorded atomically when appending new events generated by processing that event.
Parameters
| Name | Type | Description |
|---|---|---|
source | str | Upstream sequence name. |
Returns the last recorded upstream position (int), or None if the sequence name is not found.
Event
An Event represents a single event either to be appended or already stored in the event log.
| Field | Type | Description |
|---|---|---|
event_type | str | The event's logical type (e.g. "UserRegistered"). |
tags | list[str] | Tags assigned to the event (used for filtering and indexing). |
data | bytes | Binary payload associated with the event. |
uuid | str|None | Unique event ID. |
metadata | dict[str, str] | Key-value metadata attached to the event (e.g. provenance, correlation IDs). |
Idempotent support for append operations is activated by setting uuid on appended events.
The metadata field allows storing arbitrary string key-value pairs alongside events. This is useful for storing provenance information, correlation IDs, causation IDs, or other contextual data that should be preserved with the event. Metadata is stored with the event and returned unchanged when the event is read. Each metadata key and value may be up to 65535 bytes long; appending an event with a longer key or value fails with a validation error.
Include in:
- Append requests when writing new events to the store.
Included in:
SequencedEventobjects when the server responds to read requests.
Matched by:
Append Condition
An AppendCondition causes an append request to fail if events match its Query, optionally after a sequence number.
| Field | Type | Description |
|---|---|---|
fail_if_events_match | Query | Query for conflicting events. |
after | int|None | Sequence number. |
Include in:
- Append requests to define optimistic concurrent control.
To implement a consistency boundary, command handlers can use the same Query used when reading events as the value of fail_if_events_match, and the "head" sequence number received from the read response as the value of after.
Tracking Info
A TrackingInfo instance indicates the source and position of an upstream event.
| Field | Type | Description |
|---|---|---|
source | str | Upstream sequence name. |
position | int | Upstream sequence number. |
Include in:
- Append requests when recording the results of processing an upstream event.
To implement exactly-once semantics in event-processing components, pull events from an upstream source after the last recorded position, then record the upstream positions of upstream events along with new state that results from processing those events. By processing event sequentially in this way, each event will be processed at least once. And by recording tracking information along with new state, the new state will be recorded at most once. The combination of "at least once" processing and "at most once" recording gives "exactly once" semantics from the point of view of consumers of the recorded state.
Query
A Query defines criteria for selecting events in the event store.
| Field | Type | Description |
|---|---|---|
items | list[QueryItem] | A list of selection criteria (logical OR). |
An Event is selected if any QueryItem matches or the items field is empty.
Include in:
- Read requests to select events returned by the server.
- An
AppendConditionto select conflicting events.
Query Item
A QueryItem defines a criterion for matching events.
| Field | Type | Description |
|---|---|---|
types | list[str] | List of event types (logical OR). |
tags | list[str] | List of tags (logical AND). |
A QueryItem will match an Event if:
- one of its
typesmatches theEvent.event_typeor itstypesfield is empty; AND - all of its
tagsmatch one of theEvent.tagsor itstagsfield is empty.
Sequenced Event
A SequencedEvent represents a recorded Event along with its assigned sequence number.
| Field | Type | Description |
|---|---|---|
position | int | The sequence number. |
event | Event | The recorded event. |
Included in:
- Read responses when the server responds to read requests.
Error Handling
The Python client raises Python exceptions on error:
- Integrity/condition failure:
IntegrityError - Transport/connection errors:
TransportError - Authentication failures:
AuthenticationError - Invalid argument errors:
ValueError - Other internal errors:
RuntimeErrororOSError
Your application should catch these as appropriate.
Complete Example
import uuid
from umadb import AppendCondition, Client, Event, IntegrityError, Query, QueryItem
# Connect to the gRPC server (TLS + API key)
client = Client(
url="http://localhost:50051",
)
# Define a consistency boundary
cb = Query(items=[QueryItem(types=["example"], tags=["tag1", "tag2"])])
# Read events for a decision model
read_response = client.read(query=cb)
for result in read_response:
print(f"Got event at position {result.position}: {result.event}")
# Remember the last-known position
last_known_position = read_response.head()
print("Last known position is:", last_known_position)
# Create an event with a UUID to enable idempotent append and metadata
event = Event(
event_type="example",
tags=["tag1", "tag2"],
data=b"Hello, world!",
uuid=str(uuid.uuid4()),
metadata={"source": "example", "correlation_id": str(uuid.uuid4())},
)
# Append event within the consistency boundary
condition = AppendCondition(fail_if_events_match=cb, after=last_known_position)
commit_position1 = client.append([event], condition=condition)
print("Appended event at position:", commit_position1)
# Append conflicting event — expect an error
try:
conflicting_event = Event(
event_type="example",
tags=["tag1", "tag2"],
data=b"Hello, world!",
uuid=str(uuid.uuid4()), # different UUID
metadata={},
)
client.append([conflicting_event], condition=condition)
except IntegrityError as e:
print("Conflicting event was rejected:", e)
# Idempotent retry — same event ID and condition
print("Retrying to append event at position:", last_known_position)
commit_position2 = client.append([event], condition=condition)
assert commit_position1 == commit_position2
print("Append returned same commit position:", commit_position2)Example with Tracking
import uuid
from umadb import (
AppendCondition,
Client,
Event,
IntegrityError,
Query,
QueryItem,
TrackingInfo,
)
# Connect to the gRPC server (TLS + API key)
client = Client(
url="http://localhost:50051",
)
# Get last processed upstream event position
last_processed_position = client.get_tracking_info("upstream")
# Pull next unprocessed upstream event...
next_upstream_event_position = 1 + (last_processed_position or 0)
# Construct tracking information from next unprocessed event
tracking_info = TrackingInfo("upstream", next_upstream_event_position)
# Define a consistency boundary
cb = Query(items=[QueryItem(types=["example"], tags=["tag1", "tag2"])])
# Read events for a decision model
read_response = client.read(query=cb)
for result in read_response:
print(f"Got event at position {result.position}: {result.event}")
# Remember the last-known position
last_known_position = read_response.head()
print("Last known position is:", last_known_position)
# Create an event with a UUID to enable idempotent append and metadata
event = Event(
event_type="example",
tags=["tag1", "tag2"],
data=b"Hello, world!",
uuid=str(uuid.uuid4()),
metadata={"source": "tracking_example", "correlation_id": str(uuid.uuid4())},
)
# Append event within the consistency boundary
condition = AppendCondition(fail_if_events_match=cb, after=last_known_position)
commit_position1 = client.append(
[event], condition=condition, tracking_info=tracking_info
)
print("Appended event at position:", commit_position1)
# Idempotent retry — same event ID and condition
print("Retrying to append event at position:", last_known_position)
commit_position2 = client.append(
[event], condition=condition, tracking_info=tracking_info
)
assert commit_position1 == commit_position2
print("Append returned same commit position:", commit_position2)
# Check tracking information
assert tracking_info.position == client.get_tracking_info("upstream")
# Unconditional append with conflicting tracking information — expect an error
try:
conflicting_event = Event(
event_type="example",
tags=["tag1", "tag2"],
data=b"Hello, world!",
uuid=str(uuid.uuid4()), # different UUID
metadata={},
)
client.append([conflicting_event], condition=None, tracking_info=tracking_info)
except IntegrityError as e:
print("Conflicting event was rejected:", e)Notes
- Python client is synchronous and blocking; if you need async integration, run client calls in a thread pool or use an async worker that offloads to threads.
- Event data is binary (
bytes). Use a consistent serialization (e.g., JSON serialized to UTF-8 bytes, protobuf, msgpack) for your domain. - API keys must match the server configuration.
- For TLS with self-signed certs, pass
ca_pathwith your root/CA certificate.
