python - PyZMQ Subscriber doesn't receive messages when working with request socket -
i working pyzmq , have seems rather peculiar issue. have 2 classes wrapping sockets communication, mzlsubscriber , mzlrequester. there class contains both of them, mzllink. each of these, have tests mzlsubscribertest, mzlrequestertest, , mzlinktest. tests subscriber , requester work should, mzlinktest not receive subscriber messages.
below seems relative code, constructors 3 classes run() mzlsubscriber , tests mzlink , mzlsubscriber.
mzlink constructor:
# host information self.host = host self.requestport = requestport self.subscriberport = subscriberport # set zmq context self.zmq_context = zmq.context() # set subscriber , replier self.subscriber = mzlsubscriber(self.zmq_context, self.host, self.subscriberport) self.requester = mzlrequester(self.zmq_context, self.host, self.requestport) # start subscriber self.subscriber.start() mzlink test:
# constants host = "localhost" req_port = 5555 sub_port = 5556 # create link link = midaszmqlink(host, req_port, sub_port) link.close() mzlrequester constructor:
# initialize class member variables self.zmq_context = zmq_context self.host = host self.port = port # set reply socket self.socket = self.zmq_context.socket(zmq.req) # connect socket self.socket.connect("tcp://{0}:{1}".format(self.host, self.port)) mzlsubscriber constructor:
# initialize parent process process.__init__(self) # store zmq context , connection host/port self.zmq_context = zmq_context self.host = host self.port = port # sockets. don't set them here because sockets not thread safe self.socket = none # queue store data in # todo: make queue not overflow if events come in self.queue = queue() mzlsubscriber.run():
# parent call process.run(self) # set subscriber socket in thread self.socket = self.zmq_context.socket(zmq.sub) self.socket.setsockopt_string(zmq.subscribe, unicode()) # connect socket self.socket.connect("tcp://{0}:{1}".format(self.host, self.port)) # while thread alive, poll data put queue # calling mzlsubscriber.stop() automatically change while self.is_alive(): datum = self.socket.recv() self.queue.put(datum) # disconnect , close socket. #fixme: doesn't here because terminate() stops process self.socket.disconnect("tcp://{0}:{1}".format(self.host, self.port)) self.socket.close() mzlsubscriber test:
# host information host = "localhost" subscriber_port = "5556" # set zmq context zmq_context = zmq.context() # set subscriber subscriber = mzlsubscriber(zmq_context, host, subscriber_port) # start subscriber subscriber.start() # stop , join subscriber subscriber.close() subscriber.join() the subscriber thread seems block @ datum = self.socket.recv(), makes me think issue socket creation. however, seem work when working subscriber. requester seems work in both cases. in addition, goes smoothly commenting out 2 lines dealing requester.
i apologize wall of code, can't narrow code issue coming @ point. when do, i'll remove irrelevant code. test code deals incoming data has been removed.
as bit of clarification, using python 2.7 pyzmq 14.3.1.
update: seems running mzlsubscriber in main thread rather creating process results in expected result, seems sort of thread safety. knowledge, zmq contexts thread-safe, sockets not. thought wouldn't cause issue because i'm explicitly making sure there socket each thread.
update 2: if calls setting socket in mzlsubscriber moved run() __init__, socket seems receive small portion of published message, have error:
process mzlsubscriber-1: traceback (most recent call last): file "/system/library/frameworks/python.framework/versions/2.7/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap self.run() file "/users/user/repos/midas-client/client/midasclient/mzlsubscriber.py", line 45, in run datum = self.socket.recv() file "socket.pyx", line 628, in zmq.backend.cython.socket.socket.recv (zmq/backend/cython/socket.c:5616) file "socket.pyx", line 662, in zmq.backend.cython.socket.socket.recv (zmq/backend/cython/socket.c:5436) file "socket.pyx", line 139, in zmq.backend.cython.socket._recv_copy (zmq/backend/cython/socket.c:1771) file "checkrc.pxd", line 21, in zmq.backend.cython.checkrc._check_rc (zmq/backend/cython/socket.c:6032) zmqerror: interrupted system call i have gotten workaround creating new zmq.context in mzlsubscriber.run(), although feel shouldn't necessary if zmq contexts thread-safe.
it seems issue using multiple zmq contexts on different processes. while pyzmq documentation states zmq context thread-safe, can assume meant python threads rather processes. quite confusing in c, zmq contexts thread safe despite running in way similar python multiprocessing.process.
the issue solved creating zmq context each process.
Comments
Post a Comment