diff options
author | bbiff <bbiff@yandex-team.com> | 2022-08-04 18:57:36 +0300 |
---|---|---|
committer | bbiff <bbiff@yandex-team.com> | 2022-08-04 18:57:36 +0300 |
commit | 201de44fb980264f8de4a0f5f4edab77c4dcebb3 (patch) | |
tree | bb93de6986bd066b42bf3c627d844ceef3738d5e | |
parent | 1a626986c00f70a23740c3ba5c95ef51cb3d7238 (diff) | |
download | ydb-201de44fb980264f8de4a0f5f4edab77c4dcebb3.tar.gz |
optimization
-rw-r--r-- | ydb/core/yq/libs/actors/result_writer.cpp | 24 |
1 files changed, 13 insertions, 11 deletions
diff --git a/ydb/core/yq/libs/actors/result_writer.cpp b/ydb/core/yq/libs/actors/result_writer.cpp index 5d1601dfa8f..c24f694d3f4 100644 --- a/ydb/core/yq/libs/actors/result_writer.cpp +++ b/ydb/core/yq/libs/actors/result_writer.cpp @@ -27,6 +27,8 @@ LOG_INFO_S(*TlsActivationContext, NKikimrServices::YQL_PROXY, "Writer: " << TraceId << ": " << stream) #define LOG_D(stream) \ LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::YQL_PROXY, "Writer: " << TraceId << ": " << stream) +#define LOG_T(stream) \ + LOG_TRACE_S(*TlsActivationContext, NKikimrServices::YQL_PROXY, "Writer: " << TraceId << ": " << stream) namespace NYq { @@ -85,7 +87,6 @@ private: << " FreeSpace: " << FreeSpace << " Duration: " << duration << " AvgSpeed: " << Size/(duration.Seconds()+1)/1024/1024 - << " CurChunkInd: " << CurChunkInd << " ResultChunks.size(): " << ResultChunks.size()); NActors::IActor::PassAway(); } @@ -159,7 +160,7 @@ private: auto duration = (TInstant::Now()-StartTime); - LOG_D("ChannelData, Records: " << RowIndex + LOG_T("ChannelData, Records: " << RowIndex << " HasError: " << HasError << " Size: " << Size << " Rows: " << Rows @@ -195,11 +196,11 @@ private: } void SendResult() { - if (InflightCounter || CurChunkInd >= ResultChunks.size()) { + if (InflightCounter || !ResultChunks) { return; } - while (CurChunkInd < ResultChunks.size()) { - const auto& chunk = ResultChunks[CurChunkInd]; + while (ResultChunks) { + const auto& chunk = ResultChunks.front(); // if owner is not empty, then there is data to send to storage, otherwise just shift seqno if (chunk.owner_id()) { break; @@ -215,22 +216,24 @@ private: auto duration = (TInstant::Now()-StartTime); - LOG_D("ChannelData Shift, Records: " << RowIndex + LOG_T("ChannelData Shift, Records: " << RowIndex << " HasError: " << HasError << " Size: " << Size << " Rows: " << Rows << " FreeSpace: " << FreeSpace << " Duration: " << duration << " AvgSpeed: " << Size/(duration.Seconds()+1)/1024/1024); - CurChunkInd++; + ResultChunks.pop_front(); } - if (CurChunkInd >= ResultChunks.size()) { + if (!ResultChunks) { MaybeFinish(); return; } ++InflightCounter; - Send(InternalServiceId, new NFq::TEvInternalService::TEvWriteResultRequest(std::move(ResultChunks[CurChunkInd++]))); + auto chunk = std::move(ResultChunks.front()); + ResultChunks.pop_front(); + Send(InternalServiceId, new NFq::TEvInternalService::TEvWriteResultRequest(std::move(chunk))); } void ConstructResults(const Ydb::ResultSet& resultSet, ui64 startRowIndex) { @@ -356,8 +359,7 @@ private: ui64 ResultBytesLimit; ui64 OccupiedSpace = 0; - TVector<Fq::Private::WriteTaskResultRequest> ResultChunks; - size_t CurChunkInd = 0; + TDeque<Fq::Private::WriteTaskResultRequest> ResultChunks; ui32 InflightCounter = 0; TActorId InternalServiceId; }; |