finally, by reading some documents, it is found that if intensive operations are involved, it is not wise to choose python in the first place. The design of Queue also does not take into account a large amount of traffic processing. If there is no answer to the question, it is most likely that the question itself is wrong.
for cpython, operations that consume CPU resources should use multiple processes instead of multithreading.
in this case, there are two places to do "shunting", one is, as you mentioned, to allocate after receiving the data, and the other is to make use of the message framework itself.
take RabbitMQ as an example. You can refer to this tutorial Work Queues, python.html" rel=" nofollow noreferrer "> https://www.rabbitmq.com/tuto.
.
other things such as "finding ways to save money", "flexible work pool", etc., might as well put down the function first, and then optimize for the bottleneck, so as to get twice the result with half the effort.
on optimizing python performance, there is an article that you can refer to https://pypy.org/performance.
.
if you decide that it is computationally intensive, it is really not suitable to use multithreading in python, but you can consider using multiprocess. You don't need to create your own queue for internal diversion, even if you need a Queue, you need to limit the traffic of the Queue by setting the size of the Queue.
take rabbitmq as an example, see python.html" rel=" nofollow noreferrer "> https://www.rabbitmq.com/tuto.
in the official example of rabbitmq, pika is used as the client of rabbitmq. The message model should be consistent with yours. Modify the official work.py example slightly to consume messages by setting up multiple rabbitmq clients:
-sharp!/usr/bin/env python
import pika
import time
from concurrent.futures import ProcessPoolExecutor
-sharp from concurrent.futures import ThreadPoolExecutor
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channels = [
(1, connection.channel(1)),
(2, connection.channel(2)),
(3, connection.channel(3)),
]
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
time.sleep(0.2)
print(" [x] Done: %s" % ch.channel_number)
ch.basic_ack(delivery_tag=method.delivery_tag)
for _, channel in channels:
channel.queue_declare(queue='task_queue', durable=True)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue='task_queue')
def start_consumer(channel_name):
dict(channels).get(channel_name).start_consuming()
print(' [*] Waiting for messages. To exit press CTRL+C')
with ProcessPoolExecutor(max_workers=len(channels)) as e:
e.map(start_consumer, range(1, 4))
-sharp with ThreadPoolExecutor(max_workers=len(channels)) as e:
-sharp e.map(start_consumer, range(1, 4))
flexible creation of worker I think it is difficult to implement from inside the program (worker.py), but it is easier to implement from outside the program. First, monitor the traffic. If the traffic increases, you can accelerate the consumption of messages by starting more worker.py scripts; on the contrary, reduce the number of worker.py launches.