aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorildar-khisam <ikhis@ydb.tech>2022-08-15 13:45:33 +0300
committerildar-khisam <ikhis@ydb.tech>2022-08-15 13:45:33 +0300
commit97e91f44abb6f2290425557403d34bf0af804c6d (patch)
tree310277ede267f229b2b1ac49af5018dcddd80383
parent898aeaf457020407e629ae8f55896525bbf2f9ef (diff)
downloadydb-97e91f44abb6f2290425557403d34bf0af804c6d.tar.gz
topic sdk read optimizations
optimize out some read requests
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h4
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp4
2 files changed, 6 insertions, 2 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h
index 52e530822b4..c368b72f5e4 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h
@@ -964,6 +964,7 @@ public:
, ClientContext(std::move(clientContext))
, CookieMapping(ErrorHandler)
, ReadSizeBudget(GetCompressedDataSizeLimit())
+ , ReadSizeServerDelta(GetCompressedDataSizeLimit())
{
}
@@ -1183,7 +1184,8 @@ private:
bool Closing = false;
std::function<void()> CloseCallback;
std::atomic<int> DecompressionTasksInflight = 0;
- i64 ReadSizeBudget;
+ ui64 ReadSizeBudget;
+ i64 ReadSizeServerDelta;
};
// High level class that manages several read session impls.
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp
index 488020dfd90..f0f75db43db 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp
@@ -435,7 +435,7 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::ContinueReadingDataImp
if constexpr (UseMigrationProtocol) {
req.mutable_read();
} else {
- if (ReadSizeBudget <= 0) {
+ if (ReadSizeBudget == 0 || ReadSizeServerDelta <= 0) {
return;
}
req.mutable_read_request()->set_bytes_size(ReadSizeBudget);
@@ -1050,6 +1050,7 @@ inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl(
}
i64 serverBytesSize = msg.bytes_size();
+ ReadSizeServerDelta -= serverBytesSize;
UpdateMemoryUsageStatisticsImpl();
for (TPartitionData<false>& partitionData : *msg.mutable_partition_data()) {
@@ -1294,6 +1295,7 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::OnDataDecompressed(i64
}
if constexpr (!UseMigrationProtocol) {
ReadSizeBudget += serverBytesSize;
+ ReadSizeServerDelta += serverBytesSize;
}
ContinueReadingDataImpl();
StartDecompressionTasksImpl(deferred);