aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorulya-sidorina <yulia@ydb.tech>2023-02-27 15:50:08 +0300
committerulya-sidorina <yulia@ydb.tech>2023-02-27 15:50:08 +0300
commit961957a343b6076cb28628e391e68310d00b80d5 (patch)
tree03f22474fef2cc0042cba6c6261b64da8f32382e
parentf2568a74e72d2b3748d6f5deb3e653c31efe112b (diff)
downloadydb-961957a343b6076cb28628e391e68310d00b80d5.tar.gz
disable head reads for stream lookup
fix(kqp): disable head read for stream lookup
-rw-r--r--ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp16
1 files changed, 3 insertions, 13 deletions
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
index ed1fee9b20..f5f888a86c 100644
--- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
@@ -33,7 +33,6 @@ public:
, TableId(MakeTableId(settings.GetTable()))
, Snapshot(settings.GetSnapshot().GetStep(), settings.GetSnapshot().GetTxId())
, LockTxId(settings.HasLockTxId() ? settings.GetLockTxId() : TMaybe<ui64>())
- , ImmediateTx(settings.GetImmediateTx())
, SchemeCacheRequestTimeout(SCHEME_CACHE_REQUEST_TIMEOUT) {
KeyColumns.reserve(settings.GetKeyColumns().size());
@@ -273,11 +272,6 @@ private:
Locks.push_back(lock);
}
- if (!Snapshot.IsValid() && !record.GetFinished()) {
- // HEAD read was converted to repeatable read
- Snapshot = IKqpGateway::TKqpSnapshot(record.GetSnapshot().GetStep(), record.GetSnapshot().GetTxId());
- }
-
switch (record.GetStatus().GetCode()) {
case Ydb::StatusIds::SUCCESS:
break;
@@ -492,12 +486,9 @@ private:
THolder<TEvDataShard::TEvRead> request(new TEvDataShard::TEvRead());
auto& record = request->Record;
- if (Snapshot.IsValid()) {
- record.MutableSnapshot()->SetStep(Snapshot.Step);
- record.MutableSnapshot()->SetTxId(Snapshot.TxId);
- } else {
- YQL_ENSURE(ImmediateTx, "HEAD reading is only available for immediate txs");
- }
+ YQL_ENSURE(Snapshot.IsValid(), "Invalid snapshot value");
+ record.MutableSnapshot()->SetStep(Snapshot.Step);
+ record.MutableSnapshot()->SetTxId(Snapshot.TxId);
if (LockTxId && BrokenLocks.empty()) {
record.SetLockTxId(*LockTxId);
@@ -616,7 +607,6 @@ private:
const TTableId TableId;
IKqpGateway::TKqpSnapshot Snapshot;
const TMaybe<ui64> LockTxId;
- const bool ImmediateTx;
std::vector<TSysTables::TTableColumnInfo*> LookupKeyColumns;
std::unordered_map<TString, TSysTables::TTableColumnInfo> KeyColumns;
std::vector<TSysTables::TTableColumnInfo> Columns;