diff options
author | ssmike <ssmike@ydb.tech> | 2022-12-08 10:57:57 +0300 |
---|---|---|
committer | ssmike <ssmike@ydb.tech> | 2022-12-08 10:57:57 +0300 |
commit | f514d7d71c5789e348ba1a693f74c89d28b08fea (patch) | |
tree | 74f84ca7a0b05aa847b8ce8a2eb1ba9b253ecb9c | |
parent | 613dcecbea7f8bd6c041c2caef1c8b85640a0760 (diff) | |
download | ydb-f514d7d71c5789e348ba1a693f74c89d28b08fea.tar.gz |
support sources in data query executor
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_data_executer.cpp | 11 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_impl.h | 99 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_scan_executer.cpp | 105 | ||||
-rw-r--r-- | ydb/core/kqp/node_service/kqp_node_service.cpp | 3 | ||||
-rw-r--r-- | ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_read_actor.cpp | 4 | ||||
-rw-r--r-- | ydb/core/kqp/ut/kqp_ne_ut.cpp | 51 | ||||
-rw-r--r-- | ydb/core/protos/config.proto | 1 | ||||
-rw-r--r-- | ydb/core/protos/tx_datashard.proto | 1 |
9 files changed, 174 insertions, 103 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 6297bdeeb21..dc2817a8053 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -1449,7 +1449,15 @@ private: ReadOnlyTx &= !stage.GetIsEffectsStage(); - if (stageInfo.Meta.ShardOperations.empty()) { + if (stage.SourcesSize() > 0) { + switch (stage.GetSources(0).GetTypeCase()) { + case NKqpProto::TKqpSource::kReadRangesSource: + BuildScanTasksFromSource(stageInfo, holderFactory, typeEnv); + break; + default: + YQL_ENSURE(false, "unknown source type"); + } + } else if (stageInfo.Meta.ShardOperations.empty()) { BuildComputeTasks(stageInfo); } else if (stageInfo.Meta.IsSysView()) { BuildSysViewScanTasks(stageInfo, holderFactory, typeEnv); @@ -1907,6 +1915,7 @@ private: for (auto& taskDesc : tasks) { remoteComputeTasksCnt += 1; FillInputSettings(taskDesc, lockTxId); + PendingComputeTasks.insert(taskDesc.GetId()); tasksPerNode[it->second].emplace_back(std::move(taskDesc)); } } diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index a09eeda1ae4..d039b27413e 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -680,6 +680,105 @@ protected: } } + void BuildScanTasksFromSource(TStageInfo& stageInfo, const NMiniKQL::THolderFactory& holderFactory, + const NMiniKQL::TTypeEnvironment& typeEnv) + { + THashMap<ui64, std::vector<ui64>> nodeTasks; + THashMap<ui64, ui64> assignedShardsCount; + + auto& stage = GetStage(stageInfo); + + YQL_ENSURE(stage.GetSources(0).HasReadRangesSource()); + YQL_ENSURE(stage.InputsSize() == 0 && stage.SourcesSize() == 1, "multiple sources or sources mixed with connections"); + + auto& source = stage.GetSources(0).GetReadRangesSource(); + + const auto& table = TableKeys.GetTable(MakeTableId(source.GetTable())); + const auto& keyTypes = table.KeyColumnTypes; + + YQL_ENSURE(table.TableKind != NKikimr::NKqp::ETableKind::Olap); + + auto columns = BuildKqpColumns(source, table); + THashMap<ui64, TShardInfo> partitions = PrunePartitions(TableKeys, source, stageInfo, holderFactory, typeEnv); + + bool reverse = false; + ui64 itemsLimit = 0; + + TString itemsLimitParamName; + NYql::NDqProto::TData itemsLimitBytes; + NKikimr::NMiniKQL::TType* itemsLimitType = nullptr; + + YQL_ENSURE(!source.GetReverse(), "reverse not supported yet"); + + for (auto& [shardId, shardInfo] : partitions) { + YQL_ENSURE(!shardInfo.KeyWriteRanges); + + auto& task = TasksGraph.AddTask(stageInfo); + if (auto ptr = ShardIdToNodeId.FindPtr(shardId)) { + task.Meta.NodeId = *ptr; + } else { + task.Meta.ShardId = shardId; + } + + for (auto& [name, value] : shardInfo.Params) { + auto ret = task.Meta.Params.emplace(name, std::move(value)); + YQL_ENSURE(ret.second); + auto typeIterator = shardInfo.ParamTypes.find(name); + YQL_ENSURE(typeIterator != shardInfo.ParamTypes.end()); + auto retType = task.Meta.ParamTypes.emplace(name, typeIterator->second); + YQL_ENSURE(retType.second); + } + + NKikimrTxDataShard::TKqpReadRangesSourceSettings settings; + FillTableMeta(stageInfo, settings.MutableTable()); + for (auto& key : source.GetSkipNullKeys()) { + settings.AddSkipNullKeys(key); + } + + for (auto& keyColumn : keyTypes) { + settings.AddKeyColumnTypes(static_cast<ui32>(keyColumn.GetTypeId())); + } + + for (auto& column : columns) { + auto* protoColumn = settings.AddColumns(); + protoColumn->SetId(column.Id); + auto columnType = NScheme::ProtoColumnTypeFromTypeInfo(column.Type); + protoColumn->SetType(columnType.TypeId); + if (columnType.TypeInfo) { + *protoColumn->MutableTypeInfo() = *columnType.TypeInfo; + } + protoColumn->SetName(column.Name); + } + + if (AppData()->FeatureFlags.GetEnableArrowFormatAtDatashard()) { + settings.SetDataFormat(NKikimrTxDataShard::EScanDataFormat::ARROW); + } else { + settings.SetDataFormat(NKikimrTxDataShard::EScanDataFormat::CELLVEC); + } + + settings.MutableSnapshot()->SetStep(Request.Snapshot.Step); + settings.MutableSnapshot()->SetTxId(Request.Snapshot.TxId); + + shardInfo.KeyReadRanges->SerializeTo(&settings); + settings.SetReverse(reverse); + settings.SetSorted(source.GetSorted()); + + settings.SetShardIdHint(shardId); + + ExtractItemsLimit(stageInfo, source.GetItemsLimit(), holderFactory, + typeEnv, itemsLimit, itemsLimitParamName, itemsLimitBytes, itemsLimitType); + settings.SetItemsLimit(itemsLimit); + + const auto& stageSource = stage.GetSources(0); + auto& input = task.Inputs[stageSource.GetInputIndex()]; + auto& taskSourceSettings = input.SourceSettings; + input.ConnectionInfo = NYql::NDq::TSourceInput{}; + taskSourceSettings.ConstructInPlace(); + taskSourceSettings->PackFrom(settings); + input.SourceType = NYql::KqpReadRangesSourceName; + } + } + protected: // in derived classes // void FillEndpointDesc(NYql::NDqProto::TEndpoint& endpoint, const TTask& task); diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp index 63505936e38..a4888b3cc16 100644 --- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp @@ -224,101 +224,6 @@ private: } } - void BuildScanTasksFromSource(TStageInfo& stageInfo, const NMiniKQL::THolderFactory& holderFactory, - const NMiniKQL::TTypeEnvironment& typeEnv) - { - THashMap<ui64, std::vector<ui64>> nodeTasks; - THashMap<ui64, ui64> assignedShardsCount; - - auto& stage = GetStage(stageInfo); - - auto sourceIndex = FindReadRangesSource(stage); - YQL_ENSURE(sourceIndex); - YQL_ENSURE(stage.InputsSize() == 0 && stage.SourcesSize() == 1, "multiple sources or sources mixed with connections"); - - auto& source = stage.GetSources(*sourceIndex).GetReadRangesSource(); - - const auto& table = TableKeys.GetTable(MakeTableId(source.GetTable())); - const auto& keyTypes = table.KeyColumnTypes; - - YQL_ENSURE(table.TableKind != NKikimr::NKqp::ETableKind::Olap); - - auto columns = BuildKqpColumns(source, table); - THashMap<ui64, TShardInfo> partitions = PrunePartitions(TableKeys, source, stageInfo, holderFactory, typeEnv); - - bool reverse = false; - ui64 itemsLimit = 0; - bool sorted = true; - - TString itemsLimitParamName; - NDqProto::TData itemsLimitBytes; - NKikimr::NMiniKQL::TType* itemsLimitType = nullptr; - - YQL_ENSURE(!source.GetReverse(), "reverse not supported yet"); - - for (auto& [shardId, shardInfo] : partitions) { - YQL_ENSURE(!shardInfo.KeyWriteRanges); - - auto& task = AssignTaskToShard(stageInfo, shardId, nodeTasks, assignedShardsCount, sorted, false); - - for (auto& [name, value] : shardInfo.Params) { - auto ret = task.Meta.Params.emplace(name, std::move(value)); - YQL_ENSURE(ret.second); - auto typeIterator = shardInfo.ParamTypes.find(name); - YQL_ENSURE(typeIterator != shardInfo.ParamTypes.end()); - auto retType = task.Meta.ParamTypes.emplace(name, typeIterator->second); - YQL_ENSURE(retType.second); - } - - NKikimrTxDataShard::TKqpReadRangesSourceSettings settings; - FillTableMeta(stageInfo, settings.MutableTable()); - for (auto& key : source.GetSkipNullKeys()) { - settings.AddSkipNullKeys(key); - } - - for (auto& keyColumn : keyTypes) { - settings.AddKeyColumnTypes(static_cast<ui32>(keyColumn.GetTypeId())); - } - - for (auto& column : columns) { - auto* protoColumn = settings.AddColumns(); - protoColumn->SetId(column.Id); - auto columnType = NScheme::ProtoColumnTypeFromTypeInfo(column.Type); - protoColumn->SetType(columnType.TypeId); - if (columnType.TypeInfo) { - *protoColumn->MutableTypeInfo() = *columnType.TypeInfo; - } - protoColumn->SetName(column.Name); - } - - if (AppData()->FeatureFlags.GetEnableArrowFormatAtDatashard()) { - settings.SetDataFormat(NKikimrTxDataShard::EScanDataFormat::ARROW); - } else { - settings.SetDataFormat(NKikimrTxDataShard::EScanDataFormat::CELLVEC); - } - - settings.MutableSnapshot()->SetStep(Request.Snapshot.Step); - settings.MutableSnapshot()->SetTxId(Request.Snapshot.TxId); - - shardInfo.KeyReadRanges->SerializeTo(&settings); - settings.SetReverse(reverse); - - settings.SetShardIdHint(shardId); - - ExtractItemsLimit(stageInfo, source.GetItemsLimit(), holderFactory, - typeEnv, itemsLimit, itemsLimitParamName, itemsLimitBytes, itemsLimitType); - settings.SetItemsLimit(itemsLimit); - - const auto& stageSource = stage.GetSources(*sourceIndex); - auto& input = task.Inputs[stageSource.GetInputIndex()]; - auto& taskSourceSettings = input.SourceSettings; - input.ConnectionInfo = NYql::NDq::TSourceInput{}; - taskSourceSettings.ConstructInPlace(); - taskSourceSettings->PackFrom(settings); - input.SourceType = NYql::KqpReadRangesSourceName; - } - } - void BuildScanTasks(TStageInfo& stageInfo, const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv) { @@ -526,8 +431,14 @@ private: Y_VERIFY_DEBUG(!stage.GetIsEffectsStage()); - if (FindReadRangesSource(stage)) { - BuildScanTasksFromSource(stageInfo, holderFactory, typeEnv); + if (stage.SourcesSize() > 0) { + switch (stage.GetSources(0).GetTypeCase()) { + case NKqpProto::TKqpSource::kReadRangesSource: + BuildScanTasksFromSource(stageInfo, holderFactory, typeEnv); + break; + default: + YQL_ENSURE(false, "unknown source type"); + } } else if (stageInfo.Meta.ShardOperations.empty()) { BuildComputeTasks(stageInfo); } else if (stageInfo.Meta.IsSysView()) { diff --git a/ydb/core/kqp/node_service/kqp_node_service.cpp b/ydb/core/kqp/node_service/kqp_node_service.cpp index 0df720e5333..62e171299fd 100644 --- a/ydb/core/kqp/node_service/kqp_node_service.cpp +++ b/ydb/core/kqp/node_service/kqp_node_service.cpp @@ -257,9 +257,6 @@ private: runtimeSettingsBase.FailOnUndelivery = true; } - // TODO: fix me - YQL_ENSURE(runtimeSettingsBase.ExtraMemoryAllocationPool == NRm::EKqpMemoryPool::ScanQuery); - runtimeSettingsBase.StatsMode = msgRtSettings.GetStatsMode(); runtimeSettingsBase.UseLLVM = msgRtSettings.GetUseLLVM(); runtimeSettingsBase.UseSpilling = msgRtSettings.GetUseSpilling(); diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp index 9d489cda33f..8141a476843 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp @@ -60,6 +60,7 @@ TExprBase KqpBuildReadTableStage(TExprBase node, TExprContext& ctx, const TKqpOp } const TKqlReadTable& read = node.Cast<TKqlReadTable>(); bool useSource = kqpCtx.Config->FeatureFlags.GetEnableKqpScanQuerySourceRead() && kqpCtx.IsScanQuery(); + useSource = useSource || (kqpCtx.Config->FeatureFlags.GetEnableKqpDataQuerySourceRead() && kqpCtx.IsDataQuery()); TVector<TExprBase> values; TNodeOnNodeOwnedMap replaceMap; @@ -225,6 +226,7 @@ TExprBase KqpBuildReadTableRangesStage(TExprBase node, TExprContext& ctx, auto& tableDesc = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, read.Table().Path()); bool useSource = kqpCtx.Config->FeatureFlags.GetEnableKqpScanQuerySourceRead() && kqpCtx.IsScanQuery(); + useSource = useSource || (kqpCtx.Config->FeatureFlags.GetEnableKqpDataQuerySourceRead() && kqpCtx.IsDataQuery()); bool fullScan = TCoVoid::Match(ranges.Raw()); TVector<TExprBase> input; diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp index 1ab35d1b82d..b9e23c882bd 100644 --- a/ydb/core/kqp/runtime/kqp_read_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp @@ -282,7 +282,7 @@ public: } bool StartTableScan() { - const ui32 maxAllowedInFlight = MaxInFlight; + const ui32 maxAllowedInFlight = Settings.GetSorted() ? 1 : MaxInFlight; bool isFirst = true; while (!PendingShards.Empty() && RunningReads() + 1 <= maxAllowedInFlight) { if (isFirst) { @@ -525,7 +525,7 @@ public: record.AddColumns(column.GetId()); } - { + if (record.HasSnapshot()) { record.MutableSnapshot()->SetTxId(Settings.GetSnapshot().GetTxId()); record.MutableSnapshot()->SetStep(Settings.GetSnapshot().GetStep()); } diff --git a/ydb/core/kqp/ut/kqp_ne_ut.cpp b/ydb/core/kqp/ut/kqp_ne_ut.cpp index 6bfb075bc69..3f46b7cc342 100644 --- a/ydb/core/kqp/ut/kqp_ne_ut.cpp +++ b/ydb/core/kqp/ut/kqp_ne_ut.cpp @@ -3425,6 +3425,57 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); } + + Y_UNIT_TEST(DqSource) { + TKikimrSettings settings; + settings.SetDomainRoot(KikimrDefaultUtDomainRoot); + TFeatureFlags flags; + flags.SetEnablePredicateExtractForDataQueries(true); + flags.SetEnableKqpDataQuerySourceRead(true); + settings.SetFeatureFlags(flags); + TKikimrRunner kikimr(settings); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + { + auto result = session.ExecuteDataQuery(R"( + SELECT Key, Data FROM `/Root/EightShard` WHERE Key = 101 or (Key >= 202 and Key < 200+4) ORDER BY Key; + )", TTxControl::BeginTx().CommitTx()).GetValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + CompareYson(R"([[[101u];[1]];[[202u];[1]];[[203u];[3]]])", FormatResultSetYson(result.GetResultSet(0))); + } + } + + Y_UNIT_TEST(DqSourceLiteralRange) { + TKikimrSettings settings; + settings.SetDomainRoot(KikimrDefaultUtDomainRoot); + TFeatureFlags flags; + flags.SetEnablePredicateExtractForDataQueries(true); + flags.SetEnableKqpDataQuerySourceRead(true); + settings.SetFeatureFlags(flags); + TKikimrRunner kikimr(settings); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + { + auto result = session.ExecuteDataQuery(R"( + SELECT Key, Data FROM `/Root/EightShard` WHERE Key >= 101 and Key < 103 ORDER BY Key; + )", TTxControl::BeginTx().CommitTx()).GetValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + CompareYson(R"([[[101u];[1]];[[102u];[3]]])", FormatResultSetYson(result.GetResultSet(0))); + } + + { + auto params = TParamsBuilder().AddParam("$param").Uint64(101).Build().Build(); + + auto result = session.ExecuteDataQuery(R"( + DECLARE $param as Uint64; + SELECT Key, Data FROM `/Root/EightShard` WHERE Key >= $param and Key < 103 ORDER BY Key; + )", TTxControl::BeginTx().CommitTx(), params).GetValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + CompareYson(R"([[[101u];[1]];[[102u];[3]]])", FormatResultSetYson(result.GetResultSet(0))); + } + } } } // namespace NKikimr::NKqp diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 0522b526cff..3b2024f607e 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -753,6 +753,7 @@ message TFeatureFlags { // enable alter database operation to create subdomain's system tablets // directly in subdomain's hive optional bool EnableAlterDatabaseCreateHiveFirst = 82 [default = false]; + optional bool EnableKqpDataQuerySourceRead = 83 [default = false]; } diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto index 87b5419a241..8aaa9ec8019 100644 --- a/ydb/core/protos/tx_datashard.proto +++ b/ydb/core/protos/tx_datashard.proto @@ -257,6 +257,7 @@ message TKqpReadRangesSourceSettings { optional EScanDataFormat DataFormat = 9; optional NKikimrProto.TRowVersion Snapshot = 10; optional uint64 ShardIdHint = 11; + optional bool Sorted = 12; } message TKqpTaskInfo { |