aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorNikita Vasilev <ns-vasilev@ydb.tech>2024-11-12 12:22:10 +0300
committerGitHub <noreply@github.com>2024-11-12 12:22:10 +0300
commitf1bf21617534c893e66ebace4e84eddd1432fa82 (patch)
treef45fa32c2486c9214152f45006eb94c36b4c8f42
parente62a6425cc6cbabb06b09d03cfc8f9b6ad1cb75c (diff)
downloadydb-f1bf21617534c893e66ebace4e84eddd1432fa82.tar.gz
EvWrite: add mvcc snapshot (#11474)
-rw-r--r--ydb/core/kqp/common/kqp_tx.cpp4
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.h4
-rw-r--r--ydb/core/kqp/runtime/kqp_write_actor.cpp12
-rw-r--r--ydb/core/protos/kqp.proto1
4 files changed, 21 insertions, 0 deletions
diff --git a/ydb/core/kqp/common/kqp_tx.cpp b/ydb/core/kqp/common/kqp_tx.cpp
index e2758eff627..f7a07ee8d16 100644
--- a/ydb/core/kqp/common/kqp_tx.cpp
+++ b/ydb/core/kqp/common/kqp_tx.cpp
@@ -169,6 +169,10 @@ bool NeedSnapshot(const TKqpTransactionContext& txCtx, const NYql::TKikimrConfig
if (!commitTx)
return true;
+ if (HasOlapTableWriteInTx(physicalQuery) || HasOlapTableReadInTx(physicalQuery)) {
+ return true;
+ }
+
size_t readPhases = 0;
bool hasEffects = false;
bool hasSourceRead = false;
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
index 90d2813e047..29e118432b6 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
@@ -952,6 +952,10 @@ protected:
if (!settings.GetInconsistentTx() && !settings.GetIsOlap()) {
ActorIdToProto(BufferActorId, settings.MutableBufferActorId());
}
+ if (GetSnapshot().IsValid() && settings.GetIsOlap()) {
+ settings.MutableMvccSnapshot()->SetStep(GetSnapshot().Step);
+ settings.MutableMvccSnapshot()->SetTxId(GetSnapshot().TxId);
+ }
output.SinkSettings.ConstructInPlace();
output.SinkSettings->PackFrom(settings);
} else {
diff --git a/ydb/core/kqp/runtime/kqp_write_actor.cpp b/ydb/core/kqp/runtime/kqp_write_actor.cpp
index 65bbffdf93c..067333d29d1 100644
--- a/ydb/core/kqp/runtime/kqp_write_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_write_actor.cpp
@@ -170,12 +170,14 @@ public:
const bool inconsistentTx,
const NMiniKQL::TTypeEnvironment& typeEnv,
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc,
+ const std::optional<NKikimrDataEvents::TMvccSnapshot>& mvccSnapshot,
const IKqpTransactionManagerPtr& txManager,
const TActorId sessionActorId,
TIntrusivePtr<TKqpCounters> counters,
NWilson::TTraceId traceId)
: TypeEnv(typeEnv)
, Alloc(alloc)
+ , MvccSnapshot(mvccSnapshot)
, TableId(tableId)
, TablePath(tablePath)
, LockTxId(lockTxId)
@@ -812,6 +814,9 @@ public:
FillEvWritePrepare(evWrite.get(), shardId, *TxId, TxManager);
} else if (!InconsistentTx) {
evWrite->SetLockId(LockTxId, LockNodeId);
+ if (MvccSnapshot) {
+ *evWrite->Record.MutableMvccSnapshot() = *MvccSnapshot;
+ }
}
const auto serializationResult = ShardedWriteController->SerializeMessageToPayload(shardId, *evWrite);
@@ -953,6 +958,8 @@ public:
const NMiniKQL::TTypeEnvironment& TypeEnv;
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
+ std::optional<NKikimrDataEvents::TMvccSnapshot> MvccSnapshot;
+
const TTableId TableId;
const TString TablePath;
@@ -1018,6 +1025,9 @@ public:
Settings.GetInconsistentTx(),
TypeEnv,
Alloc,
+ Settings.GetIsOlap()
+ ? std::optional<NKikimrDataEvents::TMvccSnapshot>{Settings.GetMvccSnapshot()}
+ : std::optional<NKikimrDataEvents::TMvccSnapshot>{},
nullptr,
TActorId{},
Counters,
@@ -1205,6 +1215,7 @@ struct TTransactionSettings {
ui64 LockTxId = 0;
ui64 LockNodeId = 0;
bool InconsistentTx = false;
+ NKikimrDataEvents::TMvccSnapshot MvccSnapshot;
};
struct TWriteSettings {
@@ -1333,6 +1344,7 @@ public:
InconsistentTx,
TypeEnv,
Alloc,
+ std::nullopt,
TxManager,
SessionActorId,
Counters,
diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto
index b487df72e03..fb46e899f70 100644
--- a/ydb/core/protos/kqp.proto
+++ b/ydb/core/protos/kqp.proto
@@ -719,6 +719,7 @@ message TKqpTableSinkSettings {
optional NActorsProto.TActorId BufferActorId = 10;
optional int64 Priority = 11;
optional bool IsOlap = 12;
+ optional NKikimrDataEvents.TMvccSnapshot MvccSnapshot = 13;
}
message TKqpStreamLookupSettings {