aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIuliia Sidorina <yulia@ydb.tech>2024-01-09 16:39:06 +0100
committerGitHub <noreply@github.com>2024-01-09 16:39:06 +0100
commit5db97d85ba01b7c8038b37c399980097097fd06e (patch)
tree15f65d61b05977653afa32bde511f02beb12e0d2
parent8c1f7ef7a4091b7bb300ee3030ec339b6cf77a6e (diff)
downloadydb-5db97d85ba01b7c8038b37c399980097097fd06e.tar.gz
feature(kqp): enable stream lookup for data query (#611)
-rw-r--r--ydb/core/kqp/executer_actor/kqp_data_executer.cpp13
-rw-r--r--ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp2
-rw-r--r--ydb/core/kqp/ut/cost/kqp_cost_ut.cpp5
-rw-r--r--ydb/core/kqp/ut/effects/kqp_inplace_update_ut.cpp1
-rw-r--r--ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp6
-rw-r--r--ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp1
-rw-r--r--ydb/core/kqp/ut/opt/kqp_extract_predicate_unpack_ut.cpp13
-rw-r--r--ydb/core/kqp/ut/opt/kqp_ne_ut.cpp9
-rw-r--r--ydb/core/kqp/ut/perf/kqp_query_perf_ut.cpp1
-rw-r--r--ydb/core/kqp/ut/query/kqp_explain_ut.cpp21
-rw-r--r--ydb/core/kqp/ut/query/kqp_query_ut.cpp2
-rw-r--r--ydb/core/kqp/ut/query/kqp_stats_ut.cpp1
-rw-r--r--ydb/core/kqp/ut/tx/kqp_mvcc_ut.cpp2
-rw-r--r--ydb/core/protos/table_service_config.proto2
-rw-r--r--ydb/core/tx/datashard/datashard_ut_order.cpp5
-rw-r--r--ydb/core/tx/datashard/datashard_ut_trace.cpp94
-rw-r--r--ydb/services/ydb/ydb_table_split_ut.cpp1
17 files changed, 128 insertions, 51 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
index fba12d565b..0b9674e3df 100644
--- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
@@ -1650,11 +1650,18 @@ private:
}
}
- bool HassDmlOperationOnOlap(NKqpProto::TKqpPhyTx_EType queryType, const NKqpProto::TKqpPhyStage& stage) {
+ bool HasDmlOperationOnOlap(NKqpProto::TKqpPhyTx_EType queryType, const NKqpProto::TKqpPhyStage& stage) {
if (queryType == NKqpProto::TKqpPhyTx::TYPE_DATA) {
return true;
}
- for (const auto &tableOp : stage.GetTableOps()) {
+
+ for (const auto& input : stage.GetInputs()) {
+ if (input.GetTypeCase() == NKqpProto::TKqpPhyConnection::kStreamLookup) {
+ return true;
+ }
+ }
+
+ for (const auto& tableOp : stage.GetTableOps()) {
if (tableOp.GetTypeCase() != NKqpProto::TKqpPhyTableOperation::kReadOlapRange) {
return true;
}
@@ -1691,7 +1698,7 @@ private:
}
}
- if (stageInfo.Meta.IsOlap() && HassDmlOperationOnOlap(tx.Body->GetType(), stage)) {
+ if (stageInfo.Meta.IsOlap() && HasDmlOperationOnOlap(tx.Body->GetType(), stage)) {
auto error = TStringBuilder() << "Data manipulation queries do not support column shard tables.";
LOG_E(error);
ReplyErrorAndDie(Ydb::StatusIds::PRECONDITION_FAILED,
diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
index 0b3790e7b7..0f9ff8525c 100644
--- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
@@ -112,6 +112,8 @@ void FillKqpTasksGraphStages(TKqpTasksGraph& tasksGraph, const TVector<IKqpGatew
meta.TableId = MakeTableId(input.GetStreamLookup().GetTable());
meta.TablePath = input.GetStreamLookup().GetTable().GetPath();
meta.TableConstInfo = tx.Body->GetTableConstInfoById()->Map.at(meta.TableId);
+ YQL_ENSURE(meta.TableConstInfo);
+ meta.TableKind = meta.TableConstInfo->TableKind;
}
if (input.GetTypeCase() == NKqpProto::TKqpPhyConnection::kSequencer) {
diff --git a/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp b/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp
index 6211df3be3..7be32f87aa 100644
--- a/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp
+++ b/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp
@@ -11,10 +11,11 @@ namespace NKqp {
using namespace NYdb;
using namespace NYdb::NTable;
-static NKikimrConfig::TAppConfig GetAppConfig(bool sourceRead) {
+static NKikimrConfig::TAppConfig GetAppConfig(bool sourceRead, bool streamLookup = true) {
auto app = NKikimrConfig::TAppConfig();
app.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(sourceRead);
app.MutableTableServiceConfig()->SetEnableKqpScanQuerySourceRead(sourceRead);
+ app.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(streamLookup);
return app;
}
@@ -43,7 +44,7 @@ Y_UNIT_TEST_SUITE(KqpCost) {
//runtime->SetLogPriority(NKikimrServices::GRPC_SERVER, NActors::NLog::PRI_DEBUG);
}
Y_UNIT_TEST_TWIN(PointLookup, SourceRead) {
- TKikimrRunner kikimr(GetAppConfig(SourceRead));
+ TKikimrRunner kikimr(GetAppConfig(SourceRead, false));
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
diff --git a/ydb/core/kqp/ut/effects/kqp_inplace_update_ut.cpp b/ydb/core/kqp/ut/effects/kqp_inplace_update_ut.cpp
index 1f85761e0c..b8912474b5 100644
--- a/ydb/core/kqp/ut/effects/kqp_inplace_update_ut.cpp
+++ b/ydb/core/kqp/ut/effects/kqp_inplace_update_ut.cpp
@@ -401,6 +401,7 @@ Y_UNIT_TEST_TWIN(BigRow, EnableInplaceUpdate) {
// source read use iterator interface, that doesn't use datashard transactions
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false);
+ appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false);
auto settings = TKikimrSettings()
.SetAppConfig(appConfig)
diff --git a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp
index 74214695a1..929ca9d29f 100644
--- a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp
+++ b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp
@@ -4030,9 +4030,9 @@ R"([[#;#;["Primary1"];[41u]];[["Secondary2"];[2u];["Primary2"];[42u]];[["Seconda
auto& stats = NYdb::TProtoAccessor::GetProto(*result2.GetStats());
- int readPhase = 1;
+ int readPhase = 0;
if (serverSettings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
- UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 2);
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(readPhase).table_access().size(), 2);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(readPhase).table_access(0).name(), "/Root/SecondaryComplexKeys");
@@ -4042,6 +4042,8 @@ R"([[#;#;["Primary1"];[41u]];[["Secondary2"];[2u];["Primary2"];[42u]];[["Seconda
} else {
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 3);
+ readPhase++;
+
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(readPhase).table_access().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(readPhase).table_access(0).name(), "/Root/SecondaryComplexKeys/Index/indexImplTable");
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(readPhase).table_access(0).reads().rows(), 1);
diff --git a/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp b/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp
index 8bcad00b36..3c69410c54 100644
--- a/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp
+++ b/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp
@@ -90,6 +90,7 @@ Y_UNIT_TEST_SUITE(KqpIndexLookupJoin) {
void Test(const TString& query, const TString& answer, size_t rightTableReads, bool useStreamLookup = false) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamIdxLookupJoin(useStreamLookup);
+ appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false);
auto settings = TKikimrSettings().SetAppConfig(appConfig);
TKikimrRunner kikimr(settings);
diff --git a/ydb/core/kqp/ut/opt/kqp_extract_predicate_unpack_ut.cpp b/ydb/core/kqp/ut/opt/kqp_extract_predicate_unpack_ut.cpp
index a16eb76308..6977d7981c 100644
--- a/ydb/core/kqp/ut/opt/kqp_extract_predicate_unpack_ut.cpp
+++ b/ydb/core/kqp/ut/opt/kqp_extract_predicate_unpack_ut.cpp
@@ -158,8 +158,14 @@ void Test(const TString& query, const TString& answer, THashSet<TString> allowSc
}
}
-void TestRange(const TString& query, const TString& answer, ui64 rowsRead, int stagesCount = 1) {
- TKikimrRunner kikimr;
+void TestRange(const TString& query, const TString& answer, ui64 rowsRead, int stagesCount = 1, bool streamLookup = true) {
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(streamLookup);
+
+ auto settings = TKikimrSettings()
+ .SetAppConfig(appConfig);
+
+ TKikimrRunner kikimr(settings);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
@@ -204,7 +210,8 @@ Y_UNIT_TEST(OverflowLookup) {
)",
R"([])",
0,
- 2);
+ 2,
+ false);
TestRange(
R"(
diff --git a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
index 1c7ca308fc..e9bd4c7eed 100644
--- a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
+++ b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
@@ -205,7 +205,12 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
auto explainResult = session.ExplainDataQuery(query).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(explainResult.GetStatus(), EStatus::SUCCESS, explainResult.GetIssues().ToString());
- UNIT_ASSERT_C(explainResult.GetAst().Contains("KqpLookupTable"), explainResult.GetAst());
+
+ if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
+ UNIT_ASSERT_C(explainResult.GetAst().Contains("KqpCnStreamLookup"), explainResult.GetAst());
+ } else {
+ UNIT_ASSERT_C(explainResult.GetAst().Contains("KqpLookupTable"), explainResult.GetAst());
+ }
auto params = kikimr.GetTableClient().GetParamsBuilder()
.AddParam("$group").OptionalUint32(1).Build()
@@ -1224,7 +1229,7 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
size_t phase = 0;
if (stats.query_phases().size() == 2) {
phase = 1;
- } else if (stats.query_phases().size() == 0) {
+ } else if (stats.query_phases().size() == 1) {
phase = 0;
} else {
UNIT_ASSERT(false);
diff --git a/ydb/core/kqp/ut/perf/kqp_query_perf_ut.cpp b/ydb/core/kqp/ut/perf/kqp_query_perf_ut.cpp
index 47276d412c..432c81fc99 100644
--- a/ydb/core/kqp/ut/perf/kqp_query_perf_ut.cpp
+++ b/ydb/core/kqp/ut/perf/kqp_query_perf_ut.cpp
@@ -278,6 +278,7 @@ Y_UNIT_TEST_SUITE(KqpQueryPerf) {
Y_UNIT_TEST_QUAD(IndexLookupJoin, EnableStreamLookup, QueryService) {
NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false);
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamIdxLookupJoin(EnableStreamLookup);
auto settings = TKikimrSettings()
.SetAppConfig(appConfig);
diff --git a/ydb/core/kqp/ut/query/kqp_explain_ut.cpp b/ydb/core/kqp/ut/query/kqp_explain_ut.cpp
index 33bacf0385..f469f396e4 100644
--- a/ydb/core/kqp/ut/query/kqp_explain_ut.cpp
+++ b/ydb/core/kqp/ut/query/kqp_explain_ut.cpp
@@ -497,7 +497,13 @@ Y_UNIT_TEST_SUITE(KqpExplain) {
UNIT_ASSERT_EQUAL(node.GetMapSafe().at("Table").GetStringSafe(), "KeyValue");
node = FindPlanNodeByKv(plan, "Name", "TableFullScan");
UNIT_ASSERT_EQUAL(node.GetMapSafe().at("Table").GetStringSafe(), "KeyValue");
- node = FindPlanNodeByKv(plan, "Name", "TablePointLookup");
+
+ if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
+ node = FindPlanNodeByKv(plan, "Node Type", "TableLookup");
+ } else {
+ node = FindPlanNodeByKv(plan, "Name", "TablePointLookup");
+ }
+
UNIT_ASSERT_EQUAL(node.GetMapSafe().at("Table").GetStringSafe(), "KeyValue");
}
@@ -533,7 +539,12 @@ Y_UNIT_TEST_SUITE(KqpExplain) {
UNIT_ASSERT_VALUES_EQUAL(rangeScansCount, 1);
ui32 lookupsCount = 0;
- lookupsCount = CountPlanNodesByKv(plan, "Node Type", "TablePointLookup-ConstantExpr");
+ if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
+ lookupsCount = CountPlanNodesByKv(plan, "Node Type", "TableLookup");
+ } else {
+ lookupsCount = CountPlanNodesByKv(plan, "Node Type", "TablePointLookup-ConstantExpr");
+ }
+
UNIT_ASSERT_VALUES_EQUAL(lookupsCount, 1);
/* check tables section */
@@ -899,7 +910,11 @@ Y_UNIT_TEST_SUITE(KqpExplain) {
}
Y_UNIT_TEST(MultiJoinCteLinks) {
- TKikimrRunner kikimr;
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false);
+ auto settings = TKikimrSettings()
+ .SetAppConfig(appConfig);
+ TKikimrRunner kikimr{settings};
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
diff --git a/ydb/core/kqp/ut/query/kqp_query_ut.cpp b/ydb/core/kqp/ut/query/kqp_query_ut.cpp
index 2c755dd603..f57bf4005a 100644
--- a/ydb/core/kqp/ut/query/kqp_query_ut.cpp
+++ b/ydb/core/kqp/ut/query/kqp_query_ut.cpp
@@ -351,6 +351,7 @@ Y_UNIT_TEST_SUITE(KqpQuery) {
Y_UNIT_TEST(QueryTimeoutImmediate) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false);
+ appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false);
auto settings = TKikimrSettings()
.SetAppConfig(appConfig);
TKikimrRunner kikimr{settings};
@@ -490,6 +491,7 @@ Y_UNIT_TEST_SUITE(KqpQuery) {
Y_UNIT_TEST(QueryCancelImmediate) {
NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false);
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false);
auto settings = TKikimrSettings()
.SetAppConfig(appConfig);
diff --git a/ydb/core/kqp/ut/query/kqp_stats_ut.cpp b/ydb/core/kqp/ut/query/kqp_stats_ut.cpp
index 0ecbfc4153..7c1ed09f61 100644
--- a/ydb/core/kqp/ut/query/kqp_stats_ut.cpp
+++ b/ydb/core/kqp/ut/query/kqp_stats_ut.cpp
@@ -102,6 +102,7 @@ TCollectedStreamResult JoinStatsBasic(
std::function<Iterator(TKikimrRunner&, ECollectQueryStatsMode, const TString&)> getIter) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableKqpScanQueryStreamLookup(false);
+ appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false);
appConfig.MutableTableServiceConfig()->SetEnableKqpScanQuerySourceRead(true);
auto settings = TKikimrSettings()
.SetAppConfig(appConfig);
diff --git a/ydb/core/kqp/ut/tx/kqp_mvcc_ut.cpp b/ydb/core/kqp/ut/tx/kqp_mvcc_ut.cpp
index b59407d502..bf56e63e79 100644
--- a/ydb/core/kqp/ut/tx/kqp_mvcc_ut.cpp
+++ b/ydb/core/kqp/ut/tx/kqp_mvcc_ut.cpp
@@ -50,7 +50,7 @@ Y_UNIT_TEST_SUITE(KqpSnapshotRead) {
if (result.GetStatus() == EStatus::SUCCESS)
continue;
- if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQuerySourceRead() && false) {
+ if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::DEFAULT_ERROR,
[](const NYql::TIssue& issue){
return issue.GetMessage().Contains("has no snapshot at");
diff --git a/ydb/core/protos/table_service_config.proto b/ydb/core/protos/table_service_config.proto
index 5ca397d677..3ef6ae03c9 100644
--- a/ydb/core/protos/table_service_config.proto
+++ b/ydb/core/protos/table_service_config.proto
@@ -224,7 +224,7 @@ message TTableServiceConfig {
optional uint64 SessionIdleDurationSeconds = 28 [default = 600];
optional TAggregationConfig AggregationConfig = 29;
optional bool EnableKqpScanQueryStreamLookup = 30 [default = true];
- optional bool EnableKqpDataQueryStreamLookup = 31 [default = false];
+ optional bool EnableKqpDataQueryStreamLookup = 31 [default = true];
optional TExecuterRetriesConfig ExecuterRetriesConfig = 32;
reserved 33; // optional bool EnableKqpDataQueryStreamPointLookup = 33 [default = false];
optional bool EnablePublishKqpProxyByRM = 34 [default = true];
diff --git a/ydb/core/tx/datashard/datashard_ut_order.cpp b/ydb/core/tx/datashard/datashard_ut_order.cpp
index a2e73d7480..17c07969f6 100644
--- a/ydb/core/tx/datashard/datashard_ut_order.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_order.cpp
@@ -1526,6 +1526,7 @@ Y_UNIT_TEST(TestMvccReadDoesntBlockWrites) {
TPortManager pm;
NKikimrConfig::TAppConfig app;
app.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false);
+ app.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false);
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetEnableMvccSnapshotReads(false);
serverSettings.SetDomainName("Root")
@@ -1863,9 +1864,12 @@ Y_UNIT_TEST_TWIN(TestOutOfOrderNonConflictingWrites, StreamLookup) {
Y_UNIT_TEST(MvccTestOutOfOrderRestartLocksSingleWithoutBarrier) {
TPortManager pm;
+ NKikimrConfig::TAppConfig app;
+ app.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false);
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetEnableMvccSnapshotReads(false);
serverSettings.SetDomainName("Root")
+ .SetAppConfig(app)
.SetUseRealThreads(false);
Tests::TServer::TPtr server = new TServer(serverSettings);
@@ -3507,6 +3511,7 @@ Y_UNIT_TEST(MvccTestSnapshotRead) {
TPortManager pm;
NKikimrConfig::TAppConfig app;
app.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false);
+ app.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false);
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
.SetAppConfig(app)
diff --git a/ydb/core/tx/datashard/datashard_ut_trace.cpp b/ydb/core/tx/datashard/datashard_ut_trace.cpp
index 2b377de086..1bd642c9e1 100644
--- a/ydb/core/tx/datashard/datashard_ut_trace.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_trace.cpp
@@ -337,43 +337,69 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {
FakeWilsonUploader::Trace &trace = uploader->Traces.begin()->second;
- auto deSpan = trace.Root.BFSFindOne("DataExecuter");
- UNIT_ASSERT(deSpan);
-
- auto dsTxSpans = deSpan->get().FindAll("Datashard.Transaction");
- UNIT_ASSERT_EQUAL(2, dsTxSpans.size()); // Two shards, each executes a user transaction.
-
- for (auto dsTxSpan : dsTxSpans) {
- auto tabletTxs = dsTxSpan.get().FindAll("Tablet.Transaction");
- UNIT_ASSERT_EQUAL(1, tabletTxs.size());
-
- auto propose = tabletTxs[0];
- CheckTxHasWriteLog(propose);
-
- // Blobs are loaded from BS.
- UNIT_ASSERT_EQUAL(2, propose.get().FindAll("Tablet.Transaction.Wait").size());
- UNIT_ASSERT_EQUAL(2, propose.get().FindAll("Tablet.Transaction.Enqueued").size());
-
- // We execute tx multiple times, because we have to load data for it to execute.
- auto executeSpans = propose.get().FindAll("Tablet.Transaction.Execute");
- UNIT_ASSERT_EQUAL(3, executeSpans.size());
+ std::string canon;
+ if (server->GetSettings().AppConfig->GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
+ auto lookupActorSpan = trace.Root.BFSFindOne("LookupActor");
+ UNIT_ASSERT(lookupActorSpan);
+
+ auto dsReads = lookupActorSpan->get().FindAll("DataShard.Read"); // Lookup actor sends EvRead to each shard.
+ UNIT_ASSERT_EQUAL(dsReads.size(), 2);
+
+ canon = "(Session.query.QUERY_ACTION_EXECUTE -> [(CompileService -> [(CompileActor)]) "
+ ", (DataExecuter -> [(WaitForTableResolve) , (WaitForSnapshot) , (ComputeActor) "
+ ", (ComputeActor -> [(LookupActor -> [(WaitForShardsResolve) , (DataShard.Read "
+ "-> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> [(Datashard.Unit) "
+ ", (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) "
+ ", (Tablet.Transaction.Execute -> [(Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) "
+ ", (Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog "
+ "-> [(Tablet.WriteLog.LogEntry)])]) , (ReadIterator.ReadOperation)]) , (DataShard.Read "
+ "-> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit) "
+ ", (Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) "
+ ", (Tablet.Transaction.Execute -> [(Datashard.Unit)]) , (Tablet.Transaction.Wait) "
+ ", (Tablet.Transaction.Enqueued) , (Tablet.Transaction.Execute -> [(Datashard.Unit) "
+ ", (Datashard.Unit)]) , (Tablet.WriteLog -> [(Tablet.WriteLog.LogEntry)])]) "
+ ", (ReadIterator.ReadOperation)])])]) , (ComputeActor) , (RunTasks)])])";
+ } else {
+ auto deSpan = trace.Root.BFSFindOne("DataExecuter");
+ UNIT_ASSERT(deSpan);
+
+ auto dsTxSpans = deSpan->get().FindAll("Datashard.Transaction");
+ UNIT_ASSERT_EQUAL(2, dsTxSpans.size()); // Two shards, each executes a user transaction.
+
+ for (auto dsTxSpan : dsTxSpans) {
+ auto tabletTxs = dsTxSpan.get().FindAll("Tablet.Transaction");
+ UNIT_ASSERT_EQUAL(1, tabletTxs.size());
+
+ auto propose = tabletTxs[0];
+ CheckTxHasWriteLog(propose);
+
+ // Blobs are loaded from BS.
+ UNIT_ASSERT_EQUAL(2, propose.get().FindAll("Tablet.Transaction.Wait").size());
+ UNIT_ASSERT_EQUAL(2, propose.get().FindAll("Tablet.Transaction.Enqueued").size());
+
+ // We execute tx multiple times, because we have to load data for it to execute.
+ auto executeSpans = propose.get().FindAll("Tablet.Transaction.Execute");
+ UNIT_ASSERT_EQUAL(3, executeSpans.size());
+
+ CheckExecuteHasDatashardUnits(executeSpans[0], 3);
+ CheckExecuteHasDatashardUnits(executeSpans[1], 1);
+ CheckExecuteHasDatashardUnits(executeSpans[2], 3);
+ }
- CheckExecuteHasDatashardUnits(executeSpans[0], 3);
- CheckExecuteHasDatashardUnits(executeSpans[1], 1);
- CheckExecuteHasDatashardUnits(executeSpans[2], 3);
+ canon = "(Session.query.QUERY_ACTION_EXECUTE -> [(CompileService -> [(CompileActor)]) "
+ ", (LiteralExecuter) , (DataExecuter -> [(WaitForTableResolve) , (WaitForSnapshot) , (ComputeActor) , (RunTasks) , "
+ "(Datashard.Transaction -> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , "
+ "(Datashard.Unit) , (Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , "
+ "(Tablet.Transaction.Execute -> [(Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , "
+ "(Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog -> "
+ "[(Tablet.WriteLog.LogEntry)])])]) , (Datashard.Transaction -> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> "
+ "[(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , "
+ "(Tablet.Transaction.Execute -> [(Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , "
+ "(Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog -> "
+ "[(Tablet.WriteLog.LogEntry)])])])])])";
}
- std::string canon = "(Session.query.QUERY_ACTION_EXECUTE -> [(CompileService -> [(CompileActor)]) "
- ", (LiteralExecuter) , (DataExecuter -> [(WaitForTableResolve) , (WaitForSnapshot) , (ComputeActor) , (RunTasks) , "
- "(Datashard.Transaction -> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , "
- "(Datashard.Unit) , (Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , "
- "(Tablet.Transaction.Execute -> [(Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , "
- "(Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog -> "
- "[(Tablet.WriteLog.LogEntry)])])]) , (Datashard.Transaction -> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> "
- "[(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , "
- "(Tablet.Transaction.Execute -> [(Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , "
- "(Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog -> "
- "[(Tablet.WriteLog.LogEntry)])])])])])";
+
UNIT_ASSERT_VALUES_EQUAL(canon, trace.ToString());
}
diff --git a/ydb/services/ydb/ydb_table_split_ut.cpp b/ydb/services/ydb/ydb_table_split_ut.cpp
index 8e1b82e514..5a498fc289 100644
--- a/ydb/services/ydb/ydb_table_split_ut.cpp
+++ b/ydb/services/ydb/ydb_table_split_ut.cpp
@@ -369,6 +369,7 @@ Y_UNIT_TEST_SUITE(YdbTableSplit) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false);
+ appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false);
TKikimrWithGrpcAndRootSchema server(appConfig);
// Set min uptime before merge by load to 10h