diff options
author | hor911 <[email protected]> | 2023-02-03 01:49:35 +0300 |
---|---|---|
committer | hor911 <[email protected]> | 2023-02-03 01:49:35 +0300 |
commit | 3d41ba9017aa2f2dd6b18e62ef496464da0d6263 (patch) | |
tree | 006af6f28c62caeb96b8071908bf25f27dc14370 | |
parent | 53cc0970f20d3f40c603bf6c8c212bf0491eba82 (diff) |
Correct stream read statistics
-rw-r--r-- | ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp | 32 |
1 files changed, 19 insertions, 13 deletions
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index b372af05be5..3f49dbd6fad 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -174,11 +174,11 @@ struct TEvPrivate { }; struct TEvFileFinished : public TEventLocal<TEvFileFinished, EvFileFinished> { - TEvFileFinished(size_t pathIndex, ui64 ingressBytes) - : PathIndex(pathIndex), IngressBytes(ingressBytes) { + TEvFileFinished(size_t pathIndex, ui64 ingressDelta) + : PathIndex(pathIndex), IngressDelta(ingressDelta) { } const size_t PathIndex; - ui64 IngressBytes; + ui64 IngressDelta; }; struct TEvReadError : public TEventLocal<TEvReadError, EvReadError> { @@ -199,19 +199,19 @@ struct TEvPrivate { }; struct TEvNextBlock : public NActors::TEventLocal<TEvNextBlock, EvNextBlock> { - TEvNextBlock(NDB::Block& block, size_t pathInd, std::function<void()> functor = [](){}, ui64 ingressBytes = 0) : PathIndex(pathInd), Functor(functor), IngressBytes(ingressBytes) { Block.swap(block); } + TEvNextBlock(NDB::Block& block, size_t pathInd, std::function<void()> functor, ui64 ingressDelta) : PathIndex(pathInd), Functor(functor), IngressDelta(ingressDelta) { Block.swap(block); } NDB::Block Block; const size_t PathIndex; std::function<void()> Functor; - ui64 IngressBytes; + ui64 IngressDelta; }; struct TEvNextRecordBatch : public NActors::TEventLocal<TEvNextRecordBatch, EvNextRecordBatch> { - TEvNextRecordBatch(const std::shared_ptr<arrow::RecordBatch>& batch, size_t pathInd, std::function<void()> functor = []() {}, ui64 ingressBytes = 0) : Batch(batch), PathIndex(pathInd), Functor(functor), IngressBytes(ingressBytes) { } + TEvNextRecordBatch(const std::shared_ptr<arrow::RecordBatch>& batch, size_t pathInd, std::function<void()> functor, ui64 ingressDelta) : Batch(batch), PathIndex(pathInd), Functor(functor), IngressDelta(ingressDelta) { } std::shared_ptr<arrow::RecordBatch> Batch; const size_t PathIndex; std::function<void()> Functor; - ui64 IngressBytes; + ui64 IngressDelta; }; struct TEvBlockProcessed : public NActors::TEventLocal<TEvBlockProcessed, EvBlockProcessed> { @@ -836,6 +836,12 @@ public: } } private: + ui64 GetIngressDelta() { + auto currentIngressBytes = IngressBytes; + IngressBytes = 0; + return currentIngressBytes; + } + void WaitFinish() { LOG_CORO_D("TS3ReadCoroImpl", "WaitFinish: " << Path); if (InputFinished) @@ -966,7 +972,7 @@ private: if (issues) Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvAsyncInputError(InputIndex, std::move(issues), fatalCode)); else - Send(ParentActorId, new TEvPrivate::TEvFileFinished(PathIndex, IngressBytes)); + Send(ParentActorId, new TEvPrivate::TEvFileFinished(PathIndex, GetIngressDelta())); } catch (const TS3ReadAbort&) { LOG_CORO_D("TS3ReadCoroImpl", "S3 read abort. Path: " << Path); } catch (const TDtorException&) { @@ -993,7 +999,7 @@ private: } Send(ParentActorId, new TEv(batch, PathIndex, [actorSystem, selfActorId]() { actorSystem->Send(new IEventHandle(selfActorId, selfActorId, new TEvPrivate::TEvBlockProcessed())); - }, IngressBytes)); + }, GetIngressDelta())); } while (cntBlocksInFly--) { WaitForSpecificEvent<TEvPrivate::TEvBlockProcessed>(); @@ -1004,7 +1010,7 @@ private: if (!reader.Next(batch)) { break; } - Send(ParentActorId, new TEv(batch, PathIndex)); + Send(ParentActorId, new TEv(batch, PathIndex, [](){}, GetIngressDelta())); } } } @@ -1399,7 +1405,7 @@ private: void HandleNextBlock(TEvPrivate::TEvNextBlock::TPtr& next) { YQL_ENSURE(!ReadSpec->Arrow); - IngressBytes = next->Get()->IngressBytes; + IngressBytes += next->Get()->IngressDelta; auto size = next->Get()->Block.bytes(); QueueTotalDataSize += size; if (Counters) { @@ -1417,7 +1423,7 @@ private: void HandleNextRecordBatch(TEvPrivate::TEvNextRecordBatch::TPtr& next) { YQL_ENSURE(ReadSpec->Arrow); - IngressBytes = next->Get()->IngressBytes; + IngressBytes += next->Get()->IngressDelta; auto size = GetSizeOfBatch(*next->Get()->Batch); QueueTotalDataSize += size; if (Counters) { @@ -1435,7 +1441,7 @@ private: void HandleFileFinished(TEvPrivate::TEvFileFinished::TPtr& ev) { CoroActors.erase(ev->Sender); - IngressBytes = ev->Get()->IngressBytes; + IngressBytes += ev->Get()->IngressDelta; RetryStuffForFile.erase(ev->Get()->PathIndex); Y_VERIFY(Count); --Count; |