Is it possible to filter specific rabbitmq queues on metricbeat while indexing into Elasticsearch?

I’m using the RabbitMq module under metricbeat to index my topics and queues to Elasticsearch.

The rabbitmq queue metricset defines a few indexes such as: rabbitmq.queue.name, rabbitmq.queue.messages.total.count and so on, Is there a way to limit specific queues such that only the required queues index into ES without having to push all the topics, exchanges and queues?

This is the rabbitmq.yml file under the modules.d folder in Metricbeat:

metricbeat.modules:
- module: rabbitmq
  metricsets: ["node", "queue", "connection", "exchange"]
  enabled: true
  period: 10s
  hosts: ["localhost:15672"]
  #username: guest
  #password: guest

architecture – Message queues alternative in mobile development

please let me know if this StackExchange isn’t the place for this and I will move the question.

I’m trying to write a mobile app that will have a ‘notification’ aspect. I have web development experience so my references will include that 🙂

A “post” gets into my backend API/database for which there will be subscribers (app users), I want to notify those subscribers of the post, my app would then fetch the post and store it locally on the device. If this was a web application/service, I would use a Message Queue like RabbitMQ with consumers subscribing to an exchange, when a new event gets fired (e.g. “new-post:123”) relevant consumers would get the message and fetch “https://domain.com/post/123” and do whatever they want with that. Although post 123 would still stay in the database on the API side, the consumer wouldn’t need to fetch that post again. So consumer doesn’t need to poll or list all posts to check if there is any new posts.. etc.

What is the approach taken in mobile development for such a need? I don’t think I can rely on push notification services like OneSignal to take the place of message queues as push notifications don’t seem to be reliable at all! Notifications get lost when mobile is offline for longer periods, or are delayed. Message queues on the other hand are very reliable; messages are persisted in exchanges/routes until they’re consumed.

8 – How can I have a single QueueWorker run for multiple Queues?

I have a site in which I associate multiple Config Entities with a single Queue ie. example_queue (using the default QueueInterface).

My Config Entities produce records that are processed as queue items via that Queue.

Each of these Config Entities have the ability to enable or disable the processing of their records via that queue, so for that I am utilising the SuspendQueueException which I only throw when I know that the items from Config Entity A are disabled for processing.

Because I am using a single Queue, this now also suspends the queue processing for records from Config Entity B which is enabled.

I was thinking of either just getting rid of the SuspendQueueException, but that would then needlessly fail my queue items for Config Entity A, which I already know are failing, OR I would have a Queue per Config Entity. But then I would need to define a QueueWorker class per Queue, I think? Which could become a problem if I have let’s say 1,000 Config Entities, each needing such an individual QueueWorker class?

How should I address this best? Or how can I have 1 QueueWorker utilise multiple Queues that I know need the same processing logic?

This is my ExampleQueueWorker class:

namespace DrupalexamplePluginQueueWorker;

use DrupalCorePluginContainerFactoryPluginInterface;
use DrupalCoreQueueQueueWorkerBase;
use DrupalCoreQueueSuspendQueueException;

/**
 * Process a queue of items.
 *
 * @QueueWorker(
 *   id = "example_queue",
 *   title = @Translation("Example Queue worker"),
 *   cron = {"time" = 60}
 * )
 */
class ExampleQueueWorker extends QueueWorkerBase implements ContainerFactoryPluginInterface {

  /**
   * The maximum amount of times a single queue item is allowed for retries.
   */
  public const MAX_RETRY_THRESHOLD = 5;

  ...

  /**
   * {@inheritdoc}
   */
  public function processItem($data) {

    // $obj can have a parent of Config Entity A, B, C, etc...
    if (!$obj = ExampleRecord::load($data->getId())) {
      $this->logger->error('Invalid item.');
      return;
    }

    try {
      // Process or halt depending on $obj parent settings.
      $this->ExampleService->post($obj, TRUE);
    }
    catch (Exception $e) {
      // Log failure.
      $this->logger->error('ID: %id, Error: %error', (
        '%id' => $obj->id(),
        '%error' => $e->getMessage(),
      ));

      // Get retry count.
      $retry_count = (int) $obj->getRetryCount ?: 0;

      // Increment and save retry count.
      $obj->setRetryCount(++$retry_count)->save();

      if ($retry_count >= self::MAX_RETRY_THRESHOLD) {
        return;
      }

      // Treat a ServiceOutageException as a reason to suspend the queue.
      if ($e instanceof ServiceOutageException) {
        throw new SuspendQueueException($e->getMessage());
      }

      // And any other Exception for default queue error handling.
      throw new Exception($e->getMessage());
    }

  }

}

algorithms – Optimal scheduling order for two queues of jobs

I have two queues of jobs and each job consists of the pair (utilization %, time to complete). I have one machine and its utilization cannot exceed 100%. The first job in a queue must be completed before the second one can be started (and so on). What is an algorithm to find the best schedule for these jobs so that the total time is minimized?

transaction – Distributed processing of a large number of queues

I have a rather basic application hosted on Kubernetes, which connects to a Mongo database.

The app has a wallet feature. A user can put money in their wallet using real-world payments (e.g. via PayPal). Each payment is registered as a transaction for that user. The money in the wallet is then used to pay for orders, which may come from different sources – Shopify, API, placed manually, etc. – at random times. Current user balance is inferred by aggregating the transactions (double-entry basically).

Consider the following scenario: a user with $100 in their wallet receives two orders at the same time, each worth $80. Obviously, only one of these orders should be placed. Unfortunately, a wallet payment is not an atomic procedure – I need to calculate the balance first and then, if it is sufficient, record a payment transaction. Even if I do this inside a database transaction, these two simultaneous orders might still think that there is enough balance, if these transactions are executed in parallel. To ensure that this does not happen I used locking. Each order will thus:

  1. place a lock on the user’s wallet so that only a single wallet payment is executed at a time;
  2. “execute” the payment by recording a transaction;
  3. place the order;
  4. unlock the wallet.

This means that all wallet payments for a single user should be processed sequentially. I feel like it would make sense to place users’ wallet payments into queues – as soon as one payment is completed (the wallet is unlocked) the next one proceeds. These would have to be per-user queues – separate users’ payments can be safely processed in parallel.

Unfortunately, I don’t know how to properly solve this. Implementing such queues in memory would be trivial but also non-resilient. I was thinking about utilising some MQ, but I have little experience and am faced with challenges:

  • it would be nice if it’s a distributed queue, which I could easily run on Kubernetes;
  • I actually need many parallel queues – one queue per user; let’s assume tens of thousands of users;
  • the load needs to be distributed evenly across the application pods. I reckon the queues ought to somehow push the payments to the application pods rather than have the pods pull messages – I don’t want to couple the pods with specific users.

My questions:

  1. Is the basic idea reasonable? Are there any obvious problems here that I don’t see?
  2. What mechanism do I need to achieve resilient evenly distributed processing of many queues in parallel? Do I need a messaging queue + load balancing or some Pub/Sub solution, or something else?

why and when is queues used in backend architectures?

since instead of a lot of users should wait a bit for the user asking for the backend to do the syncrhonous task to be performed then the user asking for it would potentially wait a long time

No one said the work on the queue was being done synchronously.

  • Many such implementations will have a thread pool performing several pieces of work in parallel.
  • Other implementations would use a Priority Queue to ensure that quicker jobs were done first
  • Even better implementations would use both with a high priority pool and low priority pool for obtaining work on.

The relevance of a queue is to ensure that requests once received are remembered till they can be processed.

Isn’t most webservers implemented with queues in the first place?

Yes but those queues are on the front end, and once your program receives the request the timer starts. You have exactly X seconds to complete processing before the web server terminates the connection. A Queue permits those requests to keep being processed beyond this limit.

Since a server can respond to multiple requests asynchronously?

Also those requests are processed very quickly (assuming a multi-threaded server) which could easily overwhelm your actual resource budget. Delaying work till later is something a queue is good for.

Queues are also useful for transferring longer running work to other back-end servers tooled for intensive processing.

Would this only be relevant if the external service uses another protocol than http then?

No queues are supremely useful data structures. They are also known as:

  • Futures
  • Rendezvous
  • Channels
  • Streams
  • Pipes

They are used everywhere.

But more specifically the network protocol has zero to do with how a server should be implemented.

Writing the contents of multiple queues to multiple files in Python

I’m writing a multiprocessing program which involves n number of processes filling m number of queues which are shared between them. The contents of each queue then needs to be written to a file corresponding to the queue and this is causing me some problems.

I didn’t think this was a very hard problem so my first approach was to simply wait until after I had joined all the processes and then iterate over the queues they had generated and write the contents of them to their corresponding files. This worked, mostly, except due to inherent limitations of how large a queue can get this failed when large queues were generated and it caused my program to hang.

The approach I’m trying at the moment is to have spawn an additional process with the sole function which continually iterates over the queues and writes any values inserted into them to their corresponding files but I’m having some troubles getting this to work as I’m not particularly experienced with multiprocessing (or Python).

This is what I’m trying at the moment which with my understanding of what each line should be doing. I’m not sure if this would be a good approach here but either way it isn’t working at the moment and I am aware it is also very bad Python 🙂

def _write_files(self, output_buffers):
    while True: # Run continuously
        for expr in self._expr_file_names: # Iterate over all the queues
            while True: # Run until we run out of queue elements or the end of queue is signaled
                if output_buffers(expr).empty(): # If there are no elements left in the queue, break and try the next one
                    break

                line = output_buffers(expr).get()

                if line is None: # None signals the end of the queue so return if we find this
                    return

                self._output_files(self._expr_file_names(expr)).write(line)

Additionally, here is the code which I wrote in my first attempt at solving this problem which ran after all the processes had been joined and worked most of the time.

def _write_buffers(self, output_buffers):
    for expr in self._expr_file_names:
        for line in iter(output_buffers(expr).get, None):
            self._output_files(self._expr_file_names(expr)).write(line)

queueing systems – Queues competing for a pool of customers

I want to compute the expected time until a a customer is served at a MMn queue which is competing for customers with two other MMn queues and one MMr queue. (If necessary n=1 for the first three queues.) Arrivals follow a Poisson process and service times have an exponential distribution. Customers may choose at which queue they want to wait.
Customers are aware of the expected waiting times at each queue, that is, waiting times at all queues are the same.

I already looked up the QueuingProcess() function, but I do not see the solution.

Any hint into the right direction is highly appreciated.

multithreading – Tkinter GUI & Socket I/O with threading.Event() instead of queues

I am currently creating a tkinter GUI for a socket connection to a multichannel analyzer.
The GUI has two frames, one with a canvas, where I plot waveform and one where I initialize the settings for the connection (range, offset, delay etc….).

Since this is an I/O task & I don’t want the GUI to freeze I am using multi-threading to read in data.

When clicking on a Start-Button, the program sends the settings to the connection & initializes it. See parts of my code below.

self.data_handler.initDataAcquisitionObject() takes about 5 seconds to complete & would block the GUI if not threaded. When finished it sets an event so that the data acquisition loop can start.

self.data_handler.getDataFromDataAcquisitionObject() reads 14.000.000 numerical values within 14 seconds and would also block if not threaded.

self.data_plotter.updateGuiFigure is the only code to interfere with the GUI. It contains a FigureCanvasTkAgg object, takes the 14.000.000 values and uses blitting and matplotlib for updating a figure on the canvas on the GUI every 14 seconds when data is ready.

I pack this in self.master.after(0, self.data_plotter.updateGuiFigure) so that the mainloop() of tkinter can update properly.

This code works perfectly fine, the GUI is not freezing at all and this program could run forever. I have researched a lot on I/O operations in tkinter though & I did not see one code example, where someone solves this threading problem with threading.Event().
All I found where workarounds with queues and regular recursive checks with something like self.master.after(500, checkQueue).

Am I using the right approach or am I missing something important here?

Examples of codes solving this with queuing can be found here:

https://medium.com/@mattia512maldini/how-to-setup-correctly-an-application-with-python-and-tkinter-107c6bc5a45

https://stupidpythonideas.blogspot.com/2013/10/why-your-gui-app-freezes.html

https://www.oreilly.com/library/view/python-cookbook/0596001673/ch09s07.html

https://www.thetopsites.net/article/54237067.shtml

https://benedictwilkinsai.github.io/post/tkinter-mp/

This is my code:

    def onStartButtonClick(self):
        #
        .
        .
        .
        #
        self.start_data_acquisition_event = threading.Event()
        self.init_settings_thread = threading.Thread(target=self.initSettings)
        self.acquire_and_plot_data_thread = threading.Thread(target=self.acquireAndPlotData)
        #
        self.init_settings_thread.start()
        self.acquire_and_plot_data_thread.start()
        #

    def initSettings(self):
        self.data_handler.setInitSettings(self.user_settings_dict)
        self.data_handler.initDataAcquisitionObject()
        self.start_data_acquisition_event.set()

    def acquireAndPlotData(self):
        self.start_data_acquisition_event.wait()
        while self.start_data_acquisition_event.is_set():
            self.data_handler.getDataFromDataAcquisitionObject()
            self.master.after(0, self.data_plotter.updateGuiFigure)

enterprise architecture – Integrating HTTP / Webhooks with Message Queues

I’m working at a project which integrates several Applications mostly SaaS Applications. The SaaS solutions have all the possibilities to hook into the internal event system with webhooks. The webhooks give us the ability to send a message to a single system but we have to create multiple webhooks to send a single event so several systems.

My idea is to implement a message bus as a centralized middelware but the problem is that the SaaS solutions only provide an integration by http(s) and not with protocols like AMQP.

RabbitMQ for example provides the possibility to publish to a topic over http. To consume you can also use http but if the message is once consumed, the queue removes it or keeps it in the queue.

Has anybody a good solution to bridge the gap between http and aqmp? I thought about small consumer services which subscribe to a topic and then forwards the message to the RESTful API.

We try to avoid a huge enterprise service bus/iPaaS project currently. I know that this could be one of the best approaches but due to internal decisions and project time, costs and so on it’s not a possibility for the moment.

One of our requirements is to have a guaranteed delivery, so that no message will be lost.

Thanks for your suggestions.