diff options
author | makostrov <makostrov@yandex-team.com> | 2023-08-07 15:34:13 +0300 |
---|---|---|
committer | makostrov <makostrov@yandex-team.com> | 2023-08-07 16:51:50 +0300 |
commit | 67c8b51e98caa2abfc405b42b9497563207dd424 (patch) | |
tree | a365a726ed4f71ee34fc89f8488f25175ccc4309 | |
parent | f785ad0ebb0c4618aa237124e6ad8f68dd622023 (diff) | |
download | ydb-67c8b51e98caa2abfc405b42b9497563207dd424.tar.gz |
[kqp] delete table resolver actor KIKIMR-18803
KIKIMR-18803
-rw-r--r-- | ydb/core/kqp/common/kqp_resolve.h | 147 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_data_executer.cpp | 20 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_impl.h | 30 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_partition_helper.cpp | 142 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_partition_helper.h | 30 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_scan_executer.cpp | 10 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_table_resolver.cpp | 178 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_table_resolver.h | 2 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp | 77 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_tasks_graph.h | 10 | ||||
-rw-r--r-- | ydb/core/kqp/query_data/kqp_prepared_query.cpp | 66 | ||||
-rw-r--r-- | ydb/core/kqp/query_data/kqp_prepared_query.h | 31 |
12 files changed, 404 insertions, 339 deletions
diff --git a/ydb/core/kqp/common/kqp_resolve.h b/ydb/core/kqp/common/kqp_resolve.h index 2aaef8e640..0fbba98582 100644 --- a/ydb/core/kqp/common/kqp_resolve.h +++ b/ydb/core/kqp/common/kqp_resolve.h @@ -26,29 +26,150 @@ class TKqpTableKeys { public: using TColumn = NSharding::TShardingBase::TColumn; - struct TTable { - public: + struct TTableConstInfo : public TAtomicRefCount<TTableConstInfo> { TString Path; TMap<TString, TColumn> Columns; TVector<TString> KeyColumns; TVector<NScheme::TTypeInfo> KeyColumnTypes; ETableKind TableKind = ETableKind::Unknown; THashMap<TString, TString> Sequences; + + TTableConstInfo() {} + TTableConstInfo(const TString& path) : Path(path) {} + + void FillColumn(const NKqpProto::TKqpPhyColumn& phyColumn) { + if (Columns.FindPtr(phyColumn.GetId().GetName())) { + return; + } + + TKqpTableKeys::TColumn column; + column.Id = phyColumn.GetId().GetId(); + + if (phyColumn.GetTypeId() != NScheme::NTypeIds::Pg) { + column.Type = NScheme::TTypeInfo(phyColumn.GetTypeId()); + } else { + column.Type = NScheme::TTypeInfo(phyColumn.GetTypeId(), + NPg::TypeDescFromPgTypeName(phyColumn.GetPgTypeName())); + } + + Columns.emplace(phyColumn.GetId().GetName(), std::move(column)); + if (!phyColumn.GetDefaultFromSequence().empty()) { + TString seq = phyColumn.GetDefaultFromSequence(); + if (!seq.StartsWith(Path)) { + seq = Path + "/" + seq; + } + + Sequences.emplace(phyColumn.GetId().GetName(), seq); + } + } + + void AddColumn(const TString& columnName) { + auto& sysColumns = GetSystemColumns(); + if (Columns.FindPtr(columnName)) { + return; + } + + auto* systemColumn = sysColumns.FindPtr(columnName); + YQL_ENSURE(systemColumn, "Unknown table column" + << ", table: " << Path + << ", column: " << columnName); + + TKqpTableKeys::TColumn column; + column.Id = systemColumn->ColumnId; + column.Type = NScheme::TTypeInfo(systemColumn->TypeId); + Columns.emplace(columnName, std::move(column)); + } + + + void FillTable(const NKqpProto::TKqpPhyTable& phyTable) { + switch (phyTable.GetKind()) { + case NKqpProto::TABLE_KIND_DS: + TableKind = ETableKind::Datashard; + break; + case NKqpProto::TABLE_KIND_OLAP: + TableKind = ETableKind::Olap; + break; + case NKqpProto::TABLE_KIND_SYS_VIEW: + TableKind = ETableKind::SysView; + break; + default: + YQL_ENSURE(false, "Unexpected phy table kind: " << (i64) phyTable.GetKind()); + } + + for (const auto& [_, phyColumn] : phyTable.GetColumns()) { + FillColumn(phyColumn); + } + + YQL_ENSURE(KeyColumns.empty()); + KeyColumns.reserve(phyTable.KeyColumnsSize()); + YQL_ENSURE(KeyColumnTypes.empty()); + KeyColumnTypes.reserve(phyTable.KeyColumnsSize()); + for (const auto& keyColumnId : phyTable.GetKeyColumns()) { + const auto& column = Columns.FindPtr(keyColumnId.GetName()); + YQL_ENSURE(column); + + KeyColumns.push_back(keyColumnId.GetName()); + KeyColumnTypes.push_back(column->Type); + } + } + }; + + struct TTable { + private: + TIntrusivePtr<TTableConstInfo> TableConstInfo; TIntrusiveConstPtr<NSchemeCache::TSchemeCacheNavigate::TColumnTableInfo> ColumnTableInfo; - const TMap<TString, TColumn>& GetColumnsRemap() const { - return Columns; + public: + TTable() : TableConstInfo(MakeIntrusive<TTableConstInfo>()) {} + + TTable(TIntrusivePtr<TTableConstInfo> constInfoPtr) : TableConstInfo(constInfoPtr) {} + + const TString& GetPath() const { + return TableConstInfo->Path; + } + + const TMap<TString, TColumn>& GetColumns() const { + return TableConstInfo->Columns; + } + + const TVector<TString>& GetKeyColumns() const { + return TableConstInfo->KeyColumns; + } + + const TVector<NScheme::TTypeInfo>& GetKeyColumnTypes() const { + return TableConstInfo->KeyColumnTypes; + } + + const ETableKind& GetTableKind() const { + return TableConstInfo->TableKind; + } + + const THashMap<TString, TString>& GetSequences() const { + return TableConstInfo->Sequences; + } + + TIntrusiveConstPtr<NSchemeCache::TSchemeCacheNavigate::TColumnTableInfo> GetColumnTableInfo() const { + return ColumnTableInfo; + } + + void SetColumnTableInfo(TIntrusiveConstPtr<NSchemeCache::TSchemeCacheNavigate::TColumnTableInfo> columnTableInfo) { + ColumnTableInfo = columnTableInfo; } - std::unique_ptr<NSharding::TShardingBase> BuildSharding() const { - if (ColumnTableInfo) { - auto result = NSharding::TShardingBase::BuildShardingOperator(ColumnTableInfo->Description.GetSharding()); + static std::unique_ptr<NSharding::TShardingBase> BuildSharding(const TIntrusiveConstPtr<NSchemeCache::TSchemeCacheNavigate::TColumnTableInfo>& columnTableInfo) { + if (columnTableInfo) { + auto result = NSharding::TShardingBase::BuildShardingOperator(columnTableInfo->Description.GetSharding()); YQL_ENSURE(result); return result; } else { return nullptr; } } + + void SetPath(const TStringBuf& path) { + TableConstInfo->Path = path; + } + }; TTable* FindTablePtr(const TTableId& id) { @@ -71,13 +192,13 @@ public: return *table; } - TTable& GetOrAddTable(const TTableId& id, const TStringBuf path) { + TTable& GetOrAddTable(const TTableId& id, const TStringBuf& path) { auto& table = TablesById[id]; - if (table.Path.empty()) { - table.Path = path; + if (table.GetPath().empty()) { + table.SetPath(path); } else { - MKQL_ENSURE_S(table.Path == path); + MKQL_ENSURE_S(table.GetPath() == path); } return table; @@ -91,6 +212,10 @@ public: return TablesById; } + void AddTable(const TTableId& id, TIntrusivePtr<TTableConstInfo> info) { + TablesById.insert_or_assign(id, info); + } + private: THashMap<TTableId, TTable> TablesById; }; diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 4bc8e8ba9e..fd6cbc846e 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -1066,9 +1066,9 @@ private: if (!res->Record.GetTxLocks().empty()) { auto& lock = res->Record.GetTxLocks(0); auto tableId = TTableId(lock.GetSchemeShard(), lock.GetPathId()); - auto it = FindIf(GetTableKeys().Get(), [tableId](const auto& x){ return x.first.HasSamePath(tableId); }); - if (it != GetTableKeys().Get().end()) { - tableName = it->second.Path; + auto it = FindIf(TasksGraph.GetStagesInfo(), [tableId](const auto& x){ return x.second.Meta.TableId.HasSamePath(tableId); }); + if (it != TasksGraph.GetStagesInfo().end()) { + tableName = it->second.Meta.TableConstInfo->Path; } } @@ -1356,17 +1356,17 @@ private: return task; }; - const auto& table = GetTableKeys().GetTable(stageInfo.Meta.TableId); - const auto& keyTypes = table.KeyColumnTypes;; + const auto& tableInfo = stageInfo.Meta.TableConstInfo; + const auto& keyTypes = tableInfo->KeyColumnTypes; for (auto& op : stage.GetTableOps()) { Y_VERIFY_DEBUG(stageInfo.Meta.TablePath == op.GetTable().GetPath()); - auto columns = BuildKqpColumns(op, table); + auto columns = BuildKqpColumns(op, tableInfo); switch (op.GetTypeCase()) { case NKqpProto::TKqpPhyTableOperation::kReadRanges: case NKqpProto::TKqpPhyTableOperation::kReadRange: case NKqpProto::TKqpPhyTableOperation::kLookup: { - auto partitions = PrunePartitions(GetTableKeys(), op, stageInfo, HolderFactory(), TypeEnv()); + auto partitions = PrunePartitions(op, stageInfo, HolderFactory(), TypeEnv()); auto readSettings = ExtractReadSettings(op, stageInfo, HolderFactory(), TypeEnv()); for (auto& [shardId, shardInfo] : partitions) { @@ -1420,7 +1420,7 @@ private: ShardsWithEffects.insert(task.Meta.ShardId); } } else { - auto result = PruneEffectPartitions(GetTableKeys(), op, stageInfo, HolderFactory(), TypeEnv()); + auto result = PruneEffectPartitions(op, stageInfo, HolderFactory(), TypeEnv()); for (auto& [shardId, shardInfo] : result) { YQL_ENSURE(!shardInfo.KeyReadRanges); YQL_ENSURE(shardInfo.KeyWriteRanges); @@ -1441,7 +1441,7 @@ private: } for (const auto& [name, info] : shardInfo.ColumnWrites) { - auto& column = table.Columns.at(name); + auto& column = tableInfo->Columns.at(name); auto& taskColumnWrite = task.Meta.Writes->ColumnWrites[column.Id]; taskColumnWrite.Column.Id = column.Id; @@ -1724,7 +1724,7 @@ private: YQL_ENSURE(stageInfo.Tasks.size() == 1, "Unexpected multiple tasks in single-partition stage"); } - BuildKqpStageChannels(TasksGraph, GetTableKeys(), stageInfo, TxId, /* enableSpilling */ false); + BuildKqpStageChannels(TasksGraph, stageInfo, TxId, /* enableSpilling */ false); } ResponseEv->InitTxResult(tx.Body); diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index 1014b280cc..61f2db18f8 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -344,7 +344,7 @@ protected: } auto kqpTableResolver = CreateKqpTableResolver(this->SelfId(), TxId, UserToken, Request.Transactions, - GetTableKeysRef(), TasksGraph); + TasksGraph); KqpTableResolverId = this->RegisterWithSameMailbox(kqpTableResolver); LOG_T("Got request, become WaitResolveState"); @@ -694,8 +694,8 @@ protected: auto& stage = stageInfo.Meta.GetStage(stageInfo.Id); - const auto& table = GetTableKeys().GetTable(stageInfo.Meta.TableId); - const auto& keyTypes = table.KeyColumnTypes; + const auto& tableInfo = stageInfo.Meta.TableConstInfo; + const auto& keyTypes = tableInfo->KeyColumnTypes; for (auto& op : stage.GetTableOps()) { Y_VERIFY_DEBUG(stageInfo.Meta.TablePath == op.GetTable().GetPath()); @@ -724,7 +724,7 @@ protected: TTaskMeta::TShardReadInfo readInfo = { .Ranges = std::move(keyRanges), - .Columns = BuildKqpColumns(op, table), + .Columns = BuildKqpColumns(op, tableInfo), }; task.Meta.Reads.ConstructInPlace(); @@ -797,12 +797,12 @@ protected: auto& source = stage.GetSources(0).GetReadRangesSource(); - const auto& table = GetTableKeys().GetTable(MakeTableId(source.GetTable())); - const auto& keyTypes = table.KeyColumnTypes; + const auto& tableInfo = stageInfo.Meta.TableConstInfo; + const auto& keyTypes = tableInfo->KeyColumnTypes; - YQL_ENSURE(table.TableKind != NKikimr::NKqp::ETableKind::Olap); + YQL_ENSURE(tableInfo->TableKind != NKikimr::NKqp::ETableKind::Olap); - auto columns = BuildKqpColumns(source, table); + auto columns = BuildKqpColumns(source, tableInfo); const auto& snapshot = GetSnapshot(); @@ -896,9 +896,9 @@ protected: }; if (source.GetSequentialInFlightShards()) { - auto [startShard, shardInfo] = MakeVirtualTablePartition(GetTableKeys(), source, stageInfo, HolderFactory(), TypeEnv()); + auto [startShard, shardInfo] = MakeVirtualTablePartition(source, stageInfo, HolderFactory(), TypeEnv()); if (Stats) { - THashMap<ui64, TShardInfo> partitions = PrunePartitions(GetTableKeys(), source, stageInfo, HolderFactory(), TypeEnv()); + THashMap<ui64, TShardInfo> partitions = PrunePartitions(source, stageInfo, HolderFactory(), TypeEnv()); for (auto& [shardId, _] : partitions) { Stats->AffectedShards.insert(shardId); } @@ -910,7 +910,7 @@ protected: return 0; } } else { - THashMap<ui64, TShardInfo> partitions = PrunePartitions(GetTableKeys(), source, stageInfo, HolderFactory(), TypeEnv()); + THashMap<ui64, TShardInfo> partitions = PrunePartitions(source, stageInfo, HolderFactory(), TypeEnv()); for (auto& [shardId, shardInfo] : partitions) { addPartiton(shardId, shardId, shardInfo, {}); } @@ -1141,14 +1141,6 @@ protected: } } - const TKqpTableKeys& GetTableKeys() const { - return TasksGraph.GetMeta().TableKeys; - } - - TKqpTableKeys& GetTableKeysRef() { - return TasksGraph.GetMeta().TableKeys; - } - std::unordered_map<ui64, IActor*>& GetResultChannelProxies() { return ResultChannelProxies; } diff --git a/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp b/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp index 1283f323b8..425b73a7c7 100644 --- a/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp +++ b/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp @@ -33,12 +33,12 @@ struct TShardParamValuesAndRanges { THashMap<ui64, TShardParamValuesAndRanges> PartitionParamByKey( const NUdf::TUnboxedValue& value, NKikimr::NMiniKQL::TType* type, const TTableId& tableId, - const TKqpTableKeys& tableKeys, const TKeyDesc& key, const NMiniKQL::THolderFactory&, + const TStageInfo& stageInfo, const TKeyDesc& key, const NMiniKQL::THolderFactory&, // Here is problem in ... const NMiniKQL::TTypeEnvironment& typeEnv) { auto guard = typeEnv.BindAllocator(); YQL_ENSURE(tableId.HasSamePath(key.TableId)); - auto& table = tableKeys.GetTable(tableId); + auto& tableInfo = stageInfo.Meta.TableConstInfo; THashMap<ui64, TShardParamValuesAndRanges> ret; @@ -47,19 +47,19 @@ THashMap<ui64, TShardParamValuesAndRanges> PartitionParamByKey( YQL_ENSURE(itemType->GetKind() == NMiniKQL::TType::EKind::Struct); auto* structType = static_cast<NMiniKQL::TStructType*>(itemType); - const ui64 keyLen = table.KeyColumns.size(); + const ui64 keyLen = tableInfo->KeyColumns.size(); TVector<ui32> keyColumnIndices; keyColumnIndices.reserve(keyLen); - for (auto& keyColumn : table.KeyColumns) { + for (auto& keyColumn : tableInfo->KeyColumns) { keyColumnIndices.push_back(structType->GetMemberIndex(keyColumn)); } NUdf::TUnboxedValue paramValue; - std::unique_ptr<NSharding::TShardingBase> sharding = table.BuildSharding(); + std::unique_ptr<NSharding::TShardingBase> sharding = TKqpTableKeys::TTable::BuildSharding(stageInfo.Meta.ColumnTableInfoPtr); std::unique_ptr<NSharding::TUnboxedValueReader> unboxedReader; if (sharding) { - unboxedReader = std::make_unique<NSharding::TUnboxedValueReader>(structType, table.GetColumnsRemap(), sharding->GetShardingColumns()); + unboxedReader = std::make_unique<NSharding::TUnboxedValueReader>(structType, tableInfo->Columns, sharding->GetShardingColumns()); } auto it = value.GetListIterator(); while (it.Next(paramValue)) { @@ -67,10 +67,10 @@ THashMap<ui64, TShardParamValuesAndRanges> PartitionParamByKey( if (sharding) { shardId = key.GetPartitions()[sharding->CalcShardId(paramValue, *unboxedReader)].ShardId; } else { - auto keyValue = MakeKeyCells(paramValue, table.KeyColumnTypes, keyColumnIndices, + auto keyValue = MakeKeyCells(paramValue, tableInfo->KeyColumnTypes, keyColumnIndices, typeEnv, /* copyValues */ true); Y_VERIFY_DEBUG(keyValue.size() == keyLen); - const ui32 partitionIndex = FindKeyPartitionIndex(keyValue, key.GetPartitions(), table.KeyColumnTypes, + const ui32 partitionIndex = FindKeyPartitionIndex(keyValue, key.GetPartitions(), tableInfo->KeyColumnTypes, [](const auto& partition) { return *partition.Range; }); shardId = key.GetPartitions()[partitionIndex].ShardId; @@ -96,7 +96,7 @@ THashMap<ui64, TShardParamValuesAndRanges> PartitionParamByKey( ui32 sizeBytes = NDq::TDqDataSerializer::EstimateSize(columnValue, columnType); // Sanity check, we only expect table columns in param values - Y_VERIFY_DEBUG(table.Columns.contains(columnName)); + Y_VERIFY_DEBUG(tableInfo->Columns.contains(columnName)); auto& columnWrite = shardData.ColumnWrites[columnName]; columnWrite.MaxValueSizeBytes = std::max(columnWrite.MaxValueSizeBytes, sizeBytes); @@ -111,12 +111,13 @@ THashMap<ui64, TShardParamValuesAndRanges> PartitionParamByKey( THashMap<ui64, TShardParamValuesAndRanges> PartitionParamByKeyPrefix( const NUdf::TUnboxedValue& value, NKikimr::NMiniKQL::TType* type, - const TTableId& tableId, const TKqpTableKeys& tableKeys, const TKeyDesc& key, + const TTableId& tableId, const TIntrusiveConstPtr<TKqpTableKeys::TTableConstInfo>& tableInfo, const TKeyDesc& key, const NMiniKQL::THolderFactory&, const NMiniKQL::TTypeEnvironment& typeEnv) { + YQL_ENSURE(tableInfo); + auto guard = typeEnv.BindAllocator(); YQL_ENSURE(tableId.HasSamePath(key.TableId)); - auto& table = tableKeys.GetTable(tableId); THashMap<ui64, TShardParamValuesAndRanges> ret; @@ -125,13 +126,13 @@ THashMap<ui64, TShardParamValuesAndRanges> PartitionParamByKeyPrefix( YQL_ENSURE(itemType->GetKind() == NMiniKQL::TType::EKind::Struct); auto& structType = static_cast<NMiniKQL::TStructType&>(*itemType); - const ui64 keyLen = table.KeyColumns.size(); + const ui64 keyLen = tableInfo->KeyColumns.size(); TVector<NScheme::TTypeInfo> keyFullType{Reserve(keyLen)}; TVector<NScheme::TTypeInfo> keyPrefixType{Reserve(keyLen)}; TVector<ui32> keyPrefixIndices{Reserve(keyLen)}; - for (const auto& keyColumn : table.KeyColumns) { + for (const auto& keyColumn : tableInfo->KeyColumns) { auto columnInfo = NDq::FindColumnInfo(&structType, keyColumn); if (!columnInfo) { break; @@ -146,7 +147,7 @@ THashMap<ui64, TShardParamValuesAndRanges> PartitionParamByKeyPrefix( YQL_ENSURE(!keyPrefixType.empty()); for (ui64 i = keyFullType.size(); i < keyLen; ++i) { - keyFullType.push_back(table.Columns.at(table.KeyColumns[i]).Type); + keyFullType.push_back(tableInfo->Columns.at(tableInfo->KeyColumns[i]).Type); } NUdf::TUnboxedValue paramValue; @@ -485,15 +486,13 @@ TString TShardInfo::ToString(const TVector<NScheme::TTypeInfo>& keyTypes, const return sb; } -THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys, - const NKqpProto::TKqpPhyOpReadRange& readRange, const TStageInfo& stageInfo, +THashMap<ui64, TShardInfo> PrunePartitions(const NKqpProto::TKqpPhyOpReadRange& readRange, const TStageInfo& stageInfo, const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv) { auto guard = typeEnv.BindAllocator(); - const auto* table = tableKeys.FindTablePtr(stageInfo.Meta.TableId); - YQL_ENSURE(table); + const auto& tableInfo = stageInfo.Meta.TableConstInfo; - const auto& keyColumnTypes = table->KeyColumnTypes; + const auto& keyColumnTypes = tableInfo->KeyColumnTypes; YQL_ENSURE(readRange.HasKeyRange()); auto range = MakeKeyRange(keyColumnTypes, readRange.GetKeyRange(), stageInfo, holderFactory, typeEnv); @@ -517,15 +516,13 @@ THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys, return shardInfoMap; } -THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys, - const NKqpProto::TKqpPhyOpReadRanges& readRanges, const TStageInfo& stageInfo, +THashMap<ui64, TShardInfo> PrunePartitions(const NKqpProto::TKqpPhyOpReadRanges& readRanges, const TStageInfo& stageInfo, const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv) { Y_UNUSED(holderFactory); - const auto* table = tableKeys.FindTablePtr(stageInfo.Meta.TableId); - YQL_ENSURE(table); + const auto& tableInfo = stageInfo.Meta.TableConstInfo; - const auto& keyColumnTypes = table->KeyColumnTypes; + const auto& keyColumnTypes = tableInfo->KeyColumnTypes; auto ranges = FillReadRangesInternal(keyColumnTypes, readRanges, stageInfo, typeEnv); THashMap<ui64, TShardInfo> shardInfoMap; @@ -558,15 +555,13 @@ THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys, return shardInfoMap; } -TVector<TSerializedPointOrRange> ExtractRanges(const TKqpTableKeys& tableKeys, - const NKqpProto::TKqpReadRangesSource& source, const TStageInfo& stageInfo, +TVector<TSerializedPointOrRange> ExtractRanges(const NKqpProto::TKqpReadRangesSource& source, const TStageInfo& stageInfo, const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv, TGuard<NKikimr::NMiniKQL::TScopedAlloc>&) { - const auto* table = tableKeys.FindTablePtr(stageInfo.Meta.TableId); - YQL_ENSURE(table); + const auto& tableInfo = stageInfo.Meta.TableConstInfo; - const auto& keyColumnTypes = table->KeyColumnTypes; + const auto& keyColumnTypes = tableInfo->KeyColumnTypes; TVector<TSerializedPointOrRange> ranges; if (source.HasRanges()) { @@ -588,16 +583,14 @@ TVector<TSerializedPointOrRange> ExtractRanges(const TKqpTableKeys& tableKeys, return ranges; } -std::pair<ui64, TShardInfo> MakeVirtualTablePartition(const TKqpTableKeys& tableKeys, - const NKqpProto::TKqpReadRangesSource& source, const TStageInfo& stageInfo, +std::pair<ui64, TShardInfo> MakeVirtualTablePartition(const NKqpProto::TKqpReadRangesSource& source, const TStageInfo& stageInfo, const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv) { auto guard = typeEnv.BindAllocator(); - const auto* table = tableKeys.FindTablePtr(stageInfo.Meta.TableId); - YQL_ENSURE(table); + const auto& tableInfo = stageInfo.Meta.TableConstInfo; - const auto& keyColumnTypes = table->KeyColumnTypes; - auto ranges = ExtractRanges(tableKeys, source, stageInfo, holderFactory, typeEnv, guard); + const auto& keyColumnTypes = tableInfo->KeyColumnTypes; + auto ranges = ExtractRanges(source, stageInfo, holderFactory, typeEnv, guard); ui64 shard = 0; if (!ranges.empty()) { @@ -627,16 +620,14 @@ std::pair<ui64, TShardInfo> MakeVirtualTablePartition(const TKqpTableKeys& table } -THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys, - const NKqpProto::TKqpReadRangesSource& source, const TStageInfo& stageInfo, +THashMap<ui64, TShardInfo> PrunePartitions(const NKqpProto::TKqpReadRangesSource& source, const TStageInfo& stageInfo, const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv) { auto guard = typeEnv.BindAllocator(); - const auto* table = tableKeys.FindTablePtr(stageInfo.Meta.TableId); - YQL_ENSURE(table); + const auto& tableInfo = stageInfo.Meta.TableConstInfo; - const auto& keyColumnTypes = table->KeyColumnTypes; - auto ranges = ExtractRanges(tableKeys, source, stageInfo, holderFactory, typeEnv, guard); + const auto& keyColumnTypes = tableInfo->KeyColumnTypes; + auto ranges = ExtractRanges(source, stageInfo, holderFactory, typeEnv, guard); THashMap<ui64, TShardInfo> shardInfoMap; @@ -669,18 +660,17 @@ THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys, } -THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys, - const NKqpProto::TKqpPhyOpReadOlapRanges& readRanges, const TStageInfo& stageInfo, +THashMap<ui64, TShardInfo> PrunePartitions(const NKqpProto::TKqpPhyOpReadOlapRanges& readRanges, const TStageInfo& stageInfo, const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv) { Y_UNUSED(holderFactory); auto guard = typeEnv.BindAllocator(); - const auto* table = tableKeys.FindTablePtr(stageInfo.Meta.TableId); - YQL_ENSURE(table); - YQL_ENSURE(table->TableKind == ETableKind::Olap); + const auto& tableInfo = stageInfo.Meta.TableConstInfo; + + YQL_ENSURE(tableInfo->TableKind == ETableKind::Olap); YQL_ENSURE(stageInfo.Meta.TableKind == ETableKind::Olap); - const auto& keyColumnTypes = table->KeyColumnTypes; + const auto& keyColumnTypes = tableInfo->KeyColumnTypes; auto ranges = FillReadRanges(keyColumnTypes, readRanges, stageInfo, typeEnv); THashMap<ui64, TShardInfo> shardInfoMap; @@ -699,19 +689,18 @@ THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys, return shardInfoMap; } -THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys, - const NKqpProto::TKqpPhyTableOperation& operation, const TStageInfo& stageInfo, +THashMap<ui64, TShardInfo> PrunePartitions(const NKqpProto::TKqpPhyTableOperation& operation, const TStageInfo& stageInfo, const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv) { switch (operation.GetTypeCase()) { case NKqpProto::TKqpPhyTableOperation::kReadRanges: - return PrunePartitions(tableKeys, operation.GetReadRanges(), stageInfo, holderFactory, typeEnv); + return PrunePartitions(operation.GetReadRanges(), stageInfo, holderFactory, typeEnv); case NKqpProto::TKqpPhyTableOperation::kReadRange: - return PrunePartitions(tableKeys, operation.GetReadRange(), stageInfo, holderFactory, typeEnv); + return PrunePartitions(operation.GetReadRange(), stageInfo, holderFactory, typeEnv); case NKqpProto::TKqpPhyTableOperation::kLookup: - return PrunePartitions(tableKeys, operation.GetLookup(), stageInfo, holderFactory, typeEnv); + return PrunePartitions(operation.GetLookup(), stageInfo, holderFactory, typeEnv); case NKqpProto::TKqpPhyTableOperation::kReadOlapRange: - return PrunePartitions(tableKeys, operation.GetReadOlapRange(), stageInfo, holderFactory, typeEnv); + return PrunePartitions(operation.GetReadOlapRange(), stageInfo, holderFactory, typeEnv); default: YQL_ENSURE(false, "Unexpected table scan operation: " << static_cast<ui32>(operation.GetTypeCase())); break; @@ -724,12 +713,12 @@ namespace { using namespace NMiniKQL; THashMap<ui64, TShardInfo> PartitionLookupByParameterValue(const NKqpProto::TKqpPhyParamValue& proto, - const TKqpTableKeys& tableKeys, const TStageInfo& stageInfo, const THolderFactory& holderFactory, + const TStageInfo& stageInfo, const THolderFactory& holderFactory, const TTypeEnvironment& typeEnv) { const auto& name = proto.GetParamName(); auto [type, value] = stageInfo.Meta.Tx.Params->GetParameterUnboxedValue(name); - auto shardsMap = PartitionParamByKeyPrefix(value, type, stageInfo.Meta.TableId, tableKeys, *stageInfo.Meta.ShardKey, + auto shardsMap = PartitionParamByKeyPrefix(value, type, stageInfo.Meta.TableId, stageInfo.Meta.TableConstInfo, *stageInfo.Meta.ShardKey, holderFactory, typeEnv); THashMap<ui64, TShardInfo> shardInfoMap; @@ -754,19 +743,19 @@ THashMap<ui64, TShardInfo> PartitionLookupByParameterValue(const NKqpProto::TKqp } THashMap<ui64, TShardInfo> PartitionLookupByRowsList(const NKqpProto::TKqpPhyRowsList& proto, - const TKqpTableKeys& tableKeys, const TStageInfo& stageInfo, const THolderFactory& holderFactory, + const TStageInfo& stageInfo, const THolderFactory& holderFactory, const TTypeEnvironment& typeEnv) { - const auto& table = tableKeys.GetTable(stageInfo.Meta.ShardKey->TableId); - std::unordered_map<ui64, THashSet<TString>> shardParams; // shardId -> paramNames std::unordered_map<ui64, TShardParamValuesAndRanges> ret; THashMap<ui64, TShardInfo> shardInfoMap; + const auto& tableInfo = stageInfo.Meta.TableConstInfo; + for (const auto& row : proto.GetRows()) { TVector<TCell> keyFrom, keyTo; - keyFrom.resize(table.KeyColumns.size()); + keyFrom.resize(tableInfo->KeyColumns.size()); keyTo.resize(row.GetColumns().size()); NUdf::TUnboxedValue mkqlValue; @@ -792,17 +781,17 @@ THashMap<ui64, TShardInfo> PartitionLookupByRowsList(const NKqpProto::TKqpPhyRow } } - for (ui64 i = 0; i < table.KeyColumns.size(); ++i) { - if (table.KeyColumns[i] == columnName) { + for (ui64 i = 0; i < tableInfo->KeyColumns.size(); ++i) { + if (tableInfo->KeyColumns[i] == columnName) { keyFrom[i] = keyTo[i] = NMiniKQL::MakeCell( - table.KeyColumnTypes[i], mkqlValue, typeEnv, /* copyValue */ false); + tableInfo->KeyColumnTypes[i], mkqlValue, typeEnv, /* copyValue */ false); break; } } } auto range = TTableRange(keyFrom, true, keyTo, true, /* point */ false); - auto partitions = GetKeyRangePartitions(range, stageInfo.Meta.ShardKey->GetPartitions(), table.KeyColumnTypes); + auto partitions = GetKeyRangePartitions(range, stageInfo.Meta.ShardKey->GetPartitions(), tableInfo->KeyColumnTypes); for (auto& partitionWithRange: partitions) { ui64 shardId = partitionWithRange.PartitionInfo->ShardId; @@ -844,7 +833,7 @@ THashMap<ui64, TShardInfo> PartitionLookupByRowsList(const NKqpProto::TKqpPhyRow } // namespace -THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys, const NKqpProto::TKqpPhyOpLookup& lookup, +THashMap<ui64, TShardInfo> PrunePartitions(const NKqpProto::TKqpPhyOpLookup& lookup, const TStageInfo& stageInfo, const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv) { auto guard = typeEnv.BindAllocator(); @@ -858,12 +847,12 @@ THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys, const switch (auto kind = lookup.GetKeysValue().GetKindCase()) { case NKqpProto::TKqpPhyValue::kParamValue: { - return PartitionLookupByParameterValue(lookup.GetKeysValue().GetParamValue(), tableKeys, stageInfo, + return PartitionLookupByParameterValue(lookup.GetKeysValue().GetParamValue(), stageInfo, holderFactory, typeEnv); } case NKqpProto::TKqpPhyValue::kRowsList: { - return PartitionLookupByRowsList(lookup.GetKeysValue().GetRowsList(), tableKeys, stageInfo, + return PartitionLookupByRowsList(lookup.GetKeysValue().GetRowsList(), stageInfo, holderFactory, typeEnv); } @@ -876,7 +865,7 @@ THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys, const } template <typename TEffect> -THashMap<ui64, TShardInfo> PruneEffectPartitionsImpl(const TKqpTableKeys& tableKeys, const TEffect& effect, +THashMap<ui64, TShardInfo> PruneEffectPartitionsImpl(const TEffect& effect, const TStageInfo& stageInfo, const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv) { auto guard = typeEnv.BindAllocator(); @@ -886,7 +875,7 @@ THashMap<ui64, TShardInfo> PruneEffectPartitionsImpl(const TKqpTableKeys& tableK { const auto& name = effect.GetRowsValue().GetParamValue().GetParamName(); auto [type, value] = stageInfo.Meta.Tx.Params->GetParameterUnboxedValue(name); - auto shardsMap = PartitionParamByKey(value, type, stageInfo.Meta.TableId, tableKeys, *stageInfo.Meta.ShardKey, + auto shardsMap = PartitionParamByKey(value, type, stageInfo.Meta.TableId, stageInfo, *stageInfo.Meta.ShardKey, holderFactory, typeEnv); for (auto& [shardId, shardData] : shardsMap) { @@ -918,29 +907,26 @@ THashMap<ui64, TShardInfo> PruneEffectPartitionsImpl(const TKqpTableKeys& tableK return shardInfoMap; } -THashMap<ui64, TShardInfo> PruneEffectPartitions(const TKqpTableKeys& tableKeys, - const NKqpProto::TKqpPhyOpUpsertRows& effect, const TStageInfo& stageInfo, +THashMap<ui64, TShardInfo> PruneEffectPartitions(const NKqpProto::TKqpPhyOpUpsertRows& effect, const TStageInfo& stageInfo, const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv) { - return PruneEffectPartitionsImpl(tableKeys, effect, stageInfo, holderFactory, typeEnv); + return PruneEffectPartitionsImpl(effect, stageInfo, holderFactory, typeEnv); } -THashMap<ui64, TShardInfo> PruneEffectPartitions(const TKqpTableKeys& tableKeys, - const NKqpProto::TKqpPhyOpDeleteRows& effect, const TStageInfo& stageInfo, +THashMap<ui64, TShardInfo> PruneEffectPartitions(const NKqpProto::TKqpPhyOpDeleteRows& effect, const TStageInfo& stageInfo, const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv) { - return PruneEffectPartitionsImpl(tableKeys, effect, stageInfo, holderFactory, typeEnv); + return PruneEffectPartitionsImpl(effect, stageInfo, holderFactory, typeEnv); } -THashMap<ui64, TShardInfo> PruneEffectPartitions(const TKqpTableKeys& tableKeys, - const NKqpProto::TKqpPhyTableOperation& operation, const TStageInfo& stageInfo, +THashMap<ui64, TShardInfo> PruneEffectPartitions(const NKqpProto::TKqpPhyTableOperation& operation, const TStageInfo& stageInfo, const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv) { switch(operation.GetTypeCase()) { case NKqpProto::TKqpPhyTableOperation::kUpsertRows: - return PruneEffectPartitions(tableKeys, operation.GetUpsertRows(), stageInfo, holderFactory, typeEnv); + return PruneEffectPartitions(operation.GetUpsertRows(), stageInfo, holderFactory, typeEnv); case NKqpProto::TKqpPhyTableOperation::kDeleteRows: - return PruneEffectPartitions(tableKeys, operation.GetDeleteRows(), stageInfo, holderFactory, typeEnv); + return PruneEffectPartitions(operation.GetDeleteRows(), stageInfo, holderFactory, typeEnv); default: YQL_ENSURE(false, "Unexpected table operation: " << static_cast<ui32>(operation.GetTypeCase())); } diff --git a/ydb/core/kqp/executer_actor/kqp_partition_helper.h b/ydb/core/kqp/executer_actor/kqp_partition_helper.h index 3f5973d1c8..297e59e4ec 100644 --- a/ydb/core/kqp/executer_actor/kqp_partition_helper.h +++ b/ydb/core/kqp/executer_actor/kqp_partition_helper.h @@ -54,24 +54,19 @@ TVector<TSerializedPointOrRange> FillReadRanges(const TVector<NScheme::TTypeInfo const NKqpProto::TKqpPhyOpReadRanges& readRange, const TStageInfo& stageInfo, const NMiniKQL::TTypeEnvironment& typeEnv); -THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys, - const NKqpProto::TKqpPhyOpReadRange& readRange, const TStageInfo& stageInfo, +THashMap<ui64, TShardInfo> PrunePartitions(const NKqpProto::TKqpPhyOpReadRange& readRange, const TStageInfo& stageInfo, const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv); -THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys, - const NKqpProto::TKqpPhyOpReadRanges& readRanges, const TStageInfo& stageInfo, +THashMap<ui64, TShardInfo> PrunePartitions(const NKqpProto::TKqpPhyOpReadRanges& readRanges, const TStageInfo& stageInfo, const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv); -THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys, - const NKqpProto::TKqpPhyOpLookup& lookup, const TStageInfo& stageInfo, +THashMap<ui64, TShardInfo> PrunePartitions(const NKqpProto::TKqpPhyOpLookup& lookup, const TStageInfo& stageInfo, const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv); -std::pair<ui64, TShardInfo> MakeVirtualTablePartition(const TKqpTableKeys& tableKeys, - const NKqpProto::TKqpReadRangesSource& source, const TStageInfo& stageInfo, +std::pair<ui64, TShardInfo> MakeVirtualTablePartition(const NKqpProto::TKqpReadRangesSource& source, const TStageInfo& stageInfo, const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv); -THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys, - const NKqpProto::TKqpReadRangesSource& source, const TStageInfo& stageInfo, +THashMap<ui64, TShardInfo> PrunePartitions(const NKqpProto::TKqpReadRangesSource& source, const TStageInfo& stageInfo, const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv); ui64 ExtractItemsLimit(const TStageInfo& stageInfo, const NKqpProto::TKqpPhyValue& protoItemsLimit, @@ -80,24 +75,19 @@ ui64 ExtractItemsLimit(const TStageInfo& stageInfo, const NKqpProto::TKqpPhyValu // Returns the list of ColumnShards that can store rows from the specified range // NOTE: Unlike OLTP tables that store data in DataShards, data in OLAP tables is not range // partitioned and multiple ColumnShards store data from the same key range -THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys, - const NKqpProto::TKqpPhyOpReadOlapRanges& readRanges, const TStageInfo& stageInfo, +THashMap<ui64, TShardInfo> PrunePartitions(const NKqpProto::TKqpPhyOpReadOlapRanges& readRanges, const TStageInfo& stageInfo, const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv); -THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys, - const NKqpProto::TKqpPhyTableOperation& operation, const TStageInfo& stageInfo, +THashMap<ui64, TShardInfo> PrunePartitions(const NKqpProto::TKqpPhyTableOperation& operation, const TStageInfo& stageInfo, const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv); -THashMap<ui64, TShardInfo> PruneEffectPartitions(const TKqpTableKeys& tableKeys, - const NKqpProto::TKqpPhyOpUpsertRows& effect, const TStageInfo& stageInfo, +THashMap<ui64, TShardInfo> PruneEffectPartitions(const NKqpProto::TKqpPhyOpUpsertRows& effect, const TStageInfo& stageInfo, const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv); -THashMap<ui64, TShardInfo> PruneEffectPartitions(const TKqpTableKeys& tableKeys, - const NKqpProto::TKqpPhyOpDeleteRows& effect, const TStageInfo& stageInfo, +THashMap<ui64, TShardInfo> PruneEffectPartitions(const NKqpProto::TKqpPhyOpDeleteRows& effect, const TStageInfo& stageInfo, const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv); -THashMap<ui64, TShardInfo> PruneEffectPartitions(const TKqpTableKeys& tableKeys, - const NKqpProto::TKqpPhyTableOperation& operation, const TStageInfo& stageInfo, +THashMap<ui64, TShardInfo> PruneEffectPartitions(const NKqpProto::TKqpPhyTableOperation& operation, const TStageInfo& stageInfo, const NMiniKQL::THolderFactory& holderFactory, const NMiniKQL::TTypeEnvironment& typeEnv); TPhysicalShardReadSettings ExtractReadSettings( diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp index 1a99aff3af..f5f8c1ce9c 100644 --- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp @@ -335,14 +335,14 @@ private: THashMap<ui64, ui64> assignedShardsCount; auto& stage = stageInfo.Meta.GetStage(stageInfo.Id); - const auto& table = GetTableKeys().GetTable(stageInfo.Meta.TableId); - const auto& keyTypes = table.KeyColumnTypes; + const auto& tableInfo = stageInfo.Meta.TableConstInfo; + const auto& keyTypes = tableInfo->KeyColumnTypes; ui32 metaId = 0; for (auto& op : stage.GetTableOps()) { Y_VERIFY_DEBUG(stageInfo.Meta.TablePath == op.GetTable().GetPath()); - auto columns = BuildKqpColumns(op, table); - auto partitions = PrunePartitions(GetTableKeys(), op, stageInfo, HolderFactory(), TypeEnv()); + auto columns = BuildKqpColumns(op, tableInfo); + auto partitions = PrunePartitions(op, stageInfo, HolderFactory(), TypeEnv()); const bool isOlapScan = (op.GetTypeCase() == NKqpProto::TKqpPhyTableOperation::kReadOlapRange); auto readSettings = ExtractReadSettings(op, stageInfo, HolderFactory(), TypeEnv()); @@ -562,7 +562,7 @@ private: YQL_ENSURE(stageInfo.Tasks.size() == 1, "Unexpected multiple tasks in single-partition stage"); } - BuildKqpStageChannels(TasksGraph, GetTableKeys(), stageInfo, TxId, AppData()->EnableKqpSpilling); + BuildKqpStageChannels(TasksGraph, stageInfo, TxId, AppData()->EnableKqpSpilling); } ResponseEv->InitTxResult(tx.Body); diff --git a/ydb/core/kqp/executer_actor/kqp_table_resolver.cpp b/ydb/core/kqp/executer_actor/kqp_table_resolver.cpp index dc4c30d224..dd911d24c1 100644 --- a/ydb/core/kqp/executer_actor/kqp_table_resolver.cpp +++ b/ydb/core/kqp/executer_actor/kqp_table_resolver.cpp @@ -27,151 +27,20 @@ public: TKqpTableResolver(const TActorId& owner, ui64 txId, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, - const TVector<IKqpGateway::TPhysicalTxData>& transactions, TKqpTableKeys& tableKeys, + const TVector<IKqpGateway::TPhysicalTxData>& transactions, TKqpTasksGraph& tasksGraph) : Owner(owner) , TxId(txId) , UserToken(userToken) , Transactions(transactions) - , TableKeys(tableKeys) , TasksGraph(tasksGraph) {} void Bootstrap() { - FillTables(); - ResolveKeys(); Become(&TKqpTableResolver::ResolveKeysState); } private: - static void FillColumn(const NKqpProto::TKqpPhyColumn& phyColumn, TKqpTableKeys::TTable& table) { - if (table.Columns.FindPtr(phyColumn.GetId().GetName())) { - return; - } - - TKqpTableKeys::TColumn column; - column.Id = phyColumn.GetId().GetId(); - - if (phyColumn.GetTypeId() != NScheme::NTypeIds::Pg) { - column.Type = NScheme::TTypeInfo(phyColumn.GetTypeId()); - } else { - column.Type = NScheme::TTypeInfo(phyColumn.GetTypeId(), - NPg::TypeDescFromPgTypeName(phyColumn.GetPgTypeName())); - } - - table.Columns.emplace(phyColumn.GetId().GetName(), std::move(column)); - if (!phyColumn.GetDefaultFromSequence().empty()) { - TString seq = phyColumn.GetDefaultFromSequence(); - if (!seq.StartsWith(table.Path)) { - seq = table.Path + "/" + seq; - } - - table.Sequences.emplace(phyColumn.GetId().GetName(), seq); - } - } - - void FillTable(const NKqpProto::TKqpPhyTable& phyTable) { - auto tableId = MakeTableId(phyTable.GetId()); - - auto table = TableKeys.FindTablePtr(tableId); - if (!table) { - table = &TableKeys.GetOrAddTable(tableId, phyTable.GetId().GetPath()); - - switch (phyTable.GetKind()) { - case NKqpProto::TABLE_KIND_DS: - table->TableKind = ETableKind::Datashard; - break; - case NKqpProto::TABLE_KIND_OLAP: - table->TableKind = ETableKind::Olap; - break; - case NKqpProto::TABLE_KIND_SYS_VIEW: - table->TableKind = ETableKind::SysView; - break; - default: - YQL_ENSURE(false, "Unexpected phy table kind: " << (i64) phyTable.GetKind()); - } - - for (const auto& [_, phyColumn] : phyTable.GetColumns()) { - FillColumn(phyColumn, *table); - } - - YQL_ENSURE(table->KeyColumns.empty()); - table->KeyColumns.reserve(phyTable.KeyColumnsSize()); - YQL_ENSURE(table->KeyColumnTypes.empty()); - table->KeyColumnTypes.reserve(phyTable.KeyColumnsSize()); - for (const auto& keyColumnId : phyTable.GetKeyColumns()) { - const auto& column = table->Columns.FindPtr(keyColumnId.GetName()); - YQL_ENSURE(column); - - table->KeyColumns.push_back(keyColumnId.GetName()); - table->KeyColumnTypes.push_back(column->Type); - } - } else { - for (const auto& [_, phyColumn] : phyTable.GetColumns()) { - FillColumn(phyColumn, *table); - } - } - } - - void FillTables() { - auto addColumn = [](TKqpTableKeys::TTable& table, const TString& columnName) mutable { - auto& sysColumns = GetSystemColumns(); - if (table.Columns.FindPtr(columnName)) { - return; - } - - auto* systemColumn = sysColumns.FindPtr(columnName); - YQL_ENSURE(systemColumn, "Unknown table column" - << ", table: " << table.Path - << ", column: " << columnName); - - TKqpTableKeys::TColumn column; - column.Id = systemColumn->ColumnId; - column.Type = NScheme::TTypeInfo(systemColumn->TypeId); - table.Columns.emplace(columnName, std::move(column)); - }; - - for (auto& tx : Transactions) { - for (const auto& phyTable : tx.Body->GetTables()) { - FillTable(phyTable); - } - - for (auto& stage : tx.Body->GetStages()) { - for (auto& op : stage.GetTableOps()) { - auto& table = TableKeys.GetTable(MakeTableId(op.GetTable())); - for (auto& column : op.GetColumns()) { - addColumn(table, column.GetName()); - } - } - - for (auto& source : stage.GetSources()) { - if (source.HasReadRangesSource()) { - auto& table = TableKeys.GetTable(MakeTableId(source.GetReadRangesSource().GetTable())); - for (auto& column : source.GetReadRangesSource().GetColumns()) { - addColumn(table, column.GetName()); - } - } - } - - for (const auto& input : stage.GetInputs()) { - if (input.GetTypeCase() == NKqpProto::TKqpPhyConnection::kStreamLookup) { - auto& table = TableKeys.GetTable(MakeTableId(input.GetStreamLookup().GetTable())); - for (auto& column : input.GetStreamLookup().GetColumns()) { - addColumn(table, column); - } - } - - if (input.GetTypeCase() == NKqpProto::TKqpPhyConnection::kSequencer) { - auto& table = TableKeys.GetTable(MakeTableId(input.GetSequencer().GetTable())); - for(auto& column: input.GetSequencer().GetColumns()) { - addColumn(table, column); - } - } - } - } - } - } - STATEFN(ResolveKeysState) { switch (ev->GetTypeRewrite()) { hFunc(TEvTxProxySchemeCache::TEvResolveKeySetResult, HandleResolveKeys); @@ -196,26 +65,24 @@ private: } LOG_D("Navigated key sets: " << results.size()); for (auto& entry : results) { - if (!TableRequestIds.erase(entry.TableId)) { + auto iter = TableRequestIds.find(entry.TableId); + if (iter == TableRequestIds.end()) { ReplyErrorAndDie(Ydb::StatusIds::SCHEME_ERROR, YqlIssue({}, NYql::TIssuesIds::KIKIMR_SCHEME_MISMATCH, TStringBuilder() << "Incorrect tableId in reply " << entry.TableId << '.')); return; } + TVector<TStageId> stageIds(std::move(iter->second)); + TableRequestIds.erase(entry.TableId); if (entry.Status != NSchemeCache::TSchemeCacheNavigate::EStatus::Ok) { ReplyErrorAndDie(Ydb::StatusIds::SCHEME_ERROR, YqlIssue({}, NYql::TIssuesIds::KIKIMR_SCHEME_MISMATCH, TStringBuilder() << "Failed to resolve table " << entry.TableId << " keys: " << entry.Status << '.')); return; } - auto* table = TableKeys.FindTablePtr(entry.TableId); - if (!table) { - ReplyErrorAndDie(Ydb::StatusIds::SCHEME_ERROR, - YqlIssue({}, NYql::TIssuesIds::KIKIMR_SCHEME_MISMATCH, TStringBuilder() - << "Incorrect tableId in table keys " << entry.TableId << '.')); - return; + for (auto stageId : stageIds) { + TasksGraph.GetStageInfo(stageId).Meta.ColumnTableInfoPtr = entry.ColumnTableInfo; } - table->ColumnTableInfo = entry.ColumnTableInfo; } NavigationFinished = true; TryFinish(); @@ -242,13 +109,8 @@ private: if (entry.Status != NSchemeCache::TSchemeCacheRequest::EStatus::OkData) { LOG_E("Error resolving keys for entry: " << entry.ToString(*AppData()->TypeRegistry)); - auto* table = TableKeys.FindTablePtr(entry.KeyDescription->TableId); TStringBuilder path; - if (table) { - path << '`' << table->Path << '`'; - } else { - path << "unresolved `" << entry.KeyDescription->TableId << '`'; - } + path << "unresolved `" << entry.KeyDescription->TableId << '`'; timer.reset(); ReplyErrorAndDie(Ydb::StatusIds::SCHEME_ERROR, @@ -295,12 +157,14 @@ private: YQL_ENSURE(stageInfo.Meta.ShardOperations.size() == 1); auto operation = *stageInfo.Meta.ShardOperations.begin(); - const TKqpTableKeys::TTable* table = TableKeys.FindTablePtr(stageInfo.Meta.TableId); - stageInfo.Meta.TableKind = table->TableKind; + const auto& tableInfo = stageInfo.Meta.TableConstInfo; + Y_ENSURE(tableInfo); + stageInfo.Meta.TableKind = tableInfo->TableKind; - stageInfo.Meta.ShardKey = ExtractKey(stageInfo.Meta.TableId, operation); + stageInfo.Meta.ShardKey = ExtractKey(stageInfo.Meta.TableId, stageInfo.Meta.TableConstInfo, operation); - if (stageInfo.Meta.TableKind == ETableKind::Olap && TableRequestIds.emplace(stageInfo.Meta.TableId).second) { + if (stageInfo.Meta.TableKind == ETableKind::Olap && TableRequestIds.find(stageInfo.Meta.TableId) == TableRequestIds.end()) { + TableRequestIds[stageInfo.Meta.TableId].emplace_back(pair.first); auto& entry = requestNavigate->ResultSet.emplace_back(); entry.TableId = stageInfo.Meta.TableId; entry.RequestType = NSchemeCache::TSchemeCacheNavigate::TEntry::ERequestType::ByTableId; @@ -333,11 +197,10 @@ private: } private: - THolder<TKeyDesc> ExtractKey(const TTableId& table, TKeyDesc::ERowOperation operation) { - const auto& tableKey = TableKeys.GetTable(table); - auto range = GetFullRange(tableKey.KeyColumnTypes.size()); + THolder<TKeyDesc> ExtractKey(const TTableId& table, const TIntrusiveConstPtr<TKqpTableKeys::TTableConstInfo>& tableInfo, TKeyDesc::ERowOperation operation) { + auto range = GetFullRange(tableInfo->KeyColumnTypes.size()); - return MakeHolder<TKeyDesc>(table, range.ToTableRange(), operation, tableKey.KeyColumnTypes, + return MakeHolder<TKeyDesc>(table, range.ToTableRange(), operation, tableInfo->KeyColumnTypes, TVector<TKeyDesc::TColumnOp>{}); } @@ -388,8 +251,7 @@ private: const ui64 TxId; TIntrusiveConstPtr<NACLib::TUserToken> UserToken; const TVector<IKqpGateway::TPhysicalTxData>& Transactions; - TKqpTableKeys& TableKeys; - THashSet<TTableId> TableRequestIds; + THashMap<TTableId, TVector<TStageId>> TableRequestIds; bool NavigationFinished = false; bool ResolvingFinished = false; @@ -405,8 +267,8 @@ private: NActors::IActor* CreateKqpTableResolver(const TActorId& owner, ui64 txId, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, - const TVector<IKqpGateway::TPhysicalTxData>& transactions, TKqpTableKeys& tableKeys, TKqpTasksGraph& tasksGraph) { - return new TKqpTableResolver(owner, txId, userToken, transactions, tableKeys, tasksGraph); + const TVector<IKqpGateway::TPhysicalTxData>& transactions, TKqpTasksGraph& tasksGraph) { + return new TKqpTableResolver(owner, txId, userToken, transactions, tasksGraph); } } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/executer_actor/kqp_table_resolver.h b/ydb/core/kqp/executer_actor/kqp_table_resolver.h index 0fbb152a5b..f914cf3430 100644 --- a/ydb/core/kqp/executer_actor/kqp_table_resolver.h +++ b/ydb/core/kqp/executer_actor/kqp_table_resolver.h @@ -6,6 +6,6 @@ namespace NKikimr::NKqp { NActors::IActor* CreateKqpTableResolver(const TActorId& owner, ui64 txId, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, - const TVector<IKqpGateway::TPhysicalTxData>& transactions, TKqpTableKeys& tableKeys, TKqpTasksGraph& tasksGraph); + const TVector<IKqpGateway::TPhysicalTxData>& transactions, TKqpTasksGraph& tasksGraph); } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp index c52c248672..d4f22125ad 100644 --- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp +++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp @@ -103,6 +103,21 @@ void FillKqpTasksGraphStages(TKqpTasksGraph& tasksGraph, const TVector<IKqpGatew meta.TableId = MakeTableId(source.GetReadRangesSource().GetTable()); meta.TablePath = source.GetReadRangesSource().GetTable().GetPath(); meta.ShardOperations.insert(TKeyDesc::ERowOperation::Read); + meta.TableConstInfo = tx.Body->GetTableConstInfoById()->Map.at(meta.TableId); + } + } + + for (const auto& input : stage.GetInputs()) { + if (input.GetTypeCase() == NKqpProto::TKqpPhyConnection::kStreamLookup) { + meta.TableId = MakeTableId(input.GetStreamLookup().GetTable()); + meta.TablePath = input.GetStreamLookup().GetTable().GetPath(); + meta.TableConstInfo = tx.Body->GetTableConstInfoById()->Map.at(meta.TableId); + } + + if (input.GetTypeCase() == NKqpProto::TKqpPhyConnection::kSequencer) { + meta.TableId = MakeTableId(input.GetSequencer().GetTable()); + meta.TablePath = input.GetSequencer().GetTable().GetPath(); + meta.TableConstInfo = tx.Body->GetTableConstInfoById()->Map.at(meta.TableId); } } @@ -120,6 +135,7 @@ void FillKqpTasksGraphStages(TKqpTasksGraph& tasksGraph, const TVector<IKqpGatew stageInfo.Meta.TableId = MakeTableId(op.GetTable()); stageInfo.Meta.TablePath = op.GetTable().GetPath(); stageInfo.Meta.TableKind = ETableKind::Unknown; + stageInfo.Meta.TableConstInfo = tx.Body->GetTableConstInfoById()->Map.at(stageInfo.Meta.TableId); tables.insert(MakeTableId(op.GetTable())); } else { YQL_ENSURE(stageInfo.Meta.TableId == MakeTableId(op.GetTable())); @@ -216,7 +232,7 @@ void BuildMapShardChannels(TKqpTasksGraph& graph, const TStageInfo& stageInfo, u } void BuildShuffleShardChannels(TKqpTasksGraph& graph, const TStageInfo& stageInfo, ui32 inputIndex, - const TStageInfo& inputStageInfo, ui32 outputIndex, const TKqpTableKeys& tableKeys, bool enableSpilling, + const TStageInfo& inputStageInfo, ui32 outputIndex, bool enableSpilling, const TChannelLogFunc& logFunc) { YQL_ENSURE(stageInfo.Meta.ShardKey); @@ -224,14 +240,14 @@ void BuildShuffleShardChannels(TKqpTasksGraph& graph, const TStageInfo& stageInf for (auto& partition : stageInfo.Meta.ShardKey->GetPartitions()) { partitionsMap[partition.ShardId] = &partition; } - - auto table = tableKeys.GetTable(stageInfo.Meta.TableId); + + const auto& tableInfo = stageInfo.Meta.TableConstInfo; for (auto& originTaskId : inputStageInfo.Tasks) { auto& originTask = graph.GetTask(originTaskId); auto& taskOutput = originTask.Outputs[outputIndex]; taskOutput.Type = TKqpTaskOutputType::ShardRangePartition; - taskOutput.KeyColumns = table.KeyColumns; + taskOutput.KeyColumns = tableInfo->KeyColumns; for (auto& targetTaskId : stageInfo.Tasks) { auto& targetTask = graph.GetTask(targetTaskId); @@ -258,7 +274,7 @@ void BuildShuffleShardChannels(TKqpTasksGraph& graph, const TStageInfo& stageInf } void BuildSequencerChannels(TKqpTasksGraph& graph, const TStageInfo& stageInfo, ui32 inputIndex, - const TStageInfo& inputStageInfo, ui32 outputIndex, const TKqpTableKeys& tableKeys, + const TStageInfo& inputStageInfo, ui32 outputIndex, const NKqpProto::TKqpPhyCnSequencer& sequencer, bool enableSpilling, const TChannelLogFunc& logFunc) { YQL_ENSURE(stageInfo.Tasks.size() == inputStageInfo.Tasks.size()); @@ -267,12 +283,12 @@ void BuildSequencerChannels(TKqpTasksGraph& graph, const TStageInfo& stageInfo, settings->MutableTable()->CopyFrom(sequencer.GetTable()); settings->SetDatabase(graph.GetMeta().Database); - auto table = tableKeys.GetTable(MakeTableId(sequencer.GetTable())); + const auto& tableInfo = stageInfo.Meta.TableConstInfo; THashSet<TString> autoIncrementColumns(sequencer.GetAutoIncrementColumns().begin(), sequencer.GetAutoIncrementColumns().end()); for(const auto& column: sequencer.GetColumns()) { - auto columnIt = table.Columns.find(column); - YQL_ENSURE(columnIt != table.Columns.end(), "Unknown column: " << column); + auto columnIt = tableInfo->Columns.find(column); + YQL_ENSURE(columnIt != tableInfo->Columns.end(), "Unknown column: " << column); const auto& columnInfo = columnIt->second; auto* columnProto = settings->AddColumns(); @@ -287,8 +303,8 @@ void BuildSequencerChannels(TKqpTasksGraph& graph, const TStageInfo& stageInfo, auto aic = autoIncrementColumns.find(column); if (aic != autoIncrementColumns.end()) { - auto sequenceIt = table.Sequences.find(column); - YQL_ENSURE(sequenceIt != table.Sequences.end()); + auto sequenceIt = tableInfo->Sequences.find(column); + YQL_ENSURE(sequenceIt != tableInfo->Sequences.end()); columnProto->SetDefaultFromSequence(sequenceIt->second); } } @@ -323,7 +339,7 @@ void BuildSequencerChannels(TKqpTasksGraph& graph, const TStageInfo& stageInfo, } void BuildStreamLookupChannels(TKqpTasksGraph& graph, const TStageInfo& stageInfo, ui32 inputIndex, - const TStageInfo& inputStageInfo, ui32 outputIndex, const TKqpTableKeys& tableKeys, + const TStageInfo& inputStageInfo, ui32 outputIndex, const NKqpProto::TKqpPhyCnStreamLookup& streamLookup, bool enableSpilling, const TChannelLogFunc& logFunc) { YQL_ENSURE(stageInfo.Tasks.size() == inputStageInfo.Tasks.size()); @@ -332,10 +348,10 @@ void BuildStreamLookupChannels(TKqpTasksGraph& graph, const TStageInfo& stageInf settings->MutableTable()->CopyFrom(streamLookup.GetTable()); - auto table = tableKeys.GetTable(MakeTableId(streamLookup.GetTable())); - for (const auto& keyColumn : table.KeyColumns) { - auto columnIt = table.Columns.find(keyColumn); - YQL_ENSURE(columnIt != table.Columns.end(), "Unknown column: " << keyColumn); + const auto& tableInfo = stageInfo.Meta.TableConstInfo; + for (const auto& keyColumn : tableInfo->KeyColumns) { + auto columnIt = tableInfo->Columns.find(keyColumn); + YQL_ENSURE(columnIt != tableInfo->Columns.end(), "Unknown column: " << keyColumn); auto* keyColumnProto = settings->AddKeyColumns(); keyColumnProto->SetName(keyColumn); @@ -344,14 +360,14 @@ void BuildStreamLookupChannels(TKqpTasksGraph& graph, const TStageInfo& stageInf } for (const auto& keyColumn : streamLookup.GetKeyColumns()) { - auto columnIt = table.Columns.find(keyColumn); - YQL_ENSURE(columnIt != table.Columns.end(), "Unknown column: " << keyColumn); + auto columnIt = tableInfo->Columns.find(keyColumn); + YQL_ENSURE(columnIt != tableInfo->Columns.end(), "Unknown column: " << keyColumn); settings->AddLookupKeyColumns(keyColumn); } for (const auto& column : streamLookup.GetColumns()) { - auto columnIt = table.Columns.find(column); - YQL_ENSURE(columnIt != table.Columns.end(), "Unknown column: " << column); + auto columnIt = tableInfo->Columns.find(column); + YQL_ENSURE(columnIt != tableInfo->Columns.end(), "Unknown column: " << column); auto* columnProto = settings->AddColumns(); columnProto->SetName(column); @@ -388,7 +404,7 @@ void BuildStreamLookupChannels(TKqpTasksGraph& graph, const TStageInfo& stageInf } } -void BuildKqpStageChannels(TKqpTasksGraph& tasksGraph, const TKqpTableKeys& tableKeys, const TStageInfo& stageInfo, +void BuildKqpStageChannels(TKqpTasksGraph& tasksGraph, const TStageInfo& stageInfo, ui64 txId, bool enableSpilling) { auto& stage = stageInfo.Meta.GetStage(stageInfo.Id); @@ -433,7 +449,7 @@ void BuildKqpStageChannels(TKqpTasksGraph& tasksGraph, const TKqpTableKeys& tabl BuildMapShardChannels(tasksGraph, stageInfo, inputIdx, inputStageInfo, outputIdx, enableSpilling, log); break; case NKqpProto::TKqpPhyConnection::kShuffleShard: - BuildShuffleShardChannels(tasksGraph, stageInfo, inputIdx, inputStageInfo, outputIdx, tableKeys, + BuildShuffleShardChannels(tasksGraph, stageInfo, inputIdx, inputStageInfo, outputIdx, enableSpilling, log); break; case NKqpProto::TKqpPhyConnection::kMerge: { @@ -450,13 +466,13 @@ void BuildKqpStageChannels(TKqpTasksGraph& tasksGraph, const TKqpTableKeys& tabl break; } case NKqpProto::TKqpPhyConnection::kSequencer: { - BuildSequencerChannels(tasksGraph, stageInfo, inputIdx, inputStageInfo, outputIdx, tableKeys, + BuildSequencerChannels(tasksGraph, stageInfo, inputIdx, inputStageInfo, outputIdx, input.GetSequencer(), enableSpilling, log); break; } case NKqpProto::TKqpPhyConnection::kStreamLookup: { - BuildStreamLookupChannels(tasksGraph, stageInfo, inputIdx, inputStageInfo, outputIdx, tableKeys, + BuildStreamLookupChannels(tasksGraph, stageInfo, inputIdx, inputStageInfo, outputIdx, input.GetStreamLookup(), enableSpilling, log); break; } @@ -753,7 +769,7 @@ void FillTableMeta(const TStageInfo& stageInfo, NKikimrTxDataShard::TKqpTransact meta->SetTableKind((ui32)stageInfo.Meta.TableKind); } -void FillTaskMeta(const TStageInfo& stageInfo, const TTask& task, const TKqpTableKeys& tableKeys, NYql::NDqProto::TDqTask& taskDesc) { +void FillTaskMeta(const TStageInfo& stageInfo, const TTask& task, NYql::NDqProto::TDqTask& taskDesc) { if (task.Meta.ShardId && (task.Meta.Reads || task.Meta.Writes)) { NKikimrTxDataShard::TKqpTransaction::TDataTaskMeta protoTaskMeta; @@ -806,9 +822,10 @@ void FillTaskMeta(const TStageInfo& stageInfo, const TTask& task, const TKqpTabl FillTableMeta(stageInfo, protoTaskMeta.MutableTable()); - const auto& tableInfo = tableKeys.GetTable(stageInfo.Meta.TableId); - for (const auto& keyColumnName : tableInfo.KeyColumns) { - const auto& keyColumn = tableInfo.Columns.at(keyColumnName); + const auto& tableInfo = stageInfo.Meta.TableConstInfo; + + for (const auto& keyColumnName : tableInfo->KeyColumns) { + const auto& keyColumn = tableInfo->Columns.at(keyColumnName); auto columnType = NScheme::ProtoColumnTypeFromTypeInfoMod(keyColumn.Type, keyColumn.TypeMod); protoTaskMeta.AddKeyColumnTypes(columnType.TypeId); if (columnType.TypeInfo) { @@ -820,7 +837,7 @@ void FillTaskMeta(const TStageInfo& stageInfo, const TTask& task, const TKqpTabl protoTaskMeta.AddSkipNullKeys(skipNullKey); } - switch (tableInfo.TableKind) { + switch (tableInfo->TableKind) { case ETableKind::Unknown: case ETableKind::SysView: { protoTaskMeta.SetDataFormat(NKikimrTxDataShard::EScanDataFormat::CELLVEC); @@ -860,7 +877,7 @@ void FillTaskMeta(const TStageInfo& stageInfo, const TTask& task, const TKqpTabl } } - if (tableInfo.TableKind == ETableKind::Olap) { + if (tableInfo->TableKind == ETableKind::Olap) { auto* olapProgram = protoTaskMeta.MutableOlapProgram(); olapProgram->SetProgram(task.Meta.ReadInfo.OlapProgram.Program); @@ -1070,7 +1087,7 @@ void SerializeTaskToProto(const TKqpTasksGraph& tasksGraph, const TTask& task, N } } - FillTaskMeta(stageInfo, task, tasksGraph.GetMeta().TableKeys, *result); + FillTaskMeta(stageInfo, task, *result); } NYql::NDqProto::TDqTask* ArenaSerializeTaskToProto(TKqpTasksGraph& tasksGraph, const TTask& task) { diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h index 768a1a7858..f7237c3c79 100644 --- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h +++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h @@ -32,6 +32,8 @@ struct TStageInfoMeta { TTableId TableId; TString TablePath; ETableKind TableKind; + TIntrusiveConstPtr<TKqpTableKeys::TTableConstInfo> TableConstInfo; + TIntrusiveConstPtr<NKikimr::NSchemeCache::TSchemeCacheNavigate::TColumnTableInfo> ColumnTableInfoPtr; TVector<bool> SkipNullKeys; @@ -84,7 +86,6 @@ struct TStageInfoMeta { // things which are common for all tasks in the graph. struct TGraphMeta { - TKqpTableKeys TableKeys; IKqpGateway::TKqpSnapshot Snapshot; TMaybe<ui64> LockTxId; std::unordered_map<ui64, TActorId> ResultChannelProxies; @@ -243,7 +244,7 @@ using TKqpTasksGraph = NYql::NDq::TDqTasksGraph<TGraphMeta, TStageInfoMeta, TTas void FillKqpTasksGraphStages(TKqpTasksGraph& tasksGraph, const TVector<IKqpGateway::TPhysicalTxData>& txs); void BuildKqpTaskGraphResultChannels(TKqpTasksGraph& tasksGraph, const TKqpPhyTxHolder::TConstPtr& tx, ui64 txIdx); -void BuildKqpStageChannels(TKqpTasksGraph& tasksGraph, const TKqpTableKeys& tableKeys, const TStageInfo& stageInfo, +void BuildKqpStageChannels(TKqpTasksGraph& tasksGraph, const TStageInfo& stageInfo, ui64 txId, bool enableSpilling); NYql::NDqProto::TDqTask* ArenaSerializeTaskToProto(TKqpTasksGraph& tasksGraph, const TTask& task); @@ -252,13 +253,14 @@ void FillTableMeta(const TStageInfo& stageInfo, NKikimrTxDataShard::TKqpTransact void FillChannelDesc(const TKqpTasksGraph& tasksGraph, NYql::NDqProto::TChannel& channelDesc, const NYql::NDq::TChannel& channel); template<typename Proto> -TVector<TTaskMeta::TColumn> BuildKqpColumns(const Proto& op, const TKqpTableKeys::TTable& table) { +TVector<TTaskMeta::TColumn> BuildKqpColumns(const Proto& op, TIntrusiveConstPtr<TKqpTableKeys::TTableConstInfo> tableInfo) { TVector<TTaskMeta::TColumn> columns; columns.reserve(op.GetColumns().size()); for (const auto& column : op.GetColumns()) { TTaskMeta::TColumn c; - const auto& tableColumn = table.Columns.at(column.GetName()); + + const auto& tableColumn = tableInfo->Columns.at(column.GetName()); c.Id = column.GetId(); c.Type = tableColumn.Type; c.TypeMod = tableColumn.TypeMod; diff --git a/ydb/core/kqp/query_data/kqp_prepared_query.cpp b/ydb/core/kqp/query_data/kqp_prepared_query.cpp index 93d7141299..f7b7f07fcb 100644 --- a/ydb/core/kqp/query_data/kqp_prepared_query.cpp +++ b/ydb/core/kqp/query_data/kqp_prepared_query.cpp @@ -52,11 +52,12 @@ public: }; TKqpPhyTxHolder::TKqpPhyTxHolder(const std::shared_ptr<const NKikimrKqp::TPreparedQuery>& pq, - const NKqpProto::TKqpPhyTx* proto, const std::shared_ptr<TPreparedQueryAllocHolder>& alloc) + const NKqpProto::TKqpPhyTx* proto, const std::shared_ptr<TPreparedQueryAllocHolder>& alloc, TIntrusivePtr<TTableConstInfoMap> tableConstInfoById) : PreparedQuery(pq) , Proto(proto) , LiteralTx(CalcIsLiteralTx(proto)) , Alloc(alloc) + , TableConstInfoById(tableConstInfoById) { TxResultsMeta.resize(Proto->GetResults().size()); for (auto&& i : Proto->GetStages()) { @@ -104,13 +105,23 @@ TPreparedQueryHolder::TPreparedQueryHolder(NKikimrKqp::TPreparedQuery* proto, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry) : Proto(proto) , Alloc(std::move(std::make_shared<TPreparedQueryAllocHolder>(functionRegistry))) + , TableConstInfoById(MakeIntrusive<TTableConstInfoMap>()) { THashSet<TString> tablesSet; const auto& phyQuery = Proto->GetPhysicalQuery(); Transactions.reserve(phyQuery.TransactionsSize()); + + // Init TableConstInfoById + for (const auto& phyTx: phyQuery.GetTransactions()) { + for (const auto& phyTable : phyTx.GetTables()) { + FillTable(phyTable); + } + FillTables(phyTx.GetStages()); + } + for (const auto& phyTx: phyQuery.GetTransactions()) { TKqpPhyTxHolder::TConstPtr txHolder = std::make_shared<const TKqpPhyTxHolder>( - Proto, &phyTx, Alloc); + Proto, &phyTx, Alloc, TableConstInfoById); Transactions.emplace_back(std::move(txHolder)); for (const auto& stage: phyTx.GetStages()) { @@ -139,6 +150,57 @@ TPreparedQueryHolder::TPreparedQueryHolder(NKikimrKqp::TPreparedQuery* proto, QueryTables = TVector<TString>(tablesSet.begin(), tablesSet.end()); } +void TPreparedQueryHolder::FillTable(const NKqpProto::TKqpPhyTable& phyTable) { + auto tableId = MakeTableId(phyTable.GetId()); + + auto infoPtr = TableConstInfoById->Map.FindPtr(tableId); + if (!infoPtr) { + auto infoPtr = MakeIntrusive<TTableConstInfo>(phyTable.GetId().GetPath()); + TableConstInfoById->Map[tableId] = infoPtr; + infoPtr->FillTable(phyTable); + } else { + for (const auto& [_, phyColumn] : phyTable.GetColumns()) { + (*infoPtr)->FillColumn(phyColumn); + } + } +} + +void TPreparedQueryHolder::FillTables(const google::protobuf::RepeatedPtrField< ::NKqpProto::TKqpPhyStage>& stages) { + for (auto& stage : stages) { + for (auto& op : stage.GetTableOps()) { + auto& info = GetInfo(MakeTableId(op.GetTable())); + for (auto& column : op.GetColumns()) { + info->AddColumn(column.GetName()); + } + } + + for (auto& source : stage.GetSources()) { + if (source.HasReadRangesSource()) { + auto& info = GetInfo(MakeTableId(source.GetReadRangesSource().GetTable())); + for (auto& column : source.GetReadRangesSource().GetColumns()) { + info->AddColumn(column.GetName()); + } + } + } + + for (const auto& input : stage.GetInputs()) { + if (input.GetTypeCase() == NKqpProto::TKqpPhyConnection::kStreamLookup) { + auto& info = GetInfo(MakeTableId(input.GetStreamLookup().GetTable())); + for (auto& column : input.GetStreamLookup().GetColumns()) { + info->AddColumn(column); + } + } + + if (input.GetTypeCase() == NKqpProto::TKqpPhyConnection::kSequencer) { + auto& info = GetInfo(MakeTableId(input.GetSequencer().GetTable())); + for(auto& column: input.GetSequencer().GetColumns()) { + info->AddColumn(column); + } + } + } + } +} + const TKqpPhyTxHolder::TConstPtr& TPreparedQueryHolder::GetPhyTx(ui32 txId) const { YQL_ENSURE(txId < Transactions.size()); return Transactions[txId]; diff --git a/ydb/core/kqp/query_data/kqp_prepared_query.h b/ydb/core/kqp/query_data/kqp_prepared_query.h index d1d190dbac..b1d07486da 100644 --- a/ydb/core/kqp/query_data/kqp_prepared_query.h +++ b/ydb/core/kqp/query_data/kqp_prepared_query.h @@ -1,5 +1,6 @@ #pragma once +#include <ydb/core/kqp/common/kqp_resolve.h> #include <ydb/core/kqp/query_data/kqp_predictor.h> #include <ydb/core/kqp/provider/yql_kikimr_settings.h> #include <ydb/core/protos/kqp.pb.h> @@ -31,6 +32,10 @@ struct TPhyTxResultMetadata { TVector<ui32> ColumnOrder; }; +struct TTableConstInfoMap : public TAtomicRefCount<TTableConstInfoMap> { + THashMap<TTableId, TIntrusivePtr<TKqpTableKeys::TTableConstInfo>> Map; +}; + class TKqpPhyTxHolder { std::shared_ptr<const NKikimrKqp::TPreparedQuery> PreparedQuery; const NKqpProto::TKqpPhyTx* Proto; @@ -38,6 +43,8 @@ class TKqpPhyTxHolder { TVector<TPhyTxResultMetadata> TxResultsMeta; std::shared_ptr<TPreparedQueryAllocHolder> Alloc; std::vector<TStagePredictor> Predictors; + TIntrusivePtr<TTableConstInfoMap> TableConstInfoById; + public: using TConstPtr = std::shared_ptr<const TKqpPhyTxHolder>; @@ -97,8 +104,13 @@ public: return Proto->ShortDebugString(); } + TIntrusiveConstPtr<TTableConstInfoMap> GetTableConstInfoById() const { + return TableConstInfoById; + } + + TKqpPhyTxHolder(const std::shared_ptr<const NKikimrKqp::TPreparedQuery>& pq, const NKqpProto::TKqpPhyTx* proto, - const std::shared_ptr<TPreparedQueryAllocHolder>& alloc); + const std::shared_ptr<TPreparedQueryAllocHolder>& alloc, TIntrusivePtr<TTableConstInfoMap> tableConstInfoById); bool IsLiteralTx() const; }; @@ -115,11 +127,14 @@ public: class TPreparedQueryHolder { private: + using TTableConstInfo = TKqpTableKeys::TTableConstInfo; + YDB_ACCESSOR_DEF(TLlvmSettings, LlvmSettings); std::shared_ptr<const NKikimrKqp::TPreparedQuery> Proto; std::shared_ptr<TPreparedQueryAllocHolder> Alloc; TVector<TString> QueryTables; std::vector<TKqpPhyTxHolder::TConstPtr> Transactions; + TIntrusivePtr<TTableConstInfoMap> TableConstInfoById; public: @@ -164,6 +179,20 @@ public: const NKqpProto::TKqpPhyQuery& GetPhysicalQuery() const { return Proto->GetPhysicalQuery(); } + + TIntrusivePtr<TTableConstInfo>& GetInfo(const TTableId& tableId) { + auto info = TableConstInfoById->Map.FindPtr(tableId); + MKQL_ENSURE_S(info); + return *info; + } + + const THashMap<TTableId, TIntrusivePtr<TTableConstInfo>>& GetTableConstInfo() const { + return TableConstInfoById->Map; + } + + void FillTable(const NKqpProto::TKqpPhyTable& phyTable); + + void FillTables(const google::protobuf::RepeatedPtrField< ::NKqpProto::TKqpPhyStage>& stages); }; |