8 – How can I have a single QueueWorker run for multiple Queues?

I have a site in which I associate multiple Config Entities with a single Queue ie. example_queue (using the default QueueInterface).

My Config Entities produce records that are processed as queue items via that Queue.

Each of these Config Entities have the ability to enable or disable the processing of their records via that queue, so for that I am utilising the SuspendQueueException which I only throw when I know that the items from Config Entity A are disabled for processing.

Because I am using a single Queue, this now also suspends the queue processing for records from Config Entity B which is enabled.

I was thinking of either just getting rid of the SuspendQueueException, but that would then needlessly fail my queue items for Config Entity A, which I already know are failing, OR I would have a Queue per Config Entity. But then I would need to define a QueueWorker class per Queue, I think? Which could become a problem if I have let’s say 1,000 Config Entities, each needing such an individual QueueWorker class?

How should I address this best? Or how can I have 1 QueueWorker utilise multiple Queues that I know need the same processing logic?

This is my ExampleQueueWorker class:

namespace DrupalexamplePluginQueueWorker;

use DrupalCorePluginContainerFactoryPluginInterface;
use DrupalCoreQueueQueueWorkerBase;
use DrupalCoreQueueSuspendQueueException;

/**
 * Process a queue of items.
 *
 * @QueueWorker(
 *   id = "example_queue",
 *   title = @Translation("Example Queue worker"),
 *   cron = {"time" = 60}
 * )
 */
class ExampleQueueWorker extends QueueWorkerBase implements ContainerFactoryPluginInterface {

  /**
   * The maximum amount of times a single queue item is allowed for retries.
   */
  public const MAX_RETRY_THRESHOLD = 5;

  ...

  /**
   * {@inheritdoc}
   */
  public function processItem($data) {

    // $obj can have a parent of Config Entity A, B, C, etc...
    if (!$obj = ExampleRecord::load($data->getId())) {
      $this->logger->error('Invalid item.');
      return;
    }

    try {
      // Process or halt depending on $obj parent settings.
      $this->ExampleService->post($obj, TRUE);
    }
    catch (Exception $e) {
      // Log failure.
      $this->logger->error('ID: %id, Error: %error', (
        '%id' => $obj->id(),
        '%error' => $e->getMessage(),
      ));

      // Get retry count.
      $retry_count = (int) $obj->getRetryCount ?: 0;

      // Increment and save retry count.
      $obj->setRetryCount(++$retry_count)->save();

      if ($retry_count >= self::MAX_RETRY_THRESHOLD) {
        return;
      }

      // Treat a ServiceOutageException as a reason to suspend the queue.
      if ($e instanceof ServiceOutageException) {
        throw new SuspendQueueException($e->getMessage());
      }

      // And any other Exception for default queue error handling.
      throw new Exception($e->getMessage());
    }

  }

}

8 – Cron processes the elements via Queueworker, how to deal with after the end of the queuing queue

I have a cron job that calls a worker in line to process the items. In the cron function, I would like to perform some extra operations on the data once the items have been processed. How can I know that the queue is over? Should I do a separate cron for this task?

Current logic in the cron function:

  1. (CRON) There is a content type for which an (updated) field is used to track whether the node is updated or not. Therefore, for all nodes of this content type, set the updated field to FALSE.
  2. (CRON) Retrieve the data from an external table, for each line, in QueueWorker to process.
  3. (QUEUEWORKER) In QueueWorker, check if there is already a node for this line. Otherwise, create a new node. If the node exists, update it. For both, set the updated field to TRUE and save the node.
  4. (CRON) Once all the lines have been processed in the queue, go back to get all the nodes of the content type, look for all those for which the updated field DO NOT been set to TRUE and delete them.

Because steps 3 is a queue process, the items are treated as a separate "thread". This means that item # 4 is completed before items are actually processed. My question is: how should I run at step 4 AFTER all lines have been processed in QueueWorker? Should I create a separate cron job for node deletion?