diff options
author | ildar-khisam <ikhis@ydb.tech> | 2022-08-15 13:45:33 +0300 |
---|---|---|
committer | ildar-khisam <ikhis@ydb.tech> | 2022-08-15 13:45:33 +0300 |
commit | 97e91f44abb6f2290425557403d34bf0af804c6d (patch) | |
tree | 310277ede267f229b2b1ac49af5018dcddd80383 | |
parent | 898aeaf457020407e629ae8f55896525bbf2f9ef (diff) | |
download | ydb-97e91f44abb6f2290425557403d34bf0af804c6d.tar.gz |
topic sdk read optimizations
optimize out some read requests
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h | 4 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp | 4 |
2 files changed, 6 insertions, 2 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h index 52e530822b4..c368b72f5e4 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h @@ -964,6 +964,7 @@ public: , ClientContext(std::move(clientContext)) , CookieMapping(ErrorHandler) , ReadSizeBudget(GetCompressedDataSizeLimit()) + , ReadSizeServerDelta(GetCompressedDataSizeLimit()) { } @@ -1183,7 +1184,8 @@ private: bool Closing = false; std::function<void()> CloseCallback; std::atomic<int> DecompressionTasksInflight = 0; - i64 ReadSizeBudget; + ui64 ReadSizeBudget; + i64 ReadSizeServerDelta; }; // High level class that manages several read session impls. diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp index 488020dfd90..f0f75db43db 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp @@ -435,7 +435,7 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::ContinueReadingDataImp if constexpr (UseMigrationProtocol) { req.mutable_read(); } else { - if (ReadSizeBudget <= 0) { + if (ReadSizeBudget == 0 || ReadSizeServerDelta <= 0) { return; } req.mutable_read_request()->set_bytes_size(ReadSizeBudget); @@ -1050,6 +1050,7 @@ inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl( } i64 serverBytesSize = msg.bytes_size(); + ReadSizeServerDelta -= serverBytesSize; UpdateMemoryUsageStatisticsImpl(); for (TPartitionData<false>& partitionData : *msg.mutable_partition_data()) { @@ -1294,6 +1295,7 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::OnDataDecompressed(i64 } if constexpr (!UseMigrationProtocol) { ReadSizeBudget += serverBytesSize; + ReadSizeServerDelta += serverBytesSize; } ContinueReadingDataImpl(); StartDecompressionTasksImpl(deferred); |