I'm using Pika to process data from RabbitMQ.
As I seemed to run into different kind of problems I decided to write a small test application to see how I can handle disconnects.
I wrote this test app which does following:
- Connect to Broker, retry until successful
- When connected create a queue.
- Consume this queue and put result into a python Queue.Queue(0)
- Get item from Queue.Queue(0) and produce it back into the broker queue.
What I noticed were 2 issues:
- When I run my script from one host connecting to rabbitmq on another host (inside a vm) then this scripts exits on random moments without producing an error.
- When I run my script on the same host on which RabbitMQ is installed it runs fine and keeps running.
This might be explained because of network issues, packets dropped although I find the connection not really robust.
When the script runs locally on the RabbitMQ server and I kill the RabbitMQ then the script exits with error: "ERROR pika SelectConnection: Socket Error on 3: 104"
So it looks like I can't get the reconnection strategy working as it should be. Could someone have a look at the code so see what I'm doing wrong?
Thanks,
Jay
#!/bin/python
import logging
import threading
import Queue
import pika
from pika.reconnection_strategies import SimpleReconnectionStrategy
from pika.adapters import SelectConnection
import time
from threading import Lock
class Broker(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.logging = logging.getLogger(__name__)
self.to_broker = Queue.Queue(0)
self.from_broker = Queue.Queue(0)
self.parameters = pika.ConnectionParameters(host='sandbox',heartbeat=True)
self.srs = SimpleReconnectionStrategy()
self.properties = pika.BasicProperties(delivery_mode=2)
self.connection = None
while True:
try:
self.connection = SelectConnection(self.parameters, self.on_connected, reconnection_strategy=self.srs)
break
except Exception as err:
self.logging.warning('Cant connect. Reason: %s' % err)
time.sleep(1)
self.daemon=True
def run(self):
while True:
self.submitData(self.from_broker.get(block=True))
pass
def on_connected(self,connection):
connection.channel(self.on_channel_open)
def on_channel_open(self,new_channel):
self.channel = new_channel
self.channel.queue_declare(queue='sandbox', durable=True)
self.channel.basic_consume(self.processData, queue='sandbox')
def processData(self, ch, method, properties, body):
self.logging.info('Received data from broker')
self.channel.basic_ack(delivery_tag=method.delivery_tag)
self.from_broker.put(body)
def submitData(self,data):
self.logging.info('Submitting data to broker.')
self.channel.basic_publish(exchange='',
routing_key='sandbox',
body=data,
properties=self.properties)
if __name__ == '__main__':
format=('%(asctime)s %(levelname)s %(name)s %(message)s')
logging.basicConfig(level=logging.DEBUG, format=format)
broker=Broker()
broker.start()
try:
broker.connection.ioloop.start()
except Exception as err:
print err
Best Answer
The main problem with your script is that it is interacting with a single channel from both your main thread (where the ioloop is running) and the "Broker" thread (calls
submitData
in a loop). This is not safe.Also,
SimpleReconnectionStrategy
does not seem to do anything useful. It does not cause a reconnect if the connection is interrupted. I believe this is a bug in Pika: https://github.com/pika/pika/issues/120I attempted to refactor your code to make it work as I think you wanted it to, but ran into another problem. Pika does not appear to have a way to detect delivery failure, which means that data may be lost if the connection drops. This seems like such an obvious requirement! How can there be no way to detect that
basic_publish
failed? I tried all kinds of stuff including transactions andadd_on_return_callback
(all of which seemed clunky and overly complicated), but came up with nothing. If there truly is no way then pika only seems to be useful in situations that can tolerate loss of data sent to RabbitMQ, or in programs that only need to consume from RabbitMQ.This is not reliable, but for reference, here's some code that solves your multi-thread problem: