aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorssmike <ssmike@ydb.tech>2022-12-08 10:57:57 +0300
committerssmike <ssmike@ydb.tech>2022-12-08 10:57:57 +0300
commitf514d7d71c5789e348ba1a693f74c89d28b08fea (patch)
tree74f84ca7a0b05aa847b8ce8a2eb1ba9b253ecb9c
parent613dcecbea7f8bd6c041c2caef1c8b85640a0760 (diff)
downloadydb-f514d7d71c5789e348ba1a693f74c89d28b08fea.tar.gz
support sources in data query executor
-rw-r--r--ydb/core/kqp/executer_actor/kqp_data_executer.cpp11
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.h99
-rw-r--r--ydb/core/kqp/executer_actor/kqp_scan_executer.cpp105
-rw-r--r--ydb/core/kqp/node_service/kqp_node_service.cpp3
-rw-r--r--ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp2
-rw-r--r--ydb/core/kqp/runtime/kqp_read_actor.cpp4
-rw-r--r--ydb/core/kqp/ut/kqp_ne_ut.cpp51
-rw-r--r--ydb/core/protos/config.proto1
-rw-r--r--ydb/core/protos/tx_datashard.proto1
9 files changed, 174 insertions, 103 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
index 6297bdeeb21..dc2817a8053 100644
--- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
@@ -1449,7 +1449,15 @@ private:
ReadOnlyTx &= !stage.GetIsEffectsStage();
- if (stageInfo.Meta.ShardOperations.empty()) {
+ if (stage.SourcesSize() > 0) {
+ switch (stage.GetSources(0).GetTypeCase()) {
+ case NKqpProto::TKqpSource::kReadRangesSource:
+ BuildScanTasksFromSource(stageInfo, holderFactory, typeEnv);
+ break;
+ default:
+ YQL_ENSURE(false, "unknown source type");
+ }
+ } else if (stageInfo.Meta.ShardOperations.empty()) {
BuildComputeTasks(stageInfo);
} else if (stageInfo.Meta.IsSysView()) {
BuildSysViewScanTasks(stageInfo, holderFactory, typeEnv);
@@ -1907,6 +1915,7 @@ private:
for (auto& taskDesc : tasks) {
remoteComputeTasksCnt += 1;
FillInputSettings(taskDesc, lockTxId);
+ PendingComputeTasks.insert(taskDesc.GetId());
tasksPerNode[it->second].emplace_back(std::move(taskDesc));
}
}
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
index a09eeda1ae4..d039b27413e 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
@@ -680,6 +680,105 @@ protected:
}
}
+ void BuildScanTasksFromSource(TStageInfo& stageInfo, const NMiniKQL::THolderFactory& holderFactory,
+ const NMiniKQL::TTypeEnvironment& typeEnv)
+ {
+ THashMap<ui64, std::vector<ui64>> nodeTasks;
+ THashMap<ui64, ui64> assignedShardsCount;
+
+ auto& stage = GetStage(stageInfo);
+
+ YQL_ENSURE(stage.GetSources(0).HasReadRangesSource());
+ YQL_ENSURE(stage.InputsSize() == 0 && stage.SourcesSize() == 1, "multiple sources or sources mixed with connections");
+
+ auto& source = stage.GetSources(0).GetReadRangesSource();
+
+ const auto& table = TableKeys.GetTable(MakeTableId(source.GetTable()));
+ const auto& keyTypes = table.KeyColumnTypes;
+
+ YQL_ENSURE(table.TableKind != NKikimr::NKqp::ETableKind::Olap);
+
+ auto columns = BuildKqpColumns(source, table);
+ THashMap<ui64, TShardInfo> partitions = PrunePartitions(TableKeys, source, stageInfo, holderFactory, typeEnv);
+
+ bool reverse = false;
+ ui64 itemsLimit = 0;
+
+ TString itemsLimitParamName;
+ NYql::NDqProto::TData itemsLimitBytes;
+ NKikimr::NMiniKQL::TType* itemsLimitType = nullptr;
+
+ YQL_ENSURE(!source.GetReverse(), "reverse not supported yet");
+
+ for (auto& [shardId, shardInfo] : partitions) {
+ YQL_ENSURE(!shardInfo.KeyWriteRanges);
+
+ auto& task = TasksGraph.AddTask(stageInfo);
+ if (auto ptr = ShardIdToNodeId.FindPtr(shardId)) {
+ task.Meta.NodeId = *ptr;
+ } else {
+ task.Meta.ShardId = shardId;
+ }
+
+ for (auto& [name, value] : shardInfo.Params) {
+ auto ret = task.Meta.Params.emplace(name, std::move(value));
+ YQL_ENSURE(ret.second);
+ auto typeIterator = shardInfo.ParamTypes.find(name);
+ YQL_ENSURE(typeIterator != shardInfo.ParamTypes.end());
+ auto retType = task.Meta.ParamTypes.emplace(name, typeIterator->second);
+ YQL_ENSURE(retType.second);
+ }
+
+ NKikimrTxDataShard::TKqpReadRangesSourceSettings settings;
+ FillTableMeta(stageInfo, settings.MutableTable());
+ for (auto& key : source.GetSkipNullKeys()) {
+ settings.AddSkipNullKeys(key);
+ }
+
+ for (auto& keyColumn : keyTypes) {
+ settings.AddKeyColumnTypes(static_cast<ui32>(keyColumn.GetTypeId()));
+ }
+
+ for (auto& column : columns) {
+ auto* protoColumn = settings.AddColumns();
+ protoColumn->SetId(column.Id);
+ auto columnType = NScheme::ProtoColumnTypeFromTypeInfo(column.Type);
+ protoColumn->SetType(columnType.TypeId);
+ if (columnType.TypeInfo) {
+ *protoColumn->MutableTypeInfo() = *columnType.TypeInfo;
+ }
+ protoColumn->SetName(column.Name);
+ }
+
+ if (AppData()->FeatureFlags.GetEnableArrowFormatAtDatashard()) {
+ settings.SetDataFormat(NKikimrTxDataShard::EScanDataFormat::ARROW);
+ } else {
+ settings.SetDataFormat(NKikimrTxDataShard::EScanDataFormat::CELLVEC);
+ }
+
+ settings.MutableSnapshot()->SetStep(Request.Snapshot.Step);
+ settings.MutableSnapshot()->SetTxId(Request.Snapshot.TxId);
+
+ shardInfo.KeyReadRanges->SerializeTo(&settings);
+ settings.SetReverse(reverse);
+ settings.SetSorted(source.GetSorted());
+
+ settings.SetShardIdHint(shardId);
+
+ ExtractItemsLimit(stageInfo, source.GetItemsLimit(), holderFactory,
+ typeEnv, itemsLimit, itemsLimitParamName, itemsLimitBytes, itemsLimitType);
+ settings.SetItemsLimit(itemsLimit);
+
+ const auto& stageSource = stage.GetSources(0);
+ auto& input = task.Inputs[stageSource.GetInputIndex()];
+ auto& taskSourceSettings = input.SourceSettings;
+ input.ConnectionInfo = NYql::NDq::TSourceInput{};
+ taskSourceSettings.ConstructInPlace();
+ taskSourceSettings->PackFrom(settings);
+ input.SourceType = NYql::KqpReadRangesSourceName;
+ }
+ }
+
protected:
// in derived classes
// void FillEndpointDesc(NYql::NDqProto::TEndpoint& endpoint, const TTask& task);
diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
index 63505936e38..a4888b3cc16 100644
--- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
@@ -224,101 +224,6 @@ private:
}
}
- void BuildScanTasksFromSource(TStageInfo& stageInfo, const NMiniKQL::THolderFactory& holderFactory,
- const NMiniKQL::TTypeEnvironment& typeEnv)
- {
- THashMap<ui64, std::vector<ui64>> nodeTasks;
- THashMap<ui64, ui64> assignedShardsCount;
-
- auto& stage = GetStage(stageInfo);
-
- auto sourceIndex = FindReadRangesSource(stage);
- YQL_ENSURE(sourceIndex);
- YQL_ENSURE(stage.InputsSize() == 0 && stage.SourcesSize() == 1, "multiple sources or sources mixed with connections");
-
- auto& source = stage.GetSources(*sourceIndex).GetReadRangesSource();
-
- const auto& table = TableKeys.GetTable(MakeTableId(source.GetTable()));
- const auto& keyTypes = table.KeyColumnTypes;
-
- YQL_ENSURE(table.TableKind != NKikimr::NKqp::ETableKind::Olap);
-
- auto columns = BuildKqpColumns(source, table);
- THashMap<ui64, TShardInfo> partitions = PrunePartitions(TableKeys, source, stageInfo, holderFactory, typeEnv);
-
- bool reverse = false;
- ui64 itemsLimit = 0;
- bool sorted = true;
-
- TString itemsLimitParamName;
- NDqProto::TData itemsLimitBytes;
- NKikimr::NMiniKQL::TType* itemsLimitType = nullptr;
-
- YQL_ENSURE(!source.GetReverse(), "reverse not supported yet");
-
- for (auto& [shardId, shardInfo] : partitions) {
- YQL_ENSURE(!shardInfo.KeyWriteRanges);
-
- auto& task = AssignTaskToShard(stageInfo, shardId, nodeTasks, assignedShardsCount, sorted, false);
-
- for (auto& [name, value] : shardInfo.Params) {
- auto ret = task.Meta.Params.emplace(name, std::move(value));
- YQL_ENSURE(ret.second);
- auto typeIterator = shardInfo.ParamTypes.find(name);
- YQL_ENSURE(typeIterator != shardInfo.ParamTypes.end());
- auto retType = task.Meta.ParamTypes.emplace(name, typeIterator->second);
- YQL_ENSURE(retType.second);
- }
-
- NKikimrTxDataShard::TKqpReadRangesSourceSettings settings;
- FillTableMeta(stageInfo, settings.MutableTable());
- for (auto& key : source.GetSkipNullKeys()) {
- settings.AddSkipNullKeys(key);
- }
-
- for (auto& keyColumn : keyTypes) {
- settings.AddKeyColumnTypes(static_cast<ui32>(keyColumn.GetTypeId()));
- }
-
- for (auto& column : columns) {
- auto* protoColumn = settings.AddColumns();
- protoColumn->SetId(column.Id);
- auto columnType = NScheme::ProtoColumnTypeFromTypeInfo(column.Type);
- protoColumn->SetType(columnType.TypeId);
- if (columnType.TypeInfo) {
- *protoColumn->MutableTypeInfo() = *columnType.TypeInfo;
- }
- protoColumn->SetName(column.Name);
- }
-
- if (AppData()->FeatureFlags.GetEnableArrowFormatAtDatashard()) {
- settings.SetDataFormat(NKikimrTxDataShard::EScanDataFormat::ARROW);
- } else {
- settings.SetDataFormat(NKikimrTxDataShard::EScanDataFormat::CELLVEC);
- }
-
- settings.MutableSnapshot()->SetStep(Request.Snapshot.Step);
- settings.MutableSnapshot()->SetTxId(Request.Snapshot.TxId);
-
- shardInfo.KeyReadRanges->SerializeTo(&settings);
- settings.SetReverse(reverse);
-
- settings.SetShardIdHint(shardId);
-
- ExtractItemsLimit(stageInfo, source.GetItemsLimit(), holderFactory,
- typeEnv, itemsLimit, itemsLimitParamName, itemsLimitBytes, itemsLimitType);
- settings.SetItemsLimit(itemsLimit);
-
- const auto& stageSource = stage.GetSources(*sourceIndex);
- auto& input = task.Inputs[stageSource.GetInputIndex()];
- auto& taskSourceSettings = input.SourceSettings;
- input.ConnectionInfo = NYql::NDq::TSourceInput{};
- taskSourceSettings.ConstructInPlace();
- taskSourceSettings->PackFrom(settings);
- input.SourceType = NYql::KqpReadRangesSourceName;
- }
- }
-
void BuildScanTasks(TStageInfo& stageInfo, const NMiniKQL::THolderFactory& holderFactory,
const NMiniKQL::TTypeEnvironment& typeEnv)
{
@@ -526,8 +431,14 @@ private:
Y_VERIFY_DEBUG(!stage.GetIsEffectsStage());
- if (FindReadRangesSource(stage)) {
- BuildScanTasksFromSource(stageInfo, holderFactory, typeEnv);
+ if (stage.SourcesSize() > 0) {
+ switch (stage.GetSources(0).GetTypeCase()) {
+ case NKqpProto::TKqpSource::kReadRangesSource:
+ BuildScanTasksFromSource(stageInfo, holderFactory, typeEnv);
+ break;
+ default:
+ YQL_ENSURE(false, "unknown source type");
+ }
} else if (stageInfo.Meta.ShardOperations.empty()) {
BuildComputeTasks(stageInfo);
} else if (stageInfo.Meta.IsSysView()) {
diff --git a/ydb/core/kqp/node_service/kqp_node_service.cpp b/ydb/core/kqp/node_service/kqp_node_service.cpp
index 0df720e5333..62e171299fd 100644
--- a/ydb/core/kqp/node_service/kqp_node_service.cpp
+++ b/ydb/core/kqp/node_service/kqp_node_service.cpp
@@ -257,9 +257,6 @@ private:
runtimeSettingsBase.FailOnUndelivery = true;
}
- // TODO: fix me
- YQL_ENSURE(runtimeSettingsBase.ExtraMemoryAllocationPool == NRm::EKqpMemoryPool::ScanQuery);
-
runtimeSettingsBase.StatsMode = msgRtSettings.GetStatsMode();
runtimeSettingsBase.UseLLVM = msgRtSettings.GetUseLLVM();
runtimeSettingsBase.UseSpilling = msgRtSettings.GetUseSpilling();
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp
index 9d489cda33f..8141a476843 100644
--- a/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp
+++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp
@@ -60,6 +60,7 @@ TExprBase KqpBuildReadTableStage(TExprBase node, TExprContext& ctx, const TKqpOp
}
const TKqlReadTable& read = node.Cast<TKqlReadTable>();
bool useSource = kqpCtx.Config->FeatureFlags.GetEnableKqpScanQuerySourceRead() && kqpCtx.IsScanQuery();
+ useSource = useSource || (kqpCtx.Config->FeatureFlags.GetEnableKqpDataQuerySourceRead() && kqpCtx.IsDataQuery());
TVector<TExprBase> values;
TNodeOnNodeOwnedMap replaceMap;
@@ -225,6 +226,7 @@ TExprBase KqpBuildReadTableRangesStage(TExprBase node, TExprContext& ctx,
auto& tableDesc = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, read.Table().Path());
bool useSource = kqpCtx.Config->FeatureFlags.GetEnableKqpScanQuerySourceRead() && kqpCtx.IsScanQuery();
+ useSource = useSource || (kqpCtx.Config->FeatureFlags.GetEnableKqpDataQuerySourceRead() && kqpCtx.IsDataQuery());
bool fullScan = TCoVoid::Match(ranges.Raw());
TVector<TExprBase> input;
diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp
index 1ab35d1b82d..b9e23c882bd 100644
--- a/ydb/core/kqp/runtime/kqp_read_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp
@@ -282,7 +282,7 @@ public:
}
bool StartTableScan() {
- const ui32 maxAllowedInFlight = MaxInFlight;
+ const ui32 maxAllowedInFlight = Settings.GetSorted() ? 1 : MaxInFlight;
bool isFirst = true;
while (!PendingShards.Empty() && RunningReads() + 1 <= maxAllowedInFlight) {
if (isFirst) {
@@ -525,7 +525,7 @@ public:
record.AddColumns(column.GetId());
}
- {
+ if (record.HasSnapshot()) {
record.MutableSnapshot()->SetTxId(Settings.GetSnapshot().GetTxId());
record.MutableSnapshot()->SetStep(Settings.GetSnapshot().GetStep());
}
diff --git a/ydb/core/kqp/ut/kqp_ne_ut.cpp b/ydb/core/kqp/ut/kqp_ne_ut.cpp
index 6bfb075bc69..3f46b7cc342 100644
--- a/ydb/core/kqp/ut/kqp_ne_ut.cpp
+++ b/ydb/core/kqp/ut/kqp_ne_ut.cpp
@@ -3425,6 +3425,57 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}
+
+ Y_UNIT_TEST(DqSource) {
+ TKikimrSettings settings;
+ settings.SetDomainRoot(KikimrDefaultUtDomainRoot);
+ TFeatureFlags flags;
+ flags.SetEnablePredicateExtractForDataQueries(true);
+ flags.SetEnableKqpDataQuerySourceRead(true);
+ settings.SetFeatureFlags(flags);
+ TKikimrRunner kikimr(settings);
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+
+ {
+ auto result = session.ExecuteDataQuery(R"(
+ SELECT Key, Data FROM `/Root/EightShard` WHERE Key = 101 or (Key >= 202 and Key < 200+4) ORDER BY Key;
+ )", TTxControl::BeginTx().CommitTx()).GetValueSync();
+ UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
+ CompareYson(R"([[[101u];[1]];[[202u];[1]];[[203u];[3]]])", FormatResultSetYson(result.GetResultSet(0)));
+ }
+ }
+
+ Y_UNIT_TEST(DqSourceLiteralRange) {
+ TKikimrSettings settings;
+ settings.SetDomainRoot(KikimrDefaultUtDomainRoot);
+ TFeatureFlags flags;
+ flags.SetEnablePredicateExtractForDataQueries(true);
+ flags.SetEnableKqpDataQuerySourceRead(true);
+ settings.SetFeatureFlags(flags);
+ TKikimrRunner kikimr(settings);
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+
+ {
+ auto result = session.ExecuteDataQuery(R"(
+ SELECT Key, Data FROM `/Root/EightShard` WHERE Key >= 101 and Key < 103 ORDER BY Key;
+ )", TTxControl::BeginTx().CommitTx()).GetValueSync();
+ UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
+ CompareYson(R"([[[101u];[1]];[[102u];[3]]])", FormatResultSetYson(result.GetResultSet(0)));
+ }
+
+ {
+ auto params = TParamsBuilder().AddParam("$param").Uint64(101).Build().Build();
+
+ auto result = session.ExecuteDataQuery(R"(
+ DECLARE $param as Uint64;
+ SELECT Key, Data FROM `/Root/EightShard` WHERE Key >= $param and Key < 103 ORDER BY Key;
+ )", TTxControl::BeginTx().CommitTx(), params).GetValueSync();
+ UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
+ CompareYson(R"([[[101u];[1]];[[102u];[3]]])", FormatResultSetYson(result.GetResultSet(0)));
+ }
+ }
}
} // namespace NKikimr::NKqp
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index 0522b526cff..3b2024f607e 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -753,6 +753,7 @@ message TFeatureFlags {
// enable alter database operation to create subdomain's system tablets
// directly in subdomain's hive
optional bool EnableAlterDatabaseCreateHiveFirst = 82 [default = false];
+ optional bool EnableKqpDataQuerySourceRead = 83 [default = false];
}
diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto
index 87b5419a241..8aaa9ec8019 100644
--- a/ydb/core/protos/tx_datashard.proto
+++ b/ydb/core/protos/tx_datashard.proto
@@ -257,6 +257,7 @@ message TKqpReadRangesSourceSettings {
optional EScanDataFormat DataFormat = 9;
optional NKikimrProto.TRowVersion Snapshot = 10;
optional uint64 ShardIdHint = 11;
+ optional bool Sorted = 12;
}
message TKqpTaskInfo {