diff options
author | ilnaz <ilnaz@ydb.tech> | 2023-07-06 23:47:40 +0300 |
---|---|---|
committer | ilnaz <ilnaz@ydb.tech> | 2023-07-06 23:47:40 +0300 |
commit | 0b32efb38bdb9e806107ef3115bc8177a751d9c5 (patch) | |
tree | 0e5a2fbc9a676f70b2aaebfa950fbbfabe8bf1ca | |
parent | 3edecc646fd526f54d6cca6331130a9aafb52be4 (diff) | |
download | ydb-0b32efb38bdb9e806107ef3115bc8177a751d9c5.tar.gz |
OnReady: request new records KIKIMR-18159
-rw-r--r-- | ydb/core/tx/datashard/change_sender_common_ops.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_change_exchange.cpp | 83 |
2 files changed, 85 insertions, 0 deletions
diff --git a/ydb/core/tx/datashard/change_sender_common_ops.cpp b/ydb/core/tx/datashard/change_sender_common_ops.cpp index e894c6246f4..92b15db6f2d 100644 --- a/ydb/core/tx/datashard/change_sender_common_ops.cpp +++ b/ydb/core/tx/datashard/change_sender_common_ops.cpp @@ -227,6 +227,8 @@ void TBaseChangeSender::OnReady(ui64 partitionId) { if (sender.Prepared) { SendPreparedRecords(partitionId); } + + RequestRecords(); } void TBaseChangeSender::OnGone(ui64 partitionId) { diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp index ec8ba239abd..192fffbfdc7 100644 --- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp +++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp @@ -1,4 +1,5 @@ #include "datashard_ut_common.h" +#include "change_sender_common_ops.h" #include <library/cpp/digest/md5/md5.h> #include <library/cpp/json/json_reader.h> @@ -2566,6 +2567,88 @@ Y_UNIT_TEST_SUITE(Cdc) { waitProgress(2); } + Y_UNIT_TEST(EnqueueRequestProcessSend) { + TPortManager portManager; + TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig()) + .SetUseRealThreads(false) + .SetDomainName("Root") + ); + + auto& runtime = *server->GetRuntime(); + const auto edgeActor = runtime.AllocateEdgeActor(); + + SetupLogging(runtime); + InitRoot(server, edgeActor); + CreateShardedTable(server, edgeActor, "/Root", "Table", TShardedTableOptions() + .Columns({ + {"key", "Uint32", true, false}, + {"value", "Utf8", false, false}, + }) + ); + + bool ready = false; + auto prevObserver = runtime.SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) { + if (ev->GetTypeRewrite() == TEvChangeExchangePrivate::EvReady) { + ready = true; + } + + return TTestActorRuntime::EEventAction::PROCESS; + }); + + WaitTxNotification(server, edgeActor, AsyncAlterAddStream(server, "/Root", "Table", + Updates(NKikimrSchemeOp::ECdcStreamFormatJson))); + + if (!ready) { + TDispatchOptions opts; + opts.FinalEvents.emplace_back([&ready](IEventHandle&) { + return ready; + }); + server->GetRuntime()->DispatchEvents(opts); + } + + runtime.SetObserverFunc(prevObserver); + + THolder<IEventHandle> delayed; + prevObserver = runtime.SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) { + if (ev->GetTypeRewrite() == TEvChangeExchangePrivate::EvReady) { + delayed.Reset(ev.Release()); + return TTestActorRuntime::EEventAction::DROP; + } + + return TTestActorRuntime::EEventAction::PROCESS; + }); + + const auto value = TString(200_KB, 'A'); + // make sender busy + ExecSQL(server, edgeActor, Sprintf(R"( + UPSERT INTO `/Root/Table` (key, value) VALUES + (1, "%s"); + )", value.c_str())); + + if (!delayed) { + TDispatchOptions opts; + opts.FinalEvents.emplace_back([&delayed](IEventHandle&) { + return bool(delayed); + }); + server->GetRuntime()->DispatchEvents(opts); + } + + ExecSQL(server, edgeActor, Sprintf(R"( + UPSERT INTO `/Root/Table` (key, value) VALUES + (2, "%s"), + (3, "%s"); + )", value.c_str(), value.c_str())); + + runtime.SetObserverFunc(prevObserver); + runtime.Send(delayed.Release(), 0, true); + + WaitForContent(server, edgeActor, "/Root/Table/Stream", { + Sprintf(R"({"update":{"value":"%s"},"key":[1]})", value.c_str()), + Sprintf(R"({"update":{"value":"%s"},"key":[2]})", value.c_str()), + Sprintf(R"({"update":{"value":"%s"},"key":[3]})", value.c_str()), + }); + } + Y_UNIT_TEST(AwsRegion) { TPortManager portManager; TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig()) |