aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorssmike <ssmike@ydb.tech>2023-01-25 17:13:36 +0300
committerssmike <ssmike@ydb.tech>2023-01-25 17:13:36 +0300
commit5d94d302eae60eee6caa3212044249b952be0317 (patch)
tree7f2927575b04716ad0d69700c10f7b8217381b4a
parentd5a7349e603909130b6d390ffb15225953b1a378 (diff)
downloadydb-5d94d302eae60eee6caa3212044249b952be0317.tar.gz
fix table metadata versions in iterator reads
-rw-r--r--ydb/core/kqp/compile_service/kqp_compile_request.cpp22
1 files changed, 18 insertions, 4 deletions
diff --git a/ydb/core/kqp/compile_service/kqp_compile_request.cpp b/ydb/core/kqp/compile_service/kqp_compile_request.cpp
index 4af14f68fe..3e846d53ac 100644
--- a/ydb/core/kqp/compile_service/kqp_compile_request.cpp
+++ b/ydb/core/kqp/compile_service/kqp_compile_request.cpp
@@ -153,13 +153,27 @@ private:
private:
void FillTables(const NKqpProto::TKqpPhyTx& phyTx) {
for (const auto& stage : phyTx.GetStages()) {
- for (const auto& tableOp : stage.GetTableOps()) {
- TTableId tableId(tableOp.GetTable().GetOwnerId(), tableOp.GetTable().GetTableId());
+ auto addTable = [&](const NKqpProto::TKqpPhyTableId& table) {
+ TTableId tableId(table.GetOwnerId(), table.GetTableId());
auto it = TableVersions.find(tableId);
if (it != TableVersions.end()) {
- Y_ENSURE(it->second == tableOp.GetTable().GetVersion());
+ Y_ENSURE(it->second == table.GetVersion());
} else {
- TableVersions.emplace(tableId, tableOp.GetTable().GetVersion());
+ TableVersions.emplace(tableId, table.GetVersion());
+ }
+ };
+ for (const auto& tableOp : stage.GetTableOps()) {
+ addTable(tableOp.GetTable());
+ }
+ for (const auto& input : stage.GetInputs()) {
+ if (input.GetTypeCase() == NKqpProto::TKqpPhyConnection::kStreamLookup) {
+ addTable(input.GetStreamLookup().GetTable());
+ }
+ }
+
+ for (const auto& source : stage.GetSources()) {
+ if (source.GetTypeCase() == NKqpProto::TKqpSource::kReadRangesSource) {
+ addTable(source.GetReadRangesSource().GetTable());
}
}
}