Published at

How to batch process jobs using BullMQ with NestJS

How to batch process jobs using BullMQ with NestJS

I describe how i managed to achieve high-efficiency parallel batch processing of BullMQ jobs using NestJS CQRS

Authors
  • Name
    Joachim Bülow
    Twitter
  • Cofounder and CTO at Doubble
Sharing is caring!
Table of Contents

Introduction

Recently, I wanted to make a table for aggregating various user “signals” to help us make personalized prioritized suggestions when querying various entities.

For my work at Doubble, a social dating concept, these signals included data such as swiping, messages, looking at profiles. I.e. various actions that may be performed very often across the system.

To avoid high database contention for read-operations during these high-velocity write operations, I needed a way to batch up the signals and batch insert them into the databaseø.

The approach

When events occur in our NestJS CQRS system, I would, instead of inserting the signal into the database, simply put a Job into our Redis-based BullMQ message queue. These jobs can then be processed using special logic running on worker nodes.

In my case the process function using nestjs/bullmmq, a process function looks something like:

    /**
     * Queue processor for entity Signals
     * See further elaboration in the [Signal] entity file
     */
    @Processor(BullQueueJobs.SIGNALS, {
    concurrency: 50,
    prefix: '{signals}',
    limiter: {
        max: 2000, // maximum 2000 per second
        duration: 1000, // ms
    },
    ...(isRunningTests() && getProcessorTestOptions()),
    })
    export class SignalsProcessor extends GenericQueueWorker {
    constructor(private readonly eventbus: EventBus) {
        super();
    }

    override async process(job: Job<SignalsJobData>) {
        const { entityId, targetId, signalType, chatType } = job.data;
        // Forward to the events handler for bulk processing
        this.eventbus.publish(new SignalEvent(entityId, targetId, signalType, chatType));
    }
    }

Batching BullMQ events

The problem with open source BullMQ is that each job is process only by itself. You can process multiple jobs in batches using, the BullMQ Pro batching functionality, however, for those not particularly interested in tinkering with NPM tokens for installs, and who also think that the price tag is a bit steep, there is a way to do it manually.

The way i did it was to leverage another layer of CQRS, defining an event handler which will by run as a signular instance on the worker node processing jobs. Looking at the code, you can see that the process function is really simple. All it does, in fact, is just emit event itself, which will be picked up by the event handler.

The setup includes various layers of event abstraction, aggregating batches from the original event, into BullNQ, onto a worker node, and batched up by CQRS event handler.

With this setup, our event handler will be able to batch up signals and flush them to the Database.

Code looks like this:

/**
 * Event handler for Signal Events
 * Main responsibility is to batch up Signals and flush them to the database in periodic fashion, either by accumulating enough events or by waiting for a certain amount of time
 */
@EventsHandler(SignalEvent)
export class SignalEventHandler implements IEventHandler {
  /**
   * Batches up the signals to be processed
   */
  signals = [];

  /**
   * How many signals should be batched and processed at once
   */
  static BATCH_SIZE = 500;

  /**
   * How often the signals should be flushed in millis
   */
  static FLUSH_INTERVAL_MS = 5000;

  /**
   * Debounce timer to flush signals
   */
  private flushTimer: NodeJS.Timeout | null = null;

  constructor(
    private readonly signalService: SignalService, //
  ) {}

  async handle(event: IEvent) {
    try {
      switch (event.constructor) {
        case SignalEvent:
          if (this.flushTimer) {
            clearTimeout(this.flushTimer);
          }
          this.signals.push(event);

          if (this.signals.length >= SignalEventHandler.BATCH_SIZE) {
            await this.flushSignals();
          } else {
            this.flushTimer = setTimeout(async () => {
              try {
                await this.flushSignals();
              } catch (error) {
                captureException(error);
              }
            }, SignalEventHandler.FLUSH_INTERVAL_MS);
          }
          break;
        default:
          break;
      }
    } catch (error) {
      captureException(error);
    }
  }

  /**
   * Flushes the accumulated Signals to the database and resets the state
   */
  async flushSignals() {
    if (this.signals.length) {
      const signalsToProcess = [...this.signals];
      this.signals = [];
      // Flush the signals to the database
      await this.signalService.saveSignalsFromEvents(signalsToProcess);
    }
  }
}

We even use the BullMQ options to limit the number of jobs processed per second, to avoid overloading the database. This allows us to maintain observability if backpressure is applied, by having jobs stuck in the Redis queue

Considerations

So this approach is mainly suitable if you can tolerate losing data when nodes die / when you make new deployments. Keeping signals in memory like this means you either have to wait for a flush during a shutdown signal, or you will have to persists them some other way. E.g. in Redis.

Conclusion

We have a very scalable setup which will handle high-velocity writes to the database, without hindering reads at high scale. Perfect if you need to scale your services to hundreds of thousands of users.

Sharing is caring!