2021-12-30 19:36:57 +02:00
|
|
|
// Copyright (c) 2015-2021 The Bitcoin Core developers
|
2014-11-18 12:06:32 -05:00
|
|
|
// Distributed under the MIT software license, see the accompanying
|
|
|
|
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
|
|
|
|
|
2018-07-17 12:51:23 +02:00
|
|
|
#include <zmq/zmqpublishnotifier.h>
|
|
|
|
|
2017-11-10 13:57:53 +13:00
|
|
|
#include <chain.h>
|
|
|
|
#include <chainparams.h>
|
2021-05-26 19:18:58 +02:00
|
|
|
#include <netbase.h>
|
2021-04-02 20:42:05 +02:00
|
|
|
#include <node/blockstorage.h>
|
2018-07-17 12:51:23 +02:00
|
|
|
#include <rpc/server.h>
|
2017-11-10 13:57:53 +13:00
|
|
|
#include <streams.h>
|
2018-10-22 15:51:11 -07:00
|
|
|
#include <util/system.h>
|
2021-04-02 20:42:05 +02:00
|
|
|
#include <validation.h> // For cs_main
|
2018-07-17 12:51:23 +02:00
|
|
|
#include <zmq/zmqutil.h>
|
2014-11-18 12:06:32 -05:00
|
|
|
|
2020-09-01 09:40:13 +02:00
|
|
|
#include <zmq.h>
|
|
|
|
|
|
|
|
#include <cstdarg>
|
|
|
|
#include <cstddef>
|
|
|
|
#include <map>
|
2020-11-28 18:30:32 +01:00
|
|
|
#include <optional>
|
2020-09-01 09:40:13 +02:00
|
|
|
#include <string>
|
|
|
|
#include <utility>
|
|
|
|
|
2021-11-12 10:06:00 -05:00
|
|
|
using node::ReadBlockFromDisk;
|
|
|
|
|
2014-11-18 12:06:32 -05:00
|
|
|
static std::multimap<std::string, CZMQAbstractPublishNotifier*> mapPublishNotifiers;
|
|
|
|
|
2016-03-29 11:34:25 +02:00
|
|
|
static const char *MSG_HASHBLOCK = "hashblock";
|
|
|
|
static const char *MSG_HASHTX = "hashtx";
|
|
|
|
static const char *MSG_RAWBLOCK = "rawblock";
|
|
|
|
static const char *MSG_RAWTX = "rawtx";
|
Add 'sequence' zmq publisher to track all block (dis)connects, mempool deltas
Using the zmq notifications to avoid excessive mempool polling can be difficult
given the current notifications available. It announces all transactions
being added to mempool or included in blocks, but announces no evictions
and gives no indication if the transaction is in the mempool or a block.
Block notifications for zmq are also substandard, in that it only announces
block tips, while all block transactions are still announced.
This commit adds a unified stream which can be used to closely track mempool:
1) getrawmempool to fill out mempool knowledge
2) if txhash is announced, add or remove from set
based on add/remove flag
3) if blockhash is announced, get block txn list,
remove from those transactions local view of mempool
4) if we drop a sequence number, go to (1)
The mempool sequence number starts at the value 1, and
increments each time a transaction enters the mempool,
or is evicted from the mempool for any reason, including
block inclusion. The mempool sequence number is published
via ZMQ for any transaction-related notification.
These features allow for ZMQ/RPC consumer to track mempool
state in a more exacting way, without unnecesarily polling
getrawmempool. See interface_zmq.py::test_mempool_sync for
example usage.
2020-09-04 11:55:58 -04:00
|
|
|
static const char *MSG_SEQUENCE = "sequence";
|
2016-03-29 11:34:25 +02:00
|
|
|
|
2014-11-18 12:06:32 -05:00
|
|
|
// Internal function to send multipart message
|
|
|
|
static int zmq_send_multipart(void *sock, const void* data, size_t size, ...)
|
|
|
|
{
|
|
|
|
va_list args;
|
|
|
|
va_start(args, size);
|
|
|
|
|
|
|
|
while (1)
|
|
|
|
{
|
|
|
|
zmq_msg_t msg;
|
|
|
|
|
|
|
|
int rc = zmq_msg_init_size(&msg, size);
|
|
|
|
if (rc != 0)
|
|
|
|
{
|
|
|
|
zmqError("Unable to initialize ZMQ msg");
|
2017-03-22 12:56:44 -07:00
|
|
|
va_end(args);
|
2014-11-18 12:06:32 -05:00
|
|
|
return -1;
|
|
|
|
}
|
|
|
|
|
|
|
|
void *buf = zmq_msg_data(&msg);
|
|
|
|
memcpy(buf, data, size);
|
|
|
|
|
|
|
|
data = va_arg(args, const void*);
|
|
|
|
|
|
|
|
rc = zmq_msg_send(&msg, sock, data ? ZMQ_SNDMORE : 0);
|
|
|
|
if (rc == -1)
|
|
|
|
{
|
|
|
|
zmqError("Unable to send ZMQ msg");
|
|
|
|
zmq_msg_close(&msg);
|
2017-03-22 12:56:44 -07:00
|
|
|
va_end(args);
|
2014-11-18 12:06:32 -05:00
|
|
|
return -1;
|
|
|
|
}
|
|
|
|
|
|
|
|
zmq_msg_close(&msg);
|
|
|
|
|
|
|
|
if (!data)
|
|
|
|
break;
|
|
|
|
|
|
|
|
size = va_arg(args, size_t);
|
|
|
|
}
|
2017-03-22 12:56:44 -07:00
|
|
|
va_end(args);
|
2014-11-18 12:06:32 -05:00
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
|
2021-05-26 19:18:58 +02:00
|
|
|
static bool IsZMQAddressIPV6(const std::string &zmq_address)
|
|
|
|
{
|
|
|
|
const std::string tcp_prefix = "tcp://";
|
|
|
|
const size_t tcp_index = zmq_address.rfind(tcp_prefix);
|
|
|
|
const size_t colon_index = zmq_address.rfind(":");
|
|
|
|
if (tcp_index == 0 && colon_index != std::string::npos) {
|
|
|
|
const std::string ip = zmq_address.substr(tcp_prefix.length(), colon_index - tcp_prefix.length());
|
|
|
|
CNetAddr addr;
|
|
|
|
LookupHost(ip, addr, false);
|
|
|
|
if (addr.IsIPv6()) return true;
|
|
|
|
}
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
2014-11-18 12:06:32 -05:00
|
|
|
bool CZMQAbstractPublishNotifier::Initialize(void *pcontext)
|
|
|
|
{
|
|
|
|
assert(!psocket);
|
|
|
|
|
|
|
|
// check if address is being used by other publish notifier
|
|
|
|
std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator i = mapPublishNotifiers.find(address);
|
|
|
|
|
|
|
|
if (i==mapPublishNotifiers.end())
|
|
|
|
{
|
|
|
|
psocket = zmq_socket(pcontext, ZMQ_PUB);
|
|
|
|
if (!psocket)
|
|
|
|
{
|
|
|
|
zmqError("Failed to create socket");
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
2018-08-24 20:42:03 -04:00
|
|
|
LogPrint(BCLog::ZMQ, "zmq: Outbound message high water mark for %s at %s is %d\n", type, address, outbound_message_high_water_mark);
|
|
|
|
|
|
|
|
int rc = zmq_setsockopt(psocket, ZMQ_SNDHWM, &outbound_message_high_water_mark, sizeof(outbound_message_high_water_mark));
|
|
|
|
if (rc != 0)
|
|
|
|
{
|
|
|
|
zmqError("Failed to set outbound message high water mark");
|
|
|
|
zmq_close(psocket);
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
2018-11-08 06:38:21 -05:00
|
|
|
const int so_keepalive_option {1};
|
|
|
|
rc = zmq_setsockopt(psocket, ZMQ_TCP_KEEPALIVE, &so_keepalive_option, sizeof(so_keepalive_option));
|
|
|
|
if (rc != 0) {
|
|
|
|
zmqError("Failed to set SO_KEEPALIVE");
|
|
|
|
zmq_close(psocket);
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
2021-05-26 19:18:58 +02:00
|
|
|
// On some systems (e.g. OpenBSD) the ZMQ_IPV6 must not be enabled, if the address to bind isn't IPv6
|
|
|
|
const int enable_ipv6 { IsZMQAddressIPV6(address) ? 1 : 0};
|
|
|
|
rc = zmq_setsockopt(psocket, ZMQ_IPV6, &enable_ipv6, sizeof(enable_ipv6));
|
|
|
|
if (rc != 0) {
|
|
|
|
zmqError("Failed to set ZMQ_IPV6");
|
|
|
|
zmq_close(psocket);
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
2018-08-24 20:42:03 -04:00
|
|
|
rc = zmq_bind(psocket, address.c_str());
|
|
|
|
if (rc != 0)
|
2014-11-18 12:06:32 -05:00
|
|
|
{
|
|
|
|
zmqError("Failed to bind address");
|
2016-02-29 13:34:09 -05:00
|
|
|
zmq_close(psocket);
|
2014-11-18 12:06:32 -05:00
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
// register this notifier for the address, so it can be reused for other publish notifier
|
|
|
|
mapPublishNotifiers.insert(std::make_pair(address, this));
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
2016-12-25 20:19:40 +00:00
|
|
|
LogPrint(BCLog::ZMQ, "zmq: Reusing socket for address %s\n", address);
|
2019-01-19 22:06:14 +08:00
|
|
|
LogPrint(BCLog::ZMQ, "zmq: Outbound message high water mark for %s at %s is %d\n", type, address, outbound_message_high_water_mark);
|
2014-11-18 12:06:32 -05:00
|
|
|
|
|
|
|
psocket = i->second->psocket;
|
|
|
|
mapPublishNotifiers.insert(std::make_pair(address, this));
|
|
|
|
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
void CZMQAbstractPublishNotifier::Shutdown()
|
|
|
|
{
|
2019-11-11 22:21:43 +00:00
|
|
|
// Early return if Initialize was not called
|
|
|
|
if (!psocket) return;
|
2014-11-18 12:06:32 -05:00
|
|
|
|
|
|
|
int count = mapPublishNotifiers.count(address);
|
|
|
|
|
|
|
|
// remove this notifier from the list of publishers using this address
|
|
|
|
typedef std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator iterator;
|
|
|
|
std::pair<iterator, iterator> iterpair = mapPublishNotifiers.equal_range(address);
|
|
|
|
|
|
|
|
for (iterator it = iterpair.first; it != iterpair.second; ++it)
|
|
|
|
{
|
|
|
|
if (it->second==this)
|
|
|
|
{
|
|
|
|
mapPublishNotifiers.erase(it);
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if (count == 1)
|
|
|
|
{
|
2018-08-24 20:42:03 -04:00
|
|
|
LogPrint(BCLog::ZMQ, "zmq: Close socket at address %s\n", address);
|
2014-11-18 12:06:32 -05:00
|
|
|
int linger = 0;
|
|
|
|
zmq_setsockopt(psocket, ZMQ_LINGER, &linger, sizeof(linger));
|
|
|
|
zmq_close(psocket);
|
|
|
|
}
|
|
|
|
|
2017-06-21 21:10:00 +02:00
|
|
|
psocket = nullptr;
|
2014-11-18 12:06:32 -05:00
|
|
|
}
|
|
|
|
|
2018-11-10 20:05:34 +00:00
|
|
|
bool CZMQAbstractPublishNotifier::SendZmqMessage(const char *command, const void* data, size_t size)
|
2016-03-29 14:30:02 +02:00
|
|
|
{
|
|
|
|
assert(psocket);
|
|
|
|
|
|
|
|
/* send three parts, command & data & a LE 4byte sequence number */
|
|
|
|
unsigned char msgseq[sizeof(uint32_t)];
|
2021-04-30 19:52:00 +02:00
|
|
|
WriteLE32(msgseq, nSequence);
|
2017-06-21 21:10:00 +02:00
|
|
|
int rc = zmq_send_multipart(psocket, command, strlen(command), data, size, msgseq, (size_t)sizeof(uint32_t), nullptr);
|
2016-03-29 14:30:02 +02:00
|
|
|
if (rc == -1)
|
|
|
|
return false;
|
|
|
|
|
|
|
|
/* increment memory only sequence number after sending */
|
|
|
|
nSequence++;
|
|
|
|
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
2015-09-16 16:42:23 +02:00
|
|
|
bool CZMQPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
|
2014-11-18 12:06:32 -05:00
|
|
|
{
|
2015-09-16 16:42:23 +02:00
|
|
|
uint256 hash = pindex->GetBlockHash();
|
2020-03-11 19:48:01 +01:00
|
|
|
LogPrint(BCLog::ZMQ, "zmq: Publish hashblock %s to %s\n", hash.GetHex(), this->address);
|
2022-01-31 17:03:54 +01:00
|
|
|
uint8_t data[32];
|
|
|
|
for (unsigned int i = 0; i < 32; i++) {
|
2014-11-18 12:06:32 -05:00
|
|
|
data[31 - i] = hash.begin()[i];
|
2022-01-31 17:03:54 +01:00
|
|
|
}
|
2018-11-10 20:05:34 +00:00
|
|
|
return SendZmqMessage(MSG_HASHBLOCK, data, 32);
|
2014-11-18 12:06:32 -05:00
|
|
|
}
|
|
|
|
|
|
|
|
bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
|
|
|
|
{
|
|
|
|
uint256 hash = transaction.GetHash();
|
2020-03-11 19:48:01 +01:00
|
|
|
LogPrint(BCLog::ZMQ, "zmq: Publish hashtx %s to %s\n", hash.GetHex(), this->address);
|
2022-01-31 17:03:54 +01:00
|
|
|
uint8_t data[32];
|
|
|
|
for (unsigned int i = 0; i < 32; i++) {
|
2014-11-18 12:06:32 -05:00
|
|
|
data[31 - i] = hash.begin()[i];
|
2022-01-31 17:03:54 +01:00
|
|
|
}
|
2018-11-10 20:05:34 +00:00
|
|
|
return SendZmqMessage(MSG_HASHTX, data, 32);
|
2014-11-18 12:06:32 -05:00
|
|
|
}
|
|
|
|
|
2015-09-16 16:42:23 +02:00
|
|
|
bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
|
2014-11-18 12:06:32 -05:00
|
|
|
{
|
2020-03-11 19:48:01 +01:00
|
|
|
LogPrint(BCLog::ZMQ, "zmq: Publish rawblock %s to %s\n", pindex->GetBlockHash().GetHex(), this->address);
|
2014-11-18 12:06:32 -05:00
|
|
|
|
2015-04-17 14:19:21 +02:00
|
|
|
const Consensus::Params& consensusParams = Params().GetConsensus();
|
2016-11-22 15:47:07 -05:00
|
|
|
CDataStream ss(SER_NETWORK, PROTOCOL_VERSION | RPCSerializationFlags());
|
2014-11-18 12:06:32 -05:00
|
|
|
{
|
|
|
|
LOCK(cs_main);
|
|
|
|
CBlock block;
|
2015-04-17 14:19:21 +02:00
|
|
|
if(!ReadBlockFromDisk(block, pindex, consensusParams))
|
2014-11-18 12:06:32 -05:00
|
|
|
{
|
|
|
|
zmqError("Can't read block from disk");
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
ss << block;
|
|
|
|
}
|
|
|
|
|
2018-11-10 20:05:34 +00:00
|
|
|
return SendZmqMessage(MSG_RAWBLOCK, &(*ss.begin()), ss.size());
|
2014-11-18 12:06:32 -05:00
|
|
|
}
|
|
|
|
|
|
|
|
bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
|
|
|
|
{
|
|
|
|
uint256 hash = transaction.GetHash();
|
2020-03-11 19:48:01 +01:00
|
|
|
LogPrint(BCLog::ZMQ, "zmq: Publish rawtx %s to %s\n", hash.GetHex(), this->address);
|
2016-11-22 15:47:07 -05:00
|
|
|
CDataStream ss(SER_NETWORK, PROTOCOL_VERSION | RPCSerializationFlags());
|
2014-11-18 12:06:32 -05:00
|
|
|
ss << transaction;
|
2018-11-10 20:05:34 +00:00
|
|
|
return SendZmqMessage(MSG_RAWTX, &(*ss.begin()), ss.size());
|
2014-11-18 12:06:32 -05:00
|
|
|
}
|
Add 'sequence' zmq publisher to track all block (dis)connects, mempool deltas
Using the zmq notifications to avoid excessive mempool polling can be difficult
given the current notifications available. It announces all transactions
being added to mempool or included in blocks, but announces no evictions
and gives no indication if the transaction is in the mempool or a block.
Block notifications for zmq are also substandard, in that it only announces
block tips, while all block transactions are still announced.
This commit adds a unified stream which can be used to closely track mempool:
1) getrawmempool to fill out mempool knowledge
2) if txhash is announced, add or remove from set
based on add/remove flag
3) if blockhash is announced, get block txn list,
remove from those transactions local view of mempool
4) if we drop a sequence number, go to (1)
The mempool sequence number starts at the value 1, and
increments each time a transaction enters the mempool,
or is evicted from the mempool for any reason, including
block inclusion. The mempool sequence number is published
via ZMQ for any transaction-related notification.
These features allow for ZMQ/RPC consumer to track mempool
state in a more exacting way, without unnecesarily polling
getrawmempool. See interface_zmq.py::test_mempool_sync for
example usage.
2020-09-04 11:55:58 -04:00
|
|
|
|
2020-11-28 18:30:32 +01:00
|
|
|
// Helper function to send a 'sequence' topic message with the following structure:
|
|
|
|
// <32-byte hash> | <1-byte label> | <8-byte LE sequence> (optional)
|
|
|
|
static bool SendSequenceMsg(CZMQAbstractPublishNotifier& notifier, uint256 hash, char label, std::optional<uint64_t> sequence = {})
|
|
|
|
{
|
|
|
|
unsigned char data[sizeof(hash) + sizeof(label) + sizeof(uint64_t)];
|
|
|
|
for (unsigned int i = 0; i < sizeof(hash); ++i) {
|
|
|
|
data[sizeof(hash) - 1 - i] = hash.begin()[i];
|
|
|
|
}
|
|
|
|
data[sizeof(hash)] = label;
|
|
|
|
if (sequence) WriteLE64(data + sizeof(hash) + sizeof(label), *sequence);
|
|
|
|
return notifier.SendZmqMessage(MSG_SEQUENCE, data, sequence ? sizeof(data) : sizeof(hash) + sizeof(label));
|
|
|
|
}
|
Add 'sequence' zmq publisher to track all block (dis)connects, mempool deltas
Using the zmq notifications to avoid excessive mempool polling can be difficult
given the current notifications available. It announces all transactions
being added to mempool or included in blocks, but announces no evictions
and gives no indication if the transaction is in the mempool or a block.
Block notifications for zmq are also substandard, in that it only announces
block tips, while all block transactions are still announced.
This commit adds a unified stream which can be used to closely track mempool:
1) getrawmempool to fill out mempool knowledge
2) if txhash is announced, add or remove from set
based on add/remove flag
3) if blockhash is announced, get block txn list,
remove from those transactions local view of mempool
4) if we drop a sequence number, go to (1)
The mempool sequence number starts at the value 1, and
increments each time a transaction enters the mempool,
or is evicted from the mempool for any reason, including
block inclusion. The mempool sequence number is published
via ZMQ for any transaction-related notification.
These features allow for ZMQ/RPC consumer to track mempool
state in a more exacting way, without unnecesarily polling
getrawmempool. See interface_zmq.py::test_mempool_sync for
example usage.
2020-09-04 11:55:58 -04:00
|
|
|
|
|
|
|
bool CZMQPublishSequenceNotifier::NotifyBlockConnect(const CBlockIndex *pindex)
|
|
|
|
{
|
|
|
|
uint256 hash = pindex->GetBlockHash();
|
2020-03-11 19:48:01 +01:00
|
|
|
LogPrint(BCLog::ZMQ, "zmq: Publish sequence block connect %s to %s\n", hash.GetHex(), this->address);
|
2020-11-28 18:30:32 +01:00
|
|
|
return SendSequenceMsg(*this, hash, /* Block (C)onnect */ 'C');
|
Add 'sequence' zmq publisher to track all block (dis)connects, mempool deltas
Using the zmq notifications to avoid excessive mempool polling can be difficult
given the current notifications available. It announces all transactions
being added to mempool or included in blocks, but announces no evictions
and gives no indication if the transaction is in the mempool or a block.
Block notifications for zmq are also substandard, in that it only announces
block tips, while all block transactions are still announced.
This commit adds a unified stream which can be used to closely track mempool:
1) getrawmempool to fill out mempool knowledge
2) if txhash is announced, add or remove from set
based on add/remove flag
3) if blockhash is announced, get block txn list,
remove from those transactions local view of mempool
4) if we drop a sequence number, go to (1)
The mempool sequence number starts at the value 1, and
increments each time a transaction enters the mempool,
or is evicted from the mempool for any reason, including
block inclusion. The mempool sequence number is published
via ZMQ for any transaction-related notification.
These features allow for ZMQ/RPC consumer to track mempool
state in a more exacting way, without unnecesarily polling
getrawmempool. See interface_zmq.py::test_mempool_sync for
example usage.
2020-09-04 11:55:58 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
bool CZMQPublishSequenceNotifier::NotifyBlockDisconnect(const CBlockIndex *pindex)
|
|
|
|
{
|
|
|
|
uint256 hash = pindex->GetBlockHash();
|
2020-03-11 19:48:01 +01:00
|
|
|
LogPrint(BCLog::ZMQ, "zmq: Publish sequence block disconnect %s to %s\n", hash.GetHex(), this->address);
|
2020-11-28 18:30:32 +01:00
|
|
|
return SendSequenceMsg(*this, hash, /* Block (D)isconnect */ 'D');
|
Add 'sequence' zmq publisher to track all block (dis)connects, mempool deltas
Using the zmq notifications to avoid excessive mempool polling can be difficult
given the current notifications available. It announces all transactions
being added to mempool or included in blocks, but announces no evictions
and gives no indication if the transaction is in the mempool or a block.
Block notifications for zmq are also substandard, in that it only announces
block tips, while all block transactions are still announced.
This commit adds a unified stream which can be used to closely track mempool:
1) getrawmempool to fill out mempool knowledge
2) if txhash is announced, add or remove from set
based on add/remove flag
3) if blockhash is announced, get block txn list,
remove from those transactions local view of mempool
4) if we drop a sequence number, go to (1)
The mempool sequence number starts at the value 1, and
increments each time a transaction enters the mempool,
or is evicted from the mempool for any reason, including
block inclusion. The mempool sequence number is published
via ZMQ for any transaction-related notification.
These features allow for ZMQ/RPC consumer to track mempool
state in a more exacting way, without unnecesarily polling
getrawmempool. See interface_zmq.py::test_mempool_sync for
example usage.
2020-09-04 11:55:58 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
bool CZMQPublishSequenceNotifier::NotifyTransactionAcceptance(const CTransaction &transaction, uint64_t mempool_sequence)
|
|
|
|
{
|
|
|
|
uint256 hash = transaction.GetHash();
|
2020-03-11 19:48:01 +01:00
|
|
|
LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool acceptance %s to %s\n", hash.GetHex(), this->address);
|
2020-11-28 18:30:32 +01:00
|
|
|
return SendSequenceMsg(*this, hash, /* Mempool (A)cceptance */ 'A', mempool_sequence);
|
Add 'sequence' zmq publisher to track all block (dis)connects, mempool deltas
Using the zmq notifications to avoid excessive mempool polling can be difficult
given the current notifications available. It announces all transactions
being added to mempool or included in blocks, but announces no evictions
and gives no indication if the transaction is in the mempool or a block.
Block notifications for zmq are also substandard, in that it only announces
block tips, while all block transactions are still announced.
This commit adds a unified stream which can be used to closely track mempool:
1) getrawmempool to fill out mempool knowledge
2) if txhash is announced, add or remove from set
based on add/remove flag
3) if blockhash is announced, get block txn list,
remove from those transactions local view of mempool
4) if we drop a sequence number, go to (1)
The mempool sequence number starts at the value 1, and
increments each time a transaction enters the mempool,
or is evicted from the mempool for any reason, including
block inclusion. The mempool sequence number is published
via ZMQ for any transaction-related notification.
These features allow for ZMQ/RPC consumer to track mempool
state in a more exacting way, without unnecesarily polling
getrawmempool. See interface_zmq.py::test_mempool_sync for
example usage.
2020-09-04 11:55:58 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
bool CZMQPublishSequenceNotifier::NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence)
|
|
|
|
{
|
|
|
|
uint256 hash = transaction.GetHash();
|
2020-03-11 19:48:01 +01:00
|
|
|
LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool removal %s to %s\n", hash.GetHex(), this->address);
|
2020-11-28 18:30:32 +01:00
|
|
|
return SendSequenceMsg(*this, hash, /* Mempool (R)emoval */ 'R', mempool_sequence);
|
Add 'sequence' zmq publisher to track all block (dis)connects, mempool deltas
Using the zmq notifications to avoid excessive mempool polling can be difficult
given the current notifications available. It announces all transactions
being added to mempool or included in blocks, but announces no evictions
and gives no indication if the transaction is in the mempool or a block.
Block notifications for zmq are also substandard, in that it only announces
block tips, while all block transactions are still announced.
This commit adds a unified stream which can be used to closely track mempool:
1) getrawmempool to fill out mempool knowledge
2) if txhash is announced, add or remove from set
based on add/remove flag
3) if blockhash is announced, get block txn list,
remove from those transactions local view of mempool
4) if we drop a sequence number, go to (1)
The mempool sequence number starts at the value 1, and
increments each time a transaction enters the mempool,
or is evicted from the mempool for any reason, including
block inclusion. The mempool sequence number is published
via ZMQ for any transaction-related notification.
These features allow for ZMQ/RPC consumer to track mempool
state in a more exacting way, without unnecesarily polling
getrawmempool. See interface_zmq.py::test_mempool_sync for
example usage.
2020-09-04 11:55:58 -04:00
|
|
|
}
|