diff options
author | Iuliia Sidorina <yulia@ydb.tech> | 2024-01-09 16:39:06 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-01-09 16:39:06 +0100 |
commit | 5db97d85ba01b7c8038b37c399980097097fd06e (patch) | |
tree | 15f65d61b05977653afa32bde511f02beb12e0d2 | |
parent | 8c1f7ef7a4091b7bb300ee3030ec339b6cf77a6e (diff) | |
download | ydb-5db97d85ba01b7c8038b37c399980097097fd06e.tar.gz |
feature(kqp): enable stream lookup for data query (#611)
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_data_executer.cpp | 13 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/ut/cost/kqp_cost_ut.cpp | 5 | ||||
-rw-r--r-- | ydb/core/kqp/ut/effects/kqp_inplace_update_ut.cpp | 1 | ||||
-rw-r--r-- | ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp | 6 | ||||
-rw-r--r-- | ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp | 1 | ||||
-rw-r--r-- | ydb/core/kqp/ut/opt/kqp_extract_predicate_unpack_ut.cpp | 13 | ||||
-rw-r--r-- | ydb/core/kqp/ut/opt/kqp_ne_ut.cpp | 9 | ||||
-rw-r--r-- | ydb/core/kqp/ut/perf/kqp_query_perf_ut.cpp | 1 | ||||
-rw-r--r-- | ydb/core/kqp/ut/query/kqp_explain_ut.cpp | 21 | ||||
-rw-r--r-- | ydb/core/kqp/ut/query/kqp_query_ut.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/ut/query/kqp_stats_ut.cpp | 1 | ||||
-rw-r--r-- | ydb/core/kqp/ut/tx/kqp_mvcc_ut.cpp | 2 | ||||
-rw-r--r-- | ydb/core/protos/table_service_config.proto | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_order.cpp | 5 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_trace.cpp | 94 | ||||
-rw-r--r-- | ydb/services/ydb/ydb_table_split_ut.cpp | 1 |
17 files changed, 128 insertions, 51 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index fba12d565b..0b9674e3df 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -1650,11 +1650,18 @@ private: } } - bool HassDmlOperationOnOlap(NKqpProto::TKqpPhyTx_EType queryType, const NKqpProto::TKqpPhyStage& stage) { + bool HasDmlOperationOnOlap(NKqpProto::TKqpPhyTx_EType queryType, const NKqpProto::TKqpPhyStage& stage) { if (queryType == NKqpProto::TKqpPhyTx::TYPE_DATA) { return true; } - for (const auto &tableOp : stage.GetTableOps()) { + + for (const auto& input : stage.GetInputs()) { + if (input.GetTypeCase() == NKqpProto::TKqpPhyConnection::kStreamLookup) { + return true; + } + } + + for (const auto& tableOp : stage.GetTableOps()) { if (tableOp.GetTypeCase() != NKqpProto::TKqpPhyTableOperation::kReadOlapRange) { return true; } @@ -1691,7 +1698,7 @@ private: } } - if (stageInfo.Meta.IsOlap() && HassDmlOperationOnOlap(tx.Body->GetType(), stage)) { + if (stageInfo.Meta.IsOlap() && HasDmlOperationOnOlap(tx.Body->GetType(), stage)) { auto error = TStringBuilder() << "Data manipulation queries do not support column shard tables."; LOG_E(error); ReplyErrorAndDie(Ydb::StatusIds::PRECONDITION_FAILED, diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp index 0b3790e7b7..0f9ff8525c 100644 --- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp +++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp @@ -112,6 +112,8 @@ void FillKqpTasksGraphStages(TKqpTasksGraph& tasksGraph, const TVector<IKqpGatew meta.TableId = MakeTableId(input.GetStreamLookup().GetTable()); meta.TablePath = input.GetStreamLookup().GetTable().GetPath(); meta.TableConstInfo = tx.Body->GetTableConstInfoById()->Map.at(meta.TableId); + YQL_ENSURE(meta.TableConstInfo); + meta.TableKind = meta.TableConstInfo->TableKind; } if (input.GetTypeCase() == NKqpProto::TKqpPhyConnection::kSequencer) { diff --git a/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp b/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp index 6211df3be3..7be32f87aa 100644 --- a/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp +++ b/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp @@ -11,10 +11,11 @@ namespace NKqp { using namespace NYdb; using namespace NYdb::NTable; -static NKikimrConfig::TAppConfig GetAppConfig(bool sourceRead) { +static NKikimrConfig::TAppConfig GetAppConfig(bool sourceRead, bool streamLookup = true) { auto app = NKikimrConfig::TAppConfig(); app.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(sourceRead); app.MutableTableServiceConfig()->SetEnableKqpScanQuerySourceRead(sourceRead); + app.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(streamLookup); return app; } @@ -43,7 +44,7 @@ Y_UNIT_TEST_SUITE(KqpCost) { //runtime->SetLogPriority(NKikimrServices::GRPC_SERVER, NActors::NLog::PRI_DEBUG); } Y_UNIT_TEST_TWIN(PointLookup, SourceRead) { - TKikimrRunner kikimr(GetAppConfig(SourceRead)); + TKikimrRunner kikimr(GetAppConfig(SourceRead, false)); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); diff --git a/ydb/core/kqp/ut/effects/kqp_inplace_update_ut.cpp b/ydb/core/kqp/ut/effects/kqp_inplace_update_ut.cpp index 1f85761e0c..b8912474b5 100644 --- a/ydb/core/kqp/ut/effects/kqp_inplace_update_ut.cpp +++ b/ydb/core/kqp/ut/effects/kqp_inplace_update_ut.cpp @@ -401,6 +401,7 @@ Y_UNIT_TEST_TWIN(BigRow, EnableInplaceUpdate) { // source read use iterator interface, that doesn't use datashard transactions NKikimrConfig::TAppConfig appConfig; appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false); + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false); auto settings = TKikimrSettings() .SetAppConfig(appConfig) diff --git a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp index 74214695a1..929ca9d29f 100644 --- a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp +++ b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp @@ -4030,9 +4030,9 @@ R"([[#;#;["Primary1"];[41u]];[["Secondary2"];[2u];["Primary2"];[42u]];[["Seconda auto& stats = NYdb::TProtoAccessor::GetProto(*result2.GetStats()); - int readPhase = 1; + int readPhase = 0; if (serverSettings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) { - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 2); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 1); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(readPhase).table_access().size(), 2); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(readPhase).table_access(0).name(), "/Root/SecondaryComplexKeys"); @@ -4042,6 +4042,8 @@ R"([[#;#;["Primary1"];[41u]];[["Secondary2"];[2u];["Primary2"];[42u]];[["Seconda } else { UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 3); + readPhase++; + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(readPhase).table_access().size(), 1); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(readPhase).table_access(0).name(), "/Root/SecondaryComplexKeys/Index/indexImplTable"); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(readPhase).table_access(0).reads().rows(), 1); diff --git a/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp b/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp index 8bcad00b36..3c69410c54 100644 --- a/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp +++ b/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp @@ -90,6 +90,7 @@ Y_UNIT_TEST_SUITE(KqpIndexLookupJoin) { void Test(const TString& query, const TString& answer, size_t rightTableReads, bool useStreamLookup = false) { NKikimrConfig::TAppConfig appConfig; appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamIdxLookupJoin(useStreamLookup); + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false); auto settings = TKikimrSettings().SetAppConfig(appConfig); TKikimrRunner kikimr(settings); diff --git a/ydb/core/kqp/ut/opt/kqp_extract_predicate_unpack_ut.cpp b/ydb/core/kqp/ut/opt/kqp_extract_predicate_unpack_ut.cpp index a16eb76308..6977d7981c 100644 --- a/ydb/core/kqp/ut/opt/kqp_extract_predicate_unpack_ut.cpp +++ b/ydb/core/kqp/ut/opt/kqp_extract_predicate_unpack_ut.cpp @@ -158,8 +158,14 @@ void Test(const TString& query, const TString& answer, THashSet<TString> allowSc } } -void TestRange(const TString& query, const TString& answer, ui64 rowsRead, int stagesCount = 1) { - TKikimrRunner kikimr; +void TestRange(const TString& query, const TString& answer, ui64 rowsRead, int stagesCount = 1, bool streamLookup = true) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(streamLookup); + + auto settings = TKikimrSettings() + .SetAppConfig(appConfig); + + TKikimrRunner kikimr(settings); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -204,7 +210,8 @@ Y_UNIT_TEST(OverflowLookup) { )", R"([])", 0, - 2); + 2, + false); TestRange( R"( diff --git a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp index 1c7ca308fc..e9bd4c7eed 100644 --- a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp +++ b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp @@ -205,7 +205,12 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { auto explainResult = session.ExplainDataQuery(query).GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(explainResult.GetStatus(), EStatus::SUCCESS, explainResult.GetIssues().ToString()); - UNIT_ASSERT_C(explainResult.GetAst().Contains("KqpLookupTable"), explainResult.GetAst()); + + if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) { + UNIT_ASSERT_C(explainResult.GetAst().Contains("KqpCnStreamLookup"), explainResult.GetAst()); + } else { + UNIT_ASSERT_C(explainResult.GetAst().Contains("KqpLookupTable"), explainResult.GetAst()); + } auto params = kikimr.GetTableClient().GetParamsBuilder() .AddParam("$group").OptionalUint32(1).Build() @@ -1224,7 +1229,7 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { size_t phase = 0; if (stats.query_phases().size() == 2) { phase = 1; - } else if (stats.query_phases().size() == 0) { + } else if (stats.query_phases().size() == 1) { phase = 0; } else { UNIT_ASSERT(false); diff --git a/ydb/core/kqp/ut/perf/kqp_query_perf_ut.cpp b/ydb/core/kqp/ut/perf/kqp_query_perf_ut.cpp index 47276d412c..432c81fc99 100644 --- a/ydb/core/kqp/ut/perf/kqp_query_perf_ut.cpp +++ b/ydb/core/kqp/ut/perf/kqp_query_perf_ut.cpp @@ -278,6 +278,7 @@ Y_UNIT_TEST_SUITE(KqpQueryPerf) { Y_UNIT_TEST_QUAD(IndexLookupJoin, EnableStreamLookup, QueryService) { NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false); appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamIdxLookupJoin(EnableStreamLookup); auto settings = TKikimrSettings() .SetAppConfig(appConfig); diff --git a/ydb/core/kqp/ut/query/kqp_explain_ut.cpp b/ydb/core/kqp/ut/query/kqp_explain_ut.cpp index 33bacf0385..f469f396e4 100644 --- a/ydb/core/kqp/ut/query/kqp_explain_ut.cpp +++ b/ydb/core/kqp/ut/query/kqp_explain_ut.cpp @@ -497,7 +497,13 @@ Y_UNIT_TEST_SUITE(KqpExplain) { UNIT_ASSERT_EQUAL(node.GetMapSafe().at("Table").GetStringSafe(), "KeyValue"); node = FindPlanNodeByKv(plan, "Name", "TableFullScan"); UNIT_ASSERT_EQUAL(node.GetMapSafe().at("Table").GetStringSafe(), "KeyValue"); - node = FindPlanNodeByKv(plan, "Name", "TablePointLookup"); + + if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) { + node = FindPlanNodeByKv(plan, "Node Type", "TableLookup"); + } else { + node = FindPlanNodeByKv(plan, "Name", "TablePointLookup"); + } + UNIT_ASSERT_EQUAL(node.GetMapSafe().at("Table").GetStringSafe(), "KeyValue"); } @@ -533,7 +539,12 @@ Y_UNIT_TEST_SUITE(KqpExplain) { UNIT_ASSERT_VALUES_EQUAL(rangeScansCount, 1); ui32 lookupsCount = 0; - lookupsCount = CountPlanNodesByKv(plan, "Node Type", "TablePointLookup-ConstantExpr"); + if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) { + lookupsCount = CountPlanNodesByKv(plan, "Node Type", "TableLookup"); + } else { + lookupsCount = CountPlanNodesByKv(plan, "Node Type", "TablePointLookup-ConstantExpr"); + } + UNIT_ASSERT_VALUES_EQUAL(lookupsCount, 1); /* check tables section */ @@ -899,7 +910,11 @@ Y_UNIT_TEST_SUITE(KqpExplain) { } Y_UNIT_TEST(MultiJoinCteLinks) { - TKikimrRunner kikimr; + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false); + auto settings = TKikimrSettings() + .SetAppConfig(appConfig); + TKikimrRunner kikimr{settings}; auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); diff --git a/ydb/core/kqp/ut/query/kqp_query_ut.cpp b/ydb/core/kqp/ut/query/kqp_query_ut.cpp index 2c755dd603..f57bf4005a 100644 --- a/ydb/core/kqp/ut/query/kqp_query_ut.cpp +++ b/ydb/core/kqp/ut/query/kqp_query_ut.cpp @@ -351,6 +351,7 @@ Y_UNIT_TEST_SUITE(KqpQuery) { Y_UNIT_TEST(QueryTimeoutImmediate) { NKikimrConfig::TAppConfig appConfig; appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false); + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false); auto settings = TKikimrSettings() .SetAppConfig(appConfig); TKikimrRunner kikimr{settings}; @@ -490,6 +491,7 @@ Y_UNIT_TEST_SUITE(KqpQuery) { Y_UNIT_TEST(QueryCancelImmediate) { NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false); appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false); auto settings = TKikimrSettings() .SetAppConfig(appConfig); diff --git a/ydb/core/kqp/ut/query/kqp_stats_ut.cpp b/ydb/core/kqp/ut/query/kqp_stats_ut.cpp index 0ecbfc4153..7c1ed09f61 100644 --- a/ydb/core/kqp/ut/query/kqp_stats_ut.cpp +++ b/ydb/core/kqp/ut/query/kqp_stats_ut.cpp @@ -102,6 +102,7 @@ TCollectedStreamResult JoinStatsBasic( std::function<Iterator(TKikimrRunner&, ECollectQueryStatsMode, const TString&)> getIter) { NKikimrConfig::TAppConfig appConfig; appConfig.MutableTableServiceConfig()->SetEnableKqpScanQueryStreamLookup(false); + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false); appConfig.MutableTableServiceConfig()->SetEnableKqpScanQuerySourceRead(true); auto settings = TKikimrSettings() .SetAppConfig(appConfig); diff --git a/ydb/core/kqp/ut/tx/kqp_mvcc_ut.cpp b/ydb/core/kqp/ut/tx/kqp_mvcc_ut.cpp index b59407d502..bf56e63e79 100644 --- a/ydb/core/kqp/ut/tx/kqp_mvcc_ut.cpp +++ b/ydb/core/kqp/ut/tx/kqp_mvcc_ut.cpp @@ -50,7 +50,7 @@ Y_UNIT_TEST_SUITE(KqpSnapshotRead) { if (result.GetStatus() == EStatus::SUCCESS) continue; - if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQuerySourceRead() && false) { + if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) { UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::DEFAULT_ERROR, [](const NYql::TIssue& issue){ return issue.GetMessage().Contains("has no snapshot at"); diff --git a/ydb/core/protos/table_service_config.proto b/ydb/core/protos/table_service_config.proto index 5ca397d677..3ef6ae03c9 100644 --- a/ydb/core/protos/table_service_config.proto +++ b/ydb/core/protos/table_service_config.proto @@ -224,7 +224,7 @@ message TTableServiceConfig { optional uint64 SessionIdleDurationSeconds = 28 [default = 600]; optional TAggregationConfig AggregationConfig = 29; optional bool EnableKqpScanQueryStreamLookup = 30 [default = true]; - optional bool EnableKqpDataQueryStreamLookup = 31 [default = false]; + optional bool EnableKqpDataQueryStreamLookup = 31 [default = true]; optional TExecuterRetriesConfig ExecuterRetriesConfig = 32; reserved 33; // optional bool EnableKqpDataQueryStreamPointLookup = 33 [default = false]; optional bool EnablePublishKqpProxyByRM = 34 [default = true]; diff --git a/ydb/core/tx/datashard/datashard_ut_order.cpp b/ydb/core/tx/datashard/datashard_ut_order.cpp index a2e73d7480..17c07969f6 100644 --- a/ydb/core/tx/datashard/datashard_ut_order.cpp +++ b/ydb/core/tx/datashard/datashard_ut_order.cpp @@ -1526,6 +1526,7 @@ Y_UNIT_TEST(TestMvccReadDoesntBlockWrites) { TPortManager pm; NKikimrConfig::TAppConfig app; app.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false); + app.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false); TServerSettings serverSettings(pm.GetPort(2134)); serverSettings.SetEnableMvccSnapshotReads(false); serverSettings.SetDomainName("Root") @@ -1863,9 +1864,12 @@ Y_UNIT_TEST_TWIN(TestOutOfOrderNonConflictingWrites, StreamLookup) { Y_UNIT_TEST(MvccTestOutOfOrderRestartLocksSingleWithoutBarrier) { TPortManager pm; + NKikimrConfig::TAppConfig app; + app.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false); TServerSettings serverSettings(pm.GetPort(2134)); serverSettings.SetEnableMvccSnapshotReads(false); serverSettings.SetDomainName("Root") + .SetAppConfig(app) .SetUseRealThreads(false); Tests::TServer::TPtr server = new TServer(serverSettings); @@ -3507,6 +3511,7 @@ Y_UNIT_TEST(MvccTestSnapshotRead) { TPortManager pm; NKikimrConfig::TAppConfig app; app.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false); + app.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false); TServerSettings serverSettings(pm.GetPort(2134)); serverSettings.SetDomainName("Root") .SetAppConfig(app) diff --git a/ydb/core/tx/datashard/datashard_ut_trace.cpp b/ydb/core/tx/datashard/datashard_ut_trace.cpp index 2b377de086..1bd642c9e1 100644 --- a/ydb/core/tx/datashard/datashard_ut_trace.cpp +++ b/ydb/core/tx/datashard/datashard_ut_trace.cpp @@ -337,43 +337,69 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) { FakeWilsonUploader::Trace &trace = uploader->Traces.begin()->second; - auto deSpan = trace.Root.BFSFindOne("DataExecuter"); - UNIT_ASSERT(deSpan); - - auto dsTxSpans = deSpan->get().FindAll("Datashard.Transaction"); - UNIT_ASSERT_EQUAL(2, dsTxSpans.size()); // Two shards, each executes a user transaction. - - for (auto dsTxSpan : dsTxSpans) { - auto tabletTxs = dsTxSpan.get().FindAll("Tablet.Transaction"); - UNIT_ASSERT_EQUAL(1, tabletTxs.size()); - - auto propose = tabletTxs[0]; - CheckTxHasWriteLog(propose); - - // Blobs are loaded from BS. - UNIT_ASSERT_EQUAL(2, propose.get().FindAll("Tablet.Transaction.Wait").size()); - UNIT_ASSERT_EQUAL(2, propose.get().FindAll("Tablet.Transaction.Enqueued").size()); - - // We execute tx multiple times, because we have to load data for it to execute. - auto executeSpans = propose.get().FindAll("Tablet.Transaction.Execute"); - UNIT_ASSERT_EQUAL(3, executeSpans.size()); + std::string canon; + if (server->GetSettings().AppConfig->GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) { + auto lookupActorSpan = trace.Root.BFSFindOne("LookupActor"); + UNIT_ASSERT(lookupActorSpan); + + auto dsReads = lookupActorSpan->get().FindAll("DataShard.Read"); // Lookup actor sends EvRead to each shard. + UNIT_ASSERT_EQUAL(dsReads.size(), 2); + + canon = "(Session.query.QUERY_ACTION_EXECUTE -> [(CompileService -> [(CompileActor)]) " + ", (DataExecuter -> [(WaitForTableResolve) , (WaitForSnapshot) , (ComputeActor) " + ", (ComputeActor -> [(LookupActor -> [(WaitForShardsResolve) , (DataShard.Read " + "-> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> [(Datashard.Unit) " + ", (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) " + ", (Tablet.Transaction.Execute -> [(Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) " + ", (Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog " + "-> [(Tablet.WriteLog.LogEntry)])]) , (ReadIterator.ReadOperation)]) , (DataShard.Read " + "-> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit) " + ", (Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) " + ", (Tablet.Transaction.Execute -> [(Datashard.Unit)]) , (Tablet.Transaction.Wait) " + ", (Tablet.Transaction.Enqueued) , (Tablet.Transaction.Execute -> [(Datashard.Unit) " + ", (Datashard.Unit)]) , (Tablet.WriteLog -> [(Tablet.WriteLog.LogEntry)])]) " + ", (ReadIterator.ReadOperation)])])]) , (ComputeActor) , (RunTasks)])])"; + } else { + auto deSpan = trace.Root.BFSFindOne("DataExecuter"); + UNIT_ASSERT(deSpan); + + auto dsTxSpans = deSpan->get().FindAll("Datashard.Transaction"); + UNIT_ASSERT_EQUAL(2, dsTxSpans.size()); // Two shards, each executes a user transaction. + + for (auto dsTxSpan : dsTxSpans) { + auto tabletTxs = dsTxSpan.get().FindAll("Tablet.Transaction"); + UNIT_ASSERT_EQUAL(1, tabletTxs.size()); + + auto propose = tabletTxs[0]; + CheckTxHasWriteLog(propose); + + // Blobs are loaded from BS. + UNIT_ASSERT_EQUAL(2, propose.get().FindAll("Tablet.Transaction.Wait").size()); + UNIT_ASSERT_EQUAL(2, propose.get().FindAll("Tablet.Transaction.Enqueued").size()); + + // We execute tx multiple times, because we have to load data for it to execute. + auto executeSpans = propose.get().FindAll("Tablet.Transaction.Execute"); + UNIT_ASSERT_EQUAL(3, executeSpans.size()); + + CheckExecuteHasDatashardUnits(executeSpans[0], 3); + CheckExecuteHasDatashardUnits(executeSpans[1], 1); + CheckExecuteHasDatashardUnits(executeSpans[2], 3); + } - CheckExecuteHasDatashardUnits(executeSpans[0], 3); - CheckExecuteHasDatashardUnits(executeSpans[1], 1); - CheckExecuteHasDatashardUnits(executeSpans[2], 3); + canon = "(Session.query.QUERY_ACTION_EXECUTE -> [(CompileService -> [(CompileActor)]) " + ", (LiteralExecuter) , (DataExecuter -> [(WaitForTableResolve) , (WaitForSnapshot) , (ComputeActor) , (RunTasks) , " + "(Datashard.Transaction -> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , " + "(Datashard.Unit) , (Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , " + "(Tablet.Transaction.Execute -> [(Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , " + "(Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog -> " + "[(Tablet.WriteLog.LogEntry)])])]) , (Datashard.Transaction -> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> " + "[(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , " + "(Tablet.Transaction.Execute -> [(Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , " + "(Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog -> " + "[(Tablet.WriteLog.LogEntry)])])])])])"; } - std::string canon = "(Session.query.QUERY_ACTION_EXECUTE -> [(CompileService -> [(CompileActor)]) " - ", (LiteralExecuter) , (DataExecuter -> [(WaitForTableResolve) , (WaitForSnapshot) , (ComputeActor) , (RunTasks) , " - "(Datashard.Transaction -> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , " - "(Datashard.Unit) , (Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , " - "(Tablet.Transaction.Execute -> [(Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , " - "(Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog -> " - "[(Tablet.WriteLog.LogEntry)])])]) , (Datashard.Transaction -> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> " - "[(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , " - "(Tablet.Transaction.Execute -> [(Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , " - "(Tablet.Transaction.Execute -> [(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog -> " - "[(Tablet.WriteLog.LogEntry)])])])])])"; + UNIT_ASSERT_VALUES_EQUAL(canon, trace.ToString()); } diff --git a/ydb/services/ydb/ydb_table_split_ut.cpp b/ydb/services/ydb/ydb_table_split_ut.cpp index 8e1b82e514..5a498fc289 100644 --- a/ydb/services/ydb/ydb_table_split_ut.cpp +++ b/ydb/services/ydb/ydb_table_split_ut.cpp @@ -369,6 +369,7 @@ Y_UNIT_TEST_SUITE(YdbTableSplit) { NKikimrConfig::TAppConfig appConfig; appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false); + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false); TKikimrWithGrpcAndRootSchema server(appConfig); // Set min uptime before merge by load to 10h |