aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgalaxycrab <UgnineSirdis@ydb.tech>2022-12-21 11:42:46 +0300
committergalaxycrab <UgnineSirdis@ydb.tech>2022-12-21 11:42:46 +0300
commit7508a09b6813bc18004bca2c9c33455b924883b7 (patch)
tree1d481cf8ff3508456a13a924d03c1ff3a5b50b4b
parentd4e640830ffe49640c9b4833529acc8ebefb7c11 (diff)
downloadydb-7508a09b6813bc18004bca2c9c33455b924883b7.tar.gz
Log memory state in S3 actor
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp28
1 files changed, 26 insertions, 2 deletions
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
index c0db287dd0..3ad50bdc19 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
@@ -109,6 +109,8 @@ using namespace ::NYql::NS3Details;
namespace {
+constexpr TDuration MEMORY_USAGE_REPORT_PERIOD = TDuration::Seconds(10);
+
struct TS3ReadError : public yexception {
using yexception::yexception;
};
@@ -992,10 +994,29 @@ private:
return IngressBytes;
}
+ ui64 GetBlockSize(const TReadyBlock& block) const {
+ return ReadSpec->Arrow ? GetSizeOfBatch(*block.Batch) : block.Block.bytes();
+ }
+
+ void ReportMemoryUsage() const {
+ const TInstant now = TInstant::Now();
+ if (now - LastMemoryReport < MEMORY_USAGE_REPORT_PERIOD) {
+ return;
+ }
+ LastMemoryReport = now;
+ size_t blocksTotalSize = 0;
+ for (const auto& block : Blocks) {
+ blocksTotalSize += GetBlockSize(block);
+ }
+ LOG_D("TS3StreamReadActor", "Memory usage. Ready blocks: " << Blocks.size() << ". Ready blocks total size: " << blocksTotalSize);
+ }
+
i64 GetAsyncInputData(TUnboxedValueVector& output, TMaybe<TInstant>&, bool& finished, i64 free) final {
+ ReportMemoryUsage();
+
i64 total = 0LL;
if (!Blocks.empty()) do {
- const i64 s = ReadSpec->Arrow ? GetSizeOfBatch(*Blocks.front().Batch) : Blocks.front().Block.bytes();
+ const i64 s = GetBlockSize(Blocks.front());
NUdf::TUnboxedValue value;
if (ReadSpec->Arrow) {
@@ -1027,7 +1048,7 @@ private:
total += s;
output.emplace_back(std::move(value));
Blocks.pop_front();
- } while (!Blocks.empty() && free > 0LL && (ReadSpec->Arrow ? GetSizeOfBatch(*Blocks.front().Batch) : Blocks.front().Block.bytes()) <= size_t(free));
+ } while (!Blocks.empty() && free > 0LL && GetBlockSize(Blocks.front()) <= size_t(free));
finished = Blocks.empty() && !Count;
if (finished) {
@@ -1072,6 +1093,7 @@ private:
IngressBytes = next->Get()->IngressBytes;
Blocks.emplace_back(next);
Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived(InputIndex));
+ ReportMemoryUsage();
}
void HandleNextRecordBatch(TEvPrivate::TEvNextRecordBatch::TPtr& next) {
@@ -1079,6 +1101,7 @@ private:
IngressBytes = next->Get()->IngressBytes;
Blocks.emplace_back(next);
Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived(InputIndex));
+ ReportMemoryUsage();
}
void HandleReadFinished(TEvPrivate::TEvReadFinished::TPtr& ev) {
@@ -1125,6 +1148,7 @@ private:
const std::size_t MaxBlocksInFly;
ui64 IngressBytes = 0;
size_t CurrentPathIndex = 0;
+ mutable TInstant LastMemoryReport = TInstant::Now();
};
using namespace NKikimr::NMiniKQL;