Skip to content

Real-World Examples

These patterns are drawn from production codebases using typed-pubsub.

Cache Clearing Topics

A common pattern is using PubSub to trigger cache invalidation across services. Define lightweight schemas for cache-related events:

typescript
const schemas = {
  clear_scorecard_cache: z.object({ reviewId: z.string() }),
  clear_memoize_cache: z.object({}),
  clear_all_scorecard_cache: z.object({}),
  calculate_part_progress: z.object({
    reviewId: z.string(),
    reference: z.string(),
  }),
} as const;

const pubsub = createTypedPubsub({
  client: new PubSub(),
  schemas,
  region: "europe-west1",
  options: {
    defaultHandlerOptions: {
      retryMaxAgeMinutes: 48 * HOUR_MINUTES,
      timeoutSeconds: 60,
    },
  },
});

Publishing a cache clear after a business operation:

typescript
export async function resetReviewStructure(reviewId: string) {
  // ... business logic ...

  await pubsub.createPublisher("clear_scorecard_cache")({ reviewId });
}

Complex Schemas with Discriminated Unions

For topics that carry different event types, use Zod discriminated unions to get precise typing based on the event type:

typescript
const schemas = {
  record_user_event: z.discriminatedUnion("type", [
    z.object({
      userId: z.string(),
      type: z.literal("loadsRequest"),
      metadata: z.object({
        flightId: z.string(),
        isPriorityRequest: z.boolean(),
        noCharge: z.boolean().optional(),
      }),
    }),
    z.object({
      userId: z.string(),
      type: z.literal("purchase"),
      metadata: z.object({
        productId: z.string(),
        amount: z.number(),
      }),
    }),
    // ... more event types
  ]),
} as const;

Handler Chaining

Handlers can publish messages to other topics, creating processing pipelines:

typescript
export const notify_for_single_request = pubsub.createHandler({
  topic: "notify_for_single_request",
  options: {
    retry: false,
  },
  handler: async (payload) => {
    const { airlineId, flightId } = payload;

    const users = await findUsersToNotify(airlineId, flightId);

    // Publish to another topic from within a handler
    await pubsub.createPublisher("send_notifications")({
      userIdTokenPairs: users,
      type: "newLoadsRequest",
      notification: {
        title: "New loads available",
        body: `Flight ${flightId} has new loads`,
      },
      data: { airlineId, flightId },
    });
  },
});

Event Recording with TTL

Record user events with an expiration time for automatic cleanup:

typescript
export const record_user_event = pubsub.createHandler({
  topic: "record_user_event",
  handler: async ({ userId, type, metadata }) => {
    const YEAR_MS = 365.25 * 24 * 60 * 60 * 1000;
    const ttlMs = YEAR_MS * 2;

    await refs.userEvents(userId).add({
      type,
      createdAt: FieldValue.serverTimestamp(),
      metadata,
      expireAt: Timestamp.fromMillis(Date.now() + ttlMs),
    });
  },
});

Event Marking with Redis

For handlers where duplicate processing would cause real harm, enable event marking with a Redis-backed implementation:

typescript
import { createClient } from "redis";

const redis = createClient({ url: process.env.REDIS_URL });

const pubsub = createTypedPubsub({
  client: new PubSub(),
  schemas,
  region: "us-central1",
  options: {
    eventMarkingFunctions: {
      isEventProcessed: async (eventId) =>
        Boolean(await redis.get(`event:${eventId}`)),
      markEventAsProcessed: async (eventId) =>
        void (await redis.set(`event:${eventId}`, "1", { EX: 86400 })),
    },
    defaultHandlerOptions: {
      markEvent: true,
      vpcConnector: "redis-connector",
      retryMaxAgeMinutes: 48 * HOUR_MINUTES,
    },
  },
});

Released under the MIT License.