diff options
author | alexnick <alexnick@ydb.tech> | 2022-10-18 17:34:51 +0300 |
---|---|---|
committer | alexnick <alexnick@ydb.tech> | 2022-10-18 17:34:51 +0300 |
commit | 86f150b73356104e0b24527cb89c03adb1751829 (patch) | |
tree | 09e90f16759611812f451a476b170008287492dd | |
parent | 04570c05d8f0570efee02b73ddfdbc757ec1e8f2 (diff) | |
download | ydb-86f150b73356104e0b24527cb89c03adb1751829.tar.gz |
fix for datarace
fix for data race
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp | 13 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp | 2 |
2 files changed, 12 insertions, 3 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp index f6ca37554bd..4f942b89ca7 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp @@ -1342,13 +1342,22 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::OnCreateNewDecompressi template<bool UseMigrationProtocol> void TSingleClusterReadSessionImpl<UseMigrationProtocol>::OnDecompressionInfoDestroy(i64 compressedSize, i64 decompressedSize, i64 messagesCount) { + *Settings.Counters_->MessagesInflight -= messagesCount; *Settings.Counters_->BytesInflightUncompressed -= decompressedSize; *Settings.Counters_->BytesInflightCompressed -= compressedSize; *Settings.Counters_->BytesInflightTotal -= (compressedSize + decompressedSize); - CompressedDataSize -= compressedSize; - DecompressedDataSize -= decompressedSize; + TDeferredActions<UseMigrationProtocol> deferred; + with_lock (Lock) { + UpdateMemoryUsageStatisticsImpl(); + + CompressedDataSize -= compressedSize; + DecompressedDataSize -= decompressedSize; + + ContinueReadingDataImpl(); + StartDecompressionTasksImpl(deferred); + } } template<bool UseMigrationProtocol> diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp index 65d83f387b6..2a71b05897e 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp @@ -1688,7 +1688,7 @@ Y_UNIT_TEST_SUITE(ReadSessionImplTest) { }); auto destroyCalledPromise = NThreading::NewPromise<TReadSessionEvent::TDestroyPartitionStreamEvent>(); auto destroyCalled = destroyCalledPromise.GetFuture(); - setup.Settings.EventHandlers_.DestroyPartitionStreamHandler([&](TReadSessionEvent::TDestroyPartitionStreamEvent& event) { + setup.Settings.EventHandlers_.DestroyPartitionStreamHandler([destroyCalledPromise = std::move(destroyCalledPromise)](TReadSessionEvent::TDestroyPartitionStreamEvent& event) mutable { destroyCalledPromise.SetValue(std::move(event)); }); auto closedCalledPromise = NThreading::NewPromise<void>(); |