aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorssmike <ssmike@ydb.tech>2023-02-20 14:28:06 +0300
committerssmike <ssmike@ydb.tech>2023-02-20 14:28:06 +0300
commit912a15cc434f03b3cefdd258123473732c441617 (patch)
treeb9bced008629bba1c4f4c17d8767111098a24ecf
parentc3d5b651bb091783c1f78c74bb8c926b61a7f3a4 (diff)
downloadydb-912a15cc434f03b3cefdd258123473732c441617.tar.gz
keep taken snapshots on retries
-rw-r--r--ydb/core/kqp/runtime/kqp_read_actor.cpp41
1 files changed, 26 insertions, 15 deletions
diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp
index d3119ce17be..2700b9d625a 100644
--- a/ydb/core/kqp/runtime/kqp_read_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp
@@ -3,6 +3,8 @@
#include <ydb/core/kqp/runtime/kqp_scan_data.h>
#include <ydb/core/base/tablet_pipecache.h>
#include <ydb/core/engine/minikql/minikql_engine_host.h>
+
+#include <ydb/core/kqp/gateway/kqp_gateway.h>
#include <ydb/core/kqp/common/kqp_yql.h>
#include <ydb/core/protos/tx_datashard.pb.h>
#include <ydb/core/tx/datashard/datashard.h>
@@ -99,8 +101,6 @@ public:
size_t ResolveAttempt = 0;
size_t RetryAttempt = 0;
- bool NeedResolve = false;
-
void AssignContinuationToken(TShardState* state) {
if (state->LastKey.DataSize() != 0) {
LastKey = std::move(state->LastKey);
@@ -342,6 +342,7 @@ public:
void Bootstrap() {
LogPrefix = TStringBuilder() << "SelfId: " << this->SelfId() << ", " << LogPrefix;
+ Snapshot = IKqpGateway::TKqpSnapshot(Settings.GetSnapshot().GetStep(), Settings.GetSnapshot().GetTxId());
THolder<TShardState> stateHolder = MakeHolder<TShardState>(Settings.GetShardIdHint());
PendingShards.PushBack(stateHolder.Get());
auto& state = *stateHolder.Release();
@@ -363,7 +364,6 @@ public:
}
if (!Settings.HasShardIdHint()) {
- state.NeedResolve = true;
ResolveShard(&state);
} else {
StartTableScan();
@@ -485,15 +485,20 @@ public:
return;
}
- if (keyDesc->GetPartitions().size() == 1 && !state->NeedResolve) {
- // we re-resolved the same shard
- NYql::TIssues issues;
- for (auto& issue : state->Issues) {
- issues.AddIssue(issue.message());
+ if (keyDesc->GetPartitions().size() == 1) {
+ auto& partition = keyDesc->GetPartitions()[0];
+ if (partition.ShardId == state->TabletId) {
+ // we re-resolved the same shard
+ NYql::TIssues issues;
+ for (auto& issue : state->Issues) {
+ issues.AddIssue(issue.message());
+ }
+ RuntimeError(TStringBuilder() << "Too many retries for shard " << state->TabletId, NDqProto::StatusIds::StatusIds::INTERNAL_ERROR, issues);
+ PendingShards.PushBack(state.Release());
+ return;
}
- RuntimeError(TStringBuilder() << "Too many retries for shard " << state->TabletId, NDqProto::StatusIds::StatusIds::INTERNAL_ERROR, issues);
- PendingShards.PushBack(state.Release());
- return;
+ } else if (!Snapshot.IsValid()) {
+ return RuntimeError("inconsistent reads after shards split", NDqProto::StatusIds::INTERNAL_ERROR);
}
if (keyDesc->GetPartitions().empty()) {
@@ -637,9 +642,9 @@ public:
}
}
- if (Settings.HasSnapshot()) {
- record.MutableSnapshot()->SetTxId(Settings.GetSnapshot().GetTxId());
- record.MutableSnapshot()->SetStep(Settings.GetSnapshot().GetStep());
+ if (Snapshot.IsValid()) {
+ record.MutableSnapshot()->SetTxId(Snapshot.TxId);
+ record.MutableSnapshot()->SetStep(Snapshot.Step);
}
//if (RuntimeSettings.Timeout) {
@@ -705,6 +710,10 @@ public:
Locks.push_back(lock);
}
+ if (!Snapshot.IsValid()) {
+ Snapshot = IKqpGateway::TKqpSnapshot(record.GetSnapshot().GetStep(), record.GetSnapshot().GetTxId());
+ }
+
for (auto& lock : record.GetBrokenTxLocks()) {
BrokenLocks.push_back(lock);
}
@@ -824,7 +833,7 @@ public:
NMiniKQL::FillSystemColumn(rowItems[resultColumnIndex], shardId, tag, type);
} else {
rowItems[resultColumnIndex] = NMiniKQL::GetCellValue(row[columnIndex], type);
- rowSize += row[columnIndex].Size();
+ rowSize += row[columnIndex].Size();
columnIndex += 1;
}
}
@@ -1019,6 +1028,8 @@ private:
TVector<NKikimrTxDataShard::TLock> Locks;
TVector<NKikimrTxDataShard::TLock> BrokenLocks;
+ IKqpGateway::TKqpSnapshot Snapshot;
+
ui32 MaxInFlight = 1024;
TString LogPrefix;
TTableId TableId;