aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorbbiff <bbiff@yandex-team.com>2022-08-04 18:57:36 +0300
committerbbiff <bbiff@yandex-team.com>2022-08-04 18:57:36 +0300
commit201de44fb980264f8de4a0f5f4edab77c4dcebb3 (patch)
treebb93de6986bd066b42bf3c627d844ceef3738d5e
parent1a626986c00f70a23740c3ba5c95ef51cb3d7238 (diff)
downloadydb-201de44fb980264f8de4a0f5f4edab77c4dcebb3.tar.gz
optimization
-rw-r--r--ydb/core/yq/libs/actors/result_writer.cpp24
1 files changed, 13 insertions, 11 deletions
diff --git a/ydb/core/yq/libs/actors/result_writer.cpp b/ydb/core/yq/libs/actors/result_writer.cpp
index 5d1601dfa8f..c24f694d3f4 100644
--- a/ydb/core/yq/libs/actors/result_writer.cpp
+++ b/ydb/core/yq/libs/actors/result_writer.cpp
@@ -27,6 +27,8 @@
LOG_INFO_S(*TlsActivationContext, NKikimrServices::YQL_PROXY, "Writer: " << TraceId << ": " << stream)
#define LOG_D(stream) \
LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::YQL_PROXY, "Writer: " << TraceId << ": " << stream)
+#define LOG_T(stream) \
+ LOG_TRACE_S(*TlsActivationContext, NKikimrServices::YQL_PROXY, "Writer: " << TraceId << ": " << stream)
namespace NYq {
@@ -85,7 +87,6 @@ private:
<< " FreeSpace: " << FreeSpace
<< " Duration: " << duration
<< " AvgSpeed: " << Size/(duration.Seconds()+1)/1024/1024
- << " CurChunkInd: " << CurChunkInd
<< " ResultChunks.size(): " << ResultChunks.size());
NActors::IActor::PassAway();
}
@@ -159,7 +160,7 @@ private:
auto duration = (TInstant::Now()-StartTime);
- LOG_D("ChannelData, Records: " << RowIndex
+ LOG_T("ChannelData, Records: " << RowIndex
<< " HasError: " << HasError
<< " Size: " << Size
<< " Rows: " << Rows
@@ -195,11 +196,11 @@ private:
}
void SendResult() {
- if (InflightCounter || CurChunkInd >= ResultChunks.size()) {
+ if (InflightCounter || !ResultChunks) {
return;
}
- while (CurChunkInd < ResultChunks.size()) {
- const auto& chunk = ResultChunks[CurChunkInd];
+ while (ResultChunks) {
+ const auto& chunk = ResultChunks.front();
// if owner is not empty, then there is data to send to storage, otherwise just shift seqno
if (chunk.owner_id()) {
break;
@@ -215,22 +216,24 @@ private:
auto duration = (TInstant::Now()-StartTime);
- LOG_D("ChannelData Shift, Records: " << RowIndex
+ LOG_T("ChannelData Shift, Records: " << RowIndex
<< " HasError: " << HasError
<< " Size: " << Size
<< " Rows: " << Rows
<< " FreeSpace: " << FreeSpace
<< " Duration: " << duration
<< " AvgSpeed: " << Size/(duration.Seconds()+1)/1024/1024);
- CurChunkInd++;
+ ResultChunks.pop_front();
}
- if (CurChunkInd >= ResultChunks.size()) {
+ if (!ResultChunks) {
MaybeFinish();
return;
}
++InflightCounter;
- Send(InternalServiceId, new NFq::TEvInternalService::TEvWriteResultRequest(std::move(ResultChunks[CurChunkInd++])));
+ auto chunk = std::move(ResultChunks.front());
+ ResultChunks.pop_front();
+ Send(InternalServiceId, new NFq::TEvInternalService::TEvWriteResultRequest(std::move(chunk)));
}
void ConstructResults(const Ydb::ResultSet& resultSet, ui64 startRowIndex) {
@@ -356,8 +359,7 @@ private:
ui64 ResultBytesLimit;
ui64 OccupiedSpace = 0;
- TVector<Fq::Private::WriteTaskResultRequest> ResultChunks;
- size_t CurChunkInd = 0;
+ TDeque<Fq::Private::WriteTaskResultRequest> ResultChunks;
ui32 InflightCounter = 0;
TActorId InternalServiceId;
};