summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhor911 <[email protected]>2023-02-03 01:49:35 +0300
committerhor911 <[email protected]>2023-02-03 01:49:35 +0300
commit3d41ba9017aa2f2dd6b18e62ef496464da0d6263 (patch)
tree006af6f28c62caeb96b8071908bf25f27dc14370
parent53cc0970f20d3f40c603bf6c8c212bf0491eba82 (diff)
Correct stream read statistics
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp32
1 files changed, 19 insertions, 13 deletions
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
index b372af05be5..3f49dbd6fad 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
@@ -174,11 +174,11 @@ struct TEvPrivate {
};
struct TEvFileFinished : public TEventLocal<TEvFileFinished, EvFileFinished> {
- TEvFileFinished(size_t pathIndex, ui64 ingressBytes)
- : PathIndex(pathIndex), IngressBytes(ingressBytes) {
+ TEvFileFinished(size_t pathIndex, ui64 ingressDelta)
+ : PathIndex(pathIndex), IngressDelta(ingressDelta) {
}
const size_t PathIndex;
- ui64 IngressBytes;
+ ui64 IngressDelta;
};
struct TEvReadError : public TEventLocal<TEvReadError, EvReadError> {
@@ -199,19 +199,19 @@ struct TEvPrivate {
};
struct TEvNextBlock : public NActors::TEventLocal<TEvNextBlock, EvNextBlock> {
- TEvNextBlock(NDB::Block& block, size_t pathInd, std::function<void()> functor = [](){}, ui64 ingressBytes = 0) : PathIndex(pathInd), Functor(functor), IngressBytes(ingressBytes) { Block.swap(block); }
+ TEvNextBlock(NDB::Block& block, size_t pathInd, std::function<void()> functor, ui64 ingressDelta) : PathIndex(pathInd), Functor(functor), IngressDelta(ingressDelta) { Block.swap(block); }
NDB::Block Block;
const size_t PathIndex;
std::function<void()> Functor;
- ui64 IngressBytes;
+ ui64 IngressDelta;
};
struct TEvNextRecordBatch : public NActors::TEventLocal<TEvNextRecordBatch, EvNextRecordBatch> {
- TEvNextRecordBatch(const std::shared_ptr<arrow::RecordBatch>& batch, size_t pathInd, std::function<void()> functor = []() {}, ui64 ingressBytes = 0) : Batch(batch), PathIndex(pathInd), Functor(functor), IngressBytes(ingressBytes) { }
+ TEvNextRecordBatch(const std::shared_ptr<arrow::RecordBatch>& batch, size_t pathInd, std::function<void()> functor, ui64 ingressDelta) : Batch(batch), PathIndex(pathInd), Functor(functor), IngressDelta(ingressDelta) { }
std::shared_ptr<arrow::RecordBatch> Batch;
const size_t PathIndex;
std::function<void()> Functor;
- ui64 IngressBytes;
+ ui64 IngressDelta;
};
struct TEvBlockProcessed : public NActors::TEventLocal<TEvBlockProcessed, EvBlockProcessed> {
@@ -836,6 +836,12 @@ public:
}
}
private:
+ ui64 GetIngressDelta() {
+ auto currentIngressBytes = IngressBytes;
+ IngressBytes = 0;
+ return currentIngressBytes;
+ }
+
void WaitFinish() {
LOG_CORO_D("TS3ReadCoroImpl", "WaitFinish: " << Path);
if (InputFinished)
@@ -966,7 +972,7 @@ private:
if (issues)
Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvAsyncInputError(InputIndex, std::move(issues), fatalCode));
else
- Send(ParentActorId, new TEvPrivate::TEvFileFinished(PathIndex, IngressBytes));
+ Send(ParentActorId, new TEvPrivate::TEvFileFinished(PathIndex, GetIngressDelta()));
} catch (const TS3ReadAbort&) {
LOG_CORO_D("TS3ReadCoroImpl", "S3 read abort. Path: " << Path);
} catch (const TDtorException&) {
@@ -993,7 +999,7 @@ private:
}
Send(ParentActorId, new TEv(batch, PathIndex, [actorSystem, selfActorId]() {
actorSystem->Send(new IEventHandle(selfActorId, selfActorId, new TEvPrivate::TEvBlockProcessed()));
- }, IngressBytes));
+ }, GetIngressDelta()));
}
while (cntBlocksInFly--) {
WaitForSpecificEvent<TEvPrivate::TEvBlockProcessed>();
@@ -1004,7 +1010,7 @@ private:
if (!reader.Next(batch)) {
break;
}
- Send(ParentActorId, new TEv(batch, PathIndex));
+ Send(ParentActorId, new TEv(batch, PathIndex, [](){}, GetIngressDelta()));
}
}
}
@@ -1399,7 +1405,7 @@ private:
void HandleNextBlock(TEvPrivate::TEvNextBlock::TPtr& next) {
YQL_ENSURE(!ReadSpec->Arrow);
- IngressBytes = next->Get()->IngressBytes;
+ IngressBytes += next->Get()->IngressDelta;
auto size = next->Get()->Block.bytes();
QueueTotalDataSize += size;
if (Counters) {
@@ -1417,7 +1423,7 @@ private:
void HandleNextRecordBatch(TEvPrivate::TEvNextRecordBatch::TPtr& next) {
YQL_ENSURE(ReadSpec->Arrow);
- IngressBytes = next->Get()->IngressBytes;
+ IngressBytes += next->Get()->IngressDelta;
auto size = GetSizeOfBatch(*next->Get()->Batch);
QueueTotalDataSize += size;
if (Counters) {
@@ -1435,7 +1441,7 @@ private:
void HandleFileFinished(TEvPrivate::TEvFileFinished::TPtr& ev) {
CoroActors.erase(ev->Sender);
- IngressBytes = ev->Get()->IngressBytes;
+ IngressBytes += ev->Get()->IngressDelta;
RetryStuffForFile.erase(ev->Get()->PathIndex);
Y_VERIFY(Count);
--Count;