aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhcpp <hcpp@ydb.tech>2022-08-03 16:25:30 +0300
committerhcpp <hcpp@ydb.tech>2022-08-03 16:25:30 +0300
commit78591dca810b731924087614eb23384a00234b01 (patch)
tree1b6c584ef457c7ee3707ab3bec6df876fc50b00e
parent206563ccf21f5ad7f23d3743fc702bb05a288858 (diff)
downloadydb-78591dca810b731924087614eb23384a00234b01.tar.gz
result writer has been fixed
-rw-r--r--ydb/core/yq/libs/actors/result_writer.cpp75
1 files changed, 54 insertions, 21 deletions
diff --git a/ydb/core/yq/libs/actors/result_writer.cpp b/ydb/core/yq/libs/actors/result_writer.cpp
index 0f81920311..5d1601dfa8 100644
--- a/ydb/core/yq/libs/actors/result_writer.cpp
+++ b/ydb/core/yq/libs/actors/result_writer.cpp
@@ -78,7 +78,15 @@ private:
void PassAway() {
auto duration = (TInstant::Now()-StartTime);
- LOG_I("FinishWrite, Records: " << RowIndex << " HasError: " << HasError << " Size: " << Size << " Rows: " << Rows << " FreeSpace: " << FreeSpace << " Duration: " << duration << " AvgSpeed: " << Size/(duration.Seconds()+1)/1024/1024);
+ LOG_I("FinishWrite, Records: " << RowIndex
+ << " HasError: " << HasError
+ << " Size: " << Size
+ << " Rows: " << Rows
+ << " FreeSpace: " << FreeSpace
+ << " Duration: " << duration
+ << " AvgSpeed: " << Size/(duration.Seconds()+1)/1024/1024
+ << " CurChunkInd: " << CurChunkInd
+ << " ResultChunks.size(): " << ResultChunks.size());
NActors::IActor::PassAway();
}
@@ -190,6 +198,37 @@ private:
if (InflightCounter || CurChunkInd >= ResultChunks.size()) {
return;
}
+ while (CurChunkInd < ResultChunks.size()) {
+ const auto& chunk = ResultChunks[CurChunkInd];
+ // if owner is not empty, then there is data to send to storage, otherwise just shift seqno
+ if (chunk.owner_id()) {
+ break;
+ }
+ const auto& request = Requests[chunk.request_id()];
+ auto res = MakeHolder<NDq::TEvDqCompute::TEvChannelDataAck>();
+ res->Record.SetChannelId(request.ChannelId);
+ res->Record.SetFreeSpace(FreeSpace);
+ res->Record.SetSeqNo(request.SeqNo);
+ res->Record.SetFinish(HasError);
+ Send(request.Sender, res.Release());
+ Requests.erase(chunk.request_id());
+
+ auto duration = (TInstant::Now()-StartTime);
+
+ LOG_D("ChannelData Shift, Records: " << RowIndex
+ << " HasError: " << HasError
+ << " Size: " << Size
+ << " Rows: " << Rows
+ << " FreeSpace: " << FreeSpace
+ << " Duration: " << duration
+ << " AvgSpeed: " << Size/(duration.Seconds()+1)/1024/1024);
+ CurChunkInd++;
+ }
+
+ if (CurChunkInd >= ResultChunks.size()) {
+ MaybeFinish();
+ return;
+ }
++InflightCounter;
Send(InternalServiceId, new NFq::TEvInternalService::TEvWriteResultRequest(std::move(ResultChunks[CurChunkInd++])));
}
@@ -215,6 +254,19 @@ private:
}
void ProcessData(NDq::TEvDqCompute::TEvChannelData::TPtr& ev) {
+ if (!ev->Get()->Record.GetChannelData().HasData()) {
+ auto& request = Requests[Cookie];
+ request.Sender = ev->Sender;
+ request.ChannelId = ev->Get()->Record.GetChannelData().GetChannelId();
+ request.SeqNo = ev->Get()->Record.GetSeqNo();
+ request.Size = 0;
+ ResultChunks.emplace_back();
+ ResultChunks.back().set_request_id(Cookie);
+ SendResult();
+ Cookie++;
+ return;
+ }
+
auto& data = ev->Get()->Record.GetChannelData().GetData();
auto resultSet = ResultBuilder->BuildResultSet({data});
FreeSpace -= data.GetRaw().size();
@@ -261,26 +313,7 @@ private:
}
try {
- if (ev->Get()->Record.GetChannelData().HasData()) {
- ProcessData(ev);
- } else {
- auto res = MakeHolder<NDq::TEvDqCompute::TEvChannelDataAck>();
- res->Record.SetChannelId(ev->Get()->Record.GetChannelData().GetChannelId());
- res->Record.SetSeqNo(ev->Get()->Record.GetSeqNo());
- res->Record.SetFreeSpace(FreeSpace);
- res->Record.SetFinish(HasError);
- Send(ev->Sender, res.Release());
-
- auto duration = (TInstant::Now()-StartTime);
-
- LOG_D("ChannelData, Records: " << RowIndex
- << " HasError: " << HasError
- << " Size: " << Size
- << " Rows: " << Rows
- << " FreeSpace: " << FreeSpace
- << " Duration: " << duration
- << " AvgSpeed: " << Size/(duration.Seconds()+1)/1024/1024);
- }
+ ProcessData(ev);
} catch (...) {
LOG_E(CurrentExceptionMessage());
auto req = MakeHolder<TEvDqFailure>(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TIssue("Internal error on data write").SetCode(NYql::DEFAULT_ERROR, TSeverityIds::S_ERROR));