python - PyZMQ Subscriber doesn't receive messages when working with request socket -


i working pyzmq , have seems rather peculiar issue. have 2 classes wrapping sockets communication, mzlsubscriber , mzlrequester. there class contains both of them, mzllink. each of these, have tests mzlsubscribertest, mzlrequestertest, , mzlinktest. tests subscriber , requester work should, mzlinktest not receive subscriber messages.

below seems relative code, constructors 3 classes run() mzlsubscriber , tests mzlink , mzlsubscriber.

mzlink constructor:

    # host information     self.host = host     self.requestport = requestport     self.subscriberport = subscriberport      # set zmq context     self.zmq_context = zmq.context()      # set subscriber , replier     self.subscriber = mzlsubscriber(self.zmq_context, self.host, self.subscriberport)     self.requester = mzlrequester(self.zmq_context, self.host, self.requestport)      # start subscriber     self.subscriber.start() 

mzlink test:

# constants host = "localhost" req_port = 5555 sub_port = 5556  # create link link = midaszmqlink(host, req_port, sub_port)  link.close() 

mzlrequester constructor:

    # initialize class member variables     self.zmq_context = zmq_context     self.host = host     self.port = port      # set reply socket     self.socket = self.zmq_context.socket(zmq.req)      # connect socket     self.socket.connect("tcp://{0}:{1}".format(self.host, self.port)) 

mzlsubscriber constructor:

   # initialize parent process     process.__init__(self)      # store zmq context , connection host/port     self.zmq_context = zmq_context     self.host = host     self.port = port      # sockets. don't set them here because sockets not thread safe     self.socket = none      # queue store data in     # todo: make queue not overflow if events come in     self.queue = queue() 

mzlsubscriber.run():

    # parent call     process.run(self)      # set subscriber socket in thread     self.socket = self.zmq_context.socket(zmq.sub)     self.socket.setsockopt_string(zmq.subscribe, unicode())      # connect socket     self.socket.connect("tcp://{0}:{1}".format(self.host, self.port))      # while thread alive, poll data put queue     # calling mzlsubscriber.stop() automatically change     while self.is_alive():         datum = self.socket.recv()         self.queue.put(datum)      # disconnect , close socket.     #fixme: doesn't here because terminate() stops process     self.socket.disconnect("tcp://{0}:{1}".format(self.host, self.port))     self.socket.close() 

mzlsubscriber test:

# host information host = "localhost" subscriber_port = "5556"  # set zmq context zmq_context = zmq.context()  # set subscriber subscriber = mzlsubscriber(zmq_context, host, subscriber_port)  # start subscriber subscriber.start()  # stop , join subscriber subscriber.close() subscriber.join() 

the subscriber thread seems block @ datum = self.socket.recv(), makes me think issue socket creation. however, seem work when working subscriber. requester seems work in both cases. in addition, goes smoothly commenting out 2 lines dealing requester.

i apologize wall of code, can't narrow code issue coming @ point. when do, i'll remove irrelevant code. test code deals incoming data has been removed.

as bit of clarification, using python 2.7 pyzmq 14.3.1.

update: seems running mzlsubscriber in main thread rather creating process results in expected result, seems sort of thread safety. knowledge, zmq contexts thread-safe, sockets not. thought wouldn't cause issue because i'm explicitly making sure there socket each thread.

update 2: if calls setting socket in mzlsubscriber moved run() __init__, socket seems receive small portion of published message, have error:

process mzlsubscriber-1: traceback (most recent call last):   file "/system/library/frameworks/python.framework/versions/2.7/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap     self.run()   file "/users/user/repos/midas-client/client/midasclient/mzlsubscriber.py", line 45, in run     datum = self.socket.recv()   file "socket.pyx", line 628, in zmq.backend.cython.socket.socket.recv (zmq/backend/cython/socket.c:5616)   file "socket.pyx", line 662, in zmq.backend.cython.socket.socket.recv (zmq/backend/cython/socket.c:5436)   file "socket.pyx", line 139, in zmq.backend.cython.socket._recv_copy (zmq/backend/cython/socket.c:1771)   file "checkrc.pxd", line 21, in zmq.backend.cython.checkrc._check_rc (zmq/backend/cython/socket.c:6032) zmqerror: interrupted system call 

i have gotten workaround creating new zmq.context in mzlsubscriber.run(), although feel shouldn't necessary if zmq contexts thread-safe.

it seems issue using multiple zmq contexts on different processes. while pyzmq documentation states zmq context thread-safe, can assume meant python threads rather processes. quite confusing in c, zmq contexts thread safe despite running in way similar python multiprocessing.process.

the issue solved creating zmq context each process.


Comments

Popular posts from this blog

javascript - Using jquery append to add option values into a select element not working -

Android soft keyboard reverts to default keyboard on orientation change -

Rendering JButton to get the JCheckBox behavior in a JTable by using images does not update my table -