aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorssmike <ssmike@ydb.tech>2023-01-26 12:36:03 +0300
committerssmike <ssmike@ydb.tech>2023-01-26 12:36:03 +0300
commit0b05fe8f05e9aa801cf6f247e17f1d5271e86285 (patch)
tree86834a74ae81f4482ecf1324af1346aba62e1c1a
parentb5d61d4ff242e2447bd415fdd02a2935db0c3a4a (diff)
downloadydb-0b05fe8f05e9aa801cf6f247e17f1d5271e86285.tar.gz
Fix snapshots in iterator range reads
-rw-r--r--ydb/core/kqp/common/kqp_prepared_query.cpp5
-rw-r--r--ydb/core/kqp/executer_actor/kqp_planner.cpp19
-rw-r--r--ydb/core/kqp/runtime/kqp_read_actor.cpp10
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp6
4 files changed, 37 insertions, 3 deletions
diff --git a/ydb/core/kqp/common/kqp_prepared_query.cpp b/ydb/core/kqp/common/kqp_prepared_query.cpp
index 62cd68f2a09..594179454bb 100644
--- a/ydb/core/kqp/common/kqp_prepared_query.cpp
+++ b/ydb/core/kqp/common/kqp_prepared_query.cpp
@@ -103,6 +103,11 @@ TPreparedQueryHolder::TPreparedQueryHolder(NKikimrKqp::TPreparedQuery* proto,
tablesSet.insert(input.GetStreamLookup().GetTable().GetPath());
}
}
+ for (const auto& source : stage.GetSources()) {
+ if (source.GetTypeCase() == NKqpProto::TKqpSource::kReadRangesSource) {
+ tablesSet.insert(source.GetReadRangesSource().GetTable().GetPath());
+ }
+ }
}
}
diff --git a/ydb/core/kqp/executer_actor/kqp_planner.cpp b/ydb/core/kqp/executer_actor/kqp_planner.cpp
index 86d18fc4f06..66aed6e8dfa 100644
--- a/ydb/core/kqp/executer_actor/kqp_planner.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_planner.cpp
@@ -3,6 +3,7 @@
#include "kqp_planner_strategy.h"
#include "kqp_shards_resolver.h"
+#include <ydb/core/kqp/common/kqp_yql.h>
#include <ydb/core/base/appdata.h>
#include <ydb/core/base/wilson.h>
#include <ydb/core/kqp/rm_service/kqp_resource_estimation.h>
@@ -370,6 +371,24 @@ void TKqpPlanner::AddSnapshotInfoToTaskInputs(NYql::NDqProto::TDqTask& task) {
transform->MutableSettings()->PackFrom(settings);
}
+ if (input.HasSource() && input.GetSource().GetType() == NYql::KqpReadRangesSourceName) {
+ auto source = input.MutableSource();
+ const google::protobuf::Any& settingsAny = source->GetSettings();
+
+ YQL_ENSURE(settingsAny.Is<NKikimrTxDataShard::TKqpReadRangesSourceSettings>(), "Expected settings type: "
+ << NKikimrTxDataShard::TKqpReadRangesSourceSettings::descriptor()->full_name()
+ << " , but got: " << settingsAny.type_url());
+
+ NKikimrTxDataShard::TKqpReadRangesSourceSettings settings;
+ YQL_ENSURE(settingsAny.UnpackTo(&settings), "Failed to unpack settings");
+
+ if (Snapshot.IsValid()) {
+ settings.MutableSnapshot()->SetStep(Snapshot.Step);
+ settings.MutableSnapshot()->SetTxId(Snapshot.TxId);
+ }
+
+ source->MutableSettings()->PackFrom(settings);
+ }
}
}
diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp
index dbebf28b75d..1fb2fa24e56 100644
--- a/ydb/core/kqp/runtime/kqp_read_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp
@@ -548,7 +548,7 @@ public:
}
}
- if (record.HasSnapshot()) {
+ if (Settings.HasSnapshot()) {
record.MutableSnapshot()->SetTxId(Settings.GetSnapshot().GetTxId());
record.MutableSnapshot()->SetStep(Settings.GetSnapshot().GetStep());
}
@@ -586,7 +586,9 @@ public:
CA_LOG_D(TStringBuilder() << "Send EvRead to shardId: " << state->TabletId << ", tablePath: " << Settings.GetTable().GetTablePath()
<< ", ranges: " << DebugPrintRanges(KeyColumnTypes, ev->Ranges, *AppData()->TypeRegistry)
- << ", readId = " << id);
+ << ", readId = " << id
+ << " snapshot = (txid=" << Settings.GetSnapshot().GetTxId() << ",step=" << Settings.GetSnapshot().GetStep() << ")"
+ << " lockTxId = " << Settings.GetLockTxId());
ReadIdByTabletId[state->TabletId].push_back(id);
Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvForward(ev.Release(), state->TabletId, true),
@@ -602,7 +604,8 @@ public:
}
if (record.BrokenTxLocksSize()) {
- return RuntimeError("Transaction locks invalidated.", NYql::NDqProto::StatusIds::ABORTED);
+ return RuntimeError("Broken locks", NYql::NDqProto::StatusIds::ABORTED,
+ {YqlIssue({}, TIssuesIds::KIKIMR_LOCKS_INVALIDATED, "Transaction locks on " + Settings.GetTable().GetTablePath() + " invalidated.")});
}
if (record.GetStatus().GetCode() != Ydb::StatusIds::SUCCESS) {
@@ -614,6 +617,7 @@ public:
for (auto& lock : record.GetTxLocks()) {
Locks.push_back(lock);
}
+ CA_LOG_D("Taken " << Locks.size() << " locks");
Reads[id].SerializedContinuationToken = record.GetContinuationToken();
Reads[id].RegisterMessage(*ev->Get());
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index bae9630412a..8358898ed56 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -730,6 +730,12 @@ public:
tablesSet.insert(input.GetStreamLookup().GetTable().GetPath());
}
}
+
+ for (const auto& source : stage.GetSources()) {
+ if (source.GetTypeCase() == NKqpProto::TKqpSource::kReadRangesSource) {
+ tablesSet.insert(source.GetReadRangesSource().GetTable().GetPath());
+ }
+ }
}
}
TVector<TString> tables(tablesSet.begin(), tablesSet.end());