aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorspuchin <spuchin@ydb.tech>2023-01-15 18:44:49 +0300
committerspuchin <spuchin@ydb.tech>2023-01-15 18:44:49 +0300
commit8a749596d40e91c896a1907afcd108d9221fbde1 (patch)
tree1bbbbe2eb13a8fc5924b785e957c605e67251e53
parent306ddfa76ad06b2689bf0c8e7cf9de22a21c5594 (diff)
downloadydb-8a749596d40e91c896a1907afcd108d9221fbde1.tar.gz
Remove table resolve step via scheme board cache in KQP executer. ()
-rw-r--r--ydb/core/kqp/common/kqp_prepared_query.h4
-rw-r--r--ydb/core/kqp/common/kqp_resolve.cpp2
-rw-r--r--ydb/core/kqp/common/kqp_resolve.h20
-rw-r--r--ydb/core/kqp/executer_actor/kqp_table_resolver.cpp229
-rw-r--r--ydb/core/kqp/query_compiler/kqp_query_compiler.cpp123
-rw-r--r--ydb/core/protos/kqp.proto2
-rw-r--r--ydb/core/protos/kqp_physical.proto35
-rw-r--r--ydb/core/tx/datashard/datashard_ut_kqp_errors.cpp12
8 files changed, 248 insertions, 179 deletions
diff --git a/ydb/core/kqp/common/kqp_prepared_query.h b/ydb/core/kqp/common/kqp_prepared_query.h
index a3004073f9..9de89807e7 100644
--- a/ydb/core/kqp/common/kqp_prepared_query.h
+++ b/ydb/core/kqp/common/kqp_prepared_query.h
@@ -78,6 +78,10 @@ public:
return Proto->GetParamBindings();
}
+ const google::protobuf::RepeatedPtrField< ::NKqpProto::TKqpPhyTable>& GetTables() const {
+ return Proto->GetTables();
+ }
+
TProtoStringType DebugString() const {
return Proto->ShortDebugString();
}
diff --git a/ydb/core/kqp/common/kqp_resolve.cpp b/ydb/core/kqp/common/kqp_resolve.cpp
index 64dac31121..482a0e419c 100644
--- a/ydb/core/kqp/common/kqp_resolve.cpp
+++ b/ydb/core/kqp/common/kqp_resolve.cpp
@@ -38,7 +38,7 @@ TTableId MakeTableId(const TKqpTable& node) {
return tableId;
}
-TTableId MakeTableId(const NKqpProto::TKqpPhyTable& table) {
+TTableId MakeTableId(const NKqpProto::TKqpPhyTableId& table) {
TTableId tableId;
tableId.PathId = TPathId(table.GetOwnerId(), table.GetTableId());
tableId.SysViewInfo = table.GetSysView();
diff --git a/ydb/core/kqp/common/kqp_resolve.h b/ydb/core/kqp/common/kqp_resolve.h
index f2f34e2f8a..7d9c3e3b2d 100644
--- a/ydb/core/kqp/common/kqp_resolve.h
+++ b/ydb/core/kqp/common/kqp_resolve.h
@@ -42,6 +42,18 @@ public:
return TablesById.FindPtr(id);
}
+ TTable& GetTable(const TTableId& id) {
+ auto table = TablesById.FindPtr(id);
+ MKQL_ENSURE_S(table);
+ return *table;
+ }
+
+ const TTable& GetTable(const TTableId& id) const {
+ auto table = TablesById.FindPtr(id);
+ MKQL_ENSURE_S(table);
+ return *table;
+ }
+
TTable& GetOrAddTable(const TTableId& id, const TStringBuf path) {
auto& table = TablesById[id];
@@ -54,12 +66,6 @@ public:
return table;
}
- const TTable& GetTable(const TTableId& id) const {
- auto table = TablesById.FindPtr(id);
- MKQL_ENSURE_S(table);
- return *table;
- }
-
size_t Size() const {
return TablesById.size();
}
@@ -133,7 +139,7 @@ void SortPartitions(TList& partitions, const TVector<NScheme::TTypeInfo>& keyCol
}
TTableId MakeTableId(const NYql::NNodes::TKqpTable& node);
-TTableId MakeTableId(const NKqpProto::TKqpPhyTable& table);
+TTableId MakeTableId(const NKqpProto::TKqpPhyTableId& table);
} // namespace NKqp
} // namespace NKikimr
diff --git a/ydb/core/kqp/executer_actor/kqp_table_resolver.cpp b/ydb/core/kqp/executer_actor/kqp_table_resolver.cpp
index 513582a6d6..6cdcf4bd38 100644
--- a/ydb/core/kqp/executer_actor/kqp_table_resolver.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_table_resolver.cpp
@@ -36,125 +36,126 @@ public:
, TasksGraph(tasksGraph) {}
void Bootstrap() {
- ResolveTables();
- Become(&TKqpTableResolver::ResolveTablesState);
+ FillTables();
+
+ ResolveKeys();
+ Become(&TKqpTableResolver::ResolveKeysState);
}
private:
- STATEFN(ResolveTablesState) {
- switch (ev->GetTypeRewrite()) {
- hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleResolveTables);
- hFunc(TEvents::TEvPoison, HandleResolveTables);
- default:
- UnexpectedEvent("ResolveTablesState", ev->GetTypeRewrite());
+ static void FillColumn(const NKqpProto::TKqpPhyColumn& phyColumn, TKqpTableKeys::TTable& table) {
+ if (table.Columns.FindPtr(phyColumn.GetId().GetName())) {
+ return;
}
+
+ TKqpTableKeys::TColumn column;
+ column.Id = phyColumn.GetId().GetId();
+
+ if (phyColumn.GetTypeId() != NScheme::NTypeIds::Pg) {
+ column.Type = NScheme::TTypeInfo(phyColumn.GetTypeId());
+ } else {
+ column.Type = NScheme::TTypeInfo(phyColumn.GetTypeId(),
+ NPg::TypeDescFromPgTypeName(phyColumn.GetPgTypeName()));
+ }
+
+ table.Columns.emplace(phyColumn.GetId().GetName(), std::move(column));
}
- void HandleResolveTables(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
- auto timer = std::make_unique<NCpuTime::TCpuTimer>(CpuTime);
+ void FillTable(const NKqpProto::TKqpPhyTable& phyTable) {
+ auto tableId = MakeTableId(phyTable.GetId());
- const auto& entries = ev->Get()->Request->ResultSet;
- LOG_D("Resolved tables: " << entries.size());
- YQL_ENSURE(entries.size() == TableKeys.Size());
+ auto table = TableKeys.FindTablePtr(tableId);
+ if (!table) {
+ table = &TableKeys.GetOrAddTable(tableId, phyTable.GetId().GetPath());
- for (auto& entry : entries) {
- switch (entry.Status) {
- case NSchemeCache::TSchemeCacheNavigate::EStatus::Ok:
+ switch (phyTable.GetKind()) {
+ case NKqpProto::TABLE_KIND_DS:
+ table->TableKind = ETableKind::Datashard;
+ break;
+ case NKqpProto::TABLE_KIND_OLAP:
+ table->TableKind = ETableKind::Olap;
break;
+ case NKqpProto::TABLE_KIND_SYS_VIEW:
+ table->TableKind = ETableKind::SysView;
+ break;
+ default:
+ YQL_ENSURE(false, "Unexpected phy table kind: " << (i64) phyTable.GetKind());
+ }
- case NSchemeCache::TSchemeCacheNavigate::EStatus::PathErrorUnknown:
- case NSchemeCache::TSchemeCacheNavigate::EStatus::PathNotTable:
- case NSchemeCache::TSchemeCacheNavigate::EStatus::TableCreationNotComplete:
- ReplyErrorAndDie(Ydb::StatusIds::SCHEME_ERROR,
- YqlIssue({}, NYql::TIssuesIds::KIKIMR_SCHEME_MISMATCH, TStringBuilder()
- << "Table scheme error `" << JoinPath(entry.Path) << "`: " << entry.Status << '.'));
- return;
+ for (const auto& [_, phyColumn] : phyTable.GetColumns()) {
+ FillColumn(phyColumn, *table);
+ }
- default:
- ReplyErrorAndDie(Ydb::StatusIds::UNAVAILABLE,
- YqlIssue({}, NYql::TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE, TStringBuilder()
- << "Failed to resolve table `" << JoinPath(entry.Path) << "`: " << entry.Status << '.'));
- return;
+ YQL_ENSURE(table->KeyColumns.empty());
+ table->KeyColumns.reserve(phyTable.KeyColumnsSize());
+ YQL_ENSURE(table->KeyColumnTypes.empty());
+ table->KeyColumnTypes.reserve(phyTable.KeyColumnsSize());
+ for (const auto& keyColumnId : phyTable.GetKeyColumns()) {
+ const auto& column = table->Columns.FindPtr(keyColumnId.GetName());
+ YQL_ENSURE(column);
+ table->KeyColumns.push_back(keyColumnId.GetName());
+ table->KeyColumnTypes.push_back(column->Type);
+ }
+ } else {
+ for (const auto& [_, phyColumn] : phyTable.GetColumns()) {
+ FillColumn(phyColumn, *table);
}
+ }
+ }
- auto* table = TableKeys.FindTablePtr(entry.TableId);
- if (!table) {
- timer.reset();
- ReplyErrorAndDie(Ydb::StatusIds::UNAVAILABLE,
- YqlIssue({}, NYql::TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE, TStringBuilder()
- << "Unresolved table `" << JoinPath(entry.Path) << "` with tableId: " << entry.TableId << "."));
+ void FillTables() {
+ auto addColumn = [](TKqpTableKeys::TTable& table, const TString& columnName) mutable {
+ auto& sysColumns = GetSystemColumns();
+ if (table.Columns.FindPtr(columnName)) {
return;
}
- if (entry.Kind == NSchemeCache::TSchemeCacheNavigate::KindColumnTable) {
- YQL_ENSURE(entry.ColumnTableInfo || entry.OlapStoreInfo);
- // NOTE: entry.SysViewInfo might not be empty for OLAP stats virtual tables
- table->TableKind = ETableKind::Olap;
- } else if (entry.TableId.IsSystemView()) {
- table->TableKind = ETableKind::SysView;
- } else {
- table->TableKind = ETableKind::Datashard;
- }
+ auto* systemColumn = sysColumns.FindPtr(columnName);
+ YQL_ENSURE(systemColumn, "Unknown table column"
+ << ", table: " << table.Path
+ << ", column: " << columnName);
- // TODO: Resolve columns by id
- TMap<TStringBuf, ui32> columnsMap;
- for (auto& [columnId, column] : entry.Columns) {
- auto ret = columnsMap.emplace(column.Name, columnId);
- YQL_ENSURE(ret.second, "" << column.Name);
+ TKqpTableKeys::TColumn column;
+ column.Id = systemColumn->ColumnId;
+ column.Type = NScheme::TTypeInfo(systemColumn->TypeId);
+ table.Columns.emplace(columnName, std::move(column));
+ };
- if (column.KeyOrder >= 0) {
- table->Columns.emplace(column.Name, TKqpTableKeys::TColumn());
+ for (auto& tx : Transactions) {
+ for (const auto& phyTable : tx.Body->GetTables()) {
+ FillTable(phyTable);
+ }
- if (table->KeyColumns.size() <= (ui32) column.KeyOrder) {
- table->KeyColumns.resize(column.KeyOrder + 1);
- table->KeyColumnTypes.resize(column.KeyOrder + 1);
+ for (auto& stage : tx.Body->GetStages()) {
+ for (auto& op : stage.GetTableOps()) {
+ auto& table = TableKeys.GetTable(MakeTableId(op.GetTable()));
+ for (auto& column : op.GetColumns()) {
+ addColumn(table, column.GetName());
}
- table->KeyColumns[column.KeyOrder] = column.Name;
- table->KeyColumnTypes[column.KeyOrder] = column.PType;
}
- }
-
- for (auto& keyColumn : table->KeyColumns) {
- YQL_ENSURE(!keyColumn.empty());
- }
- auto& sysColumns = GetSystemColumns();
- for (auto& [columnName, columnKey] : table->Columns) {
- if (auto* systemColumn = sysColumns.FindPtr(columnName)) {
- columnKey.Id = systemColumn->ColumnId;
- columnKey.Type = NScheme::TTypeInfo(systemColumn->TypeId);
- continue;
+ for (auto& source : stage.GetSources()) {
+ if (source.HasReadRangesSource()) {
+ auto& table = TableKeys.GetTable(MakeTableId(source.GetReadRangesSource().GetTable()));
+ for (auto& column : source.GetReadRangesSource().GetColumns()) {
+ addColumn(table, column.GetName());
+ }
+ }
}
- auto* columnId = columnsMap.FindPtr(columnName);
- if (!columnId) {
- timer.reset();
- ReplyErrorAndDie(Ydb::StatusIds::SCHEME_ERROR,
- YqlIssue({}, NYql::TIssuesIds::KIKIMR_SCHEME_MISMATCH, TStringBuilder()
- << "Unknown column `" << columnName << "` at table `" << JoinPath(entry.Path) << "`."));
- return;
+ for (const auto& input : stage.GetInputs()) {
+ if (input.GetTypeCase() == NKqpProto::TKqpPhyConnection::kStreamLookup) {
+ auto& table = TableKeys.GetTable(MakeTableId(input.GetStreamLookup().GetTable()));
+ for (auto& column : input.GetStreamLookup().GetColumns()) {
+ addColumn(table, column);
+ }
+ }
}
-
- auto* column = entry.Columns.FindPtr(*columnId);
- YQL_ENSURE(column);
-
- columnKey.Id = column->Id;
- columnKey.Type = column->PType;
}
}
-
- ResolveKeys();
- timer.reset();
-
- Become(&TKqpTableResolver::ResolveKeysState);
}
- void HandleResolveTables(TEvents::TEvPoison::TPtr&) {
- PassAway();
- }
-
-private:
STATEFN(ResolveKeysState) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvTxProxySchemeCache::TEvResolveKeySetResult, HandleResolveKeys);
@@ -227,56 +228,6 @@ private:
}
private:
- // TODO: Get rid of ResolveTables & TableKeys, get table information from phy tx proto.
- void ResolveTables() {
- for (auto& tx : Transactions) {
- for (auto& stage : tx.Body->GetStages()) {
- for (auto& op : stage.GetTableOps()) {
- auto& table = TableKeys.GetOrAddTable(MakeTableId(op.GetTable()), op.GetTable().GetPath());
- for (auto& column : op.GetColumns()) {
- table.Columns.emplace(column.GetName(), TKqpTableKeys::TColumn());
- }
- }
-
- for (auto& source : stage.GetSources()) {
- if (source.HasReadRangesSource()) {
- auto& table = TableKeys.GetOrAddTable(
- MakeTableId(source.GetReadRangesSource().GetTable()),
- source.GetReadRangesSource().GetTable().GetPath());
- for (auto& column : source.GetReadRangesSource().GetColumns()) {
- table.Columns.emplace(column.GetName(), TKqpTableKeys::TColumn());
- }
- }
- }
-
- for (const auto& input : stage.GetInputs()) {
- if (input.GetTypeCase() == NKqpProto::TKqpPhyConnection::kStreamLookup) {
- const auto& streamLookup = input.GetStreamLookup();
- auto& table = TableKeys.GetOrAddTable(MakeTableId(streamLookup.GetTable()), streamLookup.GetTable().GetPath());
- for (auto& column : input.GetStreamLookup().GetColumns()) {
- table.Columns.emplace(column, TKqpTableKeys::TColumn());
- }
- }
- }
- }
- }
-
- auto request = MakeHolder<NSchemeCache::TSchemeCacheNavigate>();
- request->ResultSet.reserve(TableKeys.Size());
- for (auto& [tableId, table] : TableKeys.Get()) {
- NSchemeCache::TSchemeCacheNavigate::TEntry entry;
- entry.RequestType = NSchemeCache::TSchemeCacheNavigate::TEntry::ERequestType::ByTableId;
- entry.TableId = tableId;
- entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpTable;
- entry.ShowPrivatePath = true;
-
- request->ResultSet.emplace_back(std::move(entry));
- }
-
- auto ev = MakeHolder<TEvTxProxySchemeCache::TEvNavigateKeySet>(request.Release());
- Send(MakeSchemeCacheID(), ev.Release());
- }
-
void ResolveKeys() {
FillKqpTasksGraphStages(TasksGraph, Transactions);
diff --git a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
index cdcd87ea57..ab44faba02 100644
--- a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
+++ b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
@@ -75,7 +75,7 @@ NKqpProto::TKqpPhyInternalBinding::EType GetPhyInternalBindingType(const std::st
return bindingType;
}
-void FillTable(const TKqpTable& table, NKqpProto::TKqpPhyTable& tableProto) {
+void FillTableId(const TKqpTable& table, NKqpProto::TKqpPhyTableId& tableProto) {
auto pathId = TKikimrPathId::Parse(table.PathId());
tableProto.SetPath(TString(table.Path()));
@@ -85,6 +85,77 @@ void FillTable(const TKqpTable& table, NKqpProto::TKqpPhyTable& tableProto) {
tableProto.SetVersion(FromString<ui64>(table.Version()));
}
+void FillTableId(const TKikimrTableMetadata& tableMeta, NKqpProto::TKqpPhyTableId& tableProto) {
+ tableProto.SetPath(tableMeta.Name);
+ tableProto.SetOwnerId(tableMeta.PathId.OwnerId());
+ tableProto.SetTableId(tableMeta.PathId.TableId());
+ tableProto.SetSysView(tableMeta.SysView);
+ tableProto.SetVersion(tableMeta.SchemaVersion);
+}
+
+NKqpProto::EKqpPhyTableKind GetPhyTableKind(EKikimrTableKind kind) {
+ switch (kind) {
+ case EKikimrTableKind::Datashard:
+ return NKqpProto::TABLE_KIND_DS;
+ case EKikimrTableKind::Olap:
+ return NKqpProto::TABLE_KIND_OLAP;
+ case EKikimrTableKind::SysView:
+ return NKqpProto::TABLE_KIND_SYS_VIEW;
+ default:
+ return NKqpProto::TABLE_KIND_UNSPECIFIED;
+ }
+}
+
+void FillTablesMap(const TKqpTable& table, THashMap<TStringBuf, THashSet<TStringBuf>>& tablesMap) {
+ tablesMap.emplace(table.Path().Value(), THashSet<TStringBuf>{});
+}
+
+void FillTablesMap(const TKqpTable& table, const TCoAtomList& columns,
+ THashMap<TStringBuf, THashSet<TStringBuf>>& tablesMap)
+{
+ FillTablesMap(table, tablesMap);
+
+ for (const auto& column : columns) {
+ tablesMap[table.Path()].emplace(column);
+ }
+}
+
+void FillTable(const TKikimrTableMetadata& tableMeta, THashSet<TStringBuf>&& columns,
+ NKqpProto::TKqpPhyTable& tableProto)
+{
+ FillTableId(tableMeta, *tableProto.MutableId());
+ tableProto.SetKind(GetPhyTableKind(tableMeta.Kind));
+
+ for (const auto& keyColumnName : tableMeta.KeyColumnNames) {
+ auto keyColumn = tableMeta.Columns.FindPtr(keyColumnName);
+ YQL_ENSURE(keyColumn);
+
+ auto& phyKeyColumn = *tableProto.MutableKeyColumns()->Add();
+ phyKeyColumn.SetId(keyColumn->Id);
+ phyKeyColumn.SetName(keyColumn->Name);
+
+ columns.emplace(keyColumnName);
+ }
+
+ auto& phyColumns = *tableProto.MutableColumns();
+ for (const auto& columnName : columns) {
+ auto column = tableMeta.Columns.FindPtr(columnName);
+ if (!column) {
+ YQL_ENSURE(GetSystemColumns().find(columnName) != GetSystemColumns().end());
+ continue;
+ }
+
+ auto& phyColumn = phyColumns[column->Id];
+ phyColumn.MutableId()->SetId(column->Id);
+ phyColumn.MutableId()->SetName(column->Name);
+ phyColumn.SetTypeId(column->TypeInfo.GetTypeId());
+
+ if (column->TypeInfo.GetTypeId() == NScheme::NTypeIds::Pg) {
+ phyColumn.SetPgTypeName(NPg::PgTypeNameFromTypeDesc(column->TypeInfo.GetTypeDesc()));
+ }
+ }
+}
+
template <typename TProto>
void FillColumns(const TCoAtomList& columns, const TKikimrTableMetadata& tableMeta,
TProto& opProto, bool allowSystemColumns)
@@ -461,7 +532,7 @@ private:
}
void CompileStage(const TDqPhyStage& stage, NKqpProto::TKqpPhyStage& stageProto, TExprContext& ctx,
- const TMap<ui64, ui32>& stagesMap)
+ const TMap<ui64, ui32>& stagesMap, THashMap<TStringBuf, THashSet<TStringBuf>>& tablesMap)
{
stageProto.SetIsEffectsStage(NOpt::IsKqpEffectsStage(stage));
@@ -470,14 +541,14 @@ private:
if (input.Maybe<TDqSource>()) {
auto* protoSource = stageProto.AddSources();
- FillSource(input.Cast<TDqSource>(), protoSource, true);
+ FillSource(input.Cast<TDqSource>(), protoSource, true, tablesMap);
protoSource->SetInputIndex(inputIndex);
} else {
YQL_ENSURE(input.Maybe<TDqConnection>());
auto connection = input.Cast<TDqConnection>();
auto& protoInput = *stageProto.AddInputs();
- FillConnection(connection, stagesMap, protoInput, ctx);
+ FillConnection(connection, stagesMap, protoInput, ctx, tablesMap);
protoInput.SetInputIndex(inputIndex);
}
}
@@ -493,7 +564,8 @@ private:
YQL_ENSURE(tableMeta);
auto& tableOp = *stageProto.AddTableOps();
- FillTable(readTable.Table(), *tableOp.MutableTable());
+ FillTablesMap(readTable.Table(), readTable.Columns(), tablesMap);
+ FillTableId(readTable.Table(), *tableOp.MutableTable());
FillColumns(readTable.Columns(), *tableMeta, tableOp, true);
FillReadRange(readTable, *tableMeta, *tableOp.MutableReadRange());
} else if (auto maybeLookupTable = node.Maybe<TKqpLookupTable>()) {
@@ -502,7 +574,8 @@ private:
YQL_ENSURE(tableMeta);
auto& tableOp = *stageProto.AddTableOps();
- FillTable(lookupTable.Table(), *tableOp.MutableTable());
+ FillTablesMap(lookupTable.Table(), lookupTable.Columns(), tablesMap);
+ FillTableId(lookupTable.Table(), *tableOp.MutableTable());
FillColumns(lookupTable.Columns(), *tableMeta, tableOp, true);
FillLookup(lookupTable, *tableOp.MutableLookup());
} else if (auto maybeUpsertRows = node.Maybe<TKqpUpsertRows>()) {
@@ -514,7 +587,8 @@ private:
auto settings = TKqpUpsertRowsSettings::Parse(upsertRows);
auto& tableOp = *stageProto.AddTableOps();
- FillTable(upsertRows.Table(), *tableOp.MutableTable());
+ FillTablesMap(upsertRows.Table(), upsertRows.Columns(), tablesMap);
+ FillTableId(upsertRows.Table(), *tableOp.MutableTable());
FillColumns(upsertRows.Columns(), *tableMeta, tableOp, false);
FillEffectRows(upsertRows, *tableOp.MutableUpsertRows(), settings.Inplace);
} else if (auto maybeDeleteRows = node.Maybe<TKqpDeleteRows>()) {
@@ -525,7 +599,8 @@ private:
YQL_ENSURE(stageProto.GetIsEffectsStage());
auto& tableOp = *stageProto.AddTableOps();
- FillTable(deleteRows.Table(), *tableOp.MutableTable());
+ FillTablesMap(deleteRows.Table(), tablesMap);
+ FillTableId(deleteRows.Table(), *tableOp.MutableTable());
FillEffectRows(deleteRows, *tableOp.MutableDeleteRows(), false);
} else if (auto maybeWideReadTableRanges = node.Maybe<TKqpWideReadTableRanges>()) {
auto readTableRanges = maybeWideReadTableRanges.Cast();
@@ -533,7 +608,8 @@ private:
YQL_ENSURE(tableMeta);
auto& tableOp = *stageProto.AddTableOps();
- FillTable(readTableRanges.Table(), *tableOp.MutableTable());
+ FillTablesMap(readTableRanges.Table(), readTableRanges.Columns(), tablesMap);
+ FillTableId(readTableRanges.Table(), *tableOp.MutableTable());
FillColumns(readTableRanges.Columns(), *tableMeta, tableOp, true);
FillReadRanges(readTableRanges, *tableMeta, *tableOp.MutableReadRanges());
} else if (auto maybeReadWideTableRanges = node.Maybe<TKqpWideReadOlapTableRanges>()) {
@@ -542,7 +618,8 @@ private:
YQL_ENSURE(tableMeta);
auto& tableOp = *stageProto.AddTableOps();
- FillTable(readTableRanges.Table(), *tableOp.MutableTable());
+ FillTablesMap(readTableRanges.Table(), readTableRanges.Columns(), tablesMap);
+ FillTableId(readTableRanges.Table(), *tableOp.MutableTable());
FillColumns(readTableRanges.Columns(), *tableMeta, tableOp, true);
FillReadRanges(readTableRanges, *tableMeta, *tableOp.MutableReadOlapRange());
auto miniKqlResultType = GetMKqlResultType(readTableRanges.Process().Ref().GetTypeAnn());
@@ -608,9 +685,11 @@ private:
bool hasEffectStage = false;
TMap<ui64, ui32> stagesMap;
+ THashMap<TStringBuf, THashSet<TStringBuf>> tablesMap;
+
for (const auto& stage : tx.Stages()) {
auto* protoStage = txProto.AddStages();
- CompileStage(stage, *protoStage, ctx, stagesMap);
+ CompileStage(stage, *protoStage, ctx, stagesMap, tablesMap);
hasEffectStage |= protoStage->GetIsEffectsStage();
stagesMap[stage.Ref().UniqueId()] = txProto.StagesSize() - 1;
}
@@ -652,7 +731,7 @@ private:
auto& resultProto = *txProto.AddResults();
auto& connectionProto = *resultProto.MutableConnection();
- FillConnection(connection, stagesMap, connectionProto, ctx);
+ FillConnection(connection, stagesMap, connectionProto, ctx, tablesMap);
const TTypeAnnotationNode* itemType = nullptr;
switch (connectionProto.GetTypeCase()) {
@@ -691,12 +770,22 @@ private:
}
}
}
+
+ for (auto& [tablePath, tableColumns] : tablesMap) {
+ auto tableMeta = TablesData->ExistingTable(Cluster, tablePath).Metadata;
+ YQL_ENSURE(tableMeta);
+
+ FillTable(*tableMeta, std::move(tableColumns), *txProto.AddTables());
+ }
}
- void FillSource(const TDqSource& source, NKqpProto::TKqpSource* protoSource, bool allowSystemColumns) {
+ void FillSource(const TDqSource& source, NKqpProto::TKqpSource* protoSource, bool allowSystemColumns,
+ THashMap<TStringBuf, THashSet<TStringBuf>>& tablesMap)
+ {
if (auto settings = source.Settings().Maybe<TKqpReadRangesSourceSettings>()) {
NKqpProto::TKqpReadRangesSource& readProto = *protoSource->MutableReadRangesSource();
- FillTable(settings.Table().Cast(), *readProto.MutableTable());
+ FillTablesMap(settings.Table().Cast(), settings.Columns().Cast(), tablesMap);
+ FillTableId(settings.Table().Cast(), *readProto.MutableTable());
auto tableMeta = TablesData->ExistingTable(Cluster, settings.Table().Cast().Path()).Metadata;
YQL_ENSURE(tableMeta);
@@ -744,7 +833,8 @@ private:
}
void FillConnection(const TDqConnection& connection, const TMap<ui64, ui32>& stagesMap,
- NKqpProto::TKqpPhyConnection& connectionProto, TExprContext& ctx)
+ NKqpProto::TKqpPhyConnection& connectionProto, TExprContext& ctx,
+ THashMap<TStringBuf, THashSet<TStringBuf>>& tablesMap)
{
auto inputStageIndex = stagesMap.FindPtr(connection.Output().Stage().Ref().UniqueId());
YQL_ENSURE(inputStageIndex, "stage #" << connection.Output().Stage().Ref().UniqueId() << " not found in stages map: "
@@ -815,7 +905,8 @@ private:
auto tableMeta = TablesData->ExistingTable(Cluster, streamLookup.Table().Path()).Metadata;
YQL_ENSURE(tableMeta);
- FillTable(streamLookup.Table(), *streamLookupProto.MutableTable());
+ FillTablesMap(streamLookup.Table(), streamLookup.Columns(), tablesMap);
+ FillTableId(streamLookup.Table(), *streamLookupProto.MutableTable());
const auto lookupKeysType = streamLookup.LookupKeysType().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType();
YQL_ENSURE(lookupKeysType, "Empty stream lookup keys type");
diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto
index 622a4ab2bc..3d7b438be8 100644
--- a/ydb/core/protos/kqp.proto
+++ b/ydb/core/protos/kqp.proto
@@ -584,7 +584,7 @@ message TEvKillScanTablet {
}
message TKqpStreamLookupSettings {
- optional NKqpProto.TKqpPhyTable Table = 1;
+ optional NKqpProto.TKqpPhyTableId Table = 1;
repeated string KeyColumns = 2;
repeated string Columns = 3;
optional TKqpSnapshot Snapshot = 4;
diff --git a/ydb/core/protos/kqp_physical.proto b/ydb/core/protos/kqp_physical.proto
index 854dafe475..a31e49c817 100644
--- a/ydb/core/protos/kqp_physical.proto
+++ b/ydb/core/protos/kqp_physical.proto
@@ -47,12 +47,25 @@ message TKqpPhyResultBinding {
}
}
-message TKqpPhyColumn {
+message TKqpPhyColumnId {
uint32 Id = 1;
string Name = 2;
}
-message TKqpPhyTable {
+message TKqpPhyColumn {
+ TKqpPhyColumnId Id = 1;
+ uint32 TypeId = 2;
+ string PgTypeName = 3;
+}
+
+enum EKqpPhyTableKind {
+ TABLE_KIND_UNSPECIFIED = 0;
+ TABLE_KIND_DS = 1;
+ TABLE_KIND_OLAP = 2;
+ TABLE_KIND_SYS_VIEW = 3;
+}
+
+message TKqpPhyTableId {
string Path = 1;
uint64 OwnerId = 2;
uint64 TableId = 3;
@@ -60,6 +73,13 @@ message TKqpPhyTable {
uint64 Version = 5;
}
+message TKqpPhyTable {
+ TKqpPhyTableId Id = 1;
+ EKqpPhyTableKind Kind = 2;
+ map<uint32, TKqpPhyColumn> Columns = 3;
+ repeated TKqpPhyColumnId KeyColumns = 4;
+}
+
message TKqpPhyParamValue {
string ParamName = 1;
}
@@ -165,8 +185,8 @@ message TKqpPhyOpReadRanges {
}
message TKqpPhyTableOperation {
- TKqpPhyTable Table = 1;
- repeated TKqpPhyColumn Columns = 2;
+ TKqpPhyTableId Table = 1;
+ repeated TKqpPhyColumnId Columns = 2;
oneof Type {
TKqpPhyOpReadRange ReadRange = 3;
@@ -213,7 +233,7 @@ message TKqpPhyCnMerge {
}
message TKqpPhyCnStreamLookup {
- TKqpPhyTable Table = 1;
+ TKqpPhyTableId Table = 1;
repeated string KeyColumns = 2;
repeated string Columns = 3;
bytes LookupKeysType = 4;
@@ -240,8 +260,8 @@ message TKqpPhyConnection {
}
message TKqpReadRangesSource {
- TKqpPhyTable Table = 1;
- repeated TKqpPhyColumn Columns = 2;
+ TKqpPhyTableId Table = 1;
+ repeated TKqpPhyColumnId Columns = 2;
TKqpPhyValue ItemsLimit = 3;
bool Reverse = 4;
bool Sorted = 5;
@@ -296,6 +316,7 @@ message TKqpPhyTx {
repeated TKqpPhyParamBinding ParamBindings = 4;
string Plan = 5;
bool HasEffects = 6; // at least one stage has flag TKqpPhyStage::IsEffectStage set
+ repeated TKqpPhyTable Tables = 7;
}
message TKqpTableInfo {
diff --git a/ydb/core/tx/datashard/datashard_ut_kqp_errors.cpp b/ydb/core/tx/datashard/datashard_ut_kqp_errors.cpp
index 432b1cfe5f..2642b82513 100644
--- a/ydb/core/tx/datashard/datashard_ut_kqp_errors.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_kqp_errors.cpp
@@ -81,16 +81,12 @@ Y_UNIT_TEST_SUITE(KqpErrors) {
Y_UNIT_TEST(ResolveTableError) {
TLocalFixture fixture;
- int skip = 1; // compile
auto mitm = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle> &ev) {
if (ev->GetTypeRewrite() == TEvTxProxySchemeCache::TEvNavigateKeySetResult::EventType) {
- if (skip-- == 0) {
- // fail on execution phase (kqp_table_resolver.cpp)
- auto event = ev.Get()->Get<TEvTxProxySchemeCache::TEvNavigateKeySetResult>();
- event->Request->ErrorCount = 1;
- auto& entries = event->Request->ResultSet;
- entries[0].Status = NSchemeCache::TSchemeCacheNavigate::EStatus::LookupError;
- }
+ auto event = ev.Get()->Get<TEvTxProxySchemeCache::TEvNavigateKeySetResult>();
+ event->Request->ErrorCount = 1;
+ auto& entries = event->Request->ResultSet;
+ entries[0].Status = NSchemeCache::TSchemeCacheNavigate::EStatus::LookupError;
}
return TTestActorRuntime::EEventAction::PROCESS;
};