aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@ydb.tech>2023-07-06 23:47:40 +0300
committerilnaz <ilnaz@ydb.tech>2023-07-06 23:47:40 +0300
commit0b32efb38bdb9e806107ef3115bc8177a751d9c5 (patch)
tree0e5a2fbc9a676f70b2aaebfa950fbbfabe8bf1ca
parent3edecc646fd526f54d6cca6331130a9aafb52be4 (diff)
downloadydb-0b32efb38bdb9e806107ef3115bc8177a751d9c5.tar.gz
OnReady: request new records KIKIMR-18159
-rw-r--r--ydb/core/tx/datashard/change_sender_common_ops.cpp2
-rw-r--r--ydb/core/tx/datashard/datashard_ut_change_exchange.cpp83
2 files changed, 85 insertions, 0 deletions
diff --git a/ydb/core/tx/datashard/change_sender_common_ops.cpp b/ydb/core/tx/datashard/change_sender_common_ops.cpp
index e894c6246f4..92b15db6f2d 100644
--- a/ydb/core/tx/datashard/change_sender_common_ops.cpp
+++ b/ydb/core/tx/datashard/change_sender_common_ops.cpp
@@ -227,6 +227,8 @@ void TBaseChangeSender::OnReady(ui64 partitionId) {
if (sender.Prepared) {
SendPreparedRecords(partitionId);
}
+
+ RequestRecords();
}
void TBaseChangeSender::OnGone(ui64 partitionId) {
diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
index ec8ba239abd..192fffbfdc7 100644
--- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
@@ -1,4 +1,5 @@
#include "datashard_ut_common.h"
+#include "change_sender_common_ops.h"
#include <library/cpp/digest/md5/md5.h>
#include <library/cpp/json/json_reader.h>
@@ -2566,6 +2567,88 @@ Y_UNIT_TEST_SUITE(Cdc) {
waitProgress(2);
}
+ Y_UNIT_TEST(EnqueueRequestProcessSend) {
+ TPortManager portManager;
+ TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())
+ .SetUseRealThreads(false)
+ .SetDomainName("Root")
+ );
+
+ auto& runtime = *server->GetRuntime();
+ const auto edgeActor = runtime.AllocateEdgeActor();
+
+ SetupLogging(runtime);
+ InitRoot(server, edgeActor);
+ CreateShardedTable(server, edgeActor, "/Root", "Table", TShardedTableOptions()
+ .Columns({
+ {"key", "Uint32", true, false},
+ {"value", "Utf8", false, false},
+ })
+ );
+
+ bool ready = false;
+ auto prevObserver = runtime.SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) {
+ if (ev->GetTypeRewrite() == TEvChangeExchangePrivate::EvReady) {
+ ready = true;
+ }
+
+ return TTestActorRuntime::EEventAction::PROCESS;
+ });
+
+ WaitTxNotification(server, edgeActor, AsyncAlterAddStream(server, "/Root", "Table",
+ Updates(NKikimrSchemeOp::ECdcStreamFormatJson)));
+
+ if (!ready) {
+ TDispatchOptions opts;
+ opts.FinalEvents.emplace_back([&ready](IEventHandle&) {
+ return ready;
+ });
+ server->GetRuntime()->DispatchEvents(opts);
+ }
+
+ runtime.SetObserverFunc(prevObserver);
+
+ THolder<IEventHandle> delayed;
+ prevObserver = runtime.SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) {
+ if (ev->GetTypeRewrite() == TEvChangeExchangePrivate::EvReady) {
+ delayed.Reset(ev.Release());
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+
+ return TTestActorRuntime::EEventAction::PROCESS;
+ });
+
+ const auto value = TString(200_KB, 'A');
+ // make sender busy
+ ExecSQL(server, edgeActor, Sprintf(R"(
+ UPSERT INTO `/Root/Table` (key, value) VALUES
+ (1, "%s");
+ )", value.c_str()));
+
+ if (!delayed) {
+ TDispatchOptions opts;
+ opts.FinalEvents.emplace_back([&delayed](IEventHandle&) {
+ return bool(delayed);
+ });
+ server->GetRuntime()->DispatchEvents(opts);
+ }
+
+ ExecSQL(server, edgeActor, Sprintf(R"(
+ UPSERT INTO `/Root/Table` (key, value) VALUES
+ (2, "%s"),
+ (3, "%s");
+ )", value.c_str(), value.c_str()));
+
+ runtime.SetObserverFunc(prevObserver);
+ runtime.Send(delayed.Release(), 0, true);
+
+ WaitForContent(server, edgeActor, "/Root/Table/Stream", {
+ Sprintf(R"({"update":{"value":"%s"},"key":[1]})", value.c_str()),
+ Sprintf(R"({"update":{"value":"%s"},"key":[2]})", value.c_str()),
+ Sprintf(R"({"update":{"value":"%s"},"key":[3]})", value.c_str()),
+ });
+ }
+
Y_UNIT_TEST(AwsRegion) {
TPortManager portManager;
TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())