aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexnick <alexnick@ydb.tech>2022-10-18 17:34:51 +0300
committeralexnick <alexnick@ydb.tech>2022-10-18 17:34:51 +0300
commit86f150b73356104e0b24527cb89c03adb1751829 (patch)
tree09e90f16759611812f451a476b170008287492dd
parent04570c05d8f0570efee02b73ddfdbc757ec1e8f2 (diff)
downloadydb-86f150b73356104e0b24527cb89c03adb1751829.tar.gz
fix for datarace
fix for data race
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp13
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp2
2 files changed, 12 insertions, 3 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp
index f6ca37554bd..4f942b89ca7 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp
@@ -1342,13 +1342,22 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::OnCreateNewDecompressi
template<bool UseMigrationProtocol>
void TSingleClusterReadSessionImpl<UseMigrationProtocol>::OnDecompressionInfoDestroy(i64 compressedSize, i64 decompressedSize, i64 messagesCount)
{
+
*Settings.Counters_->MessagesInflight -= messagesCount;
*Settings.Counters_->BytesInflightUncompressed -= decompressedSize;
*Settings.Counters_->BytesInflightCompressed -= compressedSize;
*Settings.Counters_->BytesInflightTotal -= (compressedSize + decompressedSize);
- CompressedDataSize -= compressedSize;
- DecompressedDataSize -= decompressedSize;
+ TDeferredActions<UseMigrationProtocol> deferred;
+ with_lock (Lock) {
+ UpdateMemoryUsageStatisticsImpl();
+
+ CompressedDataSize -= compressedSize;
+ DecompressedDataSize -= decompressedSize;
+
+ ContinueReadingDataImpl();
+ StartDecompressionTasksImpl(deferred);
+ }
}
template<bool UseMigrationProtocol>
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp
index 65d83f387b6..2a71b05897e 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp
@@ -1688,7 +1688,7 @@ Y_UNIT_TEST_SUITE(ReadSessionImplTest) {
});
auto destroyCalledPromise = NThreading::NewPromise<TReadSessionEvent::TDestroyPartitionStreamEvent>();
auto destroyCalled = destroyCalledPromise.GetFuture();
- setup.Settings.EventHandlers_.DestroyPartitionStreamHandler([&](TReadSessionEvent::TDestroyPartitionStreamEvent& event) {
+ setup.Settings.EventHandlers_.DestroyPartitionStreamHandler([destroyCalledPromise = std::move(destroyCalledPromise)](TReadSessionEvent::TDestroyPartitionStreamEvent& event) mutable {
destroyCalledPromise.SetValue(std::move(event));
});
auto closedCalledPromise = NThreading::NewPromise<void>();