aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorildar-khisam <ikhis@ydb.tech>2023-02-07 09:27:55 +0300
committerildar-khisam <ikhis@ydb.tech>2023-02-07 09:27:55 +0300
commit67026a90f514712e5c1ff62a8b3dd8631e545a88 (patch)
tree2af31998bfc653f6330e616724cd997f5e4b9b22
parent55249694bb14fef3f2773ef1381c4bb61ddfc1c8 (diff)
downloadydb-67026a90f514712e5c1ff62a8b3dd8631e545a88.tar.gz
fix
fix fix budget restore
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h8
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp22
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.darwin.txt1
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux.txt1
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp111
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/ut/managed_executor.cpp102
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/ut/managed_executor.h47
8 files changed, 283 insertions, 10 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h
index c04199d57ea..34727778925 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h
@@ -238,6 +238,10 @@ public:
return DoDecompress;
}
+ i64 GetServerBytesSize() const {
+ return ServerBytesSize;
+ }
+
TMaybe<std::pair<size_t, size_t>> GetReadyThreshold() const {
size_t readyCount = 0;
std::pair<size_t, size_t> ret;
@@ -270,7 +274,7 @@ public:
void PutDecompressionError(std::exception_ptr error, size_t batch, size_t message);
std::exception_ptr GetDecompressionError(size_t batch, size_t message);
- void OnDataDecompressed(i64 sourceSize, i64 estimatedDecompressedSize, i64 decompressedSize, size_t messagesCount, i64 serverBytesSize = 0);
+ void OnDataDecompressed(i64 sourceSize, i64 estimatedDecompressedSize, i64 decompressedSize, size_t messagesCount);
void OnUserRetrievedEvent(i64 decompressedDataSize, size_t messagesCount);
private:
@@ -962,7 +966,7 @@ public:
void Commit(const TPartitionStreamImpl<UseMigrationProtocol>* partitionStream, ui64 startOffset, ui64 endOffset);
void OnCreateNewDecompressionTask();
- void OnDecompressionInfoDestroy(i64 compressedSize, i64 decompressedSize, i64 messagesCount);
+ void OnDecompressionInfoDestroy(i64 compressedSize, i64 decompressedSize, i64 messagesCount, i64 serverBytesSize);
void OnDataDecompressed(i64 sourceSize, i64 estimatedDecompressedSize, i64 decompressedSize, size_t messagesCount, i64 serverBytesSize = 0);
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp
index 22f3088a8b9..58dcde911cf 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp
@@ -20,6 +20,7 @@
#include <util/stream/mem.h>
#include <util/system/env.h>
+#include <utility>
#include <variant>
// Forward delcarations
@@ -152,7 +153,7 @@ void TRawPartitionStreamEventQueue<UseMigrationProtocol>::SignalReadyEvents(TInt
{
auto moveToReadyQueue = [&](TRawPartitionStreamEvent<UseMigrationProtocol> &&event) {
queue.SignalEventImpl(stream, deferred, event.IsDataEvent());
-
+
Ready.push_back(std::move(event));
NotReady.pop_front();
};
@@ -503,7 +504,6 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::ContinueReadingDataImp
}
req.mutable_read_request()->set_bytes_size(ReadSizeBudget);
ReadSizeServerDelta += ReadSizeBudget;
-
ReadSizeBudget = 0;
}
@@ -1340,7 +1340,7 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::OnCreateNewDecompressi
}
template<bool UseMigrationProtocol>
-void TSingleClusterReadSessionImpl<UseMigrationProtocol>::OnDecompressionInfoDestroy(i64 compressedSize, i64 decompressedSize, i64 messagesCount)
+void TSingleClusterReadSessionImpl<UseMigrationProtocol>::OnDecompressionInfoDestroy(i64 compressedSize, i64 decompressedSize, i64 messagesCount, i64 serverBytesSize)
{
*Settings.Counters_->MessagesInflight -= messagesCount;
@@ -1355,6 +1355,10 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::OnDecompressionInfoDes
CompressedDataSize -= compressedSize;
DecompressedDataSize -= decompressedSize;
+ if constexpr (!UseMigrationProtocol) {
+ ReadSizeBudget += serverBytesSize;
+ }
+
ContinueReadingDataImpl();
StartDecompressionTasksImpl(deferred);
}
@@ -1820,7 +1824,7 @@ typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent
TReadSessionEventsQueue<UseMigrationProtocol>::GetDataEventImpl(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> stream,
size_t& maxByteSize,
TUserRetrievedEventsInfoAccumulator<UseMigrationProtocol>& accumulator) // Assumes that we're under lock.
-{
+{
TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TMessage> messages;
TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TCompressedMessage> compressedMessages;
@@ -2074,7 +2078,7 @@ template<bool UseMigrationProtocol>
TDataDecompressionInfo<UseMigrationProtocol>::~TDataDecompressionInfo()
{
if (auto session = Session.lock()) {
- session->OnDecompressionInfoDestroy(CompressedDataSize, DecompressedDataSize, MessagesInflight);
+ session->OnDecompressionInfoDestroy(CompressedDataSize, DecompressedDataSize, MessagesInflight, ServerBytesSize);
}
}
@@ -2305,13 +2309,15 @@ bool TDataDecompressionInfo<UseMigrationProtocol>::HasReadyUnreadData() const {
}
template<bool UseMigrationProtocol>
-void TDataDecompressionInfo<UseMigrationProtocol>::OnDataDecompressed(i64 sourceSize, i64 estimatedDecompressedSize, i64 decompressedSize, size_t messagesCount, i64 serverBytesSize)
+void TDataDecompressionInfo<UseMigrationProtocol>::OnDataDecompressed(i64 sourceSize, i64 estimatedDecompressedSize, i64 decompressedSize, size_t messagesCount)
{
CompressedDataSize -= sourceSize;
DecompressedDataSize += decompressedSize;
if (auto session = Session.lock()) {
- session->OnDataDecompressed(sourceSize, estimatedDecompressedSize, decompressedSize, messagesCount, serverBytesSize);
+ // TODO (ildar-khisam@): distribute total ServerBytesSize in proportion of source size
+ // Use CompressedDataSize, sourceSize, ServerBytesSize
+ session->OnDataDecompressed(sourceSize, estimatedDecompressedSize, decompressedSize, messagesCount, std::exchange(ServerBytesSize, 0));
}
}
@@ -2413,7 +2419,7 @@ void TDataDecompressionInfo<UseMigrationProtocol>::TDecompressionTask::operator(
Y_ASSERT(dataProcessed == SourceDataSize);
std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session = Parent->Session.lock();
- Parent->OnDataDecompressed(SourceDataSize, EstimatedDecompressedSize, DecompressedSize, messagesProcessed, Parent->ServerBytesSize);
+ Parent->OnDataDecompressed(SourceDataSize, EstimatedDecompressedSize, DecompressedSize, messagesProcessed);
Parent->SourceDataNotProcessed -= dataProcessed;
Ready->Ready = true;
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.darwin.txt b/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.darwin.txt
index 42f184e205f..b54dd13486a 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.darwin.txt
+++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.darwin.txt
@@ -41,6 +41,7 @@ target_link_options(ydb-public-sdk-cpp-client-ydb_topic-ut PRIVATE
)
target_sources(ydb-public-sdk-cpp-client-ydb_topic-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/managed_executor.cpp
)
set_property(
TARGET
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux-aarch64.txt b/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux-aarch64.txt
index 31cd3c9c963..4e17f9a1317 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux-aarch64.txt
+++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux-aarch64.txt
@@ -44,6 +44,7 @@ target_link_options(ydb-public-sdk-cpp-client-ydb_topic-ut PRIVATE
)
target_sources(ydb-public-sdk-cpp-client-ydb_topic-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/managed_executor.cpp
)
set_property(
TARGET
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux.txt b/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux.txt
index 1bfcc204736..bb7d888fb27 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux.txt
+++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/CMakeLists.linux.txt
@@ -46,6 +46,7 @@ target_link_options(ydb-public-sdk-cpp-client-ydb_topic-ut PRIVATE
)
target_sources(ydb-public-sdk-cpp-client-ydb_topic-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/ut/managed_executor.cpp
)
set_property(
TARGET
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp b/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp
index fec5f36088c..68c631a58b4 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp
@@ -1,5 +1,7 @@
#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
+#include <ydb/public/sdk/cpp/client/ydb_topic/ut/managed_executor.h>
+
#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h>
#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h>
@@ -92,6 +94,115 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
ReadSession->Close(TDuration::MilliSeconds(10));
AtomicSet(check, 0);
}
+
+
+ Y_UNIT_TEST(ReadWithRestarts) {
+
+ auto setup = std::make_shared<NPersQueue::NTests::TPersQueueYdbSdkTestSetup>(TEST_CASE_NAME);
+
+ NPersQueue::TWriteSessionSettings writeSettings;
+ writeSettings.Path(setup->GetTestTopic()).MessageGroupId("src_id");
+ writeSettings.Codec(NPersQueue::ECodec::RAW);
+ NPersQueue::IExecutor::TPtr executor = new NPersQueue::TSyncExecutor();
+ writeSettings.CompressionExecutor(executor);
+
+ auto& client = setup->GetPersQueueClient();
+ auto session = client.CreateSimpleBlockingWriteSession(writeSettings);
+
+ ui32 count = 700;
+ std::string message(2'000, 'x');
+ for (ui32 i = 1; i <= count; ++i) {
+ bool res = session->Write(message);
+ UNIT_ASSERT(res);
+ }
+ bool res = session->Close(TDuration::Seconds(10));
+ UNIT_ASSERT(res);
+
+ std::shared_ptr<NYdb::NTopic::IReadSession> ReadSession;
+
+ // Create topic client.
+ NYdb::NTopic::TTopicClient topicClient(setup->GetDriver());
+
+ // Create read session.
+ NYdb::NTopic::TReadSessionSettings readSettings;
+ readSettings
+ .ConsumerName(setup->GetTestClient())
+ .MaxMemoryUsageBytes(1_MB)
+ .AppendTopics(setup->GetTestTopic());
+
+ //
+ // controlled decompressor
+ //
+ auto decompressor = CreateThreadPoolManagedExecutor(1);
+ readSettings.DecompressionExecutor(decompressor);
+
+
+ //
+ // auxiliary functions for decompressor and handler control
+ //
+ auto WaitTasks = [&](auto f, size_t c) {
+ while (f() < c) {
+ Sleep(TDuration::MilliSeconds(100));
+ };
+ };
+ auto WaitPlannedTasks = [&](auto e, size_t count) {
+ WaitTasks([&]() { return e->GetPlannedCount(); }, count);
+ };
+ auto WaitExecutedTasks = [&](auto e, size_t count) {
+ WaitTasks([&]() { return e->GetExecutedCount(); }, count);
+ };
+
+ auto RunTasks = [&](auto e, const std::vector<size_t>& tasks) {
+ size_t n = tasks.size();
+ WaitPlannedTasks(e, n);
+ size_t completed = e->GetExecutedCount();
+ e->StartFuncs(tasks);
+ WaitExecutedTasks(e, completed + n);
+ };
+ Y_UNUSED(RunTasks);
+
+ auto PlanTasksAndRestart = [&](auto e, const std::vector<size_t>& tasks) {
+ size_t n = tasks.size();
+ WaitPlannedTasks(e, n);
+ size_t completed = e->GetExecutedCount();
+
+ setup->KillPqrb(setup->GetTestTopic(), setup->GetLocalCluster());
+ Cerr << ">>> TEST: PQRB killed" << Endl;
+ Sleep(TDuration::MilliSeconds(100));
+
+ e->StartFuncs(tasks);
+ WaitExecutedTasks(e, completed + n);
+ };
+ Y_UNUSED(PlanTasksAndRestart);
+
+
+ NThreading::TPromise<void> checkedPromise = NThreading::NewPromise<void>();
+ TAtomic lastOffset = 0u;
+
+ auto f = checkedPromise.GetFuture();
+ readSettings.EventHandlers_.SimpleDataHandlers(
+ [&]
+ (NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent& ev) mutable {
+ AtomicSet(lastOffset, ev.GetMessages().back().GetOffset());
+ ev.Commit();
+ Cerr << ">>> TEST: last offset = " << lastOffset << Endl;
+ });
+
+ Cerr << ">>> TEST: Create session" << Endl;
+
+ ReadSession = topicClient.CreateReadSession(readSettings);
+
+ ui32 i = 0;
+ while (AtomicGet(lastOffset) + 1 < count) {
+ Cerr << ">>> TEST: last offset = " << AtomicGet(lastOffset) << Endl;
+ // TODO (ildar-khisam@): restarts with progress and check sdk budget
+ // PlanTasksAndRestart(decompressor, {i++});
+ RunTasks(decompressor, {i++});
+ }
+
+ ReadSession->Close(TDuration::MilliSeconds(10));
+ Cerr << ">>> TEST: Session gracefully closed" << Endl;
+ }
}
}
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/managed_executor.cpp b/ydb/public/sdk/cpp/client/ydb_topic/ut/managed_executor.cpp
new file mode 100644
index 00000000000..d9826e7a08e
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/managed_executor.cpp
@@ -0,0 +1,102 @@
+#include <ydb/public/sdk/cpp/client/ydb_topic/ut/managed_executor.h>
+
+namespace NYdb::NTopic::NTests {
+
+TManagedExecutor::TManagedExecutor(TExecutorPtr executor) :
+ Executor{std::move(executor)}
+{
+}
+
+bool TManagedExecutor::IsAsync() const
+{
+ return Executor->IsAsync();
+}
+
+void TManagedExecutor::Post(TFunction &&f)
+{
+ with_lock (Mutex) {
+ Funcs.push_back(std::move(f));
+ ++Planned;
+ }
+}
+
+void TManagedExecutor::DoStart()
+{
+ Executor->Start();
+}
+
+auto TManagedExecutor::MakeTask(TFunction func) -> TFunction
+{
+ return [this, func = std::move(func)]() {
+ ++Running;
+
+ func();
+
+ --Running;
+ ++Executed;
+ };
+}
+
+void TManagedExecutor::RunTask(TFunction&& func)
+{
+ Y_VERIFY(Planned > 0);
+ --Planned;
+ Executor->Post(MakeTask(std::move(func)));
+}
+
+void TManagedExecutor::StartFuncs(const std::vector<size_t>& indicies)
+{
+ with_lock (Mutex) {
+ for (auto index : indicies) {
+ Y_VERIFY(index < Funcs.size());
+ Y_VERIFY(Funcs[index]);
+
+ RunTask(std::move(Funcs[index]));
+ }
+ }
+}
+
+size_t TManagedExecutor::GetFuncsCount() const
+{
+ with_lock (Mutex) {
+ return Funcs.size();
+ }
+}
+
+size_t TManagedExecutor::GetPlannedCount() const
+{
+ return Planned;
+}
+
+size_t TManagedExecutor::GetRunningCount() const
+{
+ return Running;
+}
+
+size_t TManagedExecutor::GetExecutedCount() const
+{
+ return Executed;
+}
+
+void TManagedExecutor::RunAllTasks()
+{
+ with_lock (Mutex) {
+ for (auto& func : Funcs) {
+ if (func) {
+ RunTask(std::move(func));
+ }
+ }
+ }
+}
+
+TIntrusivePtr<TManagedExecutor> CreateThreadPoolManagedExecutor(size_t threads)
+{
+ return MakeIntrusive<TManagedExecutor>(NYdb::NTopic::CreateThreadPoolExecutor(threads));
+}
+
+TIntrusivePtr<TManagedExecutor> CreateSyncManagedExecutor()
+{
+ return MakeIntrusive<TManagedExecutor>(NYdb::NTopic::CreateSyncExecutor());
+}
+
+}
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/managed_executor.h b/ydb/public/sdk/cpp/client/ydb_topic/ut/managed_executor.h
new file mode 100644
index 00000000000..d32c0ff8817
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/managed_executor.h
@@ -0,0 +1,47 @@
+#pragma once
+
+#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
+#include <ydb/public/sdk/cpp/client/ydb_topic/impl/executor.h>
+#include <util/system/mutex.h>
+
+#include <vector>
+
+namespace NYdb::NTopic::NTests {
+
+class TManagedExecutor : public IExecutor {
+public:
+ using TExecutorPtr = IExecutor::TPtr;
+
+ explicit TManagedExecutor(TExecutorPtr executor);
+
+ bool IsAsync() const override;
+ void Post(TFunction&& f) override;
+
+ void StartFuncs(const std::vector<size_t>& indicies);
+
+ size_t GetFuncsCount() const;
+
+ size_t GetPlannedCount() const;
+ size_t GetRunningCount() const;
+ size_t GetExecutedCount() const;
+
+ void RunAllTasks();
+
+private:
+ void DoStart() override;
+
+ TFunction MakeTask(TFunction func);
+ void RunTask(TFunction&& func);
+
+ TExecutorPtr Executor;
+ TMutex Mutex;
+ std::vector<TFunction> Funcs;
+ std::atomic<size_t> Planned = 0;
+ std::atomic<size_t> Running = 0;
+ std::atomic<size_t> Executed = 0;
+};
+
+TIntrusivePtr<TManagedExecutor> CreateThreadPoolManagedExecutor(size_t threads);
+TIntrusivePtr<TManagedExecutor> CreateSyncManagedExecutor();
+
+}