aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorNikita Vasilev <ns-vasilev@ydb.tech>2025-02-13 18:52:04 +0300
committerGitHub <noreply@github.com>2025-02-13 15:52:04 +0000
commit51dd5e5c8dc5e5c037e7dc0b87f2812d92c30c4e (patch)
treefa5eba4af17062d415fa45d6d372eab38ceb48e3
parent8af7595650a3d7370a517a3d597348520ec4e3ff (diff)
downloadydb-51dd5e5c8dc5e5c037e7dc0b87f2812d92c30c4e.tar.gz
Fix lost data for stream EvWrite (#14538)
-rw-r--r--ydb/core/kqp/runtime/kqp_write_actor.cpp49
1 files changed, 26 insertions, 23 deletions
diff --git a/ydb/core/kqp/runtime/kqp_write_actor.cpp b/ydb/core/kqp/runtime/kqp_write_actor.cpp
index 8646dc0dbb..b53d05a20e 100644
--- a/ydb/core/kqp/runtime/kqp_write_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_write_actor.cpp
@@ -1649,14 +1649,11 @@ public:
writeInfo.WriteTableActor->Close(message.Token.Cookie);
}
- if (!message.Close) {
- YQL_ENSURE(false);
- AckQueue.push(TAckMessage{
- .ForwardActorId = message.From,
- .Token = message.Token,
- .DataSize = 0,
- });
- }
+ AckQueue.push(TAckMessage{
+ .ForwardActorId = message.From,
+ .Token = message.Token,
+ .DataSize = 0,
+ });
queue.pop();
}
@@ -2493,6 +2490,7 @@ public:
}
Y_UNUSED(dataSize);
if (TxManager->ConsumeCommitResult(shardId)) {
+ CA_LOG_D("Committed TxId=" << TxId.value_or(0));
OnOperationFinished(Counters->BufferActorCommitLatencyHistogram);
State = EState::FINISHED;
Send<ESendingType::Tail>(ExecuterActorId, new TEvKqpBuffer::TEvResult{
@@ -2710,15 +2708,29 @@ private:
void Handle(TEvBufferWriteResult::TPtr& result) {
CA_LOG_D("TKqpForwardWriteActor recieve EvBufferWriteResult from " << BufferActorId);
+ InFlight = false;
+
+ EgressStats.Bytes += DataSize;
+ EgressStats.Chunks++;
+ EgressStats.Splits++;
+ EgressStats.Resume();
+
+ Counters->ForwardActorWritesSizeHistogram->Collect(DataSize);
WriteToken = result->Get()->Token;
DataSize = 0;
- CA_LOG_D("Resume with freeSpace=" << GetFreeSpace());
- Callbacks->ResumeExecution();
+ if (Closed) {
+ CA_LOG_D("Finished");
+ Callbacks->OnAsyncOutputFinished(GetOutputIndex());
+ } else {
+ CA_LOG_D("Resume with freeSpace=" << GetFreeSpace());
+ Callbacks->ResumeExecution();
+ }
}
void WriteToBuffer() {
+ InFlight = true;
auto ev = std::make_unique<TEvBufferWrite>();
ev->Data = Batcher->Build();
@@ -2762,18 +2774,6 @@ private:
CA_LOG_D("Send data=" << DataSize << ", closed=" << Closed << ", bufferActorId=" << BufferActorId);
AFL_ENSURE(Send(BufferActorId, ev.release()));
-
- EgressStats.Bytes += DataSize;
- EgressStats.Chunks++;
- EgressStats.Splits++;
- EgressStats.Resume();
-
- Counters->ForwardActorWritesSizeHistogram->Collect(DataSize);
-
- if (Closed) {
- CA_LOG_D("Finished");
- Callbacks->OnAsyncOutputFinished(GetOutputIndex());
- }
}
void CommitState(const NYql::NDqProto::TCheckpoint&) final {};
@@ -2788,7 +2788,9 @@ private:
}
i64 GetFreeSpace() const final {
- return MessageSettings.MaxForwardedSize - DataSize;
+ return InFlight
+ ? std::numeric_limits<i64>::min()
+ : MessageSettings.MaxForwardedSize - DataSize;
}
TMaybe<google::protobuf::Any> ExtraData() override {
@@ -2840,6 +2842,7 @@ private:
i64 DataSize = 0;
bool Closed = false;
+ bool InFlight = false;
const ui64 TxId;
const TTableId TableId;