0
0
Fork 0
mirror of https://github.com/bitcoin/bitcoin.git synced 2025-02-06 10:18:44 -05:00
bitcoin-bitcoin-core/src/zmq/zmqpublishnotifier.cpp
Jonas Schnelli 3a3e21dafb
Merge #14687: zmq: enable tcp keepalive
c276df7759 zmq: enable tcp keepalive (mruddy)

Pull request description:

  This addresses https://github.com/bitcoin/bitcoin/issues/12754.

  These changes enable node operators to address the silent dropping (by network middle boxes) of long-lived low-activity ZMQ TCP connections via further operating system level TCP keepalive configuration. For example, ZMQ sockets that publish block hashes can be affected in this way due to the length of time it sometimes takes between finding blocks (e.g.- sometimes more than an hour).

  Prior to this patch, operating system level TCP keepalive configurations would not take effect since the SO_KEEPALIVE option was not enabled on the underlying socket.

  There are additional ZMQ socket options related to TCP keepalive that can be set. However, I decided not to implement those options in this changeset because doing so would require adding additional bitcoin node configuration options, and would not yield a better outcome. I preferred a small, easily reviewable patch that doesn't add a bunch of new config options, with the tradeoff that the fine tuning would have to be done via well-documented operating system specific configurations.

  I tested this patch by running a node with:
  `./src/qt/bitcoin-qt -regtest -txindex -datadir=/tmp/node -zmqpubhashblock=tcp://127.0.0.1:28332 &`
  and connecting to it with:
  `python3 ./contrib/zmq/zmq_sub.py`

  Without these changes, `ss -panto | grep 28332 | grep ESTAB | grep bitcoin` will report no keepalive timer information. With these changes, the output from the prior command will show keepalive timer information consistent with the configuration at the time of connection establishment, e.g.-: `timer:(keepalive,119min,0)`.

  I also tested with a non-TCP transport and did not witness any adverse effects:
  `./src/qt/bitcoin-qt -regtest -txindex -datadir=/tmp/node -zmqpubhashblock=ipc:///tmp/bitcoin.block &`

ACKs for top commit:
  adamjonas:
    Just to summarize for those looking to review - as of c276df7759 there are 3 tACKs (n-thumann, Haaroon, and dlogemann), 1 "looks good to me" (laanwj) with no NACKs or any show-stopping concerns raised.
  jonasschnelli:
    utACK c276df7759

Tree-SHA512: b884c2c9814e97e666546a7188c48f9de9541499a11a934bd48dd16169a900c900fa519feb3b1cb7e9915fc7539aac2829c7806b5937b4e1409b4805f3ef6cd1
2020-09-02 09:09:18 +02:00

217 lines
6.5 KiB
C++

// Copyright (c) 2015-2020 The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include <chain.h>
#include <chainparams.h>
#include <streams.h>
#include <zmq/zmqpublishnotifier.h>
#include <validation.h>
#include <util/system.h>
#include <rpc/server.h>
static std::multimap<std::string, CZMQAbstractPublishNotifier*> mapPublishNotifiers;
static const char *MSG_HASHBLOCK = "hashblock";
static const char *MSG_HASHTX = "hashtx";
static const char *MSG_RAWBLOCK = "rawblock";
static const char *MSG_RAWTX = "rawtx";
// Internal function to send multipart message
static int zmq_send_multipart(void *sock, const void* data, size_t size, ...)
{
va_list args;
va_start(args, size);
while (1)
{
zmq_msg_t msg;
int rc = zmq_msg_init_size(&msg, size);
if (rc != 0)
{
zmqError("Unable to initialize ZMQ msg");
va_end(args);
return -1;
}
void *buf = zmq_msg_data(&msg);
memcpy(buf, data, size);
data = va_arg(args, const void*);
rc = zmq_msg_send(&msg, sock, data ? ZMQ_SNDMORE : 0);
if (rc == -1)
{
zmqError("Unable to send ZMQ msg");
zmq_msg_close(&msg);
va_end(args);
return -1;
}
zmq_msg_close(&msg);
if (!data)
break;
size = va_arg(args, size_t);
}
va_end(args);
return 0;
}
bool CZMQAbstractPublishNotifier::Initialize(void *pcontext)
{
assert(!psocket);
// check if address is being used by other publish notifier
std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator i = mapPublishNotifiers.find(address);
if (i==mapPublishNotifiers.end())
{
psocket = zmq_socket(pcontext, ZMQ_PUB);
if (!psocket)
{
zmqError("Failed to create socket");
return false;
}
LogPrint(BCLog::ZMQ, "zmq: Outbound message high water mark for %s at %s is %d\n", type, address, outbound_message_high_water_mark);
int rc = zmq_setsockopt(psocket, ZMQ_SNDHWM, &outbound_message_high_water_mark, sizeof(outbound_message_high_water_mark));
if (rc != 0)
{
zmqError("Failed to set outbound message high water mark");
zmq_close(psocket);
return false;
}
const int so_keepalive_option {1};
rc = zmq_setsockopt(psocket, ZMQ_TCP_KEEPALIVE, &so_keepalive_option, sizeof(so_keepalive_option));
if (rc != 0) {
zmqError("Failed to set SO_KEEPALIVE");
zmq_close(psocket);
return false;
}
rc = zmq_bind(psocket, address.c_str());
if (rc != 0)
{
zmqError("Failed to bind address");
zmq_close(psocket);
return false;
}
// register this notifier for the address, so it can be reused for other publish notifier
mapPublishNotifiers.insert(std::make_pair(address, this));
return true;
}
else
{
LogPrint(BCLog::ZMQ, "zmq: Reusing socket for address %s\n", address);
LogPrint(BCLog::ZMQ, "zmq: Outbound message high water mark for %s at %s is %d\n", type, address, outbound_message_high_water_mark);
psocket = i->second->psocket;
mapPublishNotifiers.insert(std::make_pair(address, this));
return true;
}
}
void CZMQAbstractPublishNotifier::Shutdown()
{
// Early return if Initialize was not called
if (!psocket) return;
int count = mapPublishNotifiers.count(address);
// remove this notifier from the list of publishers using this address
typedef std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator iterator;
std::pair<iterator, iterator> iterpair = mapPublishNotifiers.equal_range(address);
for (iterator it = iterpair.first; it != iterpair.second; ++it)
{
if (it->second==this)
{
mapPublishNotifiers.erase(it);
break;
}
}
if (count == 1)
{
LogPrint(BCLog::ZMQ, "zmq: Close socket at address %s\n", address);
int linger = 0;
zmq_setsockopt(psocket, ZMQ_LINGER, &linger, sizeof(linger));
zmq_close(psocket);
}
psocket = nullptr;
}
bool CZMQAbstractPublishNotifier::SendMessage(const char *command, const void* data, size_t size)
{
assert(psocket);
/* send three parts, command & data & a LE 4byte sequence number */
unsigned char msgseq[sizeof(uint32_t)];
WriteLE32(&msgseq[0], nSequence);
int rc = zmq_send_multipart(psocket, command, strlen(command), data, size, msgseq, (size_t)sizeof(uint32_t), nullptr);
if (rc == -1)
return false;
/* increment memory only sequence number after sending */
nSequence++;
return true;
}
bool CZMQPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
{
uint256 hash = pindex->GetBlockHash();
LogPrint(BCLog::ZMQ, "zmq: Publish hashblock %s\n", hash.GetHex());
char data[32];
for (unsigned int i = 0; i < 32; i++)
data[31 - i] = hash.begin()[i];
return SendMessage(MSG_HASHBLOCK, data, 32);
}
bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
{
uint256 hash = transaction.GetHash();
LogPrint(BCLog::ZMQ, "zmq: Publish hashtx %s\n", hash.GetHex());
char data[32];
for (unsigned int i = 0; i < 32; i++)
data[31 - i] = hash.begin()[i];
return SendMessage(MSG_HASHTX, data, 32);
}
bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
{
LogPrint(BCLog::ZMQ, "zmq: Publish rawblock %s\n", pindex->GetBlockHash().GetHex());
const Consensus::Params& consensusParams = Params().GetConsensus();
CDataStream ss(SER_NETWORK, PROTOCOL_VERSION | RPCSerializationFlags());
{
LOCK(cs_main);
CBlock block;
if(!ReadBlockFromDisk(block, pindex, consensusParams))
{
zmqError("Can't read block from disk");
return false;
}
ss << block;
}
return SendMessage(MSG_RAWBLOCK, &(*ss.begin()), ss.size());
}
bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
{
uint256 hash = transaction.GetHash();
LogPrint(BCLog::ZMQ, "zmq: Publish rawtx %s\n", hash.GetHex());
CDataStream ss(SER_NETWORK, PROTOCOL_VERSION | RPCSerializationFlags());
ss << transaction;
return SendMessage(MSG_RAWTX, &(*ss.begin()), ss.size());
}