diff options
author | Nikita Vasilev <ns-vasilev@ydb.tech> | 2025-02-13 18:52:04 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-02-13 15:52:04 +0000 |
commit | 51dd5e5c8dc5e5c037e7dc0b87f2812d92c30c4e (patch) | |
tree | fa5eba4af17062d415fa45d6d372eab38ceb48e3 | |
parent | 8af7595650a3d7370a517a3d597348520ec4e3ff (diff) | |
download | ydb-51dd5e5c8dc5e5c037e7dc0b87f2812d92c30c4e.tar.gz |
Fix lost data for stream EvWrite (#14538)
-rw-r--r-- | ydb/core/kqp/runtime/kqp_write_actor.cpp | 49 |
1 files changed, 26 insertions, 23 deletions
diff --git a/ydb/core/kqp/runtime/kqp_write_actor.cpp b/ydb/core/kqp/runtime/kqp_write_actor.cpp index 8646dc0dbb..b53d05a20e 100644 --- a/ydb/core/kqp/runtime/kqp_write_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_write_actor.cpp @@ -1649,14 +1649,11 @@ public: writeInfo.WriteTableActor->Close(message.Token.Cookie); } - if (!message.Close) { - YQL_ENSURE(false); - AckQueue.push(TAckMessage{ - .ForwardActorId = message.From, - .Token = message.Token, - .DataSize = 0, - }); - } + AckQueue.push(TAckMessage{ + .ForwardActorId = message.From, + .Token = message.Token, + .DataSize = 0, + }); queue.pop(); } @@ -2493,6 +2490,7 @@ public: } Y_UNUSED(dataSize); if (TxManager->ConsumeCommitResult(shardId)) { + CA_LOG_D("Committed TxId=" << TxId.value_or(0)); OnOperationFinished(Counters->BufferActorCommitLatencyHistogram); State = EState::FINISHED; Send<ESendingType::Tail>(ExecuterActorId, new TEvKqpBuffer::TEvResult{ @@ -2710,15 +2708,29 @@ private: void Handle(TEvBufferWriteResult::TPtr& result) { CA_LOG_D("TKqpForwardWriteActor recieve EvBufferWriteResult from " << BufferActorId); + InFlight = false; + + EgressStats.Bytes += DataSize; + EgressStats.Chunks++; + EgressStats.Splits++; + EgressStats.Resume(); + + Counters->ForwardActorWritesSizeHistogram->Collect(DataSize); WriteToken = result->Get()->Token; DataSize = 0; - CA_LOG_D("Resume with freeSpace=" << GetFreeSpace()); - Callbacks->ResumeExecution(); + if (Closed) { + CA_LOG_D("Finished"); + Callbacks->OnAsyncOutputFinished(GetOutputIndex()); + } else { + CA_LOG_D("Resume with freeSpace=" << GetFreeSpace()); + Callbacks->ResumeExecution(); + } } void WriteToBuffer() { + InFlight = true; auto ev = std::make_unique<TEvBufferWrite>(); ev->Data = Batcher->Build(); @@ -2762,18 +2774,6 @@ private: CA_LOG_D("Send data=" << DataSize << ", closed=" << Closed << ", bufferActorId=" << BufferActorId); AFL_ENSURE(Send(BufferActorId, ev.release())); - - EgressStats.Bytes += DataSize; - EgressStats.Chunks++; - EgressStats.Splits++; - EgressStats.Resume(); - - Counters->ForwardActorWritesSizeHistogram->Collect(DataSize); - - if (Closed) { - CA_LOG_D("Finished"); - Callbacks->OnAsyncOutputFinished(GetOutputIndex()); - } } void CommitState(const NYql::NDqProto::TCheckpoint&) final {}; @@ -2788,7 +2788,9 @@ private: } i64 GetFreeSpace() const final { - return MessageSettings.MaxForwardedSize - DataSize; + return InFlight + ? std::numeric_limits<i64>::min() + : MessageSettings.MaxForwardedSize - DataSize; } TMaybe<google::protobuf::Any> ExtraData() override { @@ -2840,6 +2842,7 @@ private: i64 DataSize = 0; bool Closed = false; + bool InFlight = false; const ui64 TxId; const TTableId TableId; |