diff options
author | ssmike <ssmike@ydb.tech> | 2023-08-27 00:57:02 +0300 |
---|---|---|
committer | ssmike <ssmike@ydb.tech> | 2023-08-27 01:15:59 +0300 |
commit | 53e534d6b6d324487d4958e0789d00ed76438102 (patch) | |
tree | df73823db48c45f2475bed64c2471a961be01574 | |
parent | b90673471b7a01d0b54dc8b5350d7341df5764bc (diff) | |
download | ydb-53e534d6b6d324487d4958e0789d00ed76438102.tar.gz |
Don't return excessive rows after undelivery
-rw-r--r-- | ydb/core/kqp/runtime/kqp_read_actor.cpp | 4 | ||||
-rw-r--r-- | ydb/core/kqp/ut/scan/kqp_split_ut.cpp | 155 |
2 files changed, 144 insertions, 15 deletions
diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp index 09a78b55d8b..5e8d9a16d87 100644 --- a/ydb/core/kqp/runtime/kqp_read_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp @@ -758,7 +758,7 @@ public: } void RetryRead(ui64 id, bool allowInstantRetry = true) { - if (!Reads[id]) { + if (!Reads[id] || Reads[id].Finished) { return; } @@ -787,7 +787,7 @@ public: } void DoRetryRead(ui64 id) { - if (!Reads[id]) { + if (!Reads[id] || Reads[id].Finished) { return; } diff --git a/ydb/core/kqp/ut/scan/kqp_split_ut.cpp b/ydb/core/kqp/ut/scan/kqp_split_ut.cpp index 6b1df6beae9..33f8ab79bb3 100644 --- a/ydb/core/kqp/ut/scan/kqp_split_ut.cpp +++ b/ydb/core/kqp/ut/scan/kqp_split_ut.cpp @@ -14,6 +14,8 @@ #include <ydb/core/tx/tx_proxy/proxy.h> #include <ydb/core/tx/schemeshard/schemeshard.h> +#include <library/cpp/threading/future/async.h> + namespace NKikimr { namespace NKqp { @@ -263,6 +265,47 @@ Y_UNIT_TEST_SUITE(KqpSplit) { return res; } + THolder<NKqp::TEvKqp::TEvQueryRequest> MakeSQLRequest(const TString &sql, bool dml) { + auto request = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(); + if (dml) { + request->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write(); + request->Record.MutableRequest()->MutableTxControl()->set_commit_tx(true); + } + request->Record.SetRequestType("_document_api_request"); + request->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE); + request->Record.MutableRequest()->SetType(dml + ? NKikimrKqp::QUERY_TYPE_SQL_DML + : NKikimrKqp::QUERY_TYPE_SQL_DDL); + request->Record.MutableRequest()->SetQuery(sql); + request->Record.MutableRequest()->SetUsePublicResponseDataFormat(true); + return request; + } + + void SendSQL(Tests::TServer::TPtr server, + TActorId sender, + const TString &sql, + bool dml) + { + auto &runtime = *server->GetRuntime(); + auto request = MakeSQLRequest(sql, dml); + runtime.Send(new IEventHandle(NKqp::MakeKqpProxyID(runtime.GetNodeId()), sender, request.Release())); + } + + void ExecSQL(Tests::TServer::TPtr server, + TActorId sender, + const TString &sql, + bool dml = true, + Ydb::StatusIds::StatusCode code = Ydb::StatusIds::SUCCESS) + { + auto &runtime = *server->GetRuntime(); + TAutoPtr<IEventHandle> handle; + + auto request = MakeSQLRequest(sql, dml); + runtime.Send(new IEventHandle(NKqp::MakeKqpProxyID(runtime.GetNodeId()), sender, request.Release())); + auto ev = runtime.GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(sender); + UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetRef().GetYdbStatus(), code); + } + void SendScanQuery(TTestActorRuntime* runtime, TActorId kqpProxy, TActorId sender, const TString& queryText) { auto ev = std::make_unique<NKqp::TEvKqp::TEvQueryRequest>(); ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE); @@ -343,22 +386,25 @@ Y_UNIT_TEST_SUITE(KqpSplit) { void TTestCase##N<OPT>::Execute_(NUnitTest::TTestContext& ut_context Y_DECLARE_UNUSED) struct TTestSetup { - TTestSetup(TString table = "/Root/KeyValueLargePartition") + TTestSetup(TString table = "/Root/KeyValueLargePartition", Tests::TServer* providedServer = nullptr) : Table(table) { - TKikimrSettings settings; - NKikimrConfig::TAppConfig appConfig; - appConfig.MutableTableServiceConfig()->SetEnableKqpScanQuerySourceRead(true); - appConfig.MutableTableServiceConfig()->SetEnableKqpScanQueryStreamLookup(false); - appConfig.MutableTableServiceConfig()->SetEnablePredicateExtractForScanQueries(true); - settings.SetDomainRoot(KikimrDefaultUtDomainRoot); - settings.SetAppConfig(appConfig); - - Kikimr.ConstructInPlace(settings); - - auto db = Kikimr->GetTableClient(); + InterceptReadActorPipeCache(MakePipePeNodeCacheID(false)); + if (providedServer) { + Server = providedServer; + } else { + TKikimrSettings settings; + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableKqpScanQuerySourceRead(true); + appConfig.MutableTableServiceConfig()->SetEnableKqpScanQueryStreamLookup(false); + appConfig.MutableTableServiceConfig()->SetEnablePredicateExtractForScanQueries(true); + settings.SetDomainRoot(KikimrDefaultUtDomainRoot); + settings.SetAppConfig(appConfig); + + Kikimr.ConstructInPlace(settings); + Server = &Kikimr->GetTestServer(); + } - Server = &Kikimr->GetTestServer(); Runtime = Server->GetRuntime(); KqpProxy = MakeKqpProxyID(Runtime->GetNodeId(0)); @@ -371,6 +417,10 @@ Y_UNIT_TEST_SUITE(KqpSplit) { Sender = Runtime->AllocateEdgeActor(); + if (providedServer) { + InitRoot(Server, Sender); + } + CollectKeysTo(&CollectedKeys, Runtime, Sender); SetSplitMergePartCountLimit(Runtime, -1); @@ -615,6 +665,85 @@ Y_UNIT_TEST_SUITE(KqpSplit) { UNIT_ASSERT_VALUES_EQUAL(Format(Canonize(s.CollectedKeys, Order)), ",101,202,203,701,702,703"); } + Y_UNIT_TEST(UndeliveryOnFinishedRead) { + TPortManager pm; + Tests::TServerSettings serverSettings(pm.GetPort(2134)); + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableKqpScanQuerySourceRead(true); + appConfig.MutableTableServiceConfig()->SetEnableKqpScanQueryStreamLookup(false); + appConfig.MutableTableServiceConfig()->SetEnablePredicateExtractForScanQueries(true); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetAppConfig(appConfig); + + Tests::TServer::TPtr server = new Tests::TServer(serverSettings); + + server->GetRuntime()->SetLogPriority(NKikimrServices::KQP_YQL, NActors::NLog::PRI_DEBUG); + server->GetRuntime()->SetLogPriority(NKikimrServices::KQP_COMPUTE, NActors::NLog::PRI_DEBUG); + TTestSetup s("/Root/Test", server.Get()); + + NThreading::TPromise<bool> captured = NThreading::NewPromise<bool>(); + TVector<THolder<IEventHandle>> evts; + std::atomic<bool> captureNotify = true; + s.Runtime->SetObserverFunc( + [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) -> TTestActorRuntimeBase::EEventAction { + if (!captureNotify.load()) { + return TTestActorRuntime::EEventAction::PROCESS; + } + switch (ev->GetTypeRewrite()) { + case NYql::NDq::IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived::EventType: { + Cerr << "captured newasyncdataarrived" << Endl; + evts.push_back(THolder<IEventHandle>(ev.Release())); + if (!captured.HasValue()) { + captured.SetValue(true); + } + return TTestActorRuntime::EEventAction::DROP; + } + } + return TTestActorRuntime::EEventAction::PROCESS; + }); + + ExecSQL(server, s.Sender, R"( + CREATE TABLE `/Root/Test` ( + Key Uint64, + Value String, + PRIMARY KEY (Key) + ); + )", + false); + + ExecSQL(server, s.Sender, R"( + REPLACE INTO `/Root/Test` (Key, Value) VALUES + (201u, "Value1"), + (202u, "Value2"), + (203u, "Value3"), + (803u, "Value3"); + )", + true); + + auto shards = s.Shards(); + UNIT_ASSERT_EQUAL(shards.size(), 1); + + s.SendScanQuery("SELECT Key FROM `/Root/Test` where Key = 202"); + + s.Runtime->WaitFuture(captured.GetFuture()); + + for (auto& ev : evts) { + auto undelivery = MakeHolder<TEvPipeCache::TEvDeliveryProblem>(shards[0], true); + + s.Runtime->Send(ev->Sender, s.Sender, undelivery.Release()); + } + + captureNotify.store(false); + + for (auto& ev : evts) { + s.Runtime->Send(ev.Release()); + } + + s.AssertSuccess(); + UNIT_ASSERT_VALUES_EQUAL(Format(s.CollectedKeys), ",202"); + } + // TODO: rework test for stream lookups //Y_UNIT_TEST_SORT(AfterResolvePoints, Order) { // TTestSetup s; |