Skip to main content

PG Projections

In the previous section, we explored the creation of read models via raw SQL queries, a method which, while effective, can be complicated and error-prone.
@equinox-js/projection-pg provides a higher level abstraction for creating event sourced projections. With it you can express your projections as a function Event -> Change[] where Change is an Insert, Update, Delete or Upsert. By representing changes this way we can intelligently collapse changesets into a single change, this is because in FP terms the change is a semigroup.

The Payer read model from the previous section could be written as:

import { ITimelineEvent, StreamName } from "@equinox-js/core"
import { Pool } from "pg"
import { PayerId } from "@domain/identifiers"
import * as Payer from "@domain/payer"
import { forEntity, Change, createProjection } from "@equinox-js/projection-pg"

type Payer = { id: PayerId; name: string; email: string }

const { Delete, Upsert } = forEntity<Payer, "id">()

export const projection = { table: "payer", id: ["id"] }

function changes(stream: string, events: ITimelineEvent[]): Change[] {
const id = PayerId.parse(StreamName.parseId(stream))
const event = Payer.codec.decode(events[events.length - 1])
if (!event) return []
switch (event.type) {
case "PayerProfileUpdated":
const data = event.data
return [Upsert({ id: id, name: data.name, email: data.email })]
case "PayerDeleted":
return [Delete({ id: id })]
}
}

export const createHandler = (pool: Pool) => createProjection(projection, pool, changes)

It's worth calling out the forEntity helper there. It takes two type parameters, the entity and the keys representing the id. If the payer projection was in a multi tenant system we might've represented it as forEntity<Payer, 'id' | 'tenant_id'>. The resulting Insert, Update, Delete, and Upsert functions will allow the compiler to yell at you if you forget to supply part of the ID to Delete or Update.

To wire it up we would do

import "./tracing.js"
import { MessageDbSource, PgCheckpoints } from "@equinox-js/message-db-source"
import pg from "pg"
import { Payer } from "../domain/index.js"
import * as PayerReadModel from "../read-models/PayerReadModel.js"

const createPool = (connectionString?: string) =>
connectionString ? new pg.Pool({ connectionString, max: 10 }) : undefined

const messageDbPool = createPool(process.env.MDB_RO_CONN_STR || process.env.MDB_CONN_STR)!
const pool = createPool(process.env.CP_CONN_STR)!
const checkpointer = new PgCheckpoints(pool)
checkpointer.ensureTable().then(() => console.log("table created"))

const source = MessageDbSource.create({
pool: messageDbPool,
batchSize: 500,
categories: [Payer.CATEGORY],
groupName: "PayerReadModel",
checkpointer,
handler: PayerReadModel.createHandler(pool),
tailSleepIntervalMs: 100,
maxConcurrentStreams: 10,
})

const ctrl = new AbortController()

process.on("SIGINT", () => ctrl.abort())
process.on("SIGTERM", () => ctrl.abort())

source.start(ctrl.signal)

Version numbers

It's possible to configure the projection with a version number column. In most cases you can use the stream version as the version seamlessly. As an example let's imagine an Appointment model.

type AppointmentModel = {
id: string
title: string
start: Date
duration_ms: number
is_cancelled: boolean
version: bigint
}

const projection = {
schema: "view_models",
table: "appointment",
id: ["id"],
version: "version",
}

const { Insert, Update } = forEntity<AppointmentModel, "id" | "version">()

function change(id: AppointmentId, version: bigint, event: Appointment.Event) {
switch (event.type) {
case "AppointmentScheduled":
return Insert({
id,
version,
title: event.data.title,
start: event.data.start,
duration_ms: event.data.duration_ms,
is_cancelled: false,
})
case "AppointmentRescheduled":
return Update({ id, version, start: event.data.start })
case "AppointmentCancelled":
return Update({ id, version, is_cancelled: true })
}
}

function changes(streamName: string, events: ITimelineEvent[]): Change[] {
const id = AppointmentId.parse(StreamName.parseId(streamName))
const version = events[events.length - 1].index
return keepMap(events, Appointment.codec.decode).map((ev) => change(id, version, ev))
}

export const createHandler = (pool: Pool) => createProjection(projection, pool, changes)

The second type argument to forEntity represents the "required keys." That is, for actions that don't contain the full entity, like Deletes and Updates, these keys MUST be provided. This helps us catch some issues in the type system rather than at runtime.

When version is provided the behaviour of the projection changes slightly

  • Insert: No change
  • Update: adds where version < $version
  • Delete: adds where version < $version
  • Upsert: add where version < $version to the conflict branch of the query