aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorNikita Vasilev <ns-vasilev@ydb.tech>2025-06-05 18:25:53 +0300
committerGitHub <noreply@github.com>2025-06-05 18:25:53 +0300
commitedea341fc7a059763c53477844be2efc16aa6b3b (patch)
tree46c0c32afc903ea02e96707708413ab3ce31bf8a
parentc0002d6472677872968e2510d3ee8bf7084ac1c2 (diff)
downloadydb-edea341fc7a059763c53477844be2efc16aa6b3b.tar.gz
Flush on insert before distributed commit (#19293)
-rw-r--r--ydb/core/kqp/runtime/kqp_write_actor.cpp36
1 files changed, 29 insertions, 7 deletions
diff --git a/ydb/core/kqp/runtime/kqp_write_actor.cpp b/ydb/core/kqp/runtime/kqp_write_actor.cpp
index 07ec02d00f8..bb4c7dea7c2 100644
--- a/ydb/core/kqp/runtime/kqp_write_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_write_actor.cpp
@@ -1897,6 +1897,9 @@ public:
settings.Priority);
}
+ // At current time only insert operation can fail.
+ NeedToFlushBeforeCommit |= (settings.OperationType == NKikimrDataEvents::TEvWrite::TOperation::OPERATION_INSERT);
+
writeInfo.Actors.at(settings.TableId.PathId).WriteActor->Open(
token.Cookie,
settings.OperationType,
@@ -2047,17 +2050,18 @@ public:
return Process();
}
- bool Prepare(const ui64 txId, NWilson::TTraceId traceId) {
+ bool Prepare(std::optional<NWilson::TTraceId> traceId) {
UpdateTracingState("Commit", std::move(traceId));
OperationStartTime = TInstant::Now();
CA_LOG_D("Start prepare for distributed commit");
YQL_ENSURE(State == EState::WRITING);
+ YQL_ENSURE(!NeedToFlushBeforeCommit);
State = EState::PREPARING;
CheckQueuesEmpty();
- TxId = txId;
+ AFL_ENSURE(TxId);
ForEachWriteActor([&](TKqpTableWriteActor* actor, const TActorId) {
- actor->SetPrepare(txId);
+ actor->SetPrepare(*TxId);
});
Close();
if (!Process()) {
@@ -2498,8 +2502,14 @@ public:
TxManager->StartExecute();
ImmediateCommit(std::move(ev->TraceId));
} else {
- TxManager->StartPrepare();
- Prepare(ev->Get()->TxId, std::move(ev->TraceId));
+ AFL_ENSURE(ev->Get()->TxId);
+ TxId = ev->Get()->TxId;
+ if (NeedToFlushBeforeCommit) {
+ Flush(std::move(ev->TraceId));
+ } else {
+ TxManager->StartPrepare();
+ Prepare(std::move(ev->TraceId));
+ }
}
}
@@ -2879,6 +2889,14 @@ public:
UpdateTracingState("Write", BufferWriteActorSpan.GetTraceId());
OnOperationFinished(Counters->BufferActorFlushLatencyHistogram);
State = EState::WRITING;
+ AFL_ENSURE(!TxId || NeedToFlushBeforeCommit); // TxId => NeedToFlushBeforeCommit
+ NeedToFlushBeforeCommit = false;
+ if (TxId) {
+ TxManager->StartPrepare();
+ Prepare(std::nullopt);
+ return;
+ }
+
Send<ESendingType::Tail>(ExecuterActorId, new TEvKqpBuffer::TEvResult{
BuildStats()
});
@@ -2920,8 +2938,11 @@ public:
ReplyErrorAndDieImpl(statusCode, std::move(issues));
}
- void UpdateTracingState(const char* name, NWilson::TTraceId traceId) {
- BufferWriteActorStateSpan = NWilson::TSpan(TWilsonKqp::BufferWriteActorState, std::move(traceId),
+ void UpdateTracingState(const char* name, std::optional<NWilson::TTraceId> traceId) {
+ if (!traceId) {
+ return;
+ }
+ BufferWriteActorStateSpan = NWilson::TSpan(TWilsonKqp::BufferWriteActorState, std::move(*traceId),
name, NWilson::EFlags::AUTO_END);
if (BufferWriteActorStateSpan.GetTraceId() != BufferWriteActorSpan.GetTraceId()) {
BufferWriteActorStateSpan.Link(BufferWriteActorSpan.GetTraceId());
@@ -3025,6 +3046,7 @@ private:
EState State;
bool HasError = false;
+ bool NeedToFlushBeforeCommit = false;
THashMap<TPathId, std::queue<TBufferWriteMessage>> RequestQueues;
struct TAckMessage {