aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVitalii Gridnev <gridnevvvit@gmail.com>2022-06-01 23:15:48 +0300
committerVitalii Gridnev <gridnevvvit@gmail.com>2022-06-01 23:15:48 +0300
commit2a8720240c26b5040c641c301213406b49320a3b (patch)
tree921ef885a3cc2536223879ec70549c1775937973
parenta500967db9bfdf3fb22cf34326f4e7f2c9e3a701 (diff)
downloadydb-2a8720240c26b5040c641c301213406b49320a3b.tar.gz
[kqp] cleanup executer code: move prune partitions methods to helpers KIKIMR-14818
ref:cde7857493792f7755be86c10b89d05de289bff9
-rw-r--r--ydb/core/kqp/executer/kqp_partition_helper.cpp48
-rw-r--r--ydb/core/kqp/executer/kqp_partition_helper.h11
-rw-r--r--ydb/core/kqp/executer/kqp_scan_executer.cpp46
3 files changed, 61 insertions, 44 deletions
diff --git a/ydb/core/kqp/executer/kqp_partition_helper.cpp b/ydb/core/kqp/executer/kqp_partition_helper.cpp
index 797047a2af..bebba15fcc 100644
--- a/ydb/core/kqp/executer/kqp_partition_helper.cpp
+++ b/ydb/core/kqp/executer/kqp_partition_helper.cpp
@@ -557,6 +557,54 @@ THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys,
return shardInfoMap;
}
+
+THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys,
+ const NKqpProto::TKqpPhyOpReadOlapRanges& readRanges, const TStageInfo& stageInfo,
+ const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv)
+{
+ const auto* table = tableKeys.FindTablePtr(stageInfo.Meta.TableId);
+ YQL_ENSURE(table);
+ YQL_ENSURE(table->TableKind == ETableKind::Olap);
+ YQL_ENSURE(stageInfo.Meta.TableKind == ETableKind::Olap);
+
+ const auto& keyColumnTypes = table->KeyColumnTypes;
+ auto ranges = FillReadRanges(keyColumnTypes, readRanges, stageInfo, holderFactory, typeEnv);
+
+ THashMap<ui64, TShardInfo> shardInfoMap;
+
+ if (ranges.empty())
+ return shardInfoMap;
+
+ for (const auto& partition : stageInfo.Meta.ShardKey->Partitions) {
+ auto& shardInfo = shardInfoMap[partition.ShardId];
+
+ YQL_ENSURE(!shardInfo.KeyReadRanges);
+ shardInfo.KeyReadRanges.ConstructInPlace();
+ shardInfo.KeyReadRanges->CopyFrom(ranges);
+ }
+
+ return shardInfoMap;
+}
+
+THashMap<ui64, TShardInfo> PrunePartitions(TKqpTableKeys& tableKeys,
+ const NKqpProto::TKqpPhyTableOperation& operation, const TStageInfo& stageInfo,
+ const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv)
+{
+ switch (operation.GetTypeCase()) {
+ case NKqpProto::TKqpPhyTableOperation::kReadRanges:
+ return PrunePartitions(tableKeys, operation.GetReadRanges(), stageInfo, holderFactory, typeEnv);
+ case NKqpProto::TKqpPhyTableOperation::kReadRange:
+ return PrunePartitions(tableKeys, operation.GetReadRange(), stageInfo, holderFactory, typeEnv);
+ case NKqpProto::TKqpPhyTableOperation::kLookup:
+ return PrunePartitions(tableKeys, operation.GetLookup(), stageInfo, holderFactory, typeEnv);
+ case NKqpProto::TKqpPhyTableOperation::kReadOlapRange:
+ return PrunePartitions(tableKeys, operation.GetReadOlapRange(), stageInfo, holderFactory, typeEnv);
+ default:
+ YQL_ENSURE(false, "Unexpected table scan operation: " << static_cast<ui32>(operation.GetTypeCase()));
+ break;
+ }
+}
+
namespace {
using namespace NMiniKQL;
diff --git a/ydb/core/kqp/executer/kqp_partition_helper.h b/ydb/core/kqp/executer/kqp_partition_helper.h
index f01f25ab4a..c0afb56cf9 100644
--- a/ydb/core/kqp/executer/kqp_partition_helper.h
+++ b/ydb/core/kqp/executer/kqp_partition_helper.h
@@ -46,6 +46,17 @@ THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys,
const NKqpProto::TKqpPhyOpLookup& lookup, const TStageInfo& stageInfo,
const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv);
+// Returns the list of ColumnShards that can store rows from the specified range
+// NOTE: Unlike OLTP tables that store data in DataShards, data in OLAP tables is not range
+// partitioned and multiple ColumnShards store data from the same key range
+THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys,
+ const NKqpProto::TKqpPhyOpReadOlapRanges& readRanges, const TStageInfo& stageInfo,
+ const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv);
+
+THashMap<ui64, TShardInfo> PrunePartitions(TKqpTableKeys& tableKeys,
+ const NKqpProto::TKqpPhyTableOperation& operation, const TStageInfo& stageInfo,
+ const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv);
+
THashMap<ui64, TShardInfo> PruneEffectPartitions(const TKqpTableKeys& tableKeys,
const NKqpProto::TKqpPhyOpUpsertRows& effect, const TStageInfo& stageInfo,
const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv);
diff --git a/ydb/core/kqp/executer/kqp_scan_executer.cpp b/ydb/core/kqp/executer/kqp_scan_executer.cpp
index cf41f87245..bd5499c395 100644
--- a/ydb/core/kqp/executer/kqp_scan_executer.cpp
+++ b/ydb/core/kqp/executer/kqp_scan_executer.cpp
@@ -431,22 +431,7 @@ private:
Y_VERIFY_DEBUG(stageInfo.Meta.TablePath == op.GetTable().GetPath());
auto columns = BuildKqpColumns(op, table);
- THashMap<ui64, TShardInfo> partitions;
-
- switch (op.GetTypeCase()) {
- case NKqpProto::TKqpPhyTableOperation::kReadRanges:
- partitions = PrunePartitions(TableKeys, op.GetReadRanges(), stageInfo, holderFactory, typeEnv);
- break;
- case NKqpProto::TKqpPhyTableOperation::kReadRange:
- partitions = PrunePartitions(TableKeys, op.GetReadRange(), stageInfo, holderFactory, typeEnv);
- break;
- case NKqpProto::TKqpPhyTableOperation::kLookup:
- partitions = PrunePartitions(TableKeys, op.GetLookup(), stageInfo, holderFactory, typeEnv);
- break;
- default:
- YQL_ENSURE(false, "Unexpected table scan operation: " << (ui32) op.GetTypeCase());
- break;
- }
+ THashMap<ui64, TShardInfo> partitions = PrunePartitions(TableKeys, op, stageInfo, holderFactory, typeEnv);
bool reverse = false;
ui64 itemsLimit = 0;
@@ -547,33 +532,6 @@ private:
}
}
- // Returns the list of ColumnShards that can store rows from the specified range
- // NOTE: Unlike OLTP tables that store data in DataShards, data in OLAP tables is not range
- // partitioned and multiple ColumnShards store data from the same key range
- THashMap<ui64, TShardInfo> ListColumnshadsForRange(const TKqpTableKeys& tableKeys,
- const NKqpProto::TKqpPhyOpReadOlapRanges& readRanges, const TStageInfo& stageInfo,
- const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv)
- {
- const auto* table = tableKeys.FindTablePtr(stageInfo.Meta.TableId);
- YQL_ENSURE(table);
- YQL_ENSURE(table->TableKind == ETableKind::Olap);
- YQL_ENSURE(stageInfo.Meta.TableKind == ETableKind::Olap);
-
- const auto& keyColumnTypes = table->KeyColumnTypes;
- auto ranges = FillReadRanges(keyColumnTypes, readRanges, stageInfo, holderFactory, typeEnv);
-
- THashMap<ui64, TShardInfo> shardInfoMap;
- for (const auto& partition : stageInfo.Meta.ShardKey->Partitions) {
- auto& shardInfo = shardInfoMap[partition.ShardId];
-
- YQL_ENSURE(!shardInfo.KeyReadRanges);
- shardInfo.KeyReadRanges.ConstructInPlace();
- shardInfo.KeyReadRanges->CopyFrom(ranges);
- }
-
- return shardInfoMap;
- }
-
// Creates scan tasks for reading OLAP table range
void BuildColumnshardScanTasks(TStageInfo& stageInfo, const NMiniKQL::THolderFactory& holderFactory,
const NMiniKQL::TTypeEnvironment& typeEnv)
@@ -603,7 +561,7 @@ private:
const auto& readRange = op.GetReadOlapRange();
- auto allShards = ListColumnshadsForRange(TableKeys, readRange, stageInfo, holderFactory, typeEnv);
+ auto allShards = PrunePartitions(TableKeys, op, stageInfo, holderFactory, typeEnv);
bool reverse = readRange.GetReverse();
ui64 itemsLimit = 0;