mirror of
https://github.com/bitcoin/bitcoin.git
synced 2025-03-05 14:06:27 -05:00
zmq test: dedup message reception handling in ZMQSubscriber
This commit is contained in:
parent
d202054675
commit
6014d6e1b5
1 changed files with 6 additions and 7 deletions
|
@ -33,7 +33,8 @@ class ZMQSubscriber:
|
||||||
|
|
||||||
self.socket.setsockopt(zmq.SUBSCRIBE, self.topic)
|
self.socket.setsockopt(zmq.SUBSCRIBE, self.topic)
|
||||||
|
|
||||||
def receive(self):
|
# Receive message from publisher and verify that topic and sequence match
|
||||||
|
def _receive_from_publisher_and_check(self):
|
||||||
topic, body, seq = self.socket.recv_multipart()
|
topic, body, seq = self.socket.recv_multipart()
|
||||||
# Topic should match the subscriber topic.
|
# Topic should match the subscriber topic.
|
||||||
assert_equal(topic, self.topic)
|
assert_equal(topic, self.topic)
|
||||||
|
@ -42,13 +43,11 @@ class ZMQSubscriber:
|
||||||
self.sequence += 1
|
self.sequence += 1
|
||||||
return body
|
return body
|
||||||
|
|
||||||
|
def receive(self):
|
||||||
|
return self._receive_from_publisher_and_check()
|
||||||
|
|
||||||
def receive_sequence(self):
|
def receive_sequence(self):
|
||||||
topic, body, seq = self.socket.recv_multipart()
|
body = self._receive_from_publisher_and_check()
|
||||||
# Topic should match the subscriber topic.
|
|
||||||
assert_equal(topic, self.topic)
|
|
||||||
# Sequence should be incremental.
|
|
||||||
assert_equal(struct.unpack('<I', seq)[-1], self.sequence)
|
|
||||||
self.sequence += 1
|
|
||||||
hash = body[:32].hex()
|
hash = body[:32].hex()
|
||||||
label = chr(body[32])
|
label = chr(body[32])
|
||||||
mempool_sequence = None if len(body) != 32+1+8 else struct.unpack("<Q", body[32+1:])[0]
|
mempool_sequence = None if len(body) != 32+1+8 else struct.unpack("<Q", body[32+1:])[0]
|
||||||
|
|
Loading…
Add table
Reference in a new issue