diff options
author | hcpp <hcpp@ydb.tech> | 2022-08-03 16:25:30 +0300 |
---|---|---|
committer | hcpp <hcpp@ydb.tech> | 2022-08-03 16:25:30 +0300 |
commit | 78591dca810b731924087614eb23384a00234b01 (patch) | |
tree | 1b6c584ef457c7ee3707ab3bec6df876fc50b00e | |
parent | 206563ccf21f5ad7f23d3743fc702bb05a288858 (diff) | |
download | ydb-78591dca810b731924087614eb23384a00234b01.tar.gz |
result writer has been fixed
-rw-r--r-- | ydb/core/yq/libs/actors/result_writer.cpp | 75 |
1 files changed, 54 insertions, 21 deletions
diff --git a/ydb/core/yq/libs/actors/result_writer.cpp b/ydb/core/yq/libs/actors/result_writer.cpp index 0f81920311..5d1601dfa8 100644 --- a/ydb/core/yq/libs/actors/result_writer.cpp +++ b/ydb/core/yq/libs/actors/result_writer.cpp @@ -78,7 +78,15 @@ private: void PassAway() { auto duration = (TInstant::Now()-StartTime); - LOG_I("FinishWrite, Records: " << RowIndex << " HasError: " << HasError << " Size: " << Size << " Rows: " << Rows << " FreeSpace: " << FreeSpace << " Duration: " << duration << " AvgSpeed: " << Size/(duration.Seconds()+1)/1024/1024); + LOG_I("FinishWrite, Records: " << RowIndex + << " HasError: " << HasError + << " Size: " << Size + << " Rows: " << Rows + << " FreeSpace: " << FreeSpace + << " Duration: " << duration + << " AvgSpeed: " << Size/(duration.Seconds()+1)/1024/1024 + << " CurChunkInd: " << CurChunkInd + << " ResultChunks.size(): " << ResultChunks.size()); NActors::IActor::PassAway(); } @@ -190,6 +198,37 @@ private: if (InflightCounter || CurChunkInd >= ResultChunks.size()) { return; } + while (CurChunkInd < ResultChunks.size()) { + const auto& chunk = ResultChunks[CurChunkInd]; + // if owner is not empty, then there is data to send to storage, otherwise just shift seqno + if (chunk.owner_id()) { + break; + } + const auto& request = Requests[chunk.request_id()]; + auto res = MakeHolder<NDq::TEvDqCompute::TEvChannelDataAck>(); + res->Record.SetChannelId(request.ChannelId); + res->Record.SetFreeSpace(FreeSpace); + res->Record.SetSeqNo(request.SeqNo); + res->Record.SetFinish(HasError); + Send(request.Sender, res.Release()); + Requests.erase(chunk.request_id()); + + auto duration = (TInstant::Now()-StartTime); + + LOG_D("ChannelData Shift, Records: " << RowIndex + << " HasError: " << HasError + << " Size: " << Size + << " Rows: " << Rows + << " FreeSpace: " << FreeSpace + << " Duration: " << duration + << " AvgSpeed: " << Size/(duration.Seconds()+1)/1024/1024); + CurChunkInd++; + } + + if (CurChunkInd >= ResultChunks.size()) { + MaybeFinish(); + return; + } ++InflightCounter; Send(InternalServiceId, new NFq::TEvInternalService::TEvWriteResultRequest(std::move(ResultChunks[CurChunkInd++]))); } @@ -215,6 +254,19 @@ private: } void ProcessData(NDq::TEvDqCompute::TEvChannelData::TPtr& ev) { + if (!ev->Get()->Record.GetChannelData().HasData()) { + auto& request = Requests[Cookie]; + request.Sender = ev->Sender; + request.ChannelId = ev->Get()->Record.GetChannelData().GetChannelId(); + request.SeqNo = ev->Get()->Record.GetSeqNo(); + request.Size = 0; + ResultChunks.emplace_back(); + ResultChunks.back().set_request_id(Cookie); + SendResult(); + Cookie++; + return; + } + auto& data = ev->Get()->Record.GetChannelData().GetData(); auto resultSet = ResultBuilder->BuildResultSet({data}); FreeSpace -= data.GetRaw().size(); @@ -261,26 +313,7 @@ private: } try { - if (ev->Get()->Record.GetChannelData().HasData()) { - ProcessData(ev); - } else { - auto res = MakeHolder<NDq::TEvDqCompute::TEvChannelDataAck>(); - res->Record.SetChannelId(ev->Get()->Record.GetChannelData().GetChannelId()); - res->Record.SetSeqNo(ev->Get()->Record.GetSeqNo()); - res->Record.SetFreeSpace(FreeSpace); - res->Record.SetFinish(HasError); - Send(ev->Sender, res.Release()); - - auto duration = (TInstant::Now()-StartTime); - - LOG_D("ChannelData, Records: " << RowIndex - << " HasError: " << HasError - << " Size: " << Size - << " Rows: " << Rows - << " FreeSpace: " << FreeSpace - << " Duration: " << duration - << " AvgSpeed: " << Size/(duration.Seconds()+1)/1024/1024); - } + ProcessData(ev); } catch (...) { LOG_E(CurrentExceptionMessage()); auto req = MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TIssue("Internal error on data write").SetCode(NYql::DEFAULT_ERROR, TSeverityIds::S_ERROR)); |