aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authormakostrov <makostrov@yandex-team.com>2023-08-07 15:34:13 +0300
committermakostrov <makostrov@yandex-team.com>2023-08-07 16:51:50 +0300
commit67c8b51e98caa2abfc405b42b9497563207dd424 (patch)
treea365a726ed4f71ee34fc89f8488f25175ccc4309
parentf785ad0ebb0c4618aa237124e6ad8f68dd622023 (diff)
downloadydb-67c8b51e98caa2abfc405b42b9497563207dd424.tar.gz
[kqp] delete table resolver actor KIKIMR-18803
KIKIMR-18803
-rw-r--r--ydb/core/kqp/common/kqp_resolve.h147
-rw-r--r--ydb/core/kqp/executer_actor/kqp_data_executer.cpp20
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.h30
-rw-r--r--ydb/core/kqp/executer_actor/kqp_partition_helper.cpp142
-rw-r--r--ydb/core/kqp/executer_actor/kqp_partition_helper.h30
-rw-r--r--ydb/core/kqp/executer_actor/kqp_scan_executer.cpp10
-rw-r--r--ydb/core/kqp/executer_actor/kqp_table_resolver.cpp178
-rw-r--r--ydb/core/kqp/executer_actor/kqp_table_resolver.h2
-rw-r--r--ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp77
-rw-r--r--ydb/core/kqp/executer_actor/kqp_tasks_graph.h10
-rw-r--r--ydb/core/kqp/query_data/kqp_prepared_query.cpp66
-rw-r--r--ydb/core/kqp/query_data/kqp_prepared_query.h31
12 files changed, 404 insertions, 339 deletions
diff --git a/ydb/core/kqp/common/kqp_resolve.h b/ydb/core/kqp/common/kqp_resolve.h
index 2aaef8e640..0fbba98582 100644
--- a/ydb/core/kqp/common/kqp_resolve.h
+++ b/ydb/core/kqp/common/kqp_resolve.h
@@ -26,29 +26,150 @@ class TKqpTableKeys {
public:
using TColumn = NSharding::TShardingBase::TColumn;
- struct TTable {
- public:
+ struct TTableConstInfo : public TAtomicRefCount<TTableConstInfo> {
TString Path;
TMap<TString, TColumn> Columns;
TVector<TString> KeyColumns;
TVector<NScheme::TTypeInfo> KeyColumnTypes;
ETableKind TableKind = ETableKind::Unknown;
THashMap<TString, TString> Sequences;
+
+ TTableConstInfo() {}
+ TTableConstInfo(const TString& path) : Path(path) {}
+
+ void FillColumn(const NKqpProto::TKqpPhyColumn& phyColumn) {
+ if (Columns.FindPtr(phyColumn.GetId().GetName())) {
+ return;
+ }
+
+ TKqpTableKeys::TColumn column;
+ column.Id = phyColumn.GetId().GetId();
+
+ if (phyColumn.GetTypeId() != NScheme::NTypeIds::Pg) {
+ column.Type = NScheme::TTypeInfo(phyColumn.GetTypeId());
+ } else {
+ column.Type = NScheme::TTypeInfo(phyColumn.GetTypeId(),
+ NPg::TypeDescFromPgTypeName(phyColumn.GetPgTypeName()));
+ }
+
+ Columns.emplace(phyColumn.GetId().GetName(), std::move(column));
+ if (!phyColumn.GetDefaultFromSequence().empty()) {
+ TString seq = phyColumn.GetDefaultFromSequence();
+ if (!seq.StartsWith(Path)) {
+ seq = Path + "/" + seq;
+ }
+
+ Sequences.emplace(phyColumn.GetId().GetName(), seq);
+ }
+ }
+
+ void AddColumn(const TString& columnName) {
+ auto& sysColumns = GetSystemColumns();
+ if (Columns.FindPtr(columnName)) {
+ return;
+ }
+
+ auto* systemColumn = sysColumns.FindPtr(columnName);
+ YQL_ENSURE(systemColumn, "Unknown table column"
+ << ", table: " << Path
+ << ", column: " << columnName);
+
+ TKqpTableKeys::TColumn column;
+ column.Id = systemColumn->ColumnId;
+ column.Type = NScheme::TTypeInfo(systemColumn->TypeId);
+ Columns.emplace(columnName, std::move(column));
+ }
+
+
+ void FillTable(const NKqpProto::TKqpPhyTable& phyTable) {
+ switch (phyTable.GetKind()) {
+ case NKqpProto::TABLE_KIND_DS:
+ TableKind = ETableKind::Datashard;
+ break;
+ case NKqpProto::TABLE_KIND_OLAP:
+ TableKind = ETableKind::Olap;
+ break;
+ case NKqpProto::TABLE_KIND_SYS_VIEW:
+ TableKind = ETableKind::SysView;
+ break;
+ default:
+ YQL_ENSURE(false, "Unexpected phy table kind: " << (i64) phyTable.GetKind());
+ }
+
+ for (const auto& [_, phyColumn] : phyTable.GetColumns()) {
+ FillColumn(phyColumn);
+ }
+
+ YQL_ENSURE(KeyColumns.empty());
+ KeyColumns.reserve(phyTable.KeyColumnsSize());
+ YQL_ENSURE(KeyColumnTypes.empty());
+ KeyColumnTypes.reserve(phyTable.KeyColumnsSize());
+ for (const auto& keyColumnId : phyTable.GetKeyColumns()) {
+ const auto& column = Columns.FindPtr(keyColumnId.GetName());
+ YQL_ENSURE(column);
+
+ KeyColumns.push_back(keyColumnId.GetName());
+ KeyColumnTypes.push_back(column->Type);
+ }
+ }
+ };
+
+ struct TTable {
+ private:
+ TIntrusivePtr<TTableConstInfo> TableConstInfo;
TIntrusiveConstPtr<NSchemeCache::TSchemeCacheNavigate::TColumnTableInfo> ColumnTableInfo;
- const TMap<TString, TColumn>& GetColumnsRemap() const {
- return Columns;
+ public:
+ TTable() : TableConstInfo(MakeIntrusive<TTableConstInfo>()) {}
+
+ TTable(TIntrusivePtr<TTableConstInfo> constInfoPtr) : TableConstInfo(constInfoPtr) {}
+
+ const TString& GetPath() const {
+ return TableConstInfo->Path;
+ }
+
+ const TMap<TString, TColumn>& GetColumns() const {
+ return TableConstInfo->Columns;
+ }
+
+ const TVector<TString>& GetKeyColumns() const {
+ return TableConstInfo->KeyColumns;
+ }
+
+ const TVector<NScheme::TTypeInfo>& GetKeyColumnTypes() const {
+ return TableConstInfo->KeyColumnTypes;
+ }
+
+ const ETableKind& GetTableKind() const {
+ return TableConstInfo->TableKind;
+ }
+
+ const THashMap<TString, TString>& GetSequences() const {
+ return TableConstInfo->Sequences;
+ }
+
+ TIntrusiveConstPtr<NSchemeCache::TSchemeCacheNavigate::TColumnTableInfo> GetColumnTableInfo() const {
+ return ColumnTableInfo;
+ }
+
+ void SetColumnTableInfo(TIntrusiveConstPtr<NSchemeCache::TSchemeCacheNavigate::TColumnTableInfo> columnTableInfo) {
+ ColumnTableInfo = columnTableInfo;
}
- std::unique_ptr<NSharding::TShardingBase> BuildSharding() const {
- if (ColumnTableInfo) {
- auto result = NSharding::TShardingBase::BuildShardingOperator(ColumnTableInfo->Description.GetSharding());
+ static std::unique_ptr<NSharding::TShardingBase> BuildSharding(const TIntrusiveConstPtr<NSchemeCache::TSchemeCacheNavigate::TColumnTableInfo>& columnTableInfo) {
+ if (columnTableInfo) {
+ auto result = NSharding::TShardingBase::BuildShardingOperator(columnTableInfo->Description.GetSharding());
YQL_ENSURE(result);
return result;
} else {
return nullptr;
}
}
+
+ void SetPath(const TStringBuf& path) {
+ TableConstInfo->Path = path;
+ }
+
};
TTable* FindTablePtr(const TTableId& id) {
@@ -71,13 +192,13 @@ public:
return *table;
}
- TTable& GetOrAddTable(const TTableId& id, const TStringBuf path) {
+ TTable& GetOrAddTable(const TTableId& id, const TStringBuf& path) {
auto& table = TablesById[id];
- if (table.Path.empty()) {
- table.Path = path;
+ if (table.GetPath().empty()) {
+ table.SetPath(path);
} else {
- MKQL_ENSURE_S(table.Path == path);
+ MKQL_ENSURE_S(table.GetPath() == path);
}
return table;
@@ -91,6 +212,10 @@ public:
return TablesById;
}
+ void AddTable(const TTableId& id, TIntrusivePtr<TTableConstInfo> info) {
+ TablesById.insert_or_assign(id, info);
+ }
+
private:
THashMap<TTableId, TTable> TablesById;
};
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
index 4bc8e8ba9e..fd6cbc846e 100644
--- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
@@ -1066,9 +1066,9 @@ private:
if (!res->Record.GetTxLocks().empty()) {
auto& lock = res->Record.GetTxLocks(0);
auto tableId = TTableId(lock.GetSchemeShard(), lock.GetPathId());
- auto it = FindIf(GetTableKeys().Get(), [tableId](const auto& x){ return x.first.HasSamePath(tableId); });
- if (it != GetTableKeys().Get().end()) {
- tableName = it->second.Path;
+ auto it = FindIf(TasksGraph.GetStagesInfo(), [tableId](const auto& x){ return x.second.Meta.TableId.HasSamePath(tableId); });
+ if (it != TasksGraph.GetStagesInfo().end()) {
+ tableName = it->second.Meta.TableConstInfo->Path;
}
}
@@ -1356,17 +1356,17 @@ private:
return task;
};
- const auto& table = GetTableKeys().GetTable(stageInfo.Meta.TableId);
- const auto& keyTypes = table.KeyColumnTypes;;
+ const auto& tableInfo = stageInfo.Meta.TableConstInfo;
+ const auto& keyTypes = tableInfo->KeyColumnTypes;
for (auto& op : stage.GetTableOps()) {
Y_VERIFY_DEBUG(stageInfo.Meta.TablePath == op.GetTable().GetPath());
- auto columns = BuildKqpColumns(op, table);
+ auto columns = BuildKqpColumns(op, tableInfo);
switch (op.GetTypeCase()) {
case NKqpProto::TKqpPhyTableOperation::kReadRanges:
case NKqpProto::TKqpPhyTableOperation::kReadRange:
case NKqpProto::TKqpPhyTableOperation::kLookup: {
- auto partitions = PrunePartitions(GetTableKeys(), op, stageInfo, HolderFactory(), TypeEnv());
+ auto partitions = PrunePartitions(op, stageInfo, HolderFactory(), TypeEnv());
auto readSettings = ExtractReadSettings(op, stageInfo, HolderFactory(), TypeEnv());
for (auto& [shardId, shardInfo] : partitions) {
@@ -1420,7 +1420,7 @@ private:
ShardsWithEffects.insert(task.Meta.ShardId);
}
} else {
- auto result = PruneEffectPartitions(GetTableKeys(), op, stageInfo, HolderFactory(), TypeEnv());
+ auto result = PruneEffectPartitions(op, stageInfo, HolderFactory(), TypeEnv());
for (auto& [shardId, shardInfo] : result) {
YQL_ENSURE(!shardInfo.KeyReadRanges);
YQL_ENSURE(shardInfo.KeyWriteRanges);
@@ -1441,7 +1441,7 @@ private:
}
for (const auto& [name, info] : shardInfo.ColumnWrites) {
- auto& column = table.Columns.at(name);
+ auto& column = tableInfo->Columns.at(name);
auto& taskColumnWrite = task.Meta.Writes->ColumnWrites[column.Id];
taskColumnWrite.Column.Id = column.Id;
@@ -1724,7 +1724,7 @@ private:
YQL_ENSURE(stageInfo.Tasks.size() == 1, "Unexpected multiple tasks in single-partition stage");
}
- BuildKqpStageChannels(TasksGraph, GetTableKeys(), stageInfo, TxId, /* enableSpilling */ false);
+ BuildKqpStageChannels(TasksGraph, stageInfo, TxId, /* enableSpilling */ false);
}
ResponseEv->InitTxResult(tx.Body);
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
index 1014b280cc..61f2db18f8 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
@@ -344,7 +344,7 @@ protected:
}
auto kqpTableResolver = CreateKqpTableResolver(this->SelfId(), TxId, UserToken, Request.Transactions,
- GetTableKeysRef(), TasksGraph);
+ TasksGraph);
KqpTableResolverId = this->RegisterWithSameMailbox(kqpTableResolver);
LOG_T("Got request, become WaitResolveState");
@@ -694,8 +694,8 @@ protected:
auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);
- const auto& table = GetTableKeys().GetTable(stageInfo.Meta.TableId);
- const auto& keyTypes = table.KeyColumnTypes;
+ const auto& tableInfo = stageInfo.Meta.TableConstInfo;
+ const auto& keyTypes = tableInfo->KeyColumnTypes;
for (auto& op : stage.GetTableOps()) {
Y_VERIFY_DEBUG(stageInfo.Meta.TablePath == op.GetTable().GetPath());
@@ -724,7 +724,7 @@ protected:
TTaskMeta::TShardReadInfo readInfo = {
.Ranges = std::move(keyRanges),
- .Columns = BuildKqpColumns(op, table),
+ .Columns = BuildKqpColumns(op, tableInfo),
};
task.Meta.Reads.ConstructInPlace();
@@ -797,12 +797,12 @@ protected:
auto& source = stage.GetSources(0).GetReadRangesSource();
- const auto& table = GetTableKeys().GetTable(MakeTableId(source.GetTable()));
- const auto& keyTypes = table.KeyColumnTypes;
+ const auto& tableInfo = stageInfo.Meta.TableConstInfo;
+ const auto& keyTypes = tableInfo->KeyColumnTypes;
- YQL_ENSURE(table.TableKind != NKikimr::NKqp::ETableKind::Olap);
+ YQL_ENSURE(tableInfo->TableKind != NKikimr::NKqp::ETableKind::Olap);
- auto columns = BuildKqpColumns(source, table);
+ auto columns = BuildKqpColumns(source, tableInfo);
const auto& snapshot = GetSnapshot();
@@ -896,9 +896,9 @@ protected:
};
if (source.GetSequentialInFlightShards()) {
- auto [startShard, shardInfo] = MakeVirtualTablePartition(GetTableKeys(), source, stageInfo, HolderFactory(), TypeEnv());
+ auto [startShard, shardInfo] = MakeVirtualTablePartition(source, stageInfo, HolderFactory(), TypeEnv());
if (Stats) {
- THashMap<ui64, TShardInfo> partitions = PrunePartitions(GetTableKeys(), source, stageInfo, HolderFactory(), TypeEnv());
+ THashMap<ui64, TShardInfo> partitions = PrunePartitions(source, stageInfo, HolderFactory(), TypeEnv());
for (auto& [shardId, _] : partitions) {
Stats->AffectedShards.insert(shardId);
}
@@ -910,7 +910,7 @@ protected:
return 0;
}
} else {
- THashMap<ui64, TShardInfo> partitions = PrunePartitions(GetTableKeys(), source, stageInfo, HolderFactory(), TypeEnv());
+ THashMap<ui64, TShardInfo> partitions = PrunePartitions(source, stageInfo, HolderFactory(), TypeEnv());
for (auto& [shardId, shardInfo] : partitions) {
addPartiton(shardId, shardId, shardInfo, {});
}
@@ -1141,14 +1141,6 @@ protected:
}
}
- const TKqpTableKeys& GetTableKeys() const {
- return TasksGraph.GetMeta().TableKeys;
- }
-
- TKqpTableKeys& GetTableKeysRef() {
- return TasksGraph.GetMeta().TableKeys;
- }
-
std::unordered_map<ui64, IActor*>& GetResultChannelProxies() {
return ResultChannelProxies;
}
diff --git a/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp b/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp
index 1283f323b8..425b73a7c7 100644
--- a/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp
@@ -33,12 +33,12 @@ struct TShardParamValuesAndRanges {
THashMap<ui64, TShardParamValuesAndRanges> PartitionParamByKey(
const NUdf::TUnboxedValue& value, NKikimr::NMiniKQL::TType* type,
const TTableId& tableId,
- const TKqpTableKeys& tableKeys, const TKeyDesc& key, const NMiniKQL::THolderFactory&,
+ const TStageInfo& stageInfo, const TKeyDesc& key, const NMiniKQL::THolderFactory&, // Here is problem in ...
const NMiniKQL::TTypeEnvironment& typeEnv)
{
auto guard = typeEnv.BindAllocator();
YQL_ENSURE(tableId.HasSamePath(key.TableId));
- auto& table = tableKeys.GetTable(tableId);
+ auto& tableInfo = stageInfo.Meta.TableConstInfo;
THashMap<ui64, TShardParamValuesAndRanges> ret;
@@ -47,19 +47,19 @@ THashMap<ui64, TShardParamValuesAndRanges> PartitionParamByKey(
YQL_ENSURE(itemType->GetKind() == NMiniKQL::TType::EKind::Struct);
auto* structType = static_cast<NMiniKQL::TStructType*>(itemType);
- const ui64 keyLen = table.KeyColumns.size();
+ const ui64 keyLen = tableInfo->KeyColumns.size();
TVector<ui32> keyColumnIndices;
keyColumnIndices.reserve(keyLen);
- for (auto& keyColumn : table.KeyColumns) {
+ for (auto& keyColumn : tableInfo->KeyColumns) {
keyColumnIndices.push_back(structType->GetMemberIndex(keyColumn));
}
NUdf::TUnboxedValue paramValue;
- std::unique_ptr<NSharding::TShardingBase> sharding = table.BuildSharding();
+ std::unique_ptr<NSharding::TShardingBase> sharding = TKqpTableKeys::TTable::BuildSharding(stageInfo.Meta.ColumnTableInfoPtr);
std::unique_ptr<NSharding::TUnboxedValueReader> unboxedReader;
if (sharding) {
- unboxedReader = std::make_unique<NSharding::TUnboxedValueReader>(structType, table.GetColumnsRemap(), sharding->GetShardingColumns());
+ unboxedReader = std::make_unique<NSharding::TUnboxedValueReader>(structType, tableInfo->Columns, sharding->GetShardingColumns());
}
auto it = value.GetListIterator();
while (it.Next(paramValue)) {
@@ -67,10 +67,10 @@ THashMap<ui64, TShardParamValuesAndRanges> PartitionParamByKey(
if (sharding) {
shardId = key.GetPartitions()[sharding->CalcShardId(paramValue, *unboxedReader)].ShardId;
} else {
- auto keyValue = MakeKeyCells(paramValue, table.KeyColumnTypes, keyColumnIndices,
+ auto keyValue = MakeKeyCells(paramValue, tableInfo->KeyColumnTypes, keyColumnIndices,
typeEnv, /* copyValues */ true);
Y_VERIFY_DEBUG(keyValue.size() == keyLen);
- const ui32 partitionIndex = FindKeyPartitionIndex(keyValue, key.GetPartitions(), table.KeyColumnTypes,
+ const ui32 partitionIndex = FindKeyPartitionIndex(keyValue, key.GetPartitions(), tableInfo->KeyColumnTypes,
[](const auto& partition) { return *partition.Range; });
shardId = key.GetPartitions()[partitionIndex].ShardId;
@@ -96,7 +96,7 @@ THashMap<ui64, TShardParamValuesAndRanges> PartitionParamByKey(
ui32 sizeBytes = NDq::TDqDataSerializer::EstimateSize(columnValue, columnType);
// Sanity check, we only expect table columns in param values
- Y_VERIFY_DEBUG(table.Columns.contains(columnName));
+ Y_VERIFY_DEBUG(tableInfo->Columns.contains(columnName));
auto& columnWrite = shardData.ColumnWrites[columnName];
columnWrite.MaxValueSizeBytes = std::max(columnWrite.MaxValueSizeBytes, sizeBytes);
@@ -111,12 +111,13 @@ THashMap<ui64, TShardParamValuesAndRanges> PartitionParamByKey(
THashMap<ui64, TShardParamValuesAndRanges> PartitionParamByKeyPrefix(
const NUdf::TUnboxedValue& value, NKikimr::NMiniKQL::TType* type,
- const TTableId& tableId, const TKqpTableKeys& tableKeys, const TKeyDesc& key,
+ const TTableId& tableId, const TIntrusiveConstPtr<TKqpTableKeys::TTableConstInfo>& tableInfo, const TKeyDesc& key,
const NMiniKQL::THolderFactory&, const NMiniKQL::TTypeEnvironment& typeEnv)
{
+ YQL_ENSURE(tableInfo);
+
auto guard = typeEnv.BindAllocator();
YQL_ENSURE(tableId.HasSamePath(key.TableId));
- auto& table = tableKeys.GetTable(tableId);
THashMap<ui64, TShardParamValuesAndRanges> ret;
@@ -125,13 +126,13 @@ THashMap<ui64, TShardParamValuesAndRanges> PartitionParamByKeyPrefix(
YQL_ENSURE(itemType->GetKind() == NMiniKQL::TType::EKind::Struct);
auto& structType = static_cast<NMiniKQL::TStructType&>(*itemType);
- const ui64 keyLen = table.KeyColumns.size();
+ const ui64 keyLen = tableInfo->KeyColumns.size();
TVector<NScheme::TTypeInfo> keyFullType{Reserve(keyLen)};
TVector<NScheme::TTypeInfo> keyPrefixType{Reserve(keyLen)};
TVector<ui32> keyPrefixIndices{Reserve(keyLen)};
- for (const auto& keyColumn : table.KeyColumns) {
+ for (const auto& keyColumn : tableInfo->KeyColumns) {
auto columnInfo = NDq::FindColumnInfo(&structType, keyColumn);
if (!columnInfo) {
break;
@@ -146,7 +147,7 @@ THashMap<ui64, TShardParamValuesAndRanges> PartitionParamByKeyPrefix(
YQL_ENSURE(!keyPrefixType.empty());
for (ui64 i = keyFullType.size(); i < keyLen; ++i) {
- keyFullType.push_back(table.Columns.at(table.KeyColumns[i]).Type);
+ keyFullType.push_back(tableInfo->Columns.at(tableInfo->KeyColumns[i]).Type);
}
NUdf::TUnboxedValue paramValue;
@@ -485,15 +486,13 @@ TString TShardInfo::ToString(const TVector<NScheme::TTypeInfo>& keyTypes, const
return sb;
}
-THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys,
- const NKqpProto::TKqpPhyOpReadRange& readRange, const TStageInfo& stageInfo,
+THashMap<ui64, TShardInfo> PrunePartitions(const NKqpProto::TKqpPhyOpReadRange& readRange, const TStageInfo& stageInfo,
const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv)
{
auto guard = typeEnv.BindAllocator();
- const auto* table = tableKeys.FindTablePtr(stageInfo.Meta.TableId);
- YQL_ENSURE(table);
+ const auto& tableInfo = stageInfo.Meta.TableConstInfo;
- const auto& keyColumnTypes = table->KeyColumnTypes;
+ const auto& keyColumnTypes = tableInfo->KeyColumnTypes;
YQL_ENSURE(readRange.HasKeyRange());
auto range = MakeKeyRange(keyColumnTypes, readRange.GetKeyRange(), stageInfo, holderFactory, typeEnv);
@@ -517,15 +516,13 @@ THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys,
return shardInfoMap;
}
-THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys,
- const NKqpProto::TKqpPhyOpReadRanges& readRanges, const TStageInfo& stageInfo,
+THashMap<ui64, TShardInfo> PrunePartitions(const NKqpProto::TKqpPhyOpReadRanges& readRanges, const TStageInfo& stageInfo,
const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv)
{
Y_UNUSED(holderFactory);
- const auto* table = tableKeys.FindTablePtr(stageInfo.Meta.TableId);
- YQL_ENSURE(table);
+ const auto& tableInfo = stageInfo.Meta.TableConstInfo;
- const auto& keyColumnTypes = table->KeyColumnTypes;
+ const auto& keyColumnTypes = tableInfo->KeyColumnTypes;
auto ranges = FillReadRangesInternal(keyColumnTypes, readRanges, stageInfo, typeEnv);
THashMap<ui64, TShardInfo> shardInfoMap;
@@ -558,15 +555,13 @@ THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys,
return shardInfoMap;
}
-TVector<TSerializedPointOrRange> ExtractRanges(const TKqpTableKeys& tableKeys,
- const NKqpProto::TKqpReadRangesSource& source, const TStageInfo& stageInfo,
+TVector<TSerializedPointOrRange> ExtractRanges(const NKqpProto::TKqpReadRangesSource& source, const TStageInfo& stageInfo,
const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv,
TGuard<NKikimr::NMiniKQL::TScopedAlloc>&)
{
- const auto* table = tableKeys.FindTablePtr(stageInfo.Meta.TableId);
- YQL_ENSURE(table);
+ const auto& tableInfo = stageInfo.Meta.TableConstInfo;
- const auto& keyColumnTypes = table->KeyColumnTypes;
+ const auto& keyColumnTypes = tableInfo->KeyColumnTypes;
TVector<TSerializedPointOrRange> ranges;
if (source.HasRanges()) {
@@ -588,16 +583,14 @@ TVector<TSerializedPointOrRange> ExtractRanges(const TKqpTableKeys& tableKeys,
return ranges;
}
-std::pair<ui64, TShardInfo> MakeVirtualTablePartition(const TKqpTableKeys& tableKeys,
- const NKqpProto::TKqpReadRangesSource& source, const TStageInfo& stageInfo,
+std::pair<ui64, TShardInfo> MakeVirtualTablePartition(const NKqpProto::TKqpReadRangesSource& source, const TStageInfo& stageInfo,
const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv)
{
auto guard = typeEnv.BindAllocator();
- const auto* table = tableKeys.FindTablePtr(stageInfo.Meta.TableId);
- YQL_ENSURE(table);
+ const auto& tableInfo = stageInfo.Meta.TableConstInfo;
- const auto& keyColumnTypes = table->KeyColumnTypes;
- auto ranges = ExtractRanges(tableKeys, source, stageInfo, holderFactory, typeEnv, guard);
+ const auto& keyColumnTypes = tableInfo->KeyColumnTypes;
+ auto ranges = ExtractRanges(source, stageInfo, holderFactory, typeEnv, guard);
ui64 shard = 0;
if (!ranges.empty()) {
@@ -627,16 +620,14 @@ std::pair<ui64, TShardInfo> MakeVirtualTablePartition(const TKqpTableKeys& table
}
-THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys,
- const NKqpProto::TKqpReadRangesSource& source, const TStageInfo& stageInfo,
+THashMap<ui64, TShardInfo> PrunePartitions(const NKqpProto::TKqpReadRangesSource& source, const TStageInfo& stageInfo,
const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv)
{
auto guard = typeEnv.BindAllocator();
- const auto* table = tableKeys.FindTablePtr(stageInfo.Meta.TableId);
- YQL_ENSURE(table);
+ const auto& tableInfo = stageInfo.Meta.TableConstInfo;
- const auto& keyColumnTypes = table->KeyColumnTypes;
- auto ranges = ExtractRanges(tableKeys, source, stageInfo, holderFactory, typeEnv, guard);
+ const auto& keyColumnTypes = tableInfo->KeyColumnTypes;
+ auto ranges = ExtractRanges(source, stageInfo, holderFactory, typeEnv, guard);
THashMap<ui64, TShardInfo> shardInfoMap;
@@ -669,18 +660,17 @@ THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys,
}
-THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys,
- const NKqpProto::TKqpPhyOpReadOlapRanges& readRanges, const TStageInfo& stageInfo,
+THashMap<ui64, TShardInfo> PrunePartitions(const NKqpProto::TKqpPhyOpReadOlapRanges& readRanges, const TStageInfo& stageInfo,
const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv)
{
Y_UNUSED(holderFactory);
auto guard = typeEnv.BindAllocator();
- const auto* table = tableKeys.FindTablePtr(stageInfo.Meta.TableId);
- YQL_ENSURE(table);
- YQL_ENSURE(table->TableKind == ETableKind::Olap);
+ const auto& tableInfo = stageInfo.Meta.TableConstInfo;
+
+ YQL_ENSURE(tableInfo->TableKind == ETableKind::Olap);
YQL_ENSURE(stageInfo.Meta.TableKind == ETableKind::Olap);
- const auto& keyColumnTypes = table->KeyColumnTypes;
+ const auto& keyColumnTypes = tableInfo->KeyColumnTypes;
auto ranges = FillReadRanges(keyColumnTypes, readRanges, stageInfo, typeEnv);
THashMap<ui64, TShardInfo> shardInfoMap;
@@ -699,19 +689,18 @@ THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys,
return shardInfoMap;
}
-THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys,
- const NKqpProto::TKqpPhyTableOperation& operation, const TStageInfo& stageInfo,
+THashMap<ui64, TShardInfo> PrunePartitions(const NKqpProto::TKqpPhyTableOperation& operation, const TStageInfo& stageInfo,
const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv)
{
switch (operation.GetTypeCase()) {
case NKqpProto::TKqpPhyTableOperation::kReadRanges:
- return PrunePartitions(tableKeys, operation.GetReadRanges(), stageInfo, holderFactory, typeEnv);
+ return PrunePartitions(operation.GetReadRanges(), stageInfo, holderFactory, typeEnv);
case NKqpProto::TKqpPhyTableOperation::kReadRange:
- return PrunePartitions(tableKeys, operation.GetReadRange(), stageInfo, holderFactory, typeEnv);
+ return PrunePartitions(operation.GetReadRange(), stageInfo, holderFactory, typeEnv);
case NKqpProto::TKqpPhyTableOperation::kLookup:
- return PrunePartitions(tableKeys, operation.GetLookup(), stageInfo, holderFactory, typeEnv);
+ return PrunePartitions(operation.GetLookup(), stageInfo, holderFactory, typeEnv);
case NKqpProto::TKqpPhyTableOperation::kReadOlapRange:
- return PrunePartitions(tableKeys, operation.GetReadOlapRange(), stageInfo, holderFactory, typeEnv);
+ return PrunePartitions(operation.GetReadOlapRange(), stageInfo, holderFactory, typeEnv);
default:
YQL_ENSURE(false, "Unexpected table scan operation: " << static_cast<ui32>(operation.GetTypeCase()));
break;
@@ -724,12 +713,12 @@ namespace {
using namespace NMiniKQL;
THashMap<ui64, TShardInfo> PartitionLookupByParameterValue(const NKqpProto::TKqpPhyParamValue& proto,
- const TKqpTableKeys& tableKeys, const TStageInfo& stageInfo, const THolderFactory& holderFactory,
+ const TStageInfo& stageInfo, const THolderFactory& holderFactory,
const TTypeEnvironment& typeEnv)
{
const auto& name = proto.GetParamName();
auto [type, value] = stageInfo.Meta.Tx.Params->GetParameterUnboxedValue(name);
- auto shardsMap = PartitionParamByKeyPrefix(value, type, stageInfo.Meta.TableId, tableKeys, *stageInfo.Meta.ShardKey,
+ auto shardsMap = PartitionParamByKeyPrefix(value, type, stageInfo.Meta.TableId, stageInfo.Meta.TableConstInfo, *stageInfo.Meta.ShardKey,
holderFactory, typeEnv);
THashMap<ui64, TShardInfo> shardInfoMap;
@@ -754,19 +743,19 @@ THashMap<ui64, TShardInfo> PartitionLookupByParameterValue(const NKqpProto::TKqp
}
THashMap<ui64, TShardInfo> PartitionLookupByRowsList(const NKqpProto::TKqpPhyRowsList& proto,
- const TKqpTableKeys& tableKeys, const TStageInfo& stageInfo, const THolderFactory& holderFactory,
+ const TStageInfo& stageInfo, const THolderFactory& holderFactory,
const TTypeEnvironment& typeEnv)
{
- const auto& table = tableKeys.GetTable(stageInfo.Meta.ShardKey->TableId);
-
std::unordered_map<ui64, THashSet<TString>> shardParams; // shardId -> paramNames
std::unordered_map<ui64, TShardParamValuesAndRanges> ret;
THashMap<ui64, TShardInfo> shardInfoMap;
+ const auto& tableInfo = stageInfo.Meta.TableConstInfo;
+
for (const auto& row : proto.GetRows()) {
TVector<TCell> keyFrom, keyTo;
- keyFrom.resize(table.KeyColumns.size());
+ keyFrom.resize(tableInfo->KeyColumns.size());
keyTo.resize(row.GetColumns().size());
NUdf::TUnboxedValue mkqlValue;
@@ -792,17 +781,17 @@ THashMap<ui64, TShardInfo> PartitionLookupByRowsList(const NKqpProto::TKqpPhyRow
}
}
- for (ui64 i = 0; i < table.KeyColumns.size(); ++i) {
- if (table.KeyColumns[i] == columnName) {
+ for (ui64 i = 0; i < tableInfo->KeyColumns.size(); ++i) {
+ if (tableInfo->KeyColumns[i] == columnName) {
keyFrom[i] = keyTo[i] = NMiniKQL::MakeCell(
- table.KeyColumnTypes[i], mkqlValue, typeEnv, /* copyValue */ false);
+ tableInfo->KeyColumnTypes[i], mkqlValue, typeEnv, /* copyValue */ false);
break;
}
}
}
auto range = TTableRange(keyFrom, true, keyTo, true, /* point */ false);
- auto partitions = GetKeyRangePartitions(range, stageInfo.Meta.ShardKey->GetPartitions(), table.KeyColumnTypes);
+ auto partitions = GetKeyRangePartitions(range, stageInfo.Meta.ShardKey->GetPartitions(), tableInfo->KeyColumnTypes);
for (auto& partitionWithRange: partitions) {
ui64 shardId = partitionWithRange.PartitionInfo->ShardId;
@@ -844,7 +833,7 @@ THashMap<ui64, TShardInfo> PartitionLookupByRowsList(const NKqpProto::TKqpPhyRow
} // namespace
-THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys, const NKqpProto::TKqpPhyOpLookup& lookup,
+THashMap<ui64, TShardInfo> PrunePartitions(const NKqpProto::TKqpPhyOpLookup& lookup,
const TStageInfo& stageInfo, const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv)
{
auto guard = typeEnv.BindAllocator();
@@ -858,12 +847,12 @@ THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys, const
switch (auto kind = lookup.GetKeysValue().GetKindCase()) {
case NKqpProto::TKqpPhyValue::kParamValue: {
- return PartitionLookupByParameterValue(lookup.GetKeysValue().GetParamValue(), tableKeys, stageInfo,
+ return PartitionLookupByParameterValue(lookup.GetKeysValue().GetParamValue(), stageInfo,
holderFactory, typeEnv);
}
case NKqpProto::TKqpPhyValue::kRowsList: {
- return PartitionLookupByRowsList(lookup.GetKeysValue().GetRowsList(), tableKeys, stageInfo,
+ return PartitionLookupByRowsList(lookup.GetKeysValue().GetRowsList(), stageInfo,
holderFactory, typeEnv);
}
@@ -876,7 +865,7 @@ THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys, const
}
template <typename TEffect>
-THashMap<ui64, TShardInfo> PruneEffectPartitionsImpl(const TKqpTableKeys& tableKeys, const TEffect& effect,
+THashMap<ui64, TShardInfo> PruneEffectPartitionsImpl(const TEffect& effect,
const TStageInfo& stageInfo, const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv)
{
auto guard = typeEnv.BindAllocator();
@@ -886,7 +875,7 @@ THashMap<ui64, TShardInfo> PruneEffectPartitionsImpl(const TKqpTableKeys& tableK
{
const auto& name = effect.GetRowsValue().GetParamValue().GetParamName();
auto [type, value] = stageInfo.Meta.Tx.Params->GetParameterUnboxedValue(name);
- auto shardsMap = PartitionParamByKey(value, type, stageInfo.Meta.TableId, tableKeys, *stageInfo.Meta.ShardKey,
+ auto shardsMap = PartitionParamByKey(value, type, stageInfo.Meta.TableId, stageInfo, *stageInfo.Meta.ShardKey,
holderFactory, typeEnv);
for (auto& [shardId, shardData] : shardsMap) {
@@ -918,29 +907,26 @@ THashMap<ui64, TShardInfo> PruneEffectPartitionsImpl(const TKqpTableKeys& tableK
return shardInfoMap;
}
-THashMap<ui64, TShardInfo> PruneEffectPartitions(const TKqpTableKeys& tableKeys,
- const NKqpProto::TKqpPhyOpUpsertRows& effect, const TStageInfo& stageInfo,
+THashMap<ui64, TShardInfo> PruneEffectPartitions(const NKqpProto::TKqpPhyOpUpsertRows& effect, const TStageInfo& stageInfo,
const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv)
{
- return PruneEffectPartitionsImpl(tableKeys, effect, stageInfo, holderFactory, typeEnv);
+ return PruneEffectPartitionsImpl(effect, stageInfo, holderFactory, typeEnv);
}
-THashMap<ui64, TShardInfo> PruneEffectPartitions(const TKqpTableKeys& tableKeys,
- const NKqpProto::TKqpPhyOpDeleteRows& effect, const TStageInfo& stageInfo,
+THashMap<ui64, TShardInfo> PruneEffectPartitions(const NKqpProto::TKqpPhyOpDeleteRows& effect, const TStageInfo& stageInfo,
const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv)
{
- return PruneEffectPartitionsImpl(tableKeys, effect, stageInfo, holderFactory, typeEnv);
+ return PruneEffectPartitionsImpl(effect, stageInfo, holderFactory, typeEnv);
}
-THashMap<ui64, TShardInfo> PruneEffectPartitions(const TKqpTableKeys& tableKeys,
- const NKqpProto::TKqpPhyTableOperation& operation, const TStageInfo& stageInfo,
+THashMap<ui64, TShardInfo> PruneEffectPartitions(const NKqpProto::TKqpPhyTableOperation& operation, const TStageInfo& stageInfo,
const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv)
{
switch(operation.GetTypeCase()) {
case NKqpProto::TKqpPhyTableOperation::kUpsertRows:
- return PruneEffectPartitions(tableKeys, operation.GetUpsertRows(), stageInfo, holderFactory, typeEnv);
+ return PruneEffectPartitions(operation.GetUpsertRows(), stageInfo, holderFactory, typeEnv);
case NKqpProto::TKqpPhyTableOperation::kDeleteRows:
- return PruneEffectPartitions(tableKeys, operation.GetDeleteRows(), stageInfo, holderFactory, typeEnv);
+ return PruneEffectPartitions(operation.GetDeleteRows(), stageInfo, holderFactory, typeEnv);
default:
YQL_ENSURE(false, "Unexpected table operation: " << static_cast<ui32>(operation.GetTypeCase()));
}
diff --git a/ydb/core/kqp/executer_actor/kqp_partition_helper.h b/ydb/core/kqp/executer_actor/kqp_partition_helper.h
index 3f5973d1c8..297e59e4ec 100644
--- a/ydb/core/kqp/executer_actor/kqp_partition_helper.h
+++ b/ydb/core/kqp/executer_actor/kqp_partition_helper.h
@@ -54,24 +54,19 @@ TVector<TSerializedPointOrRange> FillReadRanges(const TVector<NScheme::TTypeInfo
const NKqpProto::TKqpPhyOpReadRanges& readRange, const TStageInfo& stageInfo,
const NMiniKQL::TTypeEnvironment& typeEnv);
-THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys,
- const NKqpProto::TKqpPhyOpReadRange& readRange, const TStageInfo& stageInfo,
+THashMap<ui64, TShardInfo> PrunePartitions(const NKqpProto::TKqpPhyOpReadRange& readRange, const TStageInfo& stageInfo,
const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv);
-THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys,
- const NKqpProto::TKqpPhyOpReadRanges& readRanges, const TStageInfo& stageInfo,
+THashMap<ui64, TShardInfo> PrunePartitions(const NKqpProto::TKqpPhyOpReadRanges& readRanges, const TStageInfo& stageInfo,
const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv);
-THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys,
- const NKqpProto::TKqpPhyOpLookup& lookup, const TStageInfo& stageInfo,
+THashMap<ui64, TShardInfo> PrunePartitions(const NKqpProto::TKqpPhyOpLookup& lookup, const TStageInfo& stageInfo,
const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv);
-std::pair<ui64, TShardInfo> MakeVirtualTablePartition(const TKqpTableKeys& tableKeys,
- const NKqpProto::TKqpReadRangesSource& source, const TStageInfo& stageInfo,
+std::pair<ui64, TShardInfo> MakeVirtualTablePartition(const NKqpProto::TKqpReadRangesSource& source, const TStageInfo& stageInfo,
const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv);
-THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys,
- const NKqpProto::TKqpReadRangesSource& source, const TStageInfo& stageInfo,
+THashMap<ui64, TShardInfo> PrunePartitions(const NKqpProto::TKqpReadRangesSource& source, const TStageInfo& stageInfo,
const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv);
ui64 ExtractItemsLimit(const TStageInfo& stageInfo, const NKqpProto::TKqpPhyValue& protoItemsLimit,
@@ -80,24 +75,19 @@ ui64 ExtractItemsLimit(const TStageInfo& stageInfo, const NKqpProto::TKqpPhyValu
// Returns the list of ColumnShards that can store rows from the specified range
// NOTE: Unlike OLTP tables that store data in DataShards, data in OLAP tables is not range
// partitioned and multiple ColumnShards store data from the same key range
-THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys,
- const NKqpProto::TKqpPhyOpReadOlapRanges& readRanges, const TStageInfo& stageInfo,
+THashMap<ui64, TShardInfo> PrunePartitions(const NKqpProto::TKqpPhyOpReadOlapRanges& readRanges, const TStageInfo& stageInfo,
const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv);
-THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys,
- const NKqpProto::TKqpPhyTableOperation& operation, const TStageInfo& stageInfo,
+THashMap<ui64, TShardInfo> PrunePartitions(const NKqpProto::TKqpPhyTableOperation& operation, const TStageInfo& stageInfo,
const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv);
-THashMap<ui64, TShardInfo> PruneEffectPartitions(const TKqpTableKeys& tableKeys,
- const NKqpProto::TKqpPhyOpUpsertRows& effect, const TStageInfo& stageInfo,
+THashMap<ui64, TShardInfo> PruneEffectPartitions(const NKqpProto::TKqpPhyOpUpsertRows& effect, const TStageInfo& stageInfo,
const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv);
-THashMap<ui64, TShardInfo> PruneEffectPartitions(const TKqpTableKeys& tableKeys,
- const NKqpProto::TKqpPhyOpDeleteRows& effect, const TStageInfo& stageInfo,
+THashMap<ui64, TShardInfo> PruneEffectPartitions(const NKqpProto::TKqpPhyOpDeleteRows& effect, const TStageInfo& stageInfo,
const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv);
-THashMap<ui64, TShardInfo> PruneEffectPartitions(const TKqpTableKeys& tableKeys,
- const NKqpProto::TKqpPhyTableOperation& operation, const TStageInfo& stageInfo,
+THashMap<ui64, TShardInfo> PruneEffectPartitions(const NKqpProto::TKqpPhyTableOperation& operation, const TStageInfo& stageInfo,
const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv);
TPhysicalShardReadSettings ExtractReadSettings(
diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
index 1a99aff3af..f5f8c1ce9c 100644
--- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
@@ -335,14 +335,14 @@ private:
THashMap<ui64, ui64> assignedShardsCount;
auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);
- const auto& table = GetTableKeys().GetTable(stageInfo.Meta.TableId);
- const auto& keyTypes = table.KeyColumnTypes;
+ const auto& tableInfo = stageInfo.Meta.TableConstInfo;
+ const auto& keyTypes = tableInfo->KeyColumnTypes;
ui32 metaId = 0;
for (auto& op : stage.GetTableOps()) {
Y_VERIFY_DEBUG(stageInfo.Meta.TablePath == op.GetTable().GetPath());
- auto columns = BuildKqpColumns(op, table);
- auto partitions = PrunePartitions(GetTableKeys(), op, stageInfo, HolderFactory(), TypeEnv());
+ auto columns = BuildKqpColumns(op, tableInfo);
+ auto partitions = PrunePartitions(op, stageInfo, HolderFactory(), TypeEnv());
const bool isOlapScan = (op.GetTypeCase() == NKqpProto::TKqpPhyTableOperation::kReadOlapRange);
auto readSettings = ExtractReadSettings(op, stageInfo, HolderFactory(), TypeEnv());
@@ -562,7 +562,7 @@ private:
YQL_ENSURE(stageInfo.Tasks.size() == 1, "Unexpected multiple tasks in single-partition stage");
}
- BuildKqpStageChannels(TasksGraph, GetTableKeys(), stageInfo, TxId, AppData()->EnableKqpSpilling);
+ BuildKqpStageChannels(TasksGraph, stageInfo, TxId, AppData()->EnableKqpSpilling);
}
ResponseEv->InitTxResult(tx.Body);
diff --git a/ydb/core/kqp/executer_actor/kqp_table_resolver.cpp b/ydb/core/kqp/executer_actor/kqp_table_resolver.cpp
index dc4c30d224..dd911d24c1 100644
--- a/ydb/core/kqp/executer_actor/kqp_table_resolver.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_table_resolver.cpp
@@ -27,151 +27,20 @@ public:
TKqpTableResolver(const TActorId& owner, ui64 txId,
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
- const TVector<IKqpGateway::TPhysicalTxData>& transactions, TKqpTableKeys& tableKeys,
+ const TVector<IKqpGateway::TPhysicalTxData>& transactions,
TKqpTasksGraph& tasksGraph)
: Owner(owner)
, TxId(txId)
, UserToken(userToken)
, Transactions(transactions)
- , TableKeys(tableKeys)
, TasksGraph(tasksGraph) {}
void Bootstrap() {
- FillTables();
-
ResolveKeys();
Become(&TKqpTableResolver::ResolveKeysState);
}
private:
- static void FillColumn(const NKqpProto::TKqpPhyColumn& phyColumn, TKqpTableKeys::TTable& table) {
- if (table.Columns.FindPtr(phyColumn.GetId().GetName())) {
- return;
- }
-
- TKqpTableKeys::TColumn column;
- column.Id = phyColumn.GetId().GetId();
-
- if (phyColumn.GetTypeId() != NScheme::NTypeIds::Pg) {
- column.Type = NScheme::TTypeInfo(phyColumn.GetTypeId());
- } else {
- column.Type = NScheme::TTypeInfo(phyColumn.GetTypeId(),
- NPg::TypeDescFromPgTypeName(phyColumn.GetPgTypeName()));
- }
-
- table.Columns.emplace(phyColumn.GetId().GetName(), std::move(column));
- if (!phyColumn.GetDefaultFromSequence().empty()) {
- TString seq = phyColumn.GetDefaultFromSequence();
- if (!seq.StartsWith(table.Path)) {
- seq = table.Path + "/" + seq;
- }
-
- table.Sequences.emplace(phyColumn.GetId().GetName(), seq);
- }
- }
-
- void FillTable(const NKqpProto::TKqpPhyTable& phyTable) {
- auto tableId = MakeTableId(phyTable.GetId());
-
- auto table = TableKeys.FindTablePtr(tableId);
- if (!table) {
- table = &TableKeys.GetOrAddTable(tableId, phyTable.GetId().GetPath());
-
- switch (phyTable.GetKind()) {
- case NKqpProto::TABLE_KIND_DS:
- table->TableKind = ETableKind::Datashard;
- break;
- case NKqpProto::TABLE_KIND_OLAP:
- table->TableKind = ETableKind::Olap;
- break;
- case NKqpProto::TABLE_KIND_SYS_VIEW:
- table->TableKind = ETableKind::SysView;
- break;
- default:
- YQL_ENSURE(false, "Unexpected phy table kind: " << (i64) phyTable.GetKind());
- }
-
- for (const auto& [_, phyColumn] : phyTable.GetColumns()) {
- FillColumn(phyColumn, *table);
- }
-
- YQL_ENSURE(table->KeyColumns.empty());
- table->KeyColumns.reserve(phyTable.KeyColumnsSize());
- YQL_ENSURE(table->KeyColumnTypes.empty());
- table->KeyColumnTypes.reserve(phyTable.KeyColumnsSize());
- for (const auto& keyColumnId : phyTable.GetKeyColumns()) {
- const auto& column = table->Columns.FindPtr(keyColumnId.GetName());
- YQL_ENSURE(column);
-
- table->KeyColumns.push_back(keyColumnId.GetName());
- table->KeyColumnTypes.push_back(column->Type);
- }
- } else {
- for (const auto& [_, phyColumn] : phyTable.GetColumns()) {
- FillColumn(phyColumn, *table);
- }
- }
- }
-
- void FillTables() {
- auto addColumn = [](TKqpTableKeys::TTable& table, const TString& columnName) mutable {
- auto& sysColumns = GetSystemColumns();
- if (table.Columns.FindPtr(columnName)) {
- return;
- }
-
- auto* systemColumn = sysColumns.FindPtr(columnName);
- YQL_ENSURE(systemColumn, "Unknown table column"
- << ", table: " << table.Path
- << ", column: " << columnName);
-
- TKqpTableKeys::TColumn column;
- column.Id = systemColumn->ColumnId;
- column.Type = NScheme::TTypeInfo(systemColumn->TypeId);
- table.Columns.emplace(columnName, std::move(column));
- };
-
- for (auto& tx : Transactions) {
- for (const auto& phyTable : tx.Body->GetTables()) {
- FillTable(phyTable);
- }
-
- for (auto& stage : tx.Body->GetStages()) {
- for (auto& op : stage.GetTableOps()) {
- auto& table = TableKeys.GetTable(MakeTableId(op.GetTable()));
- for (auto& column : op.GetColumns()) {
- addColumn(table, column.GetName());
- }
- }
-
- for (auto& source : stage.GetSources()) {
- if (source.HasReadRangesSource()) {
- auto& table = TableKeys.GetTable(MakeTableId(source.GetReadRangesSource().GetTable()));
- for (auto& column : source.GetReadRangesSource().GetColumns()) {
- addColumn(table, column.GetName());
- }
- }
- }
-
- for (const auto& input : stage.GetInputs()) {
- if (input.GetTypeCase() == NKqpProto::TKqpPhyConnection::kStreamLookup) {
- auto& table = TableKeys.GetTable(MakeTableId(input.GetStreamLookup().GetTable()));
- for (auto& column : input.GetStreamLookup().GetColumns()) {
- addColumn(table, column);
- }
- }
-
- if (input.GetTypeCase() == NKqpProto::TKqpPhyConnection::kSequencer) {
- auto& table = TableKeys.GetTable(MakeTableId(input.GetSequencer().GetTable()));
- for(auto& column: input.GetSequencer().GetColumns()) {
- addColumn(table, column);
- }
- }
- }
- }
- }
- }
-
STATEFN(ResolveKeysState) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvTxProxySchemeCache::TEvResolveKeySetResult, HandleResolveKeys);
@@ -196,26 +65,24 @@ private:
}
LOG_D("Navigated key sets: " << results.size());
for (auto& entry : results) {
- if (!TableRequestIds.erase(entry.TableId)) {
+ auto iter = TableRequestIds.find(entry.TableId);
+ if (iter == TableRequestIds.end()) {
ReplyErrorAndDie(Ydb::StatusIds::SCHEME_ERROR,
YqlIssue({}, NYql::TIssuesIds::KIKIMR_SCHEME_MISMATCH, TStringBuilder()
<< "Incorrect tableId in reply " << entry.TableId << '.'));
return;
}
+ TVector<TStageId> stageIds(std::move(iter->second));
+ TableRequestIds.erase(entry.TableId);
if (entry.Status != NSchemeCache::TSchemeCacheNavigate::EStatus::Ok) {
ReplyErrorAndDie(Ydb::StatusIds::SCHEME_ERROR,
YqlIssue({}, NYql::TIssuesIds::KIKIMR_SCHEME_MISMATCH, TStringBuilder()
<< "Failed to resolve table " << entry.TableId << " keys: " << entry.Status << '.'));
return;
}
- auto* table = TableKeys.FindTablePtr(entry.TableId);
- if (!table) {
- ReplyErrorAndDie(Ydb::StatusIds::SCHEME_ERROR,
- YqlIssue({}, NYql::TIssuesIds::KIKIMR_SCHEME_MISMATCH, TStringBuilder()
- << "Incorrect tableId in table keys " << entry.TableId << '.'));
- return;
+ for (auto stageId : stageIds) {
+ TasksGraph.GetStageInfo(stageId).Meta.ColumnTableInfoPtr = entry.ColumnTableInfo;
}
- table->ColumnTableInfo = entry.ColumnTableInfo;
}
NavigationFinished = true;
TryFinish();
@@ -242,13 +109,8 @@ private:
if (entry.Status != NSchemeCache::TSchemeCacheRequest::EStatus::OkData) {
LOG_E("Error resolving keys for entry: " << entry.ToString(*AppData()->TypeRegistry));
- auto* table = TableKeys.FindTablePtr(entry.KeyDescription->TableId);
TStringBuilder path;
- if (table) {
- path << '`' << table->Path << '`';
- } else {
- path << "unresolved `" << entry.KeyDescription->TableId << '`';
- }
+ path << "unresolved `" << entry.KeyDescription->TableId << '`';
timer.reset();
ReplyErrorAndDie(Ydb::StatusIds::SCHEME_ERROR,
@@ -295,12 +157,14 @@ private:
YQL_ENSURE(stageInfo.Meta.ShardOperations.size() == 1);
auto operation = *stageInfo.Meta.ShardOperations.begin();
- const TKqpTableKeys::TTable* table = TableKeys.FindTablePtr(stageInfo.Meta.TableId);
- stageInfo.Meta.TableKind = table->TableKind;
+ const auto& tableInfo = stageInfo.Meta.TableConstInfo;
+ Y_ENSURE(tableInfo);
+ stageInfo.Meta.TableKind = tableInfo->TableKind;
- stageInfo.Meta.ShardKey = ExtractKey(stageInfo.Meta.TableId, operation);
+ stageInfo.Meta.ShardKey = ExtractKey(stageInfo.Meta.TableId, stageInfo.Meta.TableConstInfo, operation);
- if (stageInfo.Meta.TableKind == ETableKind::Olap && TableRequestIds.emplace(stageInfo.Meta.TableId).second) {
+ if (stageInfo.Meta.TableKind == ETableKind::Olap && TableRequestIds.find(stageInfo.Meta.TableId) == TableRequestIds.end()) {
+ TableRequestIds[stageInfo.Meta.TableId].emplace_back(pair.first);
auto& entry = requestNavigate->ResultSet.emplace_back();
entry.TableId = stageInfo.Meta.TableId;
entry.RequestType = NSchemeCache::TSchemeCacheNavigate::TEntry::ERequestType::ByTableId;
@@ -333,11 +197,10 @@ private:
}
private:
- THolder<TKeyDesc> ExtractKey(const TTableId& table, TKeyDesc::ERowOperation operation) {
- const auto& tableKey = TableKeys.GetTable(table);
- auto range = GetFullRange(tableKey.KeyColumnTypes.size());
+ THolder<TKeyDesc> ExtractKey(const TTableId& table, const TIntrusiveConstPtr<TKqpTableKeys::TTableConstInfo>& tableInfo, TKeyDesc::ERowOperation operation) {
+ auto range = GetFullRange(tableInfo->KeyColumnTypes.size());
- return MakeHolder<TKeyDesc>(table, range.ToTableRange(), operation, tableKey.KeyColumnTypes,
+ return MakeHolder<TKeyDesc>(table, range.ToTableRange(), operation, tableInfo->KeyColumnTypes,
TVector<TKeyDesc::TColumnOp>{});
}
@@ -388,8 +251,7 @@ private:
const ui64 TxId;
TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
const TVector<IKqpGateway::TPhysicalTxData>& Transactions;
- TKqpTableKeys& TableKeys;
- THashSet<TTableId> TableRequestIds;
+ THashMap<TTableId, TVector<TStageId>> TableRequestIds;
bool NavigationFinished = false;
bool ResolvingFinished = false;
@@ -405,8 +267,8 @@ private:
NActors::IActor* CreateKqpTableResolver(const TActorId& owner, ui64 txId,
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
- const TVector<IKqpGateway::TPhysicalTxData>& transactions, TKqpTableKeys& tableKeys, TKqpTasksGraph& tasksGraph) {
- return new TKqpTableResolver(owner, txId, userToken, transactions, tableKeys, tasksGraph);
+ const TVector<IKqpGateway::TPhysicalTxData>& transactions, TKqpTasksGraph& tasksGraph) {
+ return new TKqpTableResolver(owner, txId, userToken, transactions, tasksGraph);
}
} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/executer_actor/kqp_table_resolver.h b/ydb/core/kqp/executer_actor/kqp_table_resolver.h
index 0fbb152a5b..f914cf3430 100644
--- a/ydb/core/kqp/executer_actor/kqp_table_resolver.h
+++ b/ydb/core/kqp/executer_actor/kqp_table_resolver.h
@@ -6,6 +6,6 @@ namespace NKikimr::NKqp {
NActors::IActor* CreateKqpTableResolver(const TActorId& owner, ui64 txId,
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
- const TVector<IKqpGateway::TPhysicalTxData>& transactions, TKqpTableKeys& tableKeys, TKqpTasksGraph& tasksGraph);
+ const TVector<IKqpGateway::TPhysicalTxData>& transactions, TKqpTasksGraph& tasksGraph);
} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
index c52c248672..d4f22125ad 100644
--- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
@@ -103,6 +103,21 @@ void FillKqpTasksGraphStages(TKqpTasksGraph& tasksGraph, const TVector<IKqpGatew
meta.TableId = MakeTableId(source.GetReadRangesSource().GetTable());
meta.TablePath = source.GetReadRangesSource().GetTable().GetPath();
meta.ShardOperations.insert(TKeyDesc::ERowOperation::Read);
+ meta.TableConstInfo = tx.Body->GetTableConstInfoById()->Map.at(meta.TableId);
+ }
+ }
+
+ for (const auto& input : stage.GetInputs()) {
+ if (input.GetTypeCase() == NKqpProto::TKqpPhyConnection::kStreamLookup) {
+ meta.TableId = MakeTableId(input.GetStreamLookup().GetTable());
+ meta.TablePath = input.GetStreamLookup().GetTable().GetPath();
+ meta.TableConstInfo = tx.Body->GetTableConstInfoById()->Map.at(meta.TableId);
+ }
+
+ if (input.GetTypeCase() == NKqpProto::TKqpPhyConnection::kSequencer) {
+ meta.TableId = MakeTableId(input.GetSequencer().GetTable());
+ meta.TablePath = input.GetSequencer().GetTable().GetPath();
+ meta.TableConstInfo = tx.Body->GetTableConstInfoById()->Map.at(meta.TableId);
}
}
@@ -120,6 +135,7 @@ void FillKqpTasksGraphStages(TKqpTasksGraph& tasksGraph, const TVector<IKqpGatew
stageInfo.Meta.TableId = MakeTableId(op.GetTable());
stageInfo.Meta.TablePath = op.GetTable().GetPath();
stageInfo.Meta.TableKind = ETableKind::Unknown;
+ stageInfo.Meta.TableConstInfo = tx.Body->GetTableConstInfoById()->Map.at(stageInfo.Meta.TableId);
tables.insert(MakeTableId(op.GetTable()));
} else {
YQL_ENSURE(stageInfo.Meta.TableId == MakeTableId(op.GetTable()));
@@ -216,7 +232,7 @@ void BuildMapShardChannels(TKqpTasksGraph& graph, const TStageInfo& stageInfo, u
}
void BuildShuffleShardChannels(TKqpTasksGraph& graph, const TStageInfo& stageInfo, ui32 inputIndex,
- const TStageInfo& inputStageInfo, ui32 outputIndex, const TKqpTableKeys& tableKeys, bool enableSpilling,
+ const TStageInfo& inputStageInfo, ui32 outputIndex, bool enableSpilling,
const TChannelLogFunc& logFunc)
{
YQL_ENSURE(stageInfo.Meta.ShardKey);
@@ -224,14 +240,14 @@ void BuildShuffleShardChannels(TKqpTasksGraph& graph, const TStageInfo& stageInf
for (auto& partition : stageInfo.Meta.ShardKey->GetPartitions()) {
partitionsMap[partition.ShardId] = &partition;
}
-
- auto table = tableKeys.GetTable(stageInfo.Meta.TableId);
+
+ const auto& tableInfo = stageInfo.Meta.TableConstInfo;
for (auto& originTaskId : inputStageInfo.Tasks) {
auto& originTask = graph.GetTask(originTaskId);
auto& taskOutput = originTask.Outputs[outputIndex];
taskOutput.Type = TKqpTaskOutputType::ShardRangePartition;
- taskOutput.KeyColumns = table.KeyColumns;
+ taskOutput.KeyColumns = tableInfo->KeyColumns;
for (auto& targetTaskId : stageInfo.Tasks) {
auto& targetTask = graph.GetTask(targetTaskId);
@@ -258,7 +274,7 @@ void BuildShuffleShardChannels(TKqpTasksGraph& graph, const TStageInfo& stageInf
}
void BuildSequencerChannels(TKqpTasksGraph& graph, const TStageInfo& stageInfo, ui32 inputIndex,
- const TStageInfo& inputStageInfo, ui32 outputIndex, const TKqpTableKeys& tableKeys,
+ const TStageInfo& inputStageInfo, ui32 outputIndex,
const NKqpProto::TKqpPhyCnSequencer& sequencer, bool enableSpilling, const TChannelLogFunc& logFunc)
{
YQL_ENSURE(stageInfo.Tasks.size() == inputStageInfo.Tasks.size());
@@ -267,12 +283,12 @@ void BuildSequencerChannels(TKqpTasksGraph& graph, const TStageInfo& stageInfo,
settings->MutableTable()->CopyFrom(sequencer.GetTable());
settings->SetDatabase(graph.GetMeta().Database);
- auto table = tableKeys.GetTable(MakeTableId(sequencer.GetTable()));
+ const auto& tableInfo = stageInfo.Meta.TableConstInfo;
THashSet<TString> autoIncrementColumns(sequencer.GetAutoIncrementColumns().begin(), sequencer.GetAutoIncrementColumns().end());
for(const auto& column: sequencer.GetColumns()) {
- auto columnIt = table.Columns.find(column);
- YQL_ENSURE(columnIt != table.Columns.end(), "Unknown column: " << column);
+ auto columnIt = tableInfo->Columns.find(column);
+ YQL_ENSURE(columnIt != tableInfo->Columns.end(), "Unknown column: " << column);
const auto& columnInfo = columnIt->second;
auto* columnProto = settings->AddColumns();
@@ -287,8 +303,8 @@ void BuildSequencerChannels(TKqpTasksGraph& graph, const TStageInfo& stageInfo,
auto aic = autoIncrementColumns.find(column);
if (aic != autoIncrementColumns.end()) {
- auto sequenceIt = table.Sequences.find(column);
- YQL_ENSURE(sequenceIt != table.Sequences.end());
+ auto sequenceIt = tableInfo->Sequences.find(column);
+ YQL_ENSURE(sequenceIt != tableInfo->Sequences.end());
columnProto->SetDefaultFromSequence(sequenceIt->second);
}
}
@@ -323,7 +339,7 @@ void BuildSequencerChannels(TKqpTasksGraph& graph, const TStageInfo& stageInfo,
}
void BuildStreamLookupChannels(TKqpTasksGraph& graph, const TStageInfo& stageInfo, ui32 inputIndex,
- const TStageInfo& inputStageInfo, ui32 outputIndex, const TKqpTableKeys& tableKeys,
+ const TStageInfo& inputStageInfo, ui32 outputIndex,
const NKqpProto::TKqpPhyCnStreamLookup& streamLookup, bool enableSpilling, const TChannelLogFunc& logFunc)
{
YQL_ENSURE(stageInfo.Tasks.size() == inputStageInfo.Tasks.size());
@@ -332,10 +348,10 @@ void BuildStreamLookupChannels(TKqpTasksGraph& graph, const TStageInfo& stageInf
settings->MutableTable()->CopyFrom(streamLookup.GetTable());
- auto table = tableKeys.GetTable(MakeTableId(streamLookup.GetTable()));
- for (const auto& keyColumn : table.KeyColumns) {
- auto columnIt = table.Columns.find(keyColumn);
- YQL_ENSURE(columnIt != table.Columns.end(), "Unknown column: " << keyColumn);
+ const auto& tableInfo = stageInfo.Meta.TableConstInfo;
+ for (const auto& keyColumn : tableInfo->KeyColumns) {
+ auto columnIt = tableInfo->Columns.find(keyColumn);
+ YQL_ENSURE(columnIt != tableInfo->Columns.end(), "Unknown column: " << keyColumn);
auto* keyColumnProto = settings->AddKeyColumns();
keyColumnProto->SetName(keyColumn);
@@ -344,14 +360,14 @@ void BuildStreamLookupChannels(TKqpTasksGraph& graph, const TStageInfo& stageInf
}
for (const auto& keyColumn : streamLookup.GetKeyColumns()) {
- auto columnIt = table.Columns.find(keyColumn);
- YQL_ENSURE(columnIt != table.Columns.end(), "Unknown column: " << keyColumn);
+ auto columnIt = tableInfo->Columns.find(keyColumn);
+ YQL_ENSURE(columnIt != tableInfo->Columns.end(), "Unknown column: " << keyColumn);
settings->AddLookupKeyColumns(keyColumn);
}
for (const auto& column : streamLookup.GetColumns()) {
- auto columnIt = table.Columns.find(column);
- YQL_ENSURE(columnIt != table.Columns.end(), "Unknown column: " << column);
+ auto columnIt = tableInfo->Columns.find(column);
+ YQL_ENSURE(columnIt != tableInfo->Columns.end(), "Unknown column: " << column);
auto* columnProto = settings->AddColumns();
columnProto->SetName(column);
@@ -388,7 +404,7 @@ void BuildStreamLookupChannels(TKqpTasksGraph& graph, const TStageInfo& stageInf
}
}
-void BuildKqpStageChannels(TKqpTasksGraph& tasksGraph, const TKqpTableKeys& tableKeys, const TStageInfo& stageInfo,
+void BuildKqpStageChannels(TKqpTasksGraph& tasksGraph, const TStageInfo& stageInfo,
ui64 txId, bool enableSpilling)
{
auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);
@@ -433,7 +449,7 @@ void BuildKqpStageChannels(TKqpTasksGraph& tasksGraph, const TKqpTableKeys& tabl
BuildMapShardChannels(tasksGraph, stageInfo, inputIdx, inputStageInfo, outputIdx, enableSpilling, log);
break;
case NKqpProto::TKqpPhyConnection::kShuffleShard:
- BuildShuffleShardChannels(tasksGraph, stageInfo, inputIdx, inputStageInfo, outputIdx, tableKeys,
+ BuildShuffleShardChannels(tasksGraph, stageInfo, inputIdx, inputStageInfo, outputIdx,
enableSpilling, log);
break;
case NKqpProto::TKqpPhyConnection::kMerge: {
@@ -450,13 +466,13 @@ void BuildKqpStageChannels(TKqpTasksGraph& tasksGraph, const TKqpTableKeys& tabl
break;
}
case NKqpProto::TKqpPhyConnection::kSequencer: {
- BuildSequencerChannels(tasksGraph, stageInfo, inputIdx, inputStageInfo, outputIdx, tableKeys,
+ BuildSequencerChannels(tasksGraph, stageInfo, inputIdx, inputStageInfo, outputIdx,
input.GetSequencer(), enableSpilling, log);
break;
}
case NKqpProto::TKqpPhyConnection::kStreamLookup: {
- BuildStreamLookupChannels(tasksGraph, stageInfo, inputIdx, inputStageInfo, outputIdx, tableKeys,
+ BuildStreamLookupChannels(tasksGraph, stageInfo, inputIdx, inputStageInfo, outputIdx,
input.GetStreamLookup(), enableSpilling, log);
break;
}
@@ -753,7 +769,7 @@ void FillTableMeta(const TStageInfo& stageInfo, NKikimrTxDataShard::TKqpTransact
meta->SetTableKind((ui32)stageInfo.Meta.TableKind);
}
-void FillTaskMeta(const TStageInfo& stageInfo, const TTask& task, const TKqpTableKeys& tableKeys, NYql::NDqProto::TDqTask& taskDesc) {
+void FillTaskMeta(const TStageInfo& stageInfo, const TTask& task, NYql::NDqProto::TDqTask& taskDesc) {
if (task.Meta.ShardId && (task.Meta.Reads || task.Meta.Writes)) {
NKikimrTxDataShard::TKqpTransaction::TDataTaskMeta protoTaskMeta;
@@ -806,9 +822,10 @@ void FillTaskMeta(const TStageInfo& stageInfo, const TTask& task, const TKqpTabl
FillTableMeta(stageInfo, protoTaskMeta.MutableTable());
- const auto& tableInfo = tableKeys.GetTable(stageInfo.Meta.TableId);
- for (const auto& keyColumnName : tableInfo.KeyColumns) {
- const auto& keyColumn = tableInfo.Columns.at(keyColumnName);
+ const auto& tableInfo = stageInfo.Meta.TableConstInfo;
+
+ for (const auto& keyColumnName : tableInfo->KeyColumns) {
+ const auto& keyColumn = tableInfo->Columns.at(keyColumnName);
auto columnType = NScheme::ProtoColumnTypeFromTypeInfoMod(keyColumn.Type, keyColumn.TypeMod);
protoTaskMeta.AddKeyColumnTypes(columnType.TypeId);
if (columnType.TypeInfo) {
@@ -820,7 +837,7 @@ void FillTaskMeta(const TStageInfo& stageInfo, const TTask& task, const TKqpTabl
protoTaskMeta.AddSkipNullKeys(skipNullKey);
}
- switch (tableInfo.TableKind) {
+ switch (tableInfo->TableKind) {
case ETableKind::Unknown:
case ETableKind::SysView: {
protoTaskMeta.SetDataFormat(NKikimrTxDataShard::EScanDataFormat::CELLVEC);
@@ -860,7 +877,7 @@ void FillTaskMeta(const TStageInfo& stageInfo, const TTask& task, const TKqpTabl
}
}
- if (tableInfo.TableKind == ETableKind::Olap) {
+ if (tableInfo->TableKind == ETableKind::Olap) {
auto* olapProgram = protoTaskMeta.MutableOlapProgram();
olapProgram->SetProgram(task.Meta.ReadInfo.OlapProgram.Program);
@@ -1070,7 +1087,7 @@ void SerializeTaskToProto(const TKqpTasksGraph& tasksGraph, const TTask& task, N
}
}
- FillTaskMeta(stageInfo, task, tasksGraph.GetMeta().TableKeys, *result);
+ FillTaskMeta(stageInfo, task, *result);
}
NYql::NDqProto::TDqTask* ArenaSerializeTaskToProto(TKqpTasksGraph& tasksGraph, const TTask& task) {
diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h
index 768a1a7858..f7237c3c79 100644
--- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h
+++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h
@@ -32,6 +32,8 @@ struct TStageInfoMeta {
TTableId TableId;
TString TablePath;
ETableKind TableKind;
+ TIntrusiveConstPtr<TKqpTableKeys::TTableConstInfo> TableConstInfo;
+ TIntrusiveConstPtr<NKikimr::NSchemeCache::TSchemeCacheNavigate::TColumnTableInfo> ColumnTableInfoPtr;
TVector<bool> SkipNullKeys;
@@ -84,7 +86,6 @@ struct TStageInfoMeta {
// things which are common for all tasks in the graph.
struct TGraphMeta {
- TKqpTableKeys TableKeys;
IKqpGateway::TKqpSnapshot Snapshot;
TMaybe<ui64> LockTxId;
std::unordered_map<ui64, TActorId> ResultChannelProxies;
@@ -243,7 +244,7 @@ using TKqpTasksGraph = NYql::NDq::TDqTasksGraph<TGraphMeta, TStageInfoMeta, TTas
void FillKqpTasksGraphStages(TKqpTasksGraph& tasksGraph, const TVector<IKqpGateway::TPhysicalTxData>& txs);
void BuildKqpTaskGraphResultChannels(TKqpTasksGraph& tasksGraph, const TKqpPhyTxHolder::TConstPtr& tx, ui64 txIdx);
-void BuildKqpStageChannels(TKqpTasksGraph& tasksGraph, const TKqpTableKeys& tableKeys, const TStageInfo& stageInfo,
+void BuildKqpStageChannels(TKqpTasksGraph& tasksGraph, const TStageInfo& stageInfo,
ui64 txId, bool enableSpilling);
NYql::NDqProto::TDqTask* ArenaSerializeTaskToProto(TKqpTasksGraph& tasksGraph, const TTask& task);
@@ -252,13 +253,14 @@ void FillTableMeta(const TStageInfo& stageInfo, NKikimrTxDataShard::TKqpTransact
void FillChannelDesc(const TKqpTasksGraph& tasksGraph, NYql::NDqProto::TChannel& channelDesc, const NYql::NDq::TChannel& channel);
template<typename Proto>
-TVector<TTaskMeta::TColumn> BuildKqpColumns(const Proto& op, const TKqpTableKeys::TTable& table) {
+TVector<TTaskMeta::TColumn> BuildKqpColumns(const Proto& op, TIntrusiveConstPtr<TKqpTableKeys::TTableConstInfo> tableInfo) {
TVector<TTaskMeta::TColumn> columns;
columns.reserve(op.GetColumns().size());
for (const auto& column : op.GetColumns()) {
TTaskMeta::TColumn c;
- const auto& tableColumn = table.Columns.at(column.GetName());
+
+ const auto& tableColumn = tableInfo->Columns.at(column.GetName());
c.Id = column.GetId();
c.Type = tableColumn.Type;
c.TypeMod = tableColumn.TypeMod;
diff --git a/ydb/core/kqp/query_data/kqp_prepared_query.cpp b/ydb/core/kqp/query_data/kqp_prepared_query.cpp
index 93d7141299..f7b7f07fcb 100644
--- a/ydb/core/kqp/query_data/kqp_prepared_query.cpp
+++ b/ydb/core/kqp/query_data/kqp_prepared_query.cpp
@@ -52,11 +52,12 @@ public:
};
TKqpPhyTxHolder::TKqpPhyTxHolder(const std::shared_ptr<const NKikimrKqp::TPreparedQuery>& pq,
- const NKqpProto::TKqpPhyTx* proto, const std::shared_ptr<TPreparedQueryAllocHolder>& alloc)
+ const NKqpProto::TKqpPhyTx* proto, const std::shared_ptr<TPreparedQueryAllocHolder>& alloc, TIntrusivePtr<TTableConstInfoMap> tableConstInfoById)
: PreparedQuery(pq)
, Proto(proto)
, LiteralTx(CalcIsLiteralTx(proto))
, Alloc(alloc)
+ , TableConstInfoById(tableConstInfoById)
{
TxResultsMeta.resize(Proto->GetResults().size());
for (auto&& i : Proto->GetStages()) {
@@ -104,13 +105,23 @@ TPreparedQueryHolder::TPreparedQueryHolder(NKikimrKqp::TPreparedQuery* proto,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry)
: Proto(proto)
, Alloc(std::move(std::make_shared<TPreparedQueryAllocHolder>(functionRegistry)))
+ , TableConstInfoById(MakeIntrusive<TTableConstInfoMap>())
{
THashSet<TString> tablesSet;
const auto& phyQuery = Proto->GetPhysicalQuery();
Transactions.reserve(phyQuery.TransactionsSize());
+
+ // Init TableConstInfoById
+ for (const auto& phyTx: phyQuery.GetTransactions()) {
+ for (const auto& phyTable : phyTx.GetTables()) {
+ FillTable(phyTable);
+ }
+ FillTables(phyTx.GetStages());
+ }
+
for (const auto& phyTx: phyQuery.GetTransactions()) {
TKqpPhyTxHolder::TConstPtr txHolder = std::make_shared<const TKqpPhyTxHolder>(
- Proto, &phyTx, Alloc);
+ Proto, &phyTx, Alloc, TableConstInfoById);
Transactions.emplace_back(std::move(txHolder));
for (const auto& stage: phyTx.GetStages()) {
@@ -139,6 +150,57 @@ TPreparedQueryHolder::TPreparedQueryHolder(NKikimrKqp::TPreparedQuery* proto,
QueryTables = TVector<TString>(tablesSet.begin(), tablesSet.end());
}
+void TPreparedQueryHolder::FillTable(const NKqpProto::TKqpPhyTable& phyTable) {
+ auto tableId = MakeTableId(phyTable.GetId());
+
+ auto infoPtr = TableConstInfoById->Map.FindPtr(tableId);
+ if (!infoPtr) {
+ auto infoPtr = MakeIntrusive<TTableConstInfo>(phyTable.GetId().GetPath());
+ TableConstInfoById->Map[tableId] = infoPtr;
+ infoPtr->FillTable(phyTable);
+ } else {
+ for (const auto& [_, phyColumn] : phyTable.GetColumns()) {
+ (*infoPtr)->FillColumn(phyColumn);
+ }
+ }
+}
+
+void TPreparedQueryHolder::FillTables(const google::protobuf::RepeatedPtrField< ::NKqpProto::TKqpPhyStage>& stages) {
+ for (auto& stage : stages) {
+ for (auto& op : stage.GetTableOps()) {
+ auto& info = GetInfo(MakeTableId(op.GetTable()));
+ for (auto& column : op.GetColumns()) {
+ info->AddColumn(column.GetName());
+ }
+ }
+
+ for (auto& source : stage.GetSources()) {
+ if (source.HasReadRangesSource()) {
+ auto& info = GetInfo(MakeTableId(source.GetReadRangesSource().GetTable()));
+ for (auto& column : source.GetReadRangesSource().GetColumns()) {
+ info->AddColumn(column.GetName());
+ }
+ }
+ }
+
+ for (const auto& input : stage.GetInputs()) {
+ if (input.GetTypeCase() == NKqpProto::TKqpPhyConnection::kStreamLookup) {
+ auto& info = GetInfo(MakeTableId(input.GetStreamLookup().GetTable()));
+ for (auto& column : input.GetStreamLookup().GetColumns()) {
+ info->AddColumn(column);
+ }
+ }
+
+ if (input.GetTypeCase() == NKqpProto::TKqpPhyConnection::kSequencer) {
+ auto& info = GetInfo(MakeTableId(input.GetSequencer().GetTable()));
+ for(auto& column: input.GetSequencer().GetColumns()) {
+ info->AddColumn(column);
+ }
+ }
+ }
+ }
+}
+
const TKqpPhyTxHolder::TConstPtr& TPreparedQueryHolder::GetPhyTx(ui32 txId) const {
YQL_ENSURE(txId < Transactions.size());
return Transactions[txId];
diff --git a/ydb/core/kqp/query_data/kqp_prepared_query.h b/ydb/core/kqp/query_data/kqp_prepared_query.h
index d1d190dbac..b1d07486da 100644
--- a/ydb/core/kqp/query_data/kqp_prepared_query.h
+++ b/ydb/core/kqp/query_data/kqp_prepared_query.h
@@ -1,5 +1,6 @@
#pragma once
+#include <ydb/core/kqp/common/kqp_resolve.h>
#include <ydb/core/kqp/query_data/kqp_predictor.h>
#include <ydb/core/kqp/provider/yql_kikimr_settings.h>
#include <ydb/core/protos/kqp.pb.h>
@@ -31,6 +32,10 @@ struct TPhyTxResultMetadata {
TVector<ui32> ColumnOrder;
};
+struct TTableConstInfoMap : public TAtomicRefCount<TTableConstInfoMap> {
+ THashMap<TTableId, TIntrusivePtr<TKqpTableKeys::TTableConstInfo>> Map;
+};
+
class TKqpPhyTxHolder {
std::shared_ptr<const NKikimrKqp::TPreparedQuery> PreparedQuery;
const NKqpProto::TKqpPhyTx* Proto;
@@ -38,6 +43,8 @@ class TKqpPhyTxHolder {
TVector<TPhyTxResultMetadata> TxResultsMeta;
std::shared_ptr<TPreparedQueryAllocHolder> Alloc;
std::vector<TStagePredictor> Predictors;
+ TIntrusivePtr<TTableConstInfoMap> TableConstInfoById;
+
public:
using TConstPtr = std::shared_ptr<const TKqpPhyTxHolder>;
@@ -97,8 +104,13 @@ public:
return Proto->ShortDebugString();
}
+ TIntrusiveConstPtr<TTableConstInfoMap> GetTableConstInfoById() const {
+ return TableConstInfoById;
+ }
+
+
TKqpPhyTxHolder(const std::shared_ptr<const NKikimrKqp::TPreparedQuery>& pq, const NKqpProto::TKqpPhyTx* proto,
- const std::shared_ptr<TPreparedQueryAllocHolder>& alloc);
+ const std::shared_ptr<TPreparedQueryAllocHolder>& alloc, TIntrusivePtr<TTableConstInfoMap> tableConstInfoById);
bool IsLiteralTx() const;
};
@@ -115,11 +127,14 @@ public:
class TPreparedQueryHolder {
private:
+ using TTableConstInfo = TKqpTableKeys::TTableConstInfo;
+
YDB_ACCESSOR_DEF(TLlvmSettings, LlvmSettings);
std::shared_ptr<const NKikimrKqp::TPreparedQuery> Proto;
std::shared_ptr<TPreparedQueryAllocHolder> Alloc;
TVector<TString> QueryTables;
std::vector<TKqpPhyTxHolder::TConstPtr> Transactions;
+ TIntrusivePtr<TTableConstInfoMap> TableConstInfoById;
public:
@@ -164,6 +179,20 @@ public:
const NKqpProto::TKqpPhyQuery& GetPhysicalQuery() const {
return Proto->GetPhysicalQuery();
}
+
+ TIntrusivePtr<TTableConstInfo>& GetInfo(const TTableId& tableId) {
+ auto info = TableConstInfoById->Map.FindPtr(tableId);
+ MKQL_ENSURE_S(info);
+ return *info;
+ }
+
+ const THashMap<TTableId, TIntrusivePtr<TTableConstInfo>>& GetTableConstInfo() const {
+ return TableConstInfoById->Map;
+ }
+
+ void FillTable(const NKqpProto::TKqpPhyTable& phyTable);
+
+ void FillTables(const google::protobuf::RepeatedPtrField< ::NKqpProto::TKqpPhyStage>& stages);
};