diff options
author | ildar-khisam <ikhis@ydb.tech> | 2023-02-07 09:27:55 +0300 |
---|---|---|
committer | ildar-khisam <ikhis@ydb.tech> | 2023-02-07 09:27:55 +0300 |
commit | 67026a90f514712e5c1ff62a8b3dd8631e545a88 (patch) | |
tree | 2af31998bfc653f6330e616724cd997f5e4b9b22 | |
parent | 55249694bb14fef3f2773ef1381c4bb61ddfc1c8 (diff) | |
download | ydb-67026a90f514712e5c1ff62a8b3dd8631e545a88.tar.gz |
fix
fix
fix budget restore
8 files changed, 283 insertions, 10 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h index c04199d57ea..34727778925 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h @@ -238,6 +238,10 @@ public: return DoDecompress; } + i64 GetServerBytesSize() const { + return ServerBytesSize; + } + TMaybe<std::pair<size_t, size_t>> GetReadyThreshold() const { size_t readyCount = 0; std::pair<size_t, size_t> ret; @@ -270,7 +274,7 @@ public: void PutDecompressionError(std::exception_ptr error, size_t batch, size_t message); std::exception_ptr GetDecompressionError(size_t batch, size_t message); - void OnDataDecompressed(i64 sourceSize, i64 estimatedDecompressedSize, i64 decompressedSize, size_t messagesCount, i64 serverBytesSize = 0); + void OnDataDecompressed(i64 sourceSize, i64 estimatedDecompressedSize, i64 decompressedSize, size_t messagesCount); void OnUserRetrievedEvent(i64 decompressedDataSize, size_t messagesCount); private: @@ -962,7 +966,7 @@ public: void Commit(const TPartitionStreamImpl<UseMigrationProtocol>* partitionStream, ui64 startOffset, ui64 endOffset); void OnCreateNewDecompressionTask(); - void OnDecompressionInfoDestroy(i64 compressedSize, i64 decompressedSize, i64 messagesCount); + void OnDecompressionInfoDestroy(i64 compressedSize, i64 decompressedSize, i64 messagesCount, i64 serverBytesSize); void OnDataDecompressed(i64 sourceSize, i64 estimatedDecompressedSize, i64 decompressedSize, size_t messagesCount, i64 serverBytesSize = 0); diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp index 22f3088a8b9..58dcde911cf 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp @@ -20,6 +20,7 @@ #include <util/stream/mem.h> #include <util/system/env.h> +#include <utility> #include <variant> // Forward delcarations @@ -152,7 +153,7 @@ void TRawPartitionStreamEventQueue<UseMigrationProtocol>::SignalReadyEvents(TInt { auto moveToReadyQueue = [&](TRawPartitionStreamEvent<UseMigrationProtocol> &&event) { queue.SignalEventImpl(stream, deferred, event.IsDataEvent()); - + Ready.push_back(std::move(event)); NotReady.pop_front(); }; @@ -503,7 +504,6 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::ContinueReadingDataImp } req.mutable_read_request()->set_bytes_size(ReadSizeBudget); ReadSizeServerDelta += ReadSizeBudget; - ReadSizeBudget = 0; } @@ -1340,7 +1340,7 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::OnCreateNewDecompressi } template<bool UseMigrationProtocol> -void TSingleClusterReadSessionImpl<UseMigrationProtocol>::OnDecompressionInfoDestroy(i64 compressedSize, i64 decompressedSize, i64 messagesCount) +void TSingleClusterReadSessionImpl<UseMigrationProtocol>::OnDecompressionInfoDestroy(i64 compressedSize, i64 decompressedSize, i64 messagesCount, i64 serverBytesSize) { *Settings.Counters_->MessagesInflight -= messagesCount; @@ -1355,6 +1355,10 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::OnDecompressionInfoDes CompressedDataSize -= compressedSize; DecompressedDataSize -= decompressedSize; + if constexpr (!UseMigrationProtocol) { + ReadSizeBudget += serverBytesSize; + } + ContinueReadingDataImpl(); StartDecompressionTasksImpl(deferred); } @@ -1820,7 +1824,7 @@ typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent TReadSessionEventsQueue<UseMigrationProtocol>::GetDataEventImpl(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> stream, size_t& maxByteSize, TUserRetrievedEventsInfoAccumulator<UseMigrationProtocol>& accumulator) // Assumes that we're under lock. -{ +{ TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TMessage> messages; TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TCompressedMessage> compressedMessages; @@ -2074,7 +2078,7 @@ template<bool UseMigrationProtocol> TDataDecompressionInfo<UseMigrationProtocol>::~TDataDecompressionInfo() { if (auto session = Session.lock()) { - session->OnDecompressionInfoDestroy(CompressedDataSize, DecompressedDataSize, MessagesInflight); + session->OnDecompressionInfoDestroy(CompressedDataSize, DecompressedDataSize, MessagesInflight, ServerBytesSize); } } @@ -2305,13 +2309,15 @@ bool TDataDecompressionInfo<UseMigrationProtocol>::HasReadyUnreadData() const { } template<bool UseMigrationProtocol> -void TDataDecompressionInfo<UseMigrationProtocol>::OnDataDecompressed(i64 sourceSize, i64 estimatedDecompressedSize, i64 decompressedSize, size_t messagesCount, i64 serverBytesSize) +void TDataDecompressionInfo<UseMigrationProtocol>::OnDataDecompressed(i64 sourceSize, i64 estimatedDecompressedSize, i64 decompressedSize, size_t messagesCount) { CompressedDataSize -= sourceSize; DecompressedDataSize += decompressedSize; if (auto session = Session.lock()) { - session->OnDataDecompressed(sourceSize, estimatedDecompressedSize, decompressedSize, messagesCount, serverBytesSize); + // TODO (ildar-khisam@): distribute total ServerBytesSize in proportion of source size + // Use CompressedDataSize, sourceSize, ServerBytesSize + session->OnDataDecompressed(sourceSize, estimatedDecompressedSize, decompressedSize, messagesCount, std::exchange(ServerBytesSize, 0)); } } @@ -2413,7 +2419,7 @@ void TDataDecompressionInfo<UseMigrationProtocol>::TDecompressionTask::operator( Y_ASSERT(dataProcessed == SourceDataSize); std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session = Parent->Session.lock(); - Parent->OnDataDecompressed(SourceDataSize, EstimatedDecompressedSize, DecompressedSize, messagesProcessed, Parent->ServerBytesSize); + Parent->OnDataDecompressed(SourceDataSize, EstimatedDecompressedSize, DecompressedSize, messagesProcessed); Parent->SourceDataNotProcessed -= dataProcessed; Ready->Ready = true; diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.darwin.txt b/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.darwin.txt index 42f184e205f..b54dd13486a 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.darwin.txt +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.darwin.txt @@ -41,6 +41,7 @@ target_link_options(ydb-public-sdk-cpp-client-ydb_topic-ut PRIVATE ) target_sources(ydb-public-sdk-cpp-client-ydb_topic-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/managed_executor.cpp ) set_property( TARGET diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux-aarch64.txt b/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux-aarch64.txt index 31cd3c9c963..4e17f9a1317 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux-aarch64.txt +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux-aarch64.txt @@ -44,6 +44,7 @@ target_link_options(ydb-public-sdk-cpp-client-ydb_topic-ut PRIVATE ) target_sources(ydb-public-sdk-cpp-client-ydb_topic-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/managed_executor.cpp ) set_property( TARGET diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux.txt b/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux.txt index 1bfcc204736..bb7d888fb27 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux.txt +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux.txt @@ -46,6 +46,7 @@ target_link_options(ydb-public-sdk-cpp-client-ydb_topic-ut PRIVATE ) target_sources(ydb-public-sdk-cpp-client-ydb_topic-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/managed_executor.cpp ) set_property( TARGET diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp b/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp index fec5f36088c..68c631a58b4 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp @@ -1,5 +1,7 @@ #include <ydb/public/sdk/cpp/client/ydb_topic/topic.h> +#include <ydb/public/sdk/cpp/client/ydb_topic/ut/managed_executor.h> + #include <ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h> #include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h> @@ -92,6 +94,115 @@ Y_UNIT_TEST_SUITE(BasicUsage) { ReadSession->Close(TDuration::MilliSeconds(10)); AtomicSet(check, 0); } + + + Y_UNIT_TEST(ReadWithRestarts) { + + auto setup = std::make_shared<NPersQueue::NTests::TPersQueueYdbSdkTestSetup>(TEST_CASE_NAME); + + NPersQueue::TWriteSessionSettings writeSettings; + writeSettings.Path(setup->GetTestTopic()).MessageGroupId("src_id"); + writeSettings.Codec(NPersQueue::ECodec::RAW); + NPersQueue::IExecutor::TPtr executor = new NPersQueue::TSyncExecutor(); + writeSettings.CompressionExecutor(executor); + + auto& client = setup->GetPersQueueClient(); + auto session = client.CreateSimpleBlockingWriteSession(writeSettings); + + ui32 count = 700; + std::string message(2'000, 'x'); + for (ui32 i = 1; i <= count; ++i) { + bool res = session->Write(message); + UNIT_ASSERT(res); + } + bool res = session->Close(TDuration::Seconds(10)); + UNIT_ASSERT(res); + + std::shared_ptr<NYdb::NTopic::IReadSession> ReadSession; + + // Create topic client. + NYdb::NTopic::TTopicClient topicClient(setup->GetDriver()); + + // Create read session. + NYdb::NTopic::TReadSessionSettings readSettings; + readSettings + .ConsumerName(setup->GetTestClient()) + .MaxMemoryUsageBytes(1_MB) + .AppendTopics(setup->GetTestTopic()); + + // + // controlled decompressor + // + auto decompressor = CreateThreadPoolManagedExecutor(1); + readSettings.DecompressionExecutor(decompressor); + + + // + // auxiliary functions for decompressor and handler control + // + auto WaitTasks = [&](auto f, size_t c) { + while (f() < c) { + Sleep(TDuration::MilliSeconds(100)); + }; + }; + auto WaitPlannedTasks = [&](auto e, size_t count) { + WaitTasks([&]() { return e->GetPlannedCount(); }, count); + }; + auto WaitExecutedTasks = [&](auto e, size_t count) { + WaitTasks([&]() { return e->GetExecutedCount(); }, count); + }; + + auto RunTasks = [&](auto e, const std::vector<size_t>& tasks) { + size_t n = tasks.size(); + WaitPlannedTasks(e, n); + size_t completed = e->GetExecutedCount(); + e->StartFuncs(tasks); + WaitExecutedTasks(e, completed + n); + }; + Y_UNUSED(RunTasks); + + auto PlanTasksAndRestart = [&](auto e, const std::vector<size_t>& tasks) { + size_t n = tasks.size(); + WaitPlannedTasks(e, n); + size_t completed = e->GetExecutedCount(); + + setup->KillPqrb(setup->GetTestTopic(), setup->GetLocalCluster()); + Cerr << ">>> TEST: PQRB killed" << Endl; + Sleep(TDuration::MilliSeconds(100)); + + e->StartFuncs(tasks); + WaitExecutedTasks(e, completed + n); + }; + Y_UNUSED(PlanTasksAndRestart); + + + NThreading::TPromise<void> checkedPromise = NThreading::NewPromise<void>(); + TAtomic lastOffset = 0u; + + auto f = checkedPromise.GetFuture(); + readSettings.EventHandlers_.SimpleDataHandlers( + [&] + (NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent& ev) mutable { + AtomicSet(lastOffset, ev.GetMessages().back().GetOffset()); + ev.Commit(); + Cerr << ">>> TEST: last offset = " << lastOffset << Endl; + }); + + Cerr << ">>> TEST: Create session" << Endl; + + ReadSession = topicClient.CreateReadSession(readSettings); + + ui32 i = 0; + while (AtomicGet(lastOffset) + 1 < count) { + Cerr << ">>> TEST: last offset = " << AtomicGet(lastOffset) << Endl; + // TODO (ildar-khisam@): restarts with progress and check sdk budget + // PlanTasksAndRestart(decompressor, {i++}); + RunTasks(decompressor, {i++}); + } + + ReadSession->Close(TDuration::MilliSeconds(10)); + Cerr << ">>> TEST: Session gracefully closed" << Endl; + } } } diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/managed_executor.cpp b/ydb/public/sdk/cpp/client/ydb_topic/ut/managed_executor.cpp new file mode 100644 index 00000000000..d9826e7a08e --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/managed_executor.cpp @@ -0,0 +1,102 @@ +#include <ydb/public/sdk/cpp/client/ydb_topic/ut/managed_executor.h> + +namespace NYdb::NTopic::NTests { + +TManagedExecutor::TManagedExecutor(TExecutorPtr executor) : + Executor{std::move(executor)} +{ +} + +bool TManagedExecutor::IsAsync() const +{ + return Executor->IsAsync(); +} + +void TManagedExecutor::Post(TFunction &&f) +{ + with_lock (Mutex) { + Funcs.push_back(std::move(f)); + ++Planned; + } +} + +void TManagedExecutor::DoStart() +{ + Executor->Start(); +} + +auto TManagedExecutor::MakeTask(TFunction func) -> TFunction +{ + return [this, func = std::move(func)]() { + ++Running; + + func(); + + --Running; + ++Executed; + }; +} + +void TManagedExecutor::RunTask(TFunction&& func) +{ + Y_VERIFY(Planned > 0); + --Planned; + Executor->Post(MakeTask(std::move(func))); +} + +void TManagedExecutor::StartFuncs(const std::vector<size_t>& indicies) +{ + with_lock (Mutex) { + for (auto index : indicies) { + Y_VERIFY(index < Funcs.size()); + Y_VERIFY(Funcs[index]); + + RunTask(std::move(Funcs[index])); + } + } +} + +size_t TManagedExecutor::GetFuncsCount() const +{ + with_lock (Mutex) { + return Funcs.size(); + } +} + +size_t TManagedExecutor::GetPlannedCount() const +{ + return Planned; +} + +size_t TManagedExecutor::GetRunningCount() const +{ + return Running; +} + +size_t TManagedExecutor::GetExecutedCount() const +{ + return Executed; +} + +void TManagedExecutor::RunAllTasks() +{ + with_lock (Mutex) { + for (auto& func : Funcs) { + if (func) { + RunTask(std::move(func)); + } + } + } +} + +TIntrusivePtr<TManagedExecutor> CreateThreadPoolManagedExecutor(size_t threads) +{ + return MakeIntrusive<TManagedExecutor>(NYdb::NTopic::CreateThreadPoolExecutor(threads)); +} + +TIntrusivePtr<TManagedExecutor> CreateSyncManagedExecutor() +{ + return MakeIntrusive<TManagedExecutor>(NYdb::NTopic::CreateSyncExecutor()); +} + +} diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/managed_executor.h b/ydb/public/sdk/cpp/client/ydb_topic/ut/managed_executor.h new file mode 100644 index 00000000000..d32c0ff8817 --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/managed_executor.h @@ -0,0 +1,47 @@ +#pragma once + +#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h> +#include <ydb/public/sdk/cpp/client/ydb_topic/impl/executor.h> +#include <util/system/mutex.h> + +#include <vector> + +namespace NYdb::NTopic::NTests { + +class TManagedExecutor : public IExecutor { +public: + using TExecutorPtr = IExecutor::TPtr; + + explicit TManagedExecutor(TExecutorPtr executor); + + bool IsAsync() const override; + void Post(TFunction&& f) override; + + void StartFuncs(const std::vector<size_t>& indicies); + + size_t GetFuncsCount() const; + + size_t GetPlannedCount() const; + size_t GetRunningCount() const; + size_t GetExecutedCount() const; + + void RunAllTasks(); + +private: + void DoStart() override; + + TFunction MakeTask(TFunction func); + void RunTask(TFunction&& func); + + TExecutorPtr Executor; + TMutex Mutex; + std::vector<TFunction> Funcs; + std::atomic<size_t> Planned = 0; + std::atomic<size_t> Running = 0; + std::atomic<size_t> Executed = 0; +}; + +TIntrusivePtr<TManagedExecutor> CreateThreadPoolManagedExecutor(size_t threads); +TIntrusivePtr<TManagedExecutor> CreateSyncManagedExecutor(); + +} |