Skip to main content

Anatomy of an Equinox Service

An Equinox service is composed of a few modules (namespaces).

This structure is worth calling out because it gives you a clean public API without forcing a handler framework onto the domain. In most applications the Service is the only part other modules should depend on; the rest exists to keep the domain model explicit and testable.

This is the default EquinoxJS boundary: domain logic stays close to the stream, while transports such as HTTP handlers, jobs or message consumers depend on a small service surface.

  • Stream: shows which category the service writes to and how the identity of the stream is composed (and the reverse operations for when running Reaction logic)
  • Events: shows which events the service writes
  • Fold: shows how the events are folded into state
  • Decide: shows which actions can be taken upon the state (resulting in new events being written)
  • Query: shows how we map the state to answer questions.
  • Service: the class that wraps the above into a cohesive domain service and default application boundary.
  • Config: shows how we've opted to to bind the Service to Streams in a Concrete Store (which Access Strategies to apply, if any) including important information like access strategies.
tip

Everything above Service is considered internal to the module. It is exported as a convenience for testing, but other modules should not take a dependency on anything other than the Service.

This is one of the main reasons the pattern scales well: you keep the domain logic decomposed without leaking storage and stream details across the codebase.

If you prefer command handlers, you can still derive them trivially from the service methods. EquinoxJS keeps the more reusable boundary as the default.

Let's see what each of these modules looks like for a simple service for checking in and out for an Appointment Booking.

The Stream

import { StreamId, StreamName, Uuid } from "@equinox-js/core"

export type AppointmentId = Uuid.Uuid<"AppointmentId">
export const AppointmentId = Uuid.create<"AppointmentId">()
export type UserId = Uuid.Uuid<"UserId">
export const UserId = Uuid.create<"UserId">()

export namespace Stream {
export const category = "AppointmentActuals"
export const streamId = StreamId.gen(AppointmentId.toString, UserId.toString)
export const decodeId = StreamId.dec(AppointmentId.parse, UserId.parse)
export const tryMatch = StreamName.tryMatch(category, decodeId)
}

The Events

import * as z from "zod"
import { Codec } from "@equinox-js/core"

export namespace Events {
// prettier-ignore
const date = z.string().datetime().transform((x) => new Date(x))
export const Timestamp = z.object({ timestamp: date })
export type Timestamp = z.infer<typeof Timestamp>
export const ActualsOverridden = z.object({ checkedIn: date, checkedOut: date })
export type ActualsOverridden = z.infer<typeof ActualsOverridden>

export type Event =
| { type: "CheckedIn"; data: Timestamp }
| { type: "CheckedOut"; data: Timestamp }
| { type: "ActualsOverridden"; data: ActualsOverridden }

export const codec = Codec.upcast<Event>(
Codec.json(),
Codec.Upcast.body({
CheckedIn: Timestamp.parse,
CheckedOut: Timestamp.parse,
ActualsOverridden: ActualsOverridden.parse,
}),
)
}

The Fold

export namespace Fold {
import Event = Events.Event
export type State = { checkedIn?: Date; checkedOut?: Date }
export const initial: State = {}
export const evolve = (state: State, event: Event) => {
switch (event.type) {
case "CheckedIn":
return { ...state, checkedIn: event.data.timestamp }
case "CheckedOut":
return { ...state, checkedOut: event.data.timestamp }
case "ActualsOverridden":
return event.data
}
}
export const fold = (state: State, events: Event[]) => events.reduce(evolve, state)
}

The Decide

export namespace Decide {
import Event = Events.Event
import State = Fold.State
export const checkIn =
(timestamp: Date) =>
(state: State): Event[] => {
// Check if already checked in with a different timestamp
// The caller is expected to guard against this
if (state.checkedIn && +state.checkedIn !== +timestamp) {
throw new Error("Already checked in with different timestamp")
}

// Silent idempotent handling: We accept retried requests with identical timestamps.
// If checked in with the same timestamp, no changes are required.
if (state.checkedIn && +state.checkedIn === +timestamp) return []

return [{ type: "CheckedIn", data: { timestamp } }]
}
export const checkOut =
(timestamp: Date) =>
(state: State): Event[] => {
if (state.checkedOut && +state.checkedOut !== +timestamp) {
throw new Error("Already checked out with different timestamp")
}
if (state.checkedOut && +state.checkedOut === +timestamp) return []
return [{ type: "CheckedOut", data: { timestamp } }]
}

const isSameDate = (a: Date, b?: Date) => b && +a === +b

export const manuallyOverride =
(checkedIn: Date, checkedOut: Date) =>
(state: State): Event[] => {
if (isSameDate(checkedIn, state?.checkedIn) && isSameDate(checkedOut, state?.checkedOut))
return []
return [{ type: "ActualsOverridden", data: { checkedIn, checkedOut } }]
}
}

The Queries

import { ISyncContext } from "@equinox-js/core"
export namespace Query {
type Status =
| { type: "not-started"; version: bigint }
| { type: "in-progress"; version: bigint; startedAt: Date }
| { type: "complete"; version: bigint; startedAt: Date; durationMs: number }
export const status = ({ state, version }: ISyncContext<Fold.State>): Status => {
if (state.checkedIn && state.checkedOut) {
return {
type: "complete",
version,
startedAt: state.checkedIn,
durationMs: +state.checkedOut - +state.checkedIn,
}
}
if (state.checkedIn) return { type: "in-progress", version, startedAt: state.checkedIn }
return { type: "not-started", version }
}
}

The Service

import { Decider, LoadOption } from "@equinox-js/core"
export class Service {
constructor(
private readonly resolve: (
appointmentId: AppointmentId,
userId: UserId,
) => Decider<Events.Event, Fold.State>,
) {}

checkIn(appointmentId: AppointmentId, userId: UserId, timestamp: Date) {
const decider = this.resolve(appointmentId, userId)
return decider.transact(Decide.checkIn(timestamp), LoadOption.AssumeEmpty)
}

checkOut(appointmentId: AppointmentId, userId: UserId, timestamp: Date) {
const decider = this.resolve(appointmentId, userId)
return decider.transact(Decide.checkOut(timestamp))
}

manuallyOverride(
appointmentId: AppointmentId,
userId: UserId,
checkedIn: Date,
checkedOut: Date,
) {
const decider = this.resolve(appointmentId, userId)
return decider.transact(Decide.manuallyOverride(checkedIn, checkedOut))
}

readStatus(appointmentId: AppointmentId, userId: UserId) {
const decider = this.resolve(appointmentId, userId)
return decider.queryEx(Query.status)
}

/** In many cases we do not need to fetch missing events because we can accept a stale value from the cache.
* We prefer making the staleness explicit in the method name */
readStatusStale(appointmentId: AppointmentId, userId: UserId) {
const decider = this.resolve(appointmentId, userId)
return decider.queryEx(Query.status, LoadOption.AnyCachedValue)
}
}

The Config

See the ../equinox-config.ts snippet

Most projects will be well served by copying this snippet and removing stores that are not in use. That is, have a binding for memory store and a single concrete store.

import { ICodec, ICache, CachingStrategy, Codec } from "@equinox-js/core"
import { MemoryStoreCategory, VolatileStore } from "@equinox-js/memory-store"
import * as MessageDB from "@equinox-js/message-db"
import * as DynamoDB from "@equinox-js/dynamo-store"

export enum Store {
Memory,
MessageDb,
Dynamo,
}

export type Config =
| { store: Store.Memory; context: VolatileStore<string> }
| { store: Store.MessageDb; context: MessageDB.MessageDbContext; cache: ICache }
| { store: Store.Dynamo; context: DynamoDB.DynamoStoreContext; cache: ICache }

// prettier-ignore
export namespace MessageDb {
import AccessStrategy = MessageDB.AccessStrategy
import Category = MessageDB.MessageDbCategory
import Context = MessageDB.MessageDbContext
type Config = { context: Context; cache: ICache }

export function createCached<E, S, C>(name: string, codec: ICodec<E, string, C>, fold: (s: S, e: E[]) => S, initial: S, access: AccessStrategy<E, S>, { context, cache }: Config) {
const caching = CachingStrategy.Cache(cache)
return Category.create(context, name, codec, fold, initial, caching, access);
}
export function createUnoptimized<E, S, C>(name: string, codec: ICodec<E, string, C>, fold: (s: S, e: E[]) => S, initial: S, config: Config) {
const access = AccessStrategy.Unoptimized<E, S>()
return MessageDb.createCached(name, codec, fold, initial, access, config)
}
export function createSnapshotted<E, S, C>(name: string, codec: ICodec<E, string, C>, fold: (s: S, e: E[]) => S, initial: S, eventName: string, toSnapshot: (s: S) => E, config: Config) {
const access = AccessStrategy.AdjacentSnapshots(eventName, toSnapshot)
return MessageDb.createCached(name, codec, fold, initial, access, config)
}
export function createLatestKnown<E, S, C>(name: string, codec: ICodec<E, string, C>, fold: (s: S, e: E[]) => S, initial: S, config: Config) {
const access = AccessStrategy.LatestKnownEvent<E, S>()
return MessageDb.createCached(name, codec, fold, initial, access, config)
}
}

// prettier-ignore
export namespace Dynamo {
import AccessStrategy = DynamoDB.AccessStrategy
import Category = DynamoDB.DynamoStoreCategory
import Context = DynamoDB.DynamoStoreContext
type Config = { context: Context; cache: ICache }
export function createCached<E, S, C>(name: string, codec_: ICodec<E, string, C>, fold: (s: S, e: E[]) => S, initial: S, access: AccessStrategy<E, S>, { context, cache }: Config) {
const caching = CachingStrategy.Cache(cache)
const codec = Codec.compress(codec_)
return Category.create(context, name, codec, fold, initial, caching, access);
}
export function createUnoptimized<E, S, C>(name: string, codec: ICodec<E, string, C>, fold: (s: S, e: E[]) => S, initial: S, config: Config) {
const access = AccessStrategy.Unoptimized()
return Dynamo.createCached(name, codec, fold, initial, access, config)
}
export function createSnapshotted<E,S,C>(name: string, codec: ICodec<E, string, C>, fold: (s: S, e: E[]) => S, initial: S, isOrigin: (e: E) => boolean, toSnapshot: (s: S) => E, config: Config) {
const access = AccessStrategy.Snapshot(isOrigin, toSnapshot)
return Dynamo.createCached(name, codec, fold, initial, access, config)
}
export function createRollingState<E,S,C>(name: string, codec: ICodec<E, string, C>, fold: (s: S, e: E[]) => S, initial: S, toSnapshot: (s: S) => E, config: Config) {
const access = AccessStrategy.RollingState(toSnapshot)
return Dynamo.createCached(name, codec, fold, initial, access, config)
}
export function createLatestKnown<E,S,C>(name: string, codec: ICodec<E, string, C>, fold: (s: S, e: E[]) => S, initial: S, config: Config) {
const access = AccessStrategy.LatestKnownEvent()
return Dynamo.createCached(name, codec, fold, initial, access, config)
}
}

// prettier-ignore
export namespace MemoryStore {
export function create<E, S, C>(name: string, codec: ICodec<E, string, C>, fold: (s: S, e: E[]) => S, initial: S, { context: store }: { context: VolatileStore<string> }) {
return MemoryStoreCategory.create(store, name, codec, fold, initial)
}
}
import * as Config from "../equinox-config"
class Service {
// ... as above

// prettier-ignore
static resolveCategory(config: Config.Config) {
switch (config.store) {
case Config.Store.Memory:
return Config.MemoryStore.create(Stream.category, Events.codec, Fold.fold, Fold.initial, config)
case Config.Store.MessageDB:
return Config.MessageDB.createUnoptimized(Stream.category, Events.codec, Fold.fold, Fold.initial, config)
case Config.Store.Dynamo:
return Config.Dynamo.createUnoptimized(Stream.category, Events.codec, Fold.fold, Fold.initial, config)
}
}
static create(config: Config.Config) {
const category = Service.resolveCategory(config)
const resolve = (appointmentId: AppointmentId, userId: UserId) =>
Decider.forStream(category, Stream.streamId(appointmentId, userId), null)
return new Service(resolve)
}
}