aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorazevaykin <azevaykin@ydb.tech>2023-12-07 17:18:49 +0300
committerazevaykin <azevaykin@ydb.tech>2023-12-07 17:57:55 +0300
commit96f8591f14a18413954744a99956939fc47a6304 (patch)
treed1c0883f9eba5818b9d1ad69dd16ce2aa3abbb44
parent7147d76c9c977a794d379c375c1ac94ca4880be0 (diff)
downloadydb-96f8591f14a18413954744a99956939fc47a6304.tar.gz
AddObserver in ut_upload_rows
AddObserver in ut_upload_rows Pull Request resolved: https://github.com/ydb-platform/ydb/pull/484
-rw-r--r--ydb/core/tx/datashard/datashard_ut_upload_rows.cpp47
1 files changed, 22 insertions, 25 deletions
diff --git a/ydb/core/tx/datashard/datashard_ut_upload_rows.cpp b/ydb/core/tx/datashard/datashard_ut_upload_rows.cpp
index ea2a452e5b..9b71c4c781 100644
--- a/ydb/core/tx/datashard/datashard_ut_upload_rows.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_upload_rows.cpp
@@ -177,22 +177,16 @@ Y_UNIT_TEST_SUITE(TTxDataShardUploadRows) {
// Capture all upload rows requests
TVector<THolder<IEventHandle>> uploadRequests;
- auto observer = [&](TAutoPtr<IEventHandle>& ev) {
- switch (ev->GetTypeRewrite()) {
- case TEvDataShard::TEvUploadRowsRequest::EventType: {
- Cerr << "... captured TEvUploadRowsRequest" << Endl;
- uploadRequests.emplace_back(ev.Release());
- return TTestActorRuntime::EEventAction::DROP;
- }
- }
- return TTestActorRuntime::EEventAction::PROCESS;
- };
- auto prevObserver = runtime.SetObserverFunc(observer);
+
+ auto observerHolder = runtime.AddObserver<TEvDataShard::TEvUploadRowsRequest>([&uploadRequests](auto& ev) {
+ Cerr << "... captured TEvUploadRowsRequest" << Endl;
+ uploadRequests.emplace_back(ev.Release());
+ });
DoStartUploadTestRows(server, sender, "/Root/table-1", Ydb::Type::UINT32);
waitFor([&]{ return uploadRequests.size() >= 3; }, "TEvUploadRowsRequest");
- runtime.SetObserverFunc(prevObserver);
+ observerHolder.Remove();
ui64 dropTxId = AsyncAlterDropColumn(server, "/Root", "table-1", "value");
WaitTxNotification(server, dropTxId);
@@ -772,19 +766,21 @@ Y_UNIT_TEST_SUITE(TTxDataShardUploadRows) {
TVector<ui32> observedUploadStatus;
TVector<THolder<IEventHandle>> blockedEnqueueRecords;
+
+ auto observerRequestHandler = runtime.AddObserver<TEvDataShard::TEvUploadRowsRequest>([&overloadSubscribe](auto& ev) {
+ if (!overloadSubscribe) {
+ ev->Get()->Record.ClearOverloadSubscribe();
+ }
+ });
+
+ auto observerResponseHandler = runtime.AddObserver<TEvDataShard::TEvUploadRowsResponse>([&observedUploadStatus](auto& ev) {
+ observedUploadStatus.push_back(ev->Get()->Record.GetStatus());
+ });
+
auto prevObserverFunc = runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) {
- switch (ev->GetTypeRewrite()) {
- case NDataShard::TEvChangeExchange::EvEnqueueRecords:
- blockedEnqueueRecords.emplace_back(ev.Release());
- return TTestActorRuntime::EEventAction::DROP;
- case TEvDataShard::TEvUploadRowsRequest::EventType:
- if (!overloadSubscribe) {
- ev->Get<TEvDataShard::TEvUploadRowsRequest>()->Record.ClearOverloadSubscribe();
- }
- break;
- case TEvDataShard::TEvUploadRowsResponse::EventType:
- observedUploadStatus.push_back(ev->Get<TEvDataShard::TEvUploadRowsResponse>()->Record.GetStatus());
- break;
+ if (ev->GetTypeRewrite() == NDataShard::TEvChangeExchange::EvEnqueueRecords) {
+ blockedEnqueueRecords.emplace_back(ev.Release());
+ return TTestActorRuntime::EEventAction::DROP;
}
return TTestActorRuntime::EEventAction::PROCESS;
@@ -821,6 +817,8 @@ Y_UNIT_TEST_SUITE(TTxDataShardUploadRows) {
observedUploadStatus.clear();
UNIT_ASSERT(responses.empty());
+ observerRequestHandler.Remove();
+ observerResponseHandler.Remove();
runtime.SetObserverFunc(prevObserverFunc);
for (auto& ev : blockedEnqueueRecords) {
runtime.Send(ev.Release(), 0, true);
@@ -851,7 +849,6 @@ Y_UNIT_TEST_SUITE(TTxDataShardUploadRows) {
Y_UNIT_TEST(ShouldRejectOnChangeQueueOverflowAndRetry) {
DoShouldRejectOnChangeQueueOverflow(true);
}
-
}
} // namespace NKikimr