python - PyZMQ Subscriber doesn't receive messages when working with request socket -
i working pyzmq , have seems rather peculiar issue. have 2 classes wrapping sockets communication, mzlsubscriber
, mzlrequester
. there class contains both of them, mzllink
. each of these, have tests mzlsubscribertest
, mzlrequestertest
, , mzlinktest
. tests subscriber , requester work should, mzlinktest
not receive subscriber messages.
below seems relative code, constructors 3 classes run()
mzlsubscriber
, tests mzlink
, mzlsubscriber
.
mzlink
constructor:
# host information self.host = host self.requestport = requestport self.subscriberport = subscriberport # set zmq context self.zmq_context = zmq.context() # set subscriber , replier self.subscriber = mzlsubscriber(self.zmq_context, self.host, self.subscriberport) self.requester = mzlrequester(self.zmq_context, self.host, self.requestport) # start subscriber self.subscriber.start()
mzlink
test:
# constants host = "localhost" req_port = 5555 sub_port = 5556 # create link link = midaszmqlink(host, req_port, sub_port) link.close()
mzlrequester
constructor:
# initialize class member variables self.zmq_context = zmq_context self.host = host self.port = port # set reply socket self.socket = self.zmq_context.socket(zmq.req) # connect socket self.socket.connect("tcp://{0}:{1}".format(self.host, self.port))
mzlsubscriber
constructor:
# initialize parent process process.__init__(self) # store zmq context , connection host/port self.zmq_context = zmq_context self.host = host self.port = port # sockets. don't set them here because sockets not thread safe self.socket = none # queue store data in # todo: make queue not overflow if events come in self.queue = queue()
mzlsubscriber.run()
:
# parent call process.run(self) # set subscriber socket in thread self.socket = self.zmq_context.socket(zmq.sub) self.socket.setsockopt_string(zmq.subscribe, unicode()) # connect socket self.socket.connect("tcp://{0}:{1}".format(self.host, self.port)) # while thread alive, poll data put queue # calling mzlsubscriber.stop() automatically change while self.is_alive(): datum = self.socket.recv() self.queue.put(datum) # disconnect , close socket. #fixme: doesn't here because terminate() stops process self.socket.disconnect("tcp://{0}:{1}".format(self.host, self.port)) self.socket.close()
mzlsubscriber
test:
# host information host = "localhost" subscriber_port = "5556" # set zmq context zmq_context = zmq.context() # set subscriber subscriber = mzlsubscriber(zmq_context, host, subscriber_port) # start subscriber subscriber.start() # stop , join subscriber subscriber.close() subscriber.join()
the subscriber thread seems block @ datum = self.socket.recv()
, makes me think issue socket creation. however, seem work when working subscriber. requester seems work in both cases. in addition, goes smoothly commenting out 2 lines dealing requester
.
i apologize wall of code, can't narrow code issue coming @ point. when do, i'll remove irrelevant code. test code deals incoming data has been removed.
as bit of clarification, using python 2.7 pyzmq 14.3.1.
update: seems running mzlsubscriber
in main thread rather creating process
results in expected result, seems sort of thread safety. knowledge, zmq contexts thread-safe, sockets not. thought wouldn't cause issue because i'm explicitly making sure there socket each thread.
update 2: if calls setting socket in mzlsubscriber
moved run()
__init__
, socket seems receive small portion of published message, have error:
process mzlsubscriber-1: traceback (most recent call last): file "/system/library/frameworks/python.framework/versions/2.7/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap self.run() file "/users/user/repos/midas-client/client/midasclient/mzlsubscriber.py", line 45, in run datum = self.socket.recv() file "socket.pyx", line 628, in zmq.backend.cython.socket.socket.recv (zmq/backend/cython/socket.c:5616) file "socket.pyx", line 662, in zmq.backend.cython.socket.socket.recv (zmq/backend/cython/socket.c:5436) file "socket.pyx", line 139, in zmq.backend.cython.socket._recv_copy (zmq/backend/cython/socket.c:1771) file "checkrc.pxd", line 21, in zmq.backend.cython.checkrc._check_rc (zmq/backend/cython/socket.c:6032) zmqerror: interrupted system call
i have gotten workaround creating new zmq.context
in mzlsubscriber.run()
, although feel shouldn't necessary if zmq contexts thread-safe.
it seems issue using multiple zmq contexts on different processes. while pyzmq documentation states zmq context thread-safe, can assume meant python threads rather processes. quite confusing in c, zmq contexts thread safe despite running in way similar python multiprocessing.process
.
the issue solved creating zmq context each process.
Comments
Post a Comment