diff options
author | ssmike <ssmike@ydb.tech> | 2023-02-20 14:28:06 +0300 |
---|---|---|
committer | ssmike <ssmike@ydb.tech> | 2023-02-20 14:28:06 +0300 |
commit | 912a15cc434f03b3cefdd258123473732c441617 (patch) | |
tree | b9bced008629bba1c4f4c17d8767111098a24ecf | |
parent | c3d5b651bb091783c1f78c74bb8c926b61a7f3a4 (diff) | |
download | ydb-912a15cc434f03b3cefdd258123473732c441617.tar.gz |
keep taken snapshots on retries
-rw-r--r-- | ydb/core/kqp/runtime/kqp_read_actor.cpp | 41 |
1 files changed, 26 insertions, 15 deletions
diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp index d3119ce17be..2700b9d625a 100644 --- a/ydb/core/kqp/runtime/kqp_read_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp @@ -3,6 +3,8 @@ #include <ydb/core/kqp/runtime/kqp_scan_data.h> #include <ydb/core/base/tablet_pipecache.h> #include <ydb/core/engine/minikql/minikql_engine_host.h> + +#include <ydb/core/kqp/gateway/kqp_gateway.h> #include <ydb/core/kqp/common/kqp_yql.h> #include <ydb/core/protos/tx_datashard.pb.h> #include <ydb/core/tx/datashard/datashard.h> @@ -99,8 +101,6 @@ public: size_t ResolveAttempt = 0; size_t RetryAttempt = 0; - bool NeedResolve = false; - void AssignContinuationToken(TShardState* state) { if (state->LastKey.DataSize() != 0) { LastKey = std::move(state->LastKey); @@ -342,6 +342,7 @@ public: void Bootstrap() { LogPrefix = TStringBuilder() << "SelfId: " << this->SelfId() << ", " << LogPrefix; + Snapshot = IKqpGateway::TKqpSnapshot(Settings.GetSnapshot().GetStep(), Settings.GetSnapshot().GetTxId()); THolder<TShardState> stateHolder = MakeHolder<TShardState>(Settings.GetShardIdHint()); PendingShards.PushBack(stateHolder.Get()); auto& state = *stateHolder.Release(); @@ -363,7 +364,6 @@ public: } if (!Settings.HasShardIdHint()) { - state.NeedResolve = true; ResolveShard(&state); } else { StartTableScan(); @@ -485,15 +485,20 @@ public: return; } - if (keyDesc->GetPartitions().size() == 1 && !state->NeedResolve) { - // we re-resolved the same shard - NYql::TIssues issues; - for (auto& issue : state->Issues) { - issues.AddIssue(issue.message()); + if (keyDesc->GetPartitions().size() == 1) { + auto& partition = keyDesc->GetPartitions()[0]; + if (partition.ShardId == state->TabletId) { + // we re-resolved the same shard + NYql::TIssues issues; + for (auto& issue : state->Issues) { + issues.AddIssue(issue.message()); + } + RuntimeError(TStringBuilder() << "Too many retries for shard " << state->TabletId, NDqProto::StatusIds::StatusIds::INTERNAL_ERROR, issues); + PendingShards.PushBack(state.Release()); + return; } - RuntimeError(TStringBuilder() << "Too many retries for shard " << state->TabletId, NDqProto::StatusIds::StatusIds::INTERNAL_ERROR, issues); - PendingShards.PushBack(state.Release()); - return; + } else if (!Snapshot.IsValid()) { + return RuntimeError("inconsistent reads after shards split", NDqProto::StatusIds::INTERNAL_ERROR); } if (keyDesc->GetPartitions().empty()) { @@ -637,9 +642,9 @@ public: } } - if (Settings.HasSnapshot()) { - record.MutableSnapshot()->SetTxId(Settings.GetSnapshot().GetTxId()); - record.MutableSnapshot()->SetStep(Settings.GetSnapshot().GetStep()); + if (Snapshot.IsValid()) { + record.MutableSnapshot()->SetTxId(Snapshot.TxId); + record.MutableSnapshot()->SetStep(Snapshot.Step); } //if (RuntimeSettings.Timeout) { @@ -705,6 +710,10 @@ public: Locks.push_back(lock); } + if (!Snapshot.IsValid()) { + Snapshot = IKqpGateway::TKqpSnapshot(record.GetSnapshot().GetStep(), record.GetSnapshot().GetTxId()); + } + for (auto& lock : record.GetBrokenTxLocks()) { BrokenLocks.push_back(lock); } @@ -824,7 +833,7 @@ public: NMiniKQL::FillSystemColumn(rowItems[resultColumnIndex], shardId, tag, type); } else { rowItems[resultColumnIndex] = NMiniKQL::GetCellValue(row[columnIndex], type); - rowSize += row[columnIndex].Size(); + rowSize += row[columnIndex].Size(); columnIndex += 1; } } @@ -1019,6 +1028,8 @@ private: TVector<NKikimrTxDataShard::TLock> Locks; TVector<NKikimrTxDataShard::TLock> BrokenLocks; + IKqpGateway::TKqpSnapshot Snapshot; + ui32 MaxInFlight = 1024; TString LogPrefix; TTableId TableId; |