diff options
author | ssmike <ssmike@ydb.tech> | 2023-01-25 17:13:36 +0300 |
---|---|---|
committer | ssmike <ssmike@ydb.tech> | 2023-01-25 17:13:36 +0300 |
commit | 5d94d302eae60eee6caa3212044249b952be0317 (patch) | |
tree | 7f2927575b04716ad0d69700c10f7b8217381b4a | |
parent | d5a7349e603909130b6d390ffb15225953b1a378 (diff) | |
download | ydb-5d94d302eae60eee6caa3212044249b952be0317.tar.gz |
fix table metadata versions in iterator reads
-rw-r--r-- | ydb/core/kqp/compile_service/kqp_compile_request.cpp | 22 |
1 files changed, 18 insertions, 4 deletions
diff --git a/ydb/core/kqp/compile_service/kqp_compile_request.cpp b/ydb/core/kqp/compile_service/kqp_compile_request.cpp index 4af14f68fe..3e846d53ac 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_request.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_request.cpp @@ -153,13 +153,27 @@ private: private: void FillTables(const NKqpProto::TKqpPhyTx& phyTx) { for (const auto& stage : phyTx.GetStages()) { - for (const auto& tableOp : stage.GetTableOps()) { - TTableId tableId(tableOp.GetTable().GetOwnerId(), tableOp.GetTable().GetTableId()); + auto addTable = [&](const NKqpProto::TKqpPhyTableId& table) { + TTableId tableId(table.GetOwnerId(), table.GetTableId()); auto it = TableVersions.find(tableId); if (it != TableVersions.end()) { - Y_ENSURE(it->second == tableOp.GetTable().GetVersion()); + Y_ENSURE(it->second == table.GetVersion()); } else { - TableVersions.emplace(tableId, tableOp.GetTable().GetVersion()); + TableVersions.emplace(tableId, table.GetVersion()); + } + }; + for (const auto& tableOp : stage.GetTableOps()) { + addTable(tableOp.GetTable()); + } + for (const auto& input : stage.GetInputs()) { + if (input.GetTypeCase() == NKqpProto::TKqpPhyConnection::kStreamLookup) { + addTable(input.GetStreamLookup().GetTable()); + } + } + + for (const auto& source : stage.GetSources()) { + if (source.GetTypeCase() == NKqpProto::TKqpSource::kReadRangesSource) { + addTable(source.GetReadRangesSource().GetTable()); } } } |