diff options
author | ssmike <ssmike@ydb.tech> | 2023-01-26 12:36:03 +0300 |
---|---|---|
committer | ssmike <ssmike@ydb.tech> | 2023-01-26 12:36:03 +0300 |
commit | 0b05fe8f05e9aa801cf6f247e17f1d5271e86285 (patch) | |
tree | 86834a74ae81f4482ecf1324af1346aba62e1c1a | |
parent | b5d61d4ff242e2447bd415fdd02a2935db0c3a4a (diff) | |
download | ydb-0b05fe8f05e9aa801cf6f247e17f1d5271e86285.tar.gz |
Fix snapshots in iterator range reads
-rw-r--r-- | ydb/core/kqp/common/kqp_prepared_query.cpp | 5 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_planner.cpp | 19 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_read_actor.cpp | 10 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_session_actor.cpp | 6 |
4 files changed, 37 insertions, 3 deletions
diff --git a/ydb/core/kqp/common/kqp_prepared_query.cpp b/ydb/core/kqp/common/kqp_prepared_query.cpp index 62cd68f2a09..594179454bb 100644 --- a/ydb/core/kqp/common/kqp_prepared_query.cpp +++ b/ydb/core/kqp/common/kqp_prepared_query.cpp @@ -103,6 +103,11 @@ TPreparedQueryHolder::TPreparedQueryHolder(NKikimrKqp::TPreparedQuery* proto, tablesSet.insert(input.GetStreamLookup().GetTable().GetPath()); } } + for (const auto& source : stage.GetSources()) { + if (source.GetTypeCase() == NKqpProto::TKqpSource::kReadRangesSource) { + tablesSet.insert(source.GetReadRangesSource().GetTable().GetPath()); + } + } } } diff --git a/ydb/core/kqp/executer_actor/kqp_planner.cpp b/ydb/core/kqp/executer_actor/kqp_planner.cpp index 86d18fc4f06..66aed6e8dfa 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.cpp +++ b/ydb/core/kqp/executer_actor/kqp_planner.cpp @@ -3,6 +3,7 @@ #include "kqp_planner_strategy.h" #include "kqp_shards_resolver.h" +#include <ydb/core/kqp/common/kqp_yql.h> #include <ydb/core/base/appdata.h> #include <ydb/core/base/wilson.h> #include <ydb/core/kqp/rm_service/kqp_resource_estimation.h> @@ -370,6 +371,24 @@ void TKqpPlanner::AddSnapshotInfoToTaskInputs(NYql::NDqProto::TDqTask& task) { transform->MutableSettings()->PackFrom(settings); } + if (input.HasSource() && input.GetSource().GetType() == NYql::KqpReadRangesSourceName) { + auto source = input.MutableSource(); + const google::protobuf::Any& settingsAny = source->GetSettings(); + + YQL_ENSURE(settingsAny.Is<NKikimrTxDataShard::TKqpReadRangesSourceSettings>(), "Expected settings type: " + << NKikimrTxDataShard::TKqpReadRangesSourceSettings::descriptor()->full_name() + << " , but got: " << settingsAny.type_url()); + + NKikimrTxDataShard::TKqpReadRangesSourceSettings settings; + YQL_ENSURE(settingsAny.UnpackTo(&settings), "Failed to unpack settings"); + + if (Snapshot.IsValid()) { + settings.MutableSnapshot()->SetStep(Snapshot.Step); + settings.MutableSnapshot()->SetTxId(Snapshot.TxId); + } + + source->MutableSettings()->PackFrom(settings); + } } } diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp index dbebf28b75d..1fb2fa24e56 100644 --- a/ydb/core/kqp/runtime/kqp_read_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp @@ -548,7 +548,7 @@ public: } } - if (record.HasSnapshot()) { + if (Settings.HasSnapshot()) { record.MutableSnapshot()->SetTxId(Settings.GetSnapshot().GetTxId()); record.MutableSnapshot()->SetStep(Settings.GetSnapshot().GetStep()); } @@ -586,7 +586,9 @@ public: CA_LOG_D(TStringBuilder() << "Send EvRead to shardId: " << state->TabletId << ", tablePath: " << Settings.GetTable().GetTablePath() << ", ranges: " << DebugPrintRanges(KeyColumnTypes, ev->Ranges, *AppData()->TypeRegistry) - << ", readId = " << id); + << ", readId = " << id + << " snapshot = (txid=" << Settings.GetSnapshot().GetTxId() << ",step=" << Settings.GetSnapshot().GetStep() << ")" + << " lockTxId = " << Settings.GetLockTxId()); ReadIdByTabletId[state->TabletId].push_back(id); Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvForward(ev.Release(), state->TabletId, true), @@ -602,7 +604,8 @@ public: } if (record.BrokenTxLocksSize()) { - return RuntimeError("Transaction locks invalidated.", NYql::NDqProto::StatusIds::ABORTED); + return RuntimeError("Broken locks", NYql::NDqProto::StatusIds::ABORTED, + {YqlIssue({}, TIssuesIds::KIKIMR_LOCKS_INVALIDATED, "Transaction locks on " + Settings.GetTable().GetTablePath() + " invalidated.")}); } if (record.GetStatus().GetCode() != Ydb::StatusIds::SUCCESS) { @@ -614,6 +617,7 @@ public: for (auto& lock : record.GetTxLocks()) { Locks.push_back(lock); } + CA_LOG_D("Taken " << Locks.size() << " locks"); Reads[id].SerializedContinuationToken = record.GetContinuationToken(); Reads[id].RegisterMessage(*ev->Get()); diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index bae9630412a..8358898ed56 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -730,6 +730,12 @@ public: tablesSet.insert(input.GetStreamLookup().GetTable().GetPath()); } } + + for (const auto& source : stage.GetSources()) { + if (source.GetTypeCase() == NKqpProto::TKqpSource::kReadRangesSource) { + tablesSet.insert(source.GetReadRangesSource().GetTable().GetPath()); + } + } } } TVector<TString> tables(tablesSet.begin(), tablesSet.end()); |