aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksandr Petrikhin <shmeleine@gmail.com>2022-07-05 21:00:36 +0300
committerDaniil Cherednik <dan.cherednik@gmail.com>2022-07-05 21:00:36 +0300
commit7d00701f4c2ef8f841a41ec86d7dfd1c5a8eb5f1 (patch)
tree5567dfbf17a5c945b0e1600ab40e599f41a2a85c
parent8ce18c6635e7cee5d50bc9151b3b37b84da887c4 (diff)
downloadydb-7d00701f4c2ef8f841a41ec86d7dfd1c5a8eb5f1.tar.gz
PR from branch users/shmel1k/LOGBROKER-7581_mirrorer_fixes_to_22-2
[LOGBROKER-7581] increase max message size in mirrorer REVIEW: 2687752 LOGBROKER-7581 add default message size to actor_persqueue REVIEW: 2690444 REVIEW: 2691194 x-ydb-stable-ref: 97a93896f920ecaa2a545b4285a6b01240c70cb7
-rw-r--r--ydb/core/persqueue/actor_persqueue_client_iface.h3
-rw-r--r--ydb/public/sdk/cpp/client/impl/ydb_internal/grpc_connections/grpc_connections.cpp3
-rw-r--r--ydb/public/sdk/cpp/client/impl/ydb_internal/grpc_connections/grpc_connections.h13
-rw-r--r--ydb/public/sdk/cpp/client/impl/ydb_internal/grpc_connections/params.h3
-rw-r--r--ydb/public/sdk/cpp/client/ydb_driver/driver.cpp21
-rw-r--r--ydb/public/sdk/cpp/client/ydb_driver/driver.h12
6 files changed, 55 insertions, 0 deletions
diff --git a/ydb/core/persqueue/actor_persqueue_client_iface.h b/ydb/core/persqueue/actor_persqueue_client_iface.h
index 1bf0ed9a4c..2588e1ec3e 100644
--- a/ydb/core/persqueue/actor_persqueue_client_iface.h
+++ b/ydb/core/persqueue/actor_persqueue_client_iface.h
@@ -13,6 +13,8 @@
namespace NKikimr::NPQ {
+constexpr ui64 MaxMessageSize = 150_MB;
+
class IPersQueueMirrorReaderFactory {
public:
IPersQueueMirrorReaderFactory()
@@ -27,6 +29,7 @@ public:
ActorSystemPtr->store(actorSystem, std::memory_order_relaxed);
auto driverConfig = NYdb::TDriverConfig()
+ .SetMaxMessageSize(MaxMessageSize)
.SetNetworkThreadsNum(settings.GetThreadsCount());
Driver = std::make_shared<NYdb::TDriver>(driverConfig);
}
diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/grpc_connections/grpc_connections.cpp b/ydb/public/sdk/cpp/client/impl/ydb_internal/grpc_connections/grpc_connections.cpp
index 8f2420043e..a51b2db900 100644
--- a/ydb/public/sdk/cpp/client/impl/ydb_internal/grpc_connections/grpc_connections.cpp
+++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/grpc_connections/grpc_connections.cpp
@@ -150,6 +150,9 @@ TGRpcConnectionsImpl::TGRpcConnectionsImpl(std::shared_ptr<IConnectionsParams> p
, GRpcKeepAliveTimeout_(params->GetGRpcKeepAliveTimeout())
, GRpcKeepAlivePermitWithoutCalls_(params->GetGRpcKeepAlivePermitWithoutCalls())
, MemoryQuota_(params->GetMemoryQuota())
+ , MaxInboundMessageSize_(params->GetMaxInboundMessageSize())
+ , MaxOutboundMessageSize_(params->GetMaxOutboundMessageSize())
+ , MaxMessageSize_(params->GetMaxMessageSize())
, QueuedRequests_(0)
#ifndef YDB_GRPC_BYPASS_CHANNEL_POOL
, ChannelPool_(params->GetTcpKeepAliveSettings(), params->GetSocketIdleTimeout())
diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/grpc_connections/grpc_connections.h b/ydb/public/sdk/cpp/client/impl/ydb_internal/grpc_connections/grpc_connections.h
index bfc1b7ec38..0e97c1abe9 100644
--- a/ydb/public/sdk/cpp/client/impl/ydb_internal/grpc_connections/grpc_connections.h
+++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/grpc_connections/grpc_connections.h
@@ -83,6 +83,16 @@ public:
clientConfig.SslCaCert = CaCert_;
clientConfig.MemQuota = MemoryQuota_;
+ if (MaxMessageSize_ > 0) {
+ clientConfig.MaxMessageSize = MaxMessageSize_;
+ }
+ if (MaxInboundMessageSize_ > 0) {
+ clientConfig.MaxInboundMessageSize = MaxInboundMessageSize_;
+ }
+ if (MaxOutboundMessageSize_ > 0) {
+ clientConfig.MaxOutboundMessageSize = MaxOutboundMessageSize_;
+ }
+
if (std::is_same<TService,Ydb::Discovery::V1::DiscoveryService>()
|| dbState->Database.empty()
|| endpointPolicy == TRpcRequestSettings::TEndpointPolicy::UseDiscoveryEndpoint)
@@ -704,6 +714,9 @@ private:
const TDuration GRpcKeepAliveTimeout_;
const bool GRpcKeepAlivePermitWithoutCalls_;
const ui64 MemoryQuota_;
+ const ui64 MaxInboundMessageSize_;
+ const ui64 MaxOutboundMessageSize_;
+ const ui64 MaxMessageSize_;
std::atomic_int64_t QueuedRequests_;
#ifndef YDB_GRPC_BYPASS_CHANNEL_POOL
diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/grpc_connections/params.h b/ydb/public/sdk/cpp/client/impl/ydb_internal/grpc_connections/params.h
index 7833c60947..9de2911a4f 100644
--- a/ydb/public/sdk/cpp/client/impl/ydb_internal/grpc_connections/params.h
+++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/grpc_connections/params.h
@@ -25,6 +25,9 @@ public:
virtual TDuration GetSocketIdleTimeout() const = 0;
virtual const TLog& GetLog() const = 0;
virtual ui64 GetMemoryQuota() const = 0;
+ virtual ui64 GetMaxInboundMessageSize() const = 0;
+ virtual ui64 GetMaxOutboundMessageSize() const = 0;
+ virtual ui64 GetMaxMessageSize() const = 0;
};
} // namespace NYdb
diff --git a/ydb/public/sdk/cpp/client/ydb_driver/driver.cpp b/ydb/public/sdk/cpp/client/ydb_driver/driver.cpp
index de17c6e679..fae0334abf 100644
--- a/ydb/public/sdk/cpp/client/ydb_driver/driver.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_driver/driver.cpp
@@ -45,6 +45,9 @@ public:
bool GetGRpcKeepAlivePermitWithoutCalls() const override { return GRpcKeepAlivePermitWithoutCalls; }
TDuration GetSocketIdleTimeout() const override { return SocketIdleTimeout; }
ui64 GetMemoryQuota() const override { return MemoryQuota; }
+ ui64 GetMaxInboundMessageSize() const override { return MaxInboundMessageSize; }
+ ui64 GetMaxOutboundMessageSize() const override { return MaxOutboundMessageSize; }
+ ui64 GetMaxMessageSize() const override { return MaxMessageSize; }
const TLog& GetLog() const override { return Log; }
TStringType Endpoint;
@@ -70,6 +73,9 @@ public:
bool GRpcKeepAlivePermitWithoutCalls = false;
TDuration SocketIdleTimeout = TDuration::Minutes(6);
ui64 MemoryQuota = 0;
+ ui64 MaxInboundMessageSize = 0;
+ ui64 MaxOutboundMessageSize = 0;
+ ui64 MaxMessageSize = 0;
TLog Log; // Null by default.
};
@@ -172,6 +178,21 @@ TDriverConfig& TDriverConfig::SetSocketIdleTimeout(TDuration timeout) {
return *this;
}
+TDriverConfig& TDriverConfig::SetMaxInboundMessageSize(ui64 maxInboundMessageSize) {
+ Impl_->MaxInboundMessageSize = maxInboundMessageSize;
+ return *this;
+}
+
+TDriverConfig& TDriverConfig::SetMaxOutboundMessageSize(ui64 maxOutboundMessageSize) {
+ Impl_->MaxOutboundMessageSize = maxOutboundMessageSize;
+ return *this;
+}
+
+TDriverConfig& TDriverConfig::SetMaxMessageSize(ui64 maxMessageSize) {
+ Impl_->MaxMessageSize = maxMessageSize;
+ return *this;
+}
+
TDriverConfig& TDriverConfig::SetLog(THolder<TLogBackend> log) {
Impl_->Log.ResetBackend(std::move(log));
return *this;
diff --git a/ydb/public/sdk/cpp/client/ydb_driver/driver.h b/ydb/public/sdk/cpp/client/ydb_driver/driver.h
index 39c91d8852..e8027bfad7 100644
--- a/ydb/public/sdk/cpp/client/ydb_driver/driver.h
+++ b/ydb/public/sdk/cpp/client/ydb_driver/driver.h
@@ -104,6 +104,18 @@ public:
//! default: 6 minutes
TDriverConfig& SetSocketIdleTimeout(TDuration timeout);
+ //! Set maximum incoming message size.
+ //! Note: this option overrides MaxMessageSize for incoming messages.
+ //! default: 0
+ TDriverConfig& SetMaxInboundMessageSize(ui64 maxInboundMessageSize);
+ //! Set maximum outgoing message size.
+ //! Note: this option overrides MaxMessageSize for outgoing messages.
+ //! default: 0
+ TDriverConfig& SetMaxOutboundMessageSize(ui64 maxOutboundMessageSize);
+ //! Note: if this option is unset, default 64_MB message size will be used.
+ //! default: 0
+ TDriverConfig& SetMaxMessageSize(ui64 maxMessageSize);
+
//! Log backend.
TDriverConfig& SetLog(THolder<TLogBackend> log);
private: