Concrete stores
Equinox stores expose a source
package to wire reactions up. In the previous
section we created a handler
. Source packages interact with
Sink
s rather than handler functions so we must first construct a sink wrapping
the handler.
import { StreamsSink } from "@equinox-js/propeller"
const sink = StreamsSink.create({
handler,
// How many streams are we OK to process concurrently?
maxConcurrentStreams: 10,
// How many batches the feed will buffer before waiting for the sink to catch up
// (checkpointing is always sequential in order of age)
maxReadAhead: 3,
})
- MessageDB
- DynamoStore
import { MessageDbSource, PgCheckpoints } from "@equinox-js/message-db-source"
import pg from "pg"
const checkpoints = new PgCheckpoints(new pg.Pool({ connectionString: "..." }), "public")
await checkpoints.ensureTable() // creates the checkpoints table if it doesn't exist
const pool = new pg.Pool({ connectionString: "..." })
const source = MessageDbSource.create({
// the database pool to use
pool,
// under the hood the source polls for baches of events, this controls the
// batch size
batchSize: 500,
// list of categories to subscribe to.
categories: [Message.CATEGORY],
// Consumer group name (used for checkpointing and tracing)
groupName: "MessageNotifications",
// the checkpointer maintains checkpoints on per category per group basis
checkpoints,
// Once we've processed all events in the store, how long should we wait
// before requesting a new batch? In this case we want close to real time so
// will poll after 100ms
tailSleepIntervalMs: 100,
sink,
})
const ctrl = new AbortController()
process.on("SIGINT", () => ctrl.abort())
process.on("SIGTERM", () => ctrl.abort())
await source.start(ctrl.signal)
Consumer groups
The MessageDB source supports consumer groups. You can use this functionality
by passing consumerGroupMember
and consumerGroupSize
to the source's
create
. When passed two things differ from normal operation:
- The checkpointer will checkpoint to
{groupName}-{member}
- MessageDB will filter messages based on a hash of the stream id
This means that each consumer in the group maintains a distinct checkpoint and is therefore not easily autoscaled. In order to expand or contract a consumer group you will need to wipe the checkpoints and have all of the consumers in the group start from the current lowest checkpoint.
Using a hash of the ID guarantees that all messages for a particular stream will be handled in the correct order by the same consumer instance.
import { DynamoDB } from "@aws-sdk/client-dynamodb"
import { DynamoStoreSource, DynamoCheckpoints, LoadMode } from "@equinox-js/dynamo-store-source"
// prettier-ignore
import { DynamoStoreClient, DynamoStoreContext, QueryOptions, TipOptions } from "@equinox-js/dynamo-store"
const ddb = new DynamoDB()
// This context will be used to read the index
const indexContext = new DynamoStoreContext({
client: new DynamoStoreClient(ddb),
tableName: process.env.INDEX_TABLE_NAME || "events_index",
tip: TipOptions.create({}),
query: QueryOptions.create({}),
})
// This context will be used to hydrate event bodies
const eventsContext = new DynamoStoreContext({
client: new DynamoStoreClient(ddb),
tableName: process.env.TABLE_NAME || "events",
tip: TipOptions.create({}),
query: QueryOptions.create({}),
})
const checkpoints = DynamoCheckpoints.create(
context,
config.cache,
// store an event at most every 5 minutes
// the state will always be updated, but an event does not need to be written
// every time
300,
)
const source = DynamoStoreSource.create({
// A dynamo store context wired up to the index table
context: indexContext,
// While each epoch can contain up to 1,000,000 events that'd be a poor batch
// size for checkpointing purposes. Instead the source splits the epoch into
// batches of this size to facilitate checkpointing
batchSizeCutoff: 500,
// list of categories to subscribe to.
categories: [Message.CATEGORY],
// Consumer group name (used for checkpointing and tracing)
groupName: "MessageNotifications",
// the checkpointer maintains checkpoints on per category per group basis
checkpoints,
// Once we've processed all events in the store, how long should we wait before requesting a new batch?
// In this case we want close to real time so will poll after 100ms
tailSleepIntervalMs: 1000,
maxReadAhead: 3,
// The index contains the stream name, the type of the event, and the index of
// the event in the stream. If this is all the information necessary for
// processing we can use `LoadMode.IndexOnly()`. In this case the data is
// necessary so we use `WithData` with a load concurrency of 10. The load
// concurrency means we'll be making at most 10 concurrent stream load
// requests to Dynamo
mode: LoadMode.WithData(10, eventsContext),
sink,
})
const ctrl = new AbortController()
process.on("SIGINT", () => ctrl.abort())
process.on("SIGTERM", () => ctrl.abort())
await source.start(ctrl.signal)