diff options
author | azevaykin <azevaykin@ydb.tech> | 2023-12-07 17:18:49 +0300 |
---|---|---|
committer | azevaykin <azevaykin@ydb.tech> | 2023-12-07 17:57:55 +0300 |
commit | 96f8591f14a18413954744a99956939fc47a6304 (patch) | |
tree | d1c0883f9eba5818b9d1ad69dd16ce2aa3abbb44 | |
parent | 7147d76c9c977a794d379c375c1ac94ca4880be0 (diff) | |
download | ydb-96f8591f14a18413954744a99956939fc47a6304.tar.gz |
AddObserver in ut_upload_rows
AddObserver in ut_upload_rows
Pull Request resolved: https://github.com/ydb-platform/ydb/pull/484
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_upload_rows.cpp | 47 |
1 files changed, 22 insertions, 25 deletions
diff --git a/ydb/core/tx/datashard/datashard_ut_upload_rows.cpp b/ydb/core/tx/datashard/datashard_ut_upload_rows.cpp index ea2a452e5b..9b71c4c781 100644 --- a/ydb/core/tx/datashard/datashard_ut_upload_rows.cpp +++ b/ydb/core/tx/datashard/datashard_ut_upload_rows.cpp @@ -177,22 +177,16 @@ Y_UNIT_TEST_SUITE(TTxDataShardUploadRows) { // Capture all upload rows requests TVector<THolder<IEventHandle>> uploadRequests; - auto observer = [&](TAutoPtr<IEventHandle>& ev) { - switch (ev->GetTypeRewrite()) { - case TEvDataShard::TEvUploadRowsRequest::EventType: { - Cerr << "... captured TEvUploadRowsRequest" << Endl; - uploadRequests.emplace_back(ev.Release()); - return TTestActorRuntime::EEventAction::DROP; - } - } - return TTestActorRuntime::EEventAction::PROCESS; - }; - auto prevObserver = runtime.SetObserverFunc(observer); + + auto observerHolder = runtime.AddObserver<TEvDataShard::TEvUploadRowsRequest>([&uploadRequests](auto& ev) { + Cerr << "... captured TEvUploadRowsRequest" << Endl; + uploadRequests.emplace_back(ev.Release()); + }); DoStartUploadTestRows(server, sender, "/Root/table-1", Ydb::Type::UINT32); waitFor([&]{ return uploadRequests.size() >= 3; }, "TEvUploadRowsRequest"); - runtime.SetObserverFunc(prevObserver); + observerHolder.Remove(); ui64 dropTxId = AsyncAlterDropColumn(server, "/Root", "table-1", "value"); WaitTxNotification(server, dropTxId); @@ -772,19 +766,21 @@ Y_UNIT_TEST_SUITE(TTxDataShardUploadRows) { TVector<ui32> observedUploadStatus; TVector<THolder<IEventHandle>> blockedEnqueueRecords; + + auto observerRequestHandler = runtime.AddObserver<TEvDataShard::TEvUploadRowsRequest>([&overloadSubscribe](auto& ev) { + if (!overloadSubscribe) { + ev->Get()->Record.ClearOverloadSubscribe(); + } + }); + + auto observerResponseHandler = runtime.AddObserver<TEvDataShard::TEvUploadRowsResponse>([&observedUploadStatus](auto& ev) { + observedUploadStatus.push_back(ev->Get()->Record.GetStatus()); + }); + auto prevObserverFunc = runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) { - switch (ev->GetTypeRewrite()) { - case NDataShard::TEvChangeExchange::EvEnqueueRecords: - blockedEnqueueRecords.emplace_back(ev.Release()); - return TTestActorRuntime::EEventAction::DROP; - case TEvDataShard::TEvUploadRowsRequest::EventType: - if (!overloadSubscribe) { - ev->Get<TEvDataShard::TEvUploadRowsRequest>()->Record.ClearOverloadSubscribe(); - } - break; - case TEvDataShard::TEvUploadRowsResponse::EventType: - observedUploadStatus.push_back(ev->Get<TEvDataShard::TEvUploadRowsResponse>()->Record.GetStatus()); - break; + if (ev->GetTypeRewrite() == NDataShard::TEvChangeExchange::EvEnqueueRecords) { + blockedEnqueueRecords.emplace_back(ev.Release()); + return TTestActorRuntime::EEventAction::DROP; } return TTestActorRuntime::EEventAction::PROCESS; @@ -821,6 +817,8 @@ Y_UNIT_TEST_SUITE(TTxDataShardUploadRows) { observedUploadStatus.clear(); UNIT_ASSERT(responses.empty()); + observerRequestHandler.Remove(); + observerResponseHandler.Remove(); runtime.SetObserverFunc(prevObserverFunc); for (auto& ev : blockedEnqueueRecords) { runtime.Send(ev.Release(), 0, true); @@ -851,7 +849,6 @@ Y_UNIT_TEST_SUITE(TTxDataShardUploadRows) { Y_UNIT_TEST(ShouldRejectOnChangeQueueOverflowAndRetry) { DoShouldRejectOnChangeQueueOverflow(true); } - } } // namespace NKikimr |