diff options
author | Nikita Vasilev <ns-vasilev@ydb.tech> | 2025-06-05 18:25:53 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-06-05 18:25:53 +0300 |
commit | edea341fc7a059763c53477844be2efc16aa6b3b (patch) | |
tree | 46c0c32afc903ea02e96707708413ab3ce31bf8a | |
parent | c0002d6472677872968e2510d3ee8bf7084ac1c2 (diff) | |
download | ydb-edea341fc7a059763c53477844be2efc16aa6b3b.tar.gz |
Flush on insert before distributed commit (#19293)
-rw-r--r-- | ydb/core/kqp/runtime/kqp_write_actor.cpp | 36 |
1 files changed, 29 insertions, 7 deletions
diff --git a/ydb/core/kqp/runtime/kqp_write_actor.cpp b/ydb/core/kqp/runtime/kqp_write_actor.cpp index 07ec02d00f8..bb4c7dea7c2 100644 --- a/ydb/core/kqp/runtime/kqp_write_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_write_actor.cpp @@ -1897,6 +1897,9 @@ public: settings.Priority); } + // At current time only insert operation can fail. + NeedToFlushBeforeCommit |= (settings.OperationType == NKikimrDataEvents::TEvWrite::TOperation::OPERATION_INSERT); + writeInfo.Actors.at(settings.TableId.PathId).WriteActor->Open( token.Cookie, settings.OperationType, @@ -2047,17 +2050,18 @@ public: return Process(); } - bool Prepare(const ui64 txId, NWilson::TTraceId traceId) { + bool Prepare(std::optional<NWilson::TTraceId> traceId) { UpdateTracingState("Commit", std::move(traceId)); OperationStartTime = TInstant::Now(); CA_LOG_D("Start prepare for distributed commit"); YQL_ENSURE(State == EState::WRITING); + YQL_ENSURE(!NeedToFlushBeforeCommit); State = EState::PREPARING; CheckQueuesEmpty(); - TxId = txId; + AFL_ENSURE(TxId); ForEachWriteActor([&](TKqpTableWriteActor* actor, const TActorId) { - actor->SetPrepare(txId); + actor->SetPrepare(*TxId); }); Close(); if (!Process()) { @@ -2498,8 +2502,14 @@ public: TxManager->StartExecute(); ImmediateCommit(std::move(ev->TraceId)); } else { - TxManager->StartPrepare(); - Prepare(ev->Get()->TxId, std::move(ev->TraceId)); + AFL_ENSURE(ev->Get()->TxId); + TxId = ev->Get()->TxId; + if (NeedToFlushBeforeCommit) { + Flush(std::move(ev->TraceId)); + } else { + TxManager->StartPrepare(); + Prepare(std::move(ev->TraceId)); + } } } @@ -2879,6 +2889,14 @@ public: UpdateTracingState("Write", BufferWriteActorSpan.GetTraceId()); OnOperationFinished(Counters->BufferActorFlushLatencyHistogram); State = EState::WRITING; + AFL_ENSURE(!TxId || NeedToFlushBeforeCommit); // TxId => NeedToFlushBeforeCommit + NeedToFlushBeforeCommit = false; + if (TxId) { + TxManager->StartPrepare(); + Prepare(std::nullopt); + return; + } + Send<ESendingType::Tail>(ExecuterActorId, new TEvKqpBuffer::TEvResult{ BuildStats() }); @@ -2920,8 +2938,11 @@ public: ReplyErrorAndDieImpl(statusCode, std::move(issues)); } - void UpdateTracingState(const char* name, NWilson::TTraceId traceId) { - BufferWriteActorStateSpan = NWilson::TSpan(TWilsonKqp::BufferWriteActorState, std::move(traceId), + void UpdateTracingState(const char* name, std::optional<NWilson::TTraceId> traceId) { + if (!traceId) { + return; + } + BufferWriteActorStateSpan = NWilson::TSpan(TWilsonKqp::BufferWriteActorState, std::move(*traceId), name, NWilson::EFlags::AUTO_END); if (BufferWriteActorStateSpan.GetTraceId() != BufferWriteActorSpan.GetTraceId()) { BufferWriteActorStateSpan.Link(BufferWriteActorSpan.GetTraceId()); @@ -3025,6 +3046,7 @@ private: EState State; bool HasError = false; + bool NeedToFlushBeforeCommit = false; THashMap<TPathId, std::queue<TBufferWriteMessage>> RequestQueues; struct TAckMessage { |