aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorssmike <ssmike@ydb.tech>2023-08-27 00:57:02 +0300
committerssmike <ssmike@ydb.tech>2023-08-27 01:15:59 +0300
commit53e534d6b6d324487d4958e0789d00ed76438102 (patch)
treedf73823db48c45f2475bed64c2471a961be01574
parentb90673471b7a01d0b54dc8b5350d7341df5764bc (diff)
downloadydb-53e534d6b6d324487d4958e0789d00ed76438102.tar.gz
Don't return excessive rows after undelivery
-rw-r--r--ydb/core/kqp/runtime/kqp_read_actor.cpp4
-rw-r--r--ydb/core/kqp/ut/scan/kqp_split_ut.cpp155
2 files changed, 144 insertions, 15 deletions
diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp
index 09a78b55d8b..5e8d9a16d87 100644
--- a/ydb/core/kqp/runtime/kqp_read_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp
@@ -758,7 +758,7 @@ public:
}
void RetryRead(ui64 id, bool allowInstantRetry = true) {
- if (!Reads[id]) {
+ if (!Reads[id] || Reads[id].Finished) {
return;
}
@@ -787,7 +787,7 @@ public:
}
void DoRetryRead(ui64 id) {
- if (!Reads[id]) {
+ if (!Reads[id] || Reads[id].Finished) {
return;
}
diff --git a/ydb/core/kqp/ut/scan/kqp_split_ut.cpp b/ydb/core/kqp/ut/scan/kqp_split_ut.cpp
index 6b1df6beae9..33f8ab79bb3 100644
--- a/ydb/core/kqp/ut/scan/kqp_split_ut.cpp
+++ b/ydb/core/kqp/ut/scan/kqp_split_ut.cpp
@@ -14,6 +14,8 @@
#include <ydb/core/tx/tx_proxy/proxy.h>
#include <ydb/core/tx/schemeshard/schemeshard.h>
+#include <library/cpp/threading/future/async.h>
+
namespace NKikimr {
namespace NKqp {
@@ -263,6 +265,47 @@ Y_UNIT_TEST_SUITE(KqpSplit) {
return res;
}
+ THolder<NKqp::TEvKqp::TEvQueryRequest> MakeSQLRequest(const TString &sql, bool dml) {
+ auto request = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>();
+ if (dml) {
+ request->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write();
+ request->Record.MutableRequest()->MutableTxControl()->set_commit_tx(true);
+ }
+ request->Record.SetRequestType("_document_api_request");
+ request->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
+ request->Record.MutableRequest()->SetType(dml
+ ? NKikimrKqp::QUERY_TYPE_SQL_DML
+ : NKikimrKqp::QUERY_TYPE_SQL_DDL);
+ request->Record.MutableRequest()->SetQuery(sql);
+ request->Record.MutableRequest()->SetUsePublicResponseDataFormat(true);
+ return request;
+ }
+
+ void SendSQL(Tests::TServer::TPtr server,
+ TActorId sender,
+ const TString &sql,
+ bool dml)
+ {
+ auto &runtime = *server->GetRuntime();
+ auto request = MakeSQLRequest(sql, dml);
+ runtime.Send(new IEventHandle(NKqp::MakeKqpProxyID(runtime.GetNodeId()), sender, request.Release()));
+ }
+
+ void ExecSQL(Tests::TServer::TPtr server,
+ TActorId sender,
+ const TString &sql,
+ bool dml = true,
+ Ydb::StatusIds::StatusCode code = Ydb::StatusIds::SUCCESS)
+ {
+ auto &runtime = *server->GetRuntime();
+ TAutoPtr<IEventHandle> handle;
+
+ auto request = MakeSQLRequest(sql, dml);
+ runtime.Send(new IEventHandle(NKqp::MakeKqpProxyID(runtime.GetNodeId()), sender, request.Release()));
+ auto ev = runtime.GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(sender);
+ UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetRef().GetYdbStatus(), code);
+ }
+
void SendScanQuery(TTestActorRuntime* runtime, TActorId kqpProxy, TActorId sender, const TString& queryText) {
auto ev = std::make_unique<NKqp::TEvKqp::TEvQueryRequest>();
ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
@@ -343,22 +386,25 @@ Y_UNIT_TEST_SUITE(KqpSplit) {
void TTestCase##N<OPT>::Execute_(NUnitTest::TTestContext& ut_context Y_DECLARE_UNUSED)
struct TTestSetup {
- TTestSetup(TString table = "/Root/KeyValueLargePartition")
+ TTestSetup(TString table = "/Root/KeyValueLargePartition", Tests::TServer* providedServer = nullptr)
: Table(table)
{
- TKikimrSettings settings;
- NKikimrConfig::TAppConfig appConfig;
- appConfig.MutableTableServiceConfig()->SetEnableKqpScanQuerySourceRead(true);
- appConfig.MutableTableServiceConfig()->SetEnableKqpScanQueryStreamLookup(false);
- appConfig.MutableTableServiceConfig()->SetEnablePredicateExtractForScanQueries(true);
- settings.SetDomainRoot(KikimrDefaultUtDomainRoot);
- settings.SetAppConfig(appConfig);
-
- Kikimr.ConstructInPlace(settings);
-
- auto db = Kikimr->GetTableClient();
+ InterceptReadActorPipeCache(MakePipePeNodeCacheID(false));
+ if (providedServer) {
+ Server = providedServer;
+ } else {
+ TKikimrSettings settings;
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableKqpScanQuerySourceRead(true);
+ appConfig.MutableTableServiceConfig()->SetEnableKqpScanQueryStreamLookup(false);
+ appConfig.MutableTableServiceConfig()->SetEnablePredicateExtractForScanQueries(true);
+ settings.SetDomainRoot(KikimrDefaultUtDomainRoot);
+ settings.SetAppConfig(appConfig);
+
+ Kikimr.ConstructInPlace(settings);
+ Server = &Kikimr->GetTestServer();
+ }
- Server = &Kikimr->GetTestServer();
Runtime = Server->GetRuntime();
KqpProxy = MakeKqpProxyID(Runtime->GetNodeId(0));
@@ -371,6 +417,10 @@ Y_UNIT_TEST_SUITE(KqpSplit) {
Sender = Runtime->AllocateEdgeActor();
+ if (providedServer) {
+ InitRoot(Server, Sender);
+ }
+
CollectKeysTo(&CollectedKeys, Runtime, Sender);
SetSplitMergePartCountLimit(Runtime, -1);
@@ -615,6 +665,85 @@ Y_UNIT_TEST_SUITE(KqpSplit) {
UNIT_ASSERT_VALUES_EQUAL(Format(Canonize(s.CollectedKeys, Order)), ",101,202,203,701,702,703");
}
+ Y_UNIT_TEST(UndeliveryOnFinishedRead) {
+ TPortManager pm;
+ Tests::TServerSettings serverSettings(pm.GetPort(2134));
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableKqpScanQuerySourceRead(true);
+ appConfig.MutableTableServiceConfig()->SetEnableKqpScanQueryStreamLookup(false);
+ appConfig.MutableTableServiceConfig()->SetEnablePredicateExtractForScanQueries(true);
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetAppConfig(appConfig);
+
+ Tests::TServer::TPtr server = new Tests::TServer(serverSettings);
+
+ server->GetRuntime()->SetLogPriority(NKikimrServices::KQP_YQL, NActors::NLog::PRI_DEBUG);
+ server->GetRuntime()->SetLogPriority(NKikimrServices::KQP_COMPUTE, NActors::NLog::PRI_DEBUG);
+ TTestSetup s("/Root/Test", server.Get());
+
+ NThreading::TPromise<bool> captured = NThreading::NewPromise<bool>();
+ TVector<THolder<IEventHandle>> evts;
+ std::atomic<bool> captureNotify = true;
+ s.Runtime->SetObserverFunc(
+ [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) -> TTestActorRuntimeBase::EEventAction {
+ if (!captureNotify.load()) {
+ return TTestActorRuntime::EEventAction::PROCESS;
+ }
+ switch (ev->GetTypeRewrite()) {
+ case NYql::NDq::IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived::EventType: {
+ Cerr << "captured newasyncdataarrived" << Endl;
+ evts.push_back(THolder<IEventHandle>(ev.Release()));
+ if (!captured.HasValue()) {
+ captured.SetValue(true);
+ }
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+ }
+ return TTestActorRuntime::EEventAction::PROCESS;
+ });
+
+ ExecSQL(server, s.Sender, R"(
+ CREATE TABLE `/Root/Test` (
+ Key Uint64,
+ Value String,
+ PRIMARY KEY (Key)
+ );
+ )",
+ false);
+
+ ExecSQL(server, s.Sender, R"(
+ REPLACE INTO `/Root/Test` (Key, Value) VALUES
+ (201u, "Value1"),
+ (202u, "Value2"),
+ (203u, "Value3"),
+ (803u, "Value3");
+ )",
+ true);
+
+ auto shards = s.Shards();
+ UNIT_ASSERT_EQUAL(shards.size(), 1);
+
+ s.SendScanQuery("SELECT Key FROM `/Root/Test` where Key = 202");
+
+ s.Runtime->WaitFuture(captured.GetFuture());
+
+ for (auto& ev : evts) {
+ auto undelivery = MakeHolder<TEvPipeCache::TEvDeliveryProblem>(shards[0], true);
+
+ s.Runtime->Send(ev->Sender, s.Sender, undelivery.Release());
+ }
+
+ captureNotify.store(false);
+
+ for (auto& ev : evts) {
+ s.Runtime->Send(ev.Release());
+ }
+
+ s.AssertSuccess();
+ UNIT_ASSERT_VALUES_EQUAL(Format(s.CollectedKeys), ",202");
+ }
+
// TODO: rework test for stream lookups
//Y_UNIT_TEST_SORT(AfterResolvePoints, Order) {
// TTestSetup s;