diff options
author | spuchin <spuchin@ydb.tech> | 2023-01-15 18:44:49 +0300 |
---|---|---|
committer | spuchin <spuchin@ydb.tech> | 2023-01-15 18:44:49 +0300 |
commit | 8a749596d40e91c896a1907afcd108d9221fbde1 (patch) | |
tree | 1bbbbe2eb13a8fc5924b785e957c605e67251e53 | |
parent | 306ddfa76ad06b2689bf0c8e7cf9de22a21c5594 (diff) | |
download | ydb-8a749596d40e91c896a1907afcd108d9221fbde1.tar.gz |
Remove table resolve step via scheme board cache in KQP executer. ()
-rw-r--r-- | ydb/core/kqp/common/kqp_prepared_query.h | 4 | ||||
-rw-r--r-- | ydb/core/kqp/common/kqp_resolve.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/common/kqp_resolve.h | 20 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_table_resolver.cpp | 229 | ||||
-rw-r--r-- | ydb/core/kqp/query_compiler/kqp_query_compiler.cpp | 123 | ||||
-rw-r--r-- | ydb/core/protos/kqp.proto | 2 | ||||
-rw-r--r-- | ydb/core/protos/kqp_physical.proto | 35 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_kqp_errors.cpp | 12 |
8 files changed, 248 insertions, 179 deletions
diff --git a/ydb/core/kqp/common/kqp_prepared_query.h b/ydb/core/kqp/common/kqp_prepared_query.h index a3004073f9..9de89807e7 100644 --- a/ydb/core/kqp/common/kqp_prepared_query.h +++ b/ydb/core/kqp/common/kqp_prepared_query.h @@ -78,6 +78,10 @@ public: return Proto->GetParamBindings(); } + const google::protobuf::RepeatedPtrField< ::NKqpProto::TKqpPhyTable>& GetTables() const { + return Proto->GetTables(); + } + TProtoStringType DebugString() const { return Proto->ShortDebugString(); } diff --git a/ydb/core/kqp/common/kqp_resolve.cpp b/ydb/core/kqp/common/kqp_resolve.cpp index 64dac31121..482a0e419c 100644 --- a/ydb/core/kqp/common/kqp_resolve.cpp +++ b/ydb/core/kqp/common/kqp_resolve.cpp @@ -38,7 +38,7 @@ TTableId MakeTableId(const TKqpTable& node) { return tableId; } -TTableId MakeTableId(const NKqpProto::TKqpPhyTable& table) { +TTableId MakeTableId(const NKqpProto::TKqpPhyTableId& table) { TTableId tableId; tableId.PathId = TPathId(table.GetOwnerId(), table.GetTableId()); tableId.SysViewInfo = table.GetSysView(); diff --git a/ydb/core/kqp/common/kqp_resolve.h b/ydb/core/kqp/common/kqp_resolve.h index f2f34e2f8a..7d9c3e3b2d 100644 --- a/ydb/core/kqp/common/kqp_resolve.h +++ b/ydb/core/kqp/common/kqp_resolve.h @@ -42,6 +42,18 @@ public: return TablesById.FindPtr(id); } + TTable& GetTable(const TTableId& id) { + auto table = TablesById.FindPtr(id); + MKQL_ENSURE_S(table); + return *table; + } + + const TTable& GetTable(const TTableId& id) const { + auto table = TablesById.FindPtr(id); + MKQL_ENSURE_S(table); + return *table; + } + TTable& GetOrAddTable(const TTableId& id, const TStringBuf path) { auto& table = TablesById[id]; @@ -54,12 +66,6 @@ public: return table; } - const TTable& GetTable(const TTableId& id) const { - auto table = TablesById.FindPtr(id); - MKQL_ENSURE_S(table); - return *table; - } - size_t Size() const { return TablesById.size(); } @@ -133,7 +139,7 @@ void SortPartitions(TList& partitions, const TVector<NScheme::TTypeInfo>& keyCol } TTableId MakeTableId(const NYql::NNodes::TKqpTable& node); -TTableId MakeTableId(const NKqpProto::TKqpPhyTable& table); +TTableId MakeTableId(const NKqpProto::TKqpPhyTableId& table); } // namespace NKqp } // namespace NKikimr diff --git a/ydb/core/kqp/executer_actor/kqp_table_resolver.cpp b/ydb/core/kqp/executer_actor/kqp_table_resolver.cpp index 513582a6d6..6cdcf4bd38 100644 --- a/ydb/core/kqp/executer_actor/kqp_table_resolver.cpp +++ b/ydb/core/kqp/executer_actor/kqp_table_resolver.cpp @@ -36,125 +36,126 @@ public: , TasksGraph(tasksGraph) {} void Bootstrap() { - ResolveTables(); - Become(&TKqpTableResolver::ResolveTablesState); + FillTables(); + + ResolveKeys(); + Become(&TKqpTableResolver::ResolveKeysState); } private: - STATEFN(ResolveTablesState) { - switch (ev->GetTypeRewrite()) { - hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleResolveTables); - hFunc(TEvents::TEvPoison, HandleResolveTables); - default: - UnexpectedEvent("ResolveTablesState", ev->GetTypeRewrite()); + static void FillColumn(const NKqpProto::TKqpPhyColumn& phyColumn, TKqpTableKeys::TTable& table) { + if (table.Columns.FindPtr(phyColumn.GetId().GetName())) { + return; } + + TKqpTableKeys::TColumn column; + column.Id = phyColumn.GetId().GetId(); + + if (phyColumn.GetTypeId() != NScheme::NTypeIds::Pg) { + column.Type = NScheme::TTypeInfo(phyColumn.GetTypeId()); + } else { + column.Type = NScheme::TTypeInfo(phyColumn.GetTypeId(), + NPg::TypeDescFromPgTypeName(phyColumn.GetPgTypeName())); + } + + table.Columns.emplace(phyColumn.GetId().GetName(), std::move(column)); } - void HandleResolveTables(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) { - auto timer = std::make_unique<NCpuTime::TCpuTimer>(CpuTime); + void FillTable(const NKqpProto::TKqpPhyTable& phyTable) { + auto tableId = MakeTableId(phyTable.GetId()); - const auto& entries = ev->Get()->Request->ResultSet; - LOG_D("Resolved tables: " << entries.size()); - YQL_ENSURE(entries.size() == TableKeys.Size()); + auto table = TableKeys.FindTablePtr(tableId); + if (!table) { + table = &TableKeys.GetOrAddTable(tableId, phyTable.GetId().GetPath()); - for (auto& entry : entries) { - switch (entry.Status) { - case NSchemeCache::TSchemeCacheNavigate::EStatus::Ok: + switch (phyTable.GetKind()) { + case NKqpProto::TABLE_KIND_DS: + table->TableKind = ETableKind::Datashard; + break; + case NKqpProto::TABLE_KIND_OLAP: + table->TableKind = ETableKind::Olap; break; + case NKqpProto::TABLE_KIND_SYS_VIEW: + table->TableKind = ETableKind::SysView; + break; + default: + YQL_ENSURE(false, "Unexpected phy table kind: " << (i64) phyTable.GetKind()); + } - case NSchemeCache::TSchemeCacheNavigate::EStatus::PathErrorUnknown: - case NSchemeCache::TSchemeCacheNavigate::EStatus::PathNotTable: - case NSchemeCache::TSchemeCacheNavigate::EStatus::TableCreationNotComplete: - ReplyErrorAndDie(Ydb::StatusIds::SCHEME_ERROR, - YqlIssue({}, NYql::TIssuesIds::KIKIMR_SCHEME_MISMATCH, TStringBuilder() - << "Table scheme error `" << JoinPath(entry.Path) << "`: " << entry.Status << '.')); - return; + for (const auto& [_, phyColumn] : phyTable.GetColumns()) { + FillColumn(phyColumn, *table); + } - default: - ReplyErrorAndDie(Ydb::StatusIds::UNAVAILABLE, - YqlIssue({}, NYql::TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE, TStringBuilder() - << "Failed to resolve table `" << JoinPath(entry.Path) << "`: " << entry.Status << '.')); - return; + YQL_ENSURE(table->KeyColumns.empty()); + table->KeyColumns.reserve(phyTable.KeyColumnsSize()); + YQL_ENSURE(table->KeyColumnTypes.empty()); + table->KeyColumnTypes.reserve(phyTable.KeyColumnsSize()); + for (const auto& keyColumnId : phyTable.GetKeyColumns()) { + const auto& column = table->Columns.FindPtr(keyColumnId.GetName()); + YQL_ENSURE(column); + table->KeyColumns.push_back(keyColumnId.GetName()); + table->KeyColumnTypes.push_back(column->Type); + } + } else { + for (const auto& [_, phyColumn] : phyTable.GetColumns()) { + FillColumn(phyColumn, *table); } + } + } - auto* table = TableKeys.FindTablePtr(entry.TableId); - if (!table) { - timer.reset(); - ReplyErrorAndDie(Ydb::StatusIds::UNAVAILABLE, - YqlIssue({}, NYql::TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE, TStringBuilder() - << "Unresolved table `" << JoinPath(entry.Path) << "` with tableId: " << entry.TableId << ".")); + void FillTables() { + auto addColumn = [](TKqpTableKeys::TTable& table, const TString& columnName) mutable { + auto& sysColumns = GetSystemColumns(); + if (table.Columns.FindPtr(columnName)) { return; } - if (entry.Kind == NSchemeCache::TSchemeCacheNavigate::KindColumnTable) { - YQL_ENSURE(entry.ColumnTableInfo || entry.OlapStoreInfo); - // NOTE: entry.SysViewInfo might not be empty for OLAP stats virtual tables - table->TableKind = ETableKind::Olap; - } else if (entry.TableId.IsSystemView()) { - table->TableKind = ETableKind::SysView; - } else { - table->TableKind = ETableKind::Datashard; - } + auto* systemColumn = sysColumns.FindPtr(columnName); + YQL_ENSURE(systemColumn, "Unknown table column" + << ", table: " << table.Path + << ", column: " << columnName); - // TODO: Resolve columns by id - TMap<TStringBuf, ui32> columnsMap; - for (auto& [columnId, column] : entry.Columns) { - auto ret = columnsMap.emplace(column.Name, columnId); - YQL_ENSURE(ret.second, "" << column.Name); + TKqpTableKeys::TColumn column; + column.Id = systemColumn->ColumnId; + column.Type = NScheme::TTypeInfo(systemColumn->TypeId); + table.Columns.emplace(columnName, std::move(column)); + }; - if (column.KeyOrder >= 0) { - table->Columns.emplace(column.Name, TKqpTableKeys::TColumn()); + for (auto& tx : Transactions) { + for (const auto& phyTable : tx.Body->GetTables()) { + FillTable(phyTable); + } - if (table->KeyColumns.size() <= (ui32) column.KeyOrder) { - table->KeyColumns.resize(column.KeyOrder + 1); - table->KeyColumnTypes.resize(column.KeyOrder + 1); + for (auto& stage : tx.Body->GetStages()) { + for (auto& op : stage.GetTableOps()) { + auto& table = TableKeys.GetTable(MakeTableId(op.GetTable())); + for (auto& column : op.GetColumns()) { + addColumn(table, column.GetName()); } - table->KeyColumns[column.KeyOrder] = column.Name; - table->KeyColumnTypes[column.KeyOrder] = column.PType; } - } - - for (auto& keyColumn : table->KeyColumns) { - YQL_ENSURE(!keyColumn.empty()); - } - auto& sysColumns = GetSystemColumns(); - for (auto& [columnName, columnKey] : table->Columns) { - if (auto* systemColumn = sysColumns.FindPtr(columnName)) { - columnKey.Id = systemColumn->ColumnId; - columnKey.Type = NScheme::TTypeInfo(systemColumn->TypeId); - continue; + for (auto& source : stage.GetSources()) { + if (source.HasReadRangesSource()) { + auto& table = TableKeys.GetTable(MakeTableId(source.GetReadRangesSource().GetTable())); + for (auto& column : source.GetReadRangesSource().GetColumns()) { + addColumn(table, column.GetName()); + } + } } - auto* columnId = columnsMap.FindPtr(columnName); - if (!columnId) { - timer.reset(); - ReplyErrorAndDie(Ydb::StatusIds::SCHEME_ERROR, - YqlIssue({}, NYql::TIssuesIds::KIKIMR_SCHEME_MISMATCH, TStringBuilder() - << "Unknown column `" << columnName << "` at table `" << JoinPath(entry.Path) << "`.")); - return; + for (const auto& input : stage.GetInputs()) { + if (input.GetTypeCase() == NKqpProto::TKqpPhyConnection::kStreamLookup) { + auto& table = TableKeys.GetTable(MakeTableId(input.GetStreamLookup().GetTable())); + for (auto& column : input.GetStreamLookup().GetColumns()) { + addColumn(table, column); + } + } } - - auto* column = entry.Columns.FindPtr(*columnId); - YQL_ENSURE(column); - - columnKey.Id = column->Id; - columnKey.Type = column->PType; } } - - ResolveKeys(); - timer.reset(); - - Become(&TKqpTableResolver::ResolveKeysState); } - void HandleResolveTables(TEvents::TEvPoison::TPtr&) { - PassAway(); - } - -private: STATEFN(ResolveKeysState) { switch (ev->GetTypeRewrite()) { hFunc(TEvTxProxySchemeCache::TEvResolveKeySetResult, HandleResolveKeys); @@ -227,56 +228,6 @@ private: } private: - // TODO: Get rid of ResolveTables & TableKeys, get table information from phy tx proto. - void ResolveTables() { - for (auto& tx : Transactions) { - for (auto& stage : tx.Body->GetStages()) { - for (auto& op : stage.GetTableOps()) { - auto& table = TableKeys.GetOrAddTable(MakeTableId(op.GetTable()), op.GetTable().GetPath()); - for (auto& column : op.GetColumns()) { - table.Columns.emplace(column.GetName(), TKqpTableKeys::TColumn()); - } - } - - for (auto& source : stage.GetSources()) { - if (source.HasReadRangesSource()) { - auto& table = TableKeys.GetOrAddTable( - MakeTableId(source.GetReadRangesSource().GetTable()), - source.GetReadRangesSource().GetTable().GetPath()); - for (auto& column : source.GetReadRangesSource().GetColumns()) { - table.Columns.emplace(column.GetName(), TKqpTableKeys::TColumn()); - } - } - } - - for (const auto& input : stage.GetInputs()) { - if (input.GetTypeCase() == NKqpProto::TKqpPhyConnection::kStreamLookup) { - const auto& streamLookup = input.GetStreamLookup(); - auto& table = TableKeys.GetOrAddTable(MakeTableId(streamLookup.GetTable()), streamLookup.GetTable().GetPath()); - for (auto& column : input.GetStreamLookup().GetColumns()) { - table.Columns.emplace(column, TKqpTableKeys::TColumn()); - } - } - } - } - } - - auto request = MakeHolder<NSchemeCache::TSchemeCacheNavigate>(); - request->ResultSet.reserve(TableKeys.Size()); - for (auto& [tableId, table] : TableKeys.Get()) { - NSchemeCache::TSchemeCacheNavigate::TEntry entry; - entry.RequestType = NSchemeCache::TSchemeCacheNavigate::TEntry::ERequestType::ByTableId; - entry.TableId = tableId; - entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpTable; - entry.ShowPrivatePath = true; - - request->ResultSet.emplace_back(std::move(entry)); - } - - auto ev = MakeHolder<TEvTxProxySchemeCache::TEvNavigateKeySet>(request.Release()); - Send(MakeSchemeCacheID(), ev.Release()); - } - void ResolveKeys() { FillKqpTasksGraphStages(TasksGraph, Transactions); diff --git a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp index cdcd87ea57..ab44faba02 100644 --- a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp +++ b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp @@ -75,7 +75,7 @@ NKqpProto::TKqpPhyInternalBinding::EType GetPhyInternalBindingType(const std::st return bindingType; } -void FillTable(const TKqpTable& table, NKqpProto::TKqpPhyTable& tableProto) { +void FillTableId(const TKqpTable& table, NKqpProto::TKqpPhyTableId& tableProto) { auto pathId = TKikimrPathId::Parse(table.PathId()); tableProto.SetPath(TString(table.Path())); @@ -85,6 +85,77 @@ void FillTable(const TKqpTable& table, NKqpProto::TKqpPhyTable& tableProto) { tableProto.SetVersion(FromString<ui64>(table.Version())); } +void FillTableId(const TKikimrTableMetadata& tableMeta, NKqpProto::TKqpPhyTableId& tableProto) { + tableProto.SetPath(tableMeta.Name); + tableProto.SetOwnerId(tableMeta.PathId.OwnerId()); + tableProto.SetTableId(tableMeta.PathId.TableId()); + tableProto.SetSysView(tableMeta.SysView); + tableProto.SetVersion(tableMeta.SchemaVersion); +} + +NKqpProto::EKqpPhyTableKind GetPhyTableKind(EKikimrTableKind kind) { + switch (kind) { + case EKikimrTableKind::Datashard: + return NKqpProto::TABLE_KIND_DS; + case EKikimrTableKind::Olap: + return NKqpProto::TABLE_KIND_OLAP; + case EKikimrTableKind::SysView: + return NKqpProto::TABLE_KIND_SYS_VIEW; + default: + return NKqpProto::TABLE_KIND_UNSPECIFIED; + } +} + +void FillTablesMap(const TKqpTable& table, THashMap<TStringBuf, THashSet<TStringBuf>>& tablesMap) { + tablesMap.emplace(table.Path().Value(), THashSet<TStringBuf>{}); +} + +void FillTablesMap(const TKqpTable& table, const TCoAtomList& columns, + THashMap<TStringBuf, THashSet<TStringBuf>>& tablesMap) +{ + FillTablesMap(table, tablesMap); + + for (const auto& column : columns) { + tablesMap[table.Path()].emplace(column); + } +} + +void FillTable(const TKikimrTableMetadata& tableMeta, THashSet<TStringBuf>&& columns, + NKqpProto::TKqpPhyTable& tableProto) +{ + FillTableId(tableMeta, *tableProto.MutableId()); + tableProto.SetKind(GetPhyTableKind(tableMeta.Kind)); + + for (const auto& keyColumnName : tableMeta.KeyColumnNames) { + auto keyColumn = tableMeta.Columns.FindPtr(keyColumnName); + YQL_ENSURE(keyColumn); + + auto& phyKeyColumn = *tableProto.MutableKeyColumns()->Add(); + phyKeyColumn.SetId(keyColumn->Id); + phyKeyColumn.SetName(keyColumn->Name); + + columns.emplace(keyColumnName); + } + + auto& phyColumns = *tableProto.MutableColumns(); + for (const auto& columnName : columns) { + auto column = tableMeta.Columns.FindPtr(columnName); + if (!column) { + YQL_ENSURE(GetSystemColumns().find(columnName) != GetSystemColumns().end()); + continue; + } + + auto& phyColumn = phyColumns[column->Id]; + phyColumn.MutableId()->SetId(column->Id); + phyColumn.MutableId()->SetName(column->Name); + phyColumn.SetTypeId(column->TypeInfo.GetTypeId()); + + if (column->TypeInfo.GetTypeId() == NScheme::NTypeIds::Pg) { + phyColumn.SetPgTypeName(NPg::PgTypeNameFromTypeDesc(column->TypeInfo.GetTypeDesc())); + } + } +} + template <typename TProto> void FillColumns(const TCoAtomList& columns, const TKikimrTableMetadata& tableMeta, TProto& opProto, bool allowSystemColumns) @@ -461,7 +532,7 @@ private: } void CompileStage(const TDqPhyStage& stage, NKqpProto::TKqpPhyStage& stageProto, TExprContext& ctx, - const TMap<ui64, ui32>& stagesMap) + const TMap<ui64, ui32>& stagesMap, THashMap<TStringBuf, THashSet<TStringBuf>>& tablesMap) { stageProto.SetIsEffectsStage(NOpt::IsKqpEffectsStage(stage)); @@ -470,14 +541,14 @@ private: if (input.Maybe<TDqSource>()) { auto* protoSource = stageProto.AddSources(); - FillSource(input.Cast<TDqSource>(), protoSource, true); + FillSource(input.Cast<TDqSource>(), protoSource, true, tablesMap); protoSource->SetInputIndex(inputIndex); } else { YQL_ENSURE(input.Maybe<TDqConnection>()); auto connection = input.Cast<TDqConnection>(); auto& protoInput = *stageProto.AddInputs(); - FillConnection(connection, stagesMap, protoInput, ctx); + FillConnection(connection, stagesMap, protoInput, ctx, tablesMap); protoInput.SetInputIndex(inputIndex); } } @@ -493,7 +564,8 @@ private: YQL_ENSURE(tableMeta); auto& tableOp = *stageProto.AddTableOps(); - FillTable(readTable.Table(), *tableOp.MutableTable()); + FillTablesMap(readTable.Table(), readTable.Columns(), tablesMap); + FillTableId(readTable.Table(), *tableOp.MutableTable()); FillColumns(readTable.Columns(), *tableMeta, tableOp, true); FillReadRange(readTable, *tableMeta, *tableOp.MutableReadRange()); } else if (auto maybeLookupTable = node.Maybe<TKqpLookupTable>()) { @@ -502,7 +574,8 @@ private: YQL_ENSURE(tableMeta); auto& tableOp = *stageProto.AddTableOps(); - FillTable(lookupTable.Table(), *tableOp.MutableTable()); + FillTablesMap(lookupTable.Table(), lookupTable.Columns(), tablesMap); + FillTableId(lookupTable.Table(), *tableOp.MutableTable()); FillColumns(lookupTable.Columns(), *tableMeta, tableOp, true); FillLookup(lookupTable, *tableOp.MutableLookup()); } else if (auto maybeUpsertRows = node.Maybe<TKqpUpsertRows>()) { @@ -514,7 +587,8 @@ private: auto settings = TKqpUpsertRowsSettings::Parse(upsertRows); auto& tableOp = *stageProto.AddTableOps(); - FillTable(upsertRows.Table(), *tableOp.MutableTable()); + FillTablesMap(upsertRows.Table(), upsertRows.Columns(), tablesMap); + FillTableId(upsertRows.Table(), *tableOp.MutableTable()); FillColumns(upsertRows.Columns(), *tableMeta, tableOp, false); FillEffectRows(upsertRows, *tableOp.MutableUpsertRows(), settings.Inplace); } else if (auto maybeDeleteRows = node.Maybe<TKqpDeleteRows>()) { @@ -525,7 +599,8 @@ private: YQL_ENSURE(stageProto.GetIsEffectsStage()); auto& tableOp = *stageProto.AddTableOps(); - FillTable(deleteRows.Table(), *tableOp.MutableTable()); + FillTablesMap(deleteRows.Table(), tablesMap); + FillTableId(deleteRows.Table(), *tableOp.MutableTable()); FillEffectRows(deleteRows, *tableOp.MutableDeleteRows(), false); } else if (auto maybeWideReadTableRanges = node.Maybe<TKqpWideReadTableRanges>()) { auto readTableRanges = maybeWideReadTableRanges.Cast(); @@ -533,7 +608,8 @@ private: YQL_ENSURE(tableMeta); auto& tableOp = *stageProto.AddTableOps(); - FillTable(readTableRanges.Table(), *tableOp.MutableTable()); + FillTablesMap(readTableRanges.Table(), readTableRanges.Columns(), tablesMap); + FillTableId(readTableRanges.Table(), *tableOp.MutableTable()); FillColumns(readTableRanges.Columns(), *tableMeta, tableOp, true); FillReadRanges(readTableRanges, *tableMeta, *tableOp.MutableReadRanges()); } else if (auto maybeReadWideTableRanges = node.Maybe<TKqpWideReadOlapTableRanges>()) { @@ -542,7 +618,8 @@ private: YQL_ENSURE(tableMeta); auto& tableOp = *stageProto.AddTableOps(); - FillTable(readTableRanges.Table(), *tableOp.MutableTable()); + FillTablesMap(readTableRanges.Table(), readTableRanges.Columns(), tablesMap); + FillTableId(readTableRanges.Table(), *tableOp.MutableTable()); FillColumns(readTableRanges.Columns(), *tableMeta, tableOp, true); FillReadRanges(readTableRanges, *tableMeta, *tableOp.MutableReadOlapRange()); auto miniKqlResultType = GetMKqlResultType(readTableRanges.Process().Ref().GetTypeAnn()); @@ -608,9 +685,11 @@ private: bool hasEffectStage = false; TMap<ui64, ui32> stagesMap; + THashMap<TStringBuf, THashSet<TStringBuf>> tablesMap; + for (const auto& stage : tx.Stages()) { auto* protoStage = txProto.AddStages(); - CompileStage(stage, *protoStage, ctx, stagesMap); + CompileStage(stage, *protoStage, ctx, stagesMap, tablesMap); hasEffectStage |= protoStage->GetIsEffectsStage(); stagesMap[stage.Ref().UniqueId()] = txProto.StagesSize() - 1; } @@ -652,7 +731,7 @@ private: auto& resultProto = *txProto.AddResults(); auto& connectionProto = *resultProto.MutableConnection(); - FillConnection(connection, stagesMap, connectionProto, ctx); + FillConnection(connection, stagesMap, connectionProto, ctx, tablesMap); const TTypeAnnotationNode* itemType = nullptr; switch (connectionProto.GetTypeCase()) { @@ -691,12 +770,22 @@ private: } } } + + for (auto& [tablePath, tableColumns] : tablesMap) { + auto tableMeta = TablesData->ExistingTable(Cluster, tablePath).Metadata; + YQL_ENSURE(tableMeta); + + FillTable(*tableMeta, std::move(tableColumns), *txProto.AddTables()); + } } - void FillSource(const TDqSource& source, NKqpProto::TKqpSource* protoSource, bool allowSystemColumns) { + void FillSource(const TDqSource& source, NKqpProto::TKqpSource* protoSource, bool allowSystemColumns, + THashMap<TStringBuf, THashSet<TStringBuf>>& tablesMap) + { if (auto settings = source.Settings().Maybe<TKqpReadRangesSourceSettings>()) { NKqpProto::TKqpReadRangesSource& readProto = *protoSource->MutableReadRangesSource(); - FillTable(settings.Table().Cast(), *readProto.MutableTable()); + FillTablesMap(settings.Table().Cast(), settings.Columns().Cast(), tablesMap); + FillTableId(settings.Table().Cast(), *readProto.MutableTable()); auto tableMeta = TablesData->ExistingTable(Cluster, settings.Table().Cast().Path()).Metadata; YQL_ENSURE(tableMeta); @@ -744,7 +833,8 @@ private: } void FillConnection(const TDqConnection& connection, const TMap<ui64, ui32>& stagesMap, - NKqpProto::TKqpPhyConnection& connectionProto, TExprContext& ctx) + NKqpProto::TKqpPhyConnection& connectionProto, TExprContext& ctx, + THashMap<TStringBuf, THashSet<TStringBuf>>& tablesMap) { auto inputStageIndex = stagesMap.FindPtr(connection.Output().Stage().Ref().UniqueId()); YQL_ENSURE(inputStageIndex, "stage #" << connection.Output().Stage().Ref().UniqueId() << " not found in stages map: " @@ -815,7 +905,8 @@ private: auto tableMeta = TablesData->ExistingTable(Cluster, streamLookup.Table().Path()).Metadata; YQL_ENSURE(tableMeta); - FillTable(streamLookup.Table(), *streamLookupProto.MutableTable()); + FillTablesMap(streamLookup.Table(), streamLookup.Columns(), tablesMap); + FillTableId(streamLookup.Table(), *streamLookupProto.MutableTable()); const auto lookupKeysType = streamLookup.LookupKeysType().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType(); YQL_ENSURE(lookupKeysType, "Empty stream lookup keys type"); diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto index 622a4ab2bc..3d7b438be8 100644 --- a/ydb/core/protos/kqp.proto +++ b/ydb/core/protos/kqp.proto @@ -584,7 +584,7 @@ message TEvKillScanTablet { } message TKqpStreamLookupSettings { - optional NKqpProto.TKqpPhyTable Table = 1; + optional NKqpProto.TKqpPhyTableId Table = 1; repeated string KeyColumns = 2; repeated string Columns = 3; optional TKqpSnapshot Snapshot = 4; diff --git a/ydb/core/protos/kqp_physical.proto b/ydb/core/protos/kqp_physical.proto index 854dafe475..a31e49c817 100644 --- a/ydb/core/protos/kqp_physical.proto +++ b/ydb/core/protos/kqp_physical.proto @@ -47,12 +47,25 @@ message TKqpPhyResultBinding { } } -message TKqpPhyColumn { +message TKqpPhyColumnId { uint32 Id = 1; string Name = 2; } -message TKqpPhyTable { +message TKqpPhyColumn { + TKqpPhyColumnId Id = 1; + uint32 TypeId = 2; + string PgTypeName = 3; +} + +enum EKqpPhyTableKind { + TABLE_KIND_UNSPECIFIED = 0; + TABLE_KIND_DS = 1; + TABLE_KIND_OLAP = 2; + TABLE_KIND_SYS_VIEW = 3; +} + +message TKqpPhyTableId { string Path = 1; uint64 OwnerId = 2; uint64 TableId = 3; @@ -60,6 +73,13 @@ message TKqpPhyTable { uint64 Version = 5; } +message TKqpPhyTable { + TKqpPhyTableId Id = 1; + EKqpPhyTableKind Kind = 2; + map<uint32, TKqpPhyColumn> Columns = 3; + repeated TKqpPhyColumnId KeyColumns = 4; +} + message TKqpPhyParamValue { string ParamName = 1; } @@ -165,8 +185,8 @@ message TKqpPhyOpReadRanges { } message TKqpPhyTableOperation { - TKqpPhyTable Table = 1; - repeated TKqpPhyColumn Columns = 2; + TKqpPhyTableId Table = 1; + repeated TKqpPhyColumnId Columns = 2; oneof Type { TKqpPhyOpReadRange ReadRange = 3; @@ -213,7 +233,7 @@ message TKqpPhyCnMerge { } message TKqpPhyCnStreamLookup { - TKqpPhyTable Table = 1; + TKqpPhyTableId Table = 1; repeated string KeyColumns = 2; repeated string Columns = 3; bytes LookupKeysType = 4; @@ -240,8 +260,8 @@ message TKqpPhyConnection { } message TKqpReadRangesSource { - TKqpPhyTable Table = 1; - repeated TKqpPhyColumn Columns = 2; + TKqpPhyTableId Table = 1; + repeated TKqpPhyColumnId Columns = 2; TKqpPhyValue ItemsLimit = 3; bool Reverse = 4; bool Sorted = 5; @@ -296,6 +316,7 @@ message TKqpPhyTx { repeated TKqpPhyParamBinding ParamBindings = 4; string Plan = 5; bool HasEffects = 6; // at least one stage has flag TKqpPhyStage::IsEffectStage set + repeated TKqpPhyTable Tables = 7; } message TKqpTableInfo { diff --git a/ydb/core/tx/datashard/datashard_ut_kqp_errors.cpp b/ydb/core/tx/datashard/datashard_ut_kqp_errors.cpp index 432b1cfe5f..2642b82513 100644 --- a/ydb/core/tx/datashard/datashard_ut_kqp_errors.cpp +++ b/ydb/core/tx/datashard/datashard_ut_kqp_errors.cpp @@ -81,16 +81,12 @@ Y_UNIT_TEST_SUITE(KqpErrors) { Y_UNIT_TEST(ResolveTableError) { TLocalFixture fixture; - int skip = 1; // compile auto mitm = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle> &ev) { if (ev->GetTypeRewrite() == TEvTxProxySchemeCache::TEvNavigateKeySetResult::EventType) { - if (skip-- == 0) { - // fail on execution phase (kqp_table_resolver.cpp) - auto event = ev.Get()->Get<TEvTxProxySchemeCache::TEvNavigateKeySetResult>(); - event->Request->ErrorCount = 1; - auto& entries = event->Request->ResultSet; - entries[0].Status = NSchemeCache::TSchemeCacheNavigate::EStatus::LookupError; - } + auto event = ev.Get()->Get<TEvTxProxySchemeCache::TEvNavigateKeySetResult>(); + event->Request->ErrorCount = 1; + auto& entries = event->Request->ResultSet; + entries[0].Status = NSchemeCache::TSchemeCacheNavigate::EStatus::LookupError; } return TTestActorRuntime::EEventAction::PROCESS; }; |