diff options
author | galaxycrab <UgnineSirdis@ydb.tech> | 2022-12-21 11:42:46 +0300 |
---|---|---|
committer | galaxycrab <UgnineSirdis@ydb.tech> | 2022-12-21 11:42:46 +0300 |
commit | 7508a09b6813bc18004bca2c9c33455b924883b7 (patch) | |
tree | 1d481cf8ff3508456a13a924d03c1ff3a5b50b4b | |
parent | d4e640830ffe49640c9b4833529acc8ebefb7c11 (diff) | |
download | ydb-7508a09b6813bc18004bca2c9c33455b924883b7.tar.gz |
Log memory state in S3 actor
-rw-r--r-- | ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp | 28 |
1 files changed, 26 insertions, 2 deletions
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index c0db287dd0..3ad50bdc19 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -109,6 +109,8 @@ using namespace ::NYql::NS3Details; namespace { +constexpr TDuration MEMORY_USAGE_REPORT_PERIOD = TDuration::Seconds(10); + struct TS3ReadError : public yexception { using yexception::yexception; }; @@ -992,10 +994,29 @@ private: return IngressBytes; } + ui64 GetBlockSize(const TReadyBlock& block) const { + return ReadSpec->Arrow ? GetSizeOfBatch(*block.Batch) : block.Block.bytes(); + } + + void ReportMemoryUsage() const { + const TInstant now = TInstant::Now(); + if (now - LastMemoryReport < MEMORY_USAGE_REPORT_PERIOD) { + return; + } + LastMemoryReport = now; + size_t blocksTotalSize = 0; + for (const auto& block : Blocks) { + blocksTotalSize += GetBlockSize(block); + } + LOG_D("TS3StreamReadActor", "Memory usage. Ready blocks: " << Blocks.size() << ". Ready blocks total size: " << blocksTotalSize); + } + i64 GetAsyncInputData(TUnboxedValueVector& output, TMaybe<TInstant>&, bool& finished, i64 free) final { + ReportMemoryUsage(); + i64 total = 0LL; if (!Blocks.empty()) do { - const i64 s = ReadSpec->Arrow ? GetSizeOfBatch(*Blocks.front().Batch) : Blocks.front().Block.bytes(); + const i64 s = GetBlockSize(Blocks.front()); NUdf::TUnboxedValue value; if (ReadSpec->Arrow) { @@ -1027,7 +1048,7 @@ private: total += s; output.emplace_back(std::move(value)); Blocks.pop_front(); - } while (!Blocks.empty() && free > 0LL && (ReadSpec->Arrow ? GetSizeOfBatch(*Blocks.front().Batch) : Blocks.front().Block.bytes()) <= size_t(free)); + } while (!Blocks.empty() && free > 0LL && GetBlockSize(Blocks.front()) <= size_t(free)); finished = Blocks.empty() && !Count; if (finished) { @@ -1072,6 +1093,7 @@ private: IngressBytes = next->Get()->IngressBytes; Blocks.emplace_back(next); Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived(InputIndex)); + ReportMemoryUsage(); } void HandleNextRecordBatch(TEvPrivate::TEvNextRecordBatch::TPtr& next) { @@ -1079,6 +1101,7 @@ private: IngressBytes = next->Get()->IngressBytes; Blocks.emplace_back(next); Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived(InputIndex)); + ReportMemoryUsage(); } void HandleReadFinished(TEvPrivate::TEvReadFinished::TPtr& ev) { @@ -1125,6 +1148,7 @@ private: const std::size_t MaxBlocksInFly; ui64 IngressBytes = 0; size_t CurrentPathIndex = 0; + mutable TInstant LastMemoryReport = TInstant::Now(); }; using namespace NKikimr::NMiniKQL; |