diff options
author | Nikita Vasilev <ns-vasilev@ydb.tech> | 2024-11-12 12:22:10 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-11-12 12:22:10 +0300 |
commit | f1bf21617534c893e66ebace4e84eddd1432fa82 (patch) | |
tree | f45fa32c2486c9214152f45006eb94c36b4c8f42 | |
parent | e62a6425cc6cbabb06b09d03cfc8f9b6ad1cb75c (diff) | |
download | ydb-f1bf21617534c893e66ebace4e84eddd1432fa82.tar.gz |
EvWrite: add mvcc snapshot (#11474)
-rw-r--r-- | ydb/core/kqp/common/kqp_tx.cpp | 4 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_impl.h | 4 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_write_actor.cpp | 12 | ||||
-rw-r--r-- | ydb/core/protos/kqp.proto | 1 |
4 files changed, 21 insertions, 0 deletions
diff --git a/ydb/core/kqp/common/kqp_tx.cpp b/ydb/core/kqp/common/kqp_tx.cpp index e2758eff627..f7a07ee8d16 100644 --- a/ydb/core/kqp/common/kqp_tx.cpp +++ b/ydb/core/kqp/common/kqp_tx.cpp @@ -169,6 +169,10 @@ bool NeedSnapshot(const TKqpTransactionContext& txCtx, const NYql::TKikimrConfig if (!commitTx) return true; + if (HasOlapTableWriteInTx(physicalQuery) || HasOlapTableReadInTx(physicalQuery)) { + return true; + } + size_t readPhases = 0; bool hasEffects = false; bool hasSourceRead = false; diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index 90d2813e047..29e118432b6 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -952,6 +952,10 @@ protected: if (!settings.GetInconsistentTx() && !settings.GetIsOlap()) { ActorIdToProto(BufferActorId, settings.MutableBufferActorId()); } + if (GetSnapshot().IsValid() && settings.GetIsOlap()) { + settings.MutableMvccSnapshot()->SetStep(GetSnapshot().Step); + settings.MutableMvccSnapshot()->SetTxId(GetSnapshot().TxId); + } output.SinkSettings.ConstructInPlace(); output.SinkSettings->PackFrom(settings); } else { diff --git a/ydb/core/kqp/runtime/kqp_write_actor.cpp b/ydb/core/kqp/runtime/kqp_write_actor.cpp index 65bbffdf93c..067333d29d1 100644 --- a/ydb/core/kqp/runtime/kqp_write_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_write_actor.cpp @@ -170,12 +170,14 @@ public: const bool inconsistentTx, const NMiniKQL::TTypeEnvironment& typeEnv, std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc, + const std::optional<NKikimrDataEvents::TMvccSnapshot>& mvccSnapshot, const IKqpTransactionManagerPtr& txManager, const TActorId sessionActorId, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId) : TypeEnv(typeEnv) , Alloc(alloc) + , MvccSnapshot(mvccSnapshot) , TableId(tableId) , TablePath(tablePath) , LockTxId(lockTxId) @@ -812,6 +814,9 @@ public: FillEvWritePrepare(evWrite.get(), shardId, *TxId, TxManager); } else if (!InconsistentTx) { evWrite->SetLockId(LockTxId, LockNodeId); + if (MvccSnapshot) { + *evWrite->Record.MutableMvccSnapshot() = *MvccSnapshot; + } } const auto serializationResult = ShardedWriteController->SerializeMessageToPayload(shardId, *evWrite); @@ -953,6 +958,8 @@ public: const NMiniKQL::TTypeEnvironment& TypeEnv; std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc; + std::optional<NKikimrDataEvents::TMvccSnapshot> MvccSnapshot; + const TTableId TableId; const TString TablePath; @@ -1018,6 +1025,9 @@ public: Settings.GetInconsistentTx(), TypeEnv, Alloc, + Settings.GetIsOlap() + ? std::optional<NKikimrDataEvents::TMvccSnapshot>{Settings.GetMvccSnapshot()} + : std::optional<NKikimrDataEvents::TMvccSnapshot>{}, nullptr, TActorId{}, Counters, @@ -1205,6 +1215,7 @@ struct TTransactionSettings { ui64 LockTxId = 0; ui64 LockNodeId = 0; bool InconsistentTx = false; + NKikimrDataEvents::TMvccSnapshot MvccSnapshot; }; struct TWriteSettings { @@ -1333,6 +1344,7 @@ public: InconsistentTx, TypeEnv, Alloc, + std::nullopt, TxManager, SessionActorId, Counters, diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto index b487df72e03..fb46e899f70 100644 --- a/ydb/core/protos/kqp.proto +++ b/ydb/core/protos/kqp.proto @@ -719,6 +719,7 @@ message TKqpTableSinkSettings { optional NActorsProto.TActorId BufferActorId = 10; optional int64 Priority = 11; optional bool IsOlap = 12; + optional NKikimrDataEvents.TMvccSnapshot MvccSnapshot = 13; } message TKqpStreamLookupSettings { |