Skip to main content

Concrete stores

Equinox stores expose a source package to wire reactions up. In the previous section we created a handler. Source packages interact with Sinks rather than handler functions so we must first construct a sink wrapping the handler.

import { StreamsSink } from "@equinox-js/propeller"
const sink = StreamsSink.create({
handler,
// How many streams are we OK to process concurrently?
maxConcurrentStreams: 10,
// How many batches the feed will buffer before waiting for the sink to catch up
// (checkpointing is always sequential in order of age)
maxReadAhead: 3,
})
import { MessageDbSource, PgCheckpoints } from "@equinox-js/message-db-source"
import pg from "pg"

const checkpoints = new PgCheckpoints(new pg.Pool({ connectionString: "..." }), "public")
await checkpoints.ensureTable() // creates the checkpoints table if it doesn't exist

const pool = new pg.Pool({ connectionString: "..." })

const source = MessageDbSource.create({
// the database pool to use
pool,
// under the hood the source polls for baches of events, this controls the
// batch size
batchSize: 500,
// list of categories to subscribe to.
categories: [Message.CATEGORY],
// Consumer group name (used for checkpointing and tracing)
groupName: "MessageNotifications",
// the checkpointer maintains checkpoints on per category per group basis
checkpoints,
// Once we've processed all events in the store, how long should we wait
// before requesting a new batch? In this case we want close to real time so
// will poll after 100ms
tailSleepIntervalMs: 100,
sink,
})

const ctrl = new AbortController()

process.on("SIGINT", () => ctrl.abort())
process.on("SIGTERM", () => ctrl.abort())

await source.start(ctrl.signal)

Consumer groups

The MessageDB source supports consumer groups. You can use this functionality by passing consumerGroupMember and consumerGroupSize to the source's create. When passed two things differ from normal operation:

  1. The checkpointer will checkpoint to {groupName}-{member}
  2. MessageDB will filter messages based on a hash of the stream id

This means that each consumer in the group maintains a distinct checkpoint and is therefore not easily autoscaled. In order to expand or contract a consumer group you will need to wipe the checkpoints and have all of the consumers in the group start from the current lowest checkpoint.

Using a hash of the ID guarantees that all messages for a particular stream will be handled in the correct order by the same consumer instance.