diff options
author | Aleksandr Petrikhin <shmeleine@gmail.com> | 2022-07-05 21:00:36 +0300 |
---|---|---|
committer | Daniil Cherednik <dan.cherednik@gmail.com> | 2022-07-05 21:00:36 +0300 |
commit | 7d00701f4c2ef8f841a41ec86d7dfd1c5a8eb5f1 (patch) | |
tree | 5567dfbf17a5c945b0e1600ab40e599f41a2a85c | |
parent | 8ce18c6635e7cee5d50bc9151b3b37b84da887c4 (diff) | |
download | ydb-7d00701f4c2ef8f841a41ec86d7dfd1c5a8eb5f1.tar.gz |
PR from branch users/shmel1k/LOGBROKER-7581_mirrorer_fixes_to_22-2
[LOGBROKER-7581] increase max message size in mirrorer
REVIEW: 2687752
LOGBROKER-7581 add default message size to actor_persqueue
REVIEW: 2690444
REVIEW: 2691194
x-ydb-stable-ref: 97a93896f920ecaa2a545b4285a6b01240c70cb7
6 files changed, 55 insertions, 0 deletions
diff --git a/ydb/core/persqueue/actor_persqueue_client_iface.h b/ydb/core/persqueue/actor_persqueue_client_iface.h index 1bf0ed9a4c..2588e1ec3e 100644 --- a/ydb/core/persqueue/actor_persqueue_client_iface.h +++ b/ydb/core/persqueue/actor_persqueue_client_iface.h @@ -13,6 +13,8 @@ namespace NKikimr::NPQ { +constexpr ui64 MaxMessageSize = 150_MB; + class IPersQueueMirrorReaderFactory { public: IPersQueueMirrorReaderFactory() @@ -27,6 +29,7 @@ public: ActorSystemPtr->store(actorSystem, std::memory_order_relaxed); auto driverConfig = NYdb::TDriverConfig() + .SetMaxMessageSize(MaxMessageSize) .SetNetworkThreadsNum(settings.GetThreadsCount()); Driver = std::make_shared<NYdb::TDriver>(driverConfig); } diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/grpc_connections/grpc_connections.cpp b/ydb/public/sdk/cpp/client/impl/ydb_internal/grpc_connections/grpc_connections.cpp index 8f2420043e..a51b2db900 100644 --- a/ydb/public/sdk/cpp/client/impl/ydb_internal/grpc_connections/grpc_connections.cpp +++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/grpc_connections/grpc_connections.cpp @@ -150,6 +150,9 @@ TGRpcConnectionsImpl::TGRpcConnectionsImpl(std::shared_ptr<IConnectionsParams> p , GRpcKeepAliveTimeout_(params->GetGRpcKeepAliveTimeout()) , GRpcKeepAlivePermitWithoutCalls_(params->GetGRpcKeepAlivePermitWithoutCalls()) , MemoryQuota_(params->GetMemoryQuota()) + , MaxInboundMessageSize_(params->GetMaxInboundMessageSize()) + , MaxOutboundMessageSize_(params->GetMaxOutboundMessageSize()) + , MaxMessageSize_(params->GetMaxMessageSize()) , QueuedRequests_(0) #ifndef YDB_GRPC_BYPASS_CHANNEL_POOL , ChannelPool_(params->GetTcpKeepAliveSettings(), params->GetSocketIdleTimeout()) diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/grpc_connections/grpc_connections.h b/ydb/public/sdk/cpp/client/impl/ydb_internal/grpc_connections/grpc_connections.h index bfc1b7ec38..0e97c1abe9 100644 --- a/ydb/public/sdk/cpp/client/impl/ydb_internal/grpc_connections/grpc_connections.h +++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/grpc_connections/grpc_connections.h @@ -83,6 +83,16 @@ public: clientConfig.SslCaCert = CaCert_; clientConfig.MemQuota = MemoryQuota_; + if (MaxMessageSize_ > 0) { + clientConfig.MaxMessageSize = MaxMessageSize_; + } + if (MaxInboundMessageSize_ > 0) { + clientConfig.MaxInboundMessageSize = MaxInboundMessageSize_; + } + if (MaxOutboundMessageSize_ > 0) { + clientConfig.MaxOutboundMessageSize = MaxOutboundMessageSize_; + } + if (std::is_same<TService,Ydb::Discovery::V1::DiscoveryService>() || dbState->Database.empty() || endpointPolicy == TRpcRequestSettings::TEndpointPolicy::UseDiscoveryEndpoint) @@ -704,6 +714,9 @@ private: const TDuration GRpcKeepAliveTimeout_; const bool GRpcKeepAlivePermitWithoutCalls_; const ui64 MemoryQuota_; + const ui64 MaxInboundMessageSize_; + const ui64 MaxOutboundMessageSize_; + const ui64 MaxMessageSize_; std::atomic_int64_t QueuedRequests_; #ifndef YDB_GRPC_BYPASS_CHANNEL_POOL diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/grpc_connections/params.h b/ydb/public/sdk/cpp/client/impl/ydb_internal/grpc_connections/params.h index 7833c60947..9de2911a4f 100644 --- a/ydb/public/sdk/cpp/client/impl/ydb_internal/grpc_connections/params.h +++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/grpc_connections/params.h @@ -25,6 +25,9 @@ public: virtual TDuration GetSocketIdleTimeout() const = 0; virtual const TLog& GetLog() const = 0; virtual ui64 GetMemoryQuota() const = 0; + virtual ui64 GetMaxInboundMessageSize() const = 0; + virtual ui64 GetMaxOutboundMessageSize() const = 0; + virtual ui64 GetMaxMessageSize() const = 0; }; } // namespace NYdb diff --git a/ydb/public/sdk/cpp/client/ydb_driver/driver.cpp b/ydb/public/sdk/cpp/client/ydb_driver/driver.cpp index de17c6e679..fae0334abf 100644 --- a/ydb/public/sdk/cpp/client/ydb_driver/driver.cpp +++ b/ydb/public/sdk/cpp/client/ydb_driver/driver.cpp @@ -45,6 +45,9 @@ public: bool GetGRpcKeepAlivePermitWithoutCalls() const override { return GRpcKeepAlivePermitWithoutCalls; } TDuration GetSocketIdleTimeout() const override { return SocketIdleTimeout; } ui64 GetMemoryQuota() const override { return MemoryQuota; } + ui64 GetMaxInboundMessageSize() const override { return MaxInboundMessageSize; } + ui64 GetMaxOutboundMessageSize() const override { return MaxOutboundMessageSize; } + ui64 GetMaxMessageSize() const override { return MaxMessageSize; } const TLog& GetLog() const override { return Log; } TStringType Endpoint; @@ -70,6 +73,9 @@ public: bool GRpcKeepAlivePermitWithoutCalls = false; TDuration SocketIdleTimeout = TDuration::Minutes(6); ui64 MemoryQuota = 0; + ui64 MaxInboundMessageSize = 0; + ui64 MaxOutboundMessageSize = 0; + ui64 MaxMessageSize = 0; TLog Log; // Null by default. }; @@ -172,6 +178,21 @@ TDriverConfig& TDriverConfig::SetSocketIdleTimeout(TDuration timeout) { return *this; } +TDriverConfig& TDriverConfig::SetMaxInboundMessageSize(ui64 maxInboundMessageSize) { + Impl_->MaxInboundMessageSize = maxInboundMessageSize; + return *this; +} + +TDriverConfig& TDriverConfig::SetMaxOutboundMessageSize(ui64 maxOutboundMessageSize) { + Impl_->MaxOutboundMessageSize = maxOutboundMessageSize; + return *this; +} + +TDriverConfig& TDriverConfig::SetMaxMessageSize(ui64 maxMessageSize) { + Impl_->MaxMessageSize = maxMessageSize; + return *this; +} + TDriverConfig& TDriverConfig::SetLog(THolder<TLogBackend> log) { Impl_->Log.ResetBackend(std::move(log)); return *this; diff --git a/ydb/public/sdk/cpp/client/ydb_driver/driver.h b/ydb/public/sdk/cpp/client/ydb_driver/driver.h index 39c91d8852..e8027bfad7 100644 --- a/ydb/public/sdk/cpp/client/ydb_driver/driver.h +++ b/ydb/public/sdk/cpp/client/ydb_driver/driver.h @@ -104,6 +104,18 @@ public: //! default: 6 minutes TDriverConfig& SetSocketIdleTimeout(TDuration timeout); + //! Set maximum incoming message size. + //! Note: this option overrides MaxMessageSize for incoming messages. + //! default: 0 + TDriverConfig& SetMaxInboundMessageSize(ui64 maxInboundMessageSize); + //! Set maximum outgoing message size. + //! Note: this option overrides MaxMessageSize for outgoing messages. + //! default: 0 + TDriverConfig& SetMaxOutboundMessageSize(ui64 maxOutboundMessageSize); + //! Note: if this option is unset, default 64_MB message size will be used. + //! default: 0 + TDriverConfig& SetMaxMessageSize(ui64 maxMessageSize); + //! Log backend. TDriverConfig& SetLog(THolder<TLogBackend> log); private: |