Home > Blockchain >  BlockingQueue with slow producer and fast consumers
BlockingQueue with slow producer and fast consumers

Time:11-13

I'm writing a Java command line application that scrapes a website and downloads video files. The video files range in size from a few megs to 20 GB or more. This means downloading a file can take as little as a few seconds to as much as a few hours. I've decided to implement a produce/consumer pattern to handle the scraping and downloading of files. A producer thread scrapes the site and retrieves the links to the video files and puts those links into an object and puts that object into an unbounded blocking queue. There are N consumer threads that handle the download. They retrieve the objects containing the URLs from the blocking queue and each thread downloads the file. The object that the producer puts on the queue contains the URL along with some other information that the consumer will need to save the file to the correct location in local storage. Before a file is downloaded, the consumer thread first checks if the file already exists in local storage. If the file exists, the download is skipped and the next object is pulled from the queue. If a consumer experiences a problem while downloading a file (connection reset, etc.), the consumer puts the object containing the URL into a separate queue for failed requests and sleeps for 15 minutes. This allows the application to deal with temporary network interruptions. While the producer is active, it checks the failed URLs queue and removes those URLs from that queue and puts them back into the main queue.

After implementing this initial design, I quickly realized that I had a problem. Because I'm using a blocking queue and the worker threads are polling without a timeout, once the producer was finished, it couldn't just complete its execution because it needed to hang around to put failed URLs back into the queue. My first attempt at a solution was to remove the second "failure" queue and have workers put failed URLs back into the main queue. This meant that the application now had N consumers and N 1 producers. This approach would allow the main producer thread to just exit when it was finished because it didn't have to worry about putting failed requests back into the queue. Once that problem was solved, there was still another problem. The problem of notifying the worker threads that they could exit once the queue was empty. A blocking queue has no mechanism for the producer to signal that it won't be putting more data to the queue. I thought about having the consumers poll the queue with a timeout and have the primary producer set some sort of flag when it exits. When a consumer times out, it checks the flag. If the flag is set, the consumer exits, if not set, it polls the queue again. While this approach will work, I don't like the design. I don't like the idea of having threads sitting around unnecessarily and I hate even more the use of a magic flag. The only interaction between producer and consumers should be via the queue. The consumers have no knowledge of the producer and checking a magic flag breaks that principle.

I ditched the blocking queue and decided to use a regular non-blocking queue. To prevent the worker threads from exiting as soon as they started, I used a CyclicBarrier. When a worker thread starts, it waits at the barrier before polling the queue. Meanwhile, the producer thread was coded to lower the barrier once the queue contained 10 x N URLs. Once the barrier was lowered, the worker threads would begin processing the Queue. This approach quickly failed because in some cases the consumers would consume the queue faster than the producer could replenish it. This happens in cases where a large number of files are already stored on disk so the consumers don't need to download anything. Once the queue was empty, the consumers exited, even though the producer was still scraping the site looking for URLs.

This tells me that I need to use a blocking queue. I'm continuing to try to find a clean, elegant solution that doesn't depend on timeouts and magic flags. I would love to hear your approach to solving this problem given the requirements.

CodePudding user response:

My approach would be to use framework with back-pressure mechanism support, for example vert.x reactive streams.

Good examples of systems handling back-pressure built on vert.x can be found in the book vert.x in action

CodePudding user response:

As I understand it, if you want the consumer thread to exit (which means that nothing new can be produced), the producer needs to notify the consumer, either by setting a flag or interrupt the consumer. The queue itself could not make such a notification, because it does not know when production will stop, only the producer knows.

  • Related