diff options
author | hor911 <hor911@ydb.tech> | 2023-04-28 15:27:45 +0300 |
---|---|---|
committer | hor911 <hor911@ydb.tech> | 2023-04-28 15:27:45 +0300 |
commit | feb79a8842a81474d42b9c460ffb7d93ef44184c (patch) | |
tree | 813b5499dda34dfd1c9e01d4873e24969cde018b | |
parent | 277bf2b433110a4e3a5a127d7cc801d050d0394f (diff) | |
download | ydb-feb79a8842a81474d42b9c460ffb7d93ef44184c.tar.gz |
Accurate mem quota
5 files changed, 333 insertions, 171 deletions
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index bb97cee6be7..798a50b6de7 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -157,9 +157,7 @@ struct TEvPrivate { EvRetry, EvNextBlock, EvNextRecordBatch, - EvBlockProcessed, EvFileFinished, - EvPause, EvContinue, EvFutureResolved, EvObjectPathBatch, @@ -246,35 +244,26 @@ struct TEvPrivate { }; struct TEvNextBlock : public NActors::TEventLocal<TEvNextBlock, EvNextBlock> { - TEvNextBlock(NDB::Block& block, size_t pathInd, std::function<void()> functor, ui64 ingressDelta, TDuration cpuTimeDelta) - : PathIndex(pathInd), Functor(functor), IngressDelta(ingressDelta), CpuTimeDelta(cpuTimeDelta) { + TEvNextBlock(NDB::Block& block, size_t pathInd, ui64 ingressDelta, TDuration cpuTimeDelta) + : PathIndex(pathInd), IngressDelta(ingressDelta), CpuTimeDelta(cpuTimeDelta) { Block.swap(block); } NDB::Block Block; const size_t PathIndex; - std::function<void()> Functor; const ui64 IngressDelta; const TDuration CpuTimeDelta; }; struct TEvNextRecordBatch : public NActors::TEventLocal<TEvNextRecordBatch, EvNextRecordBatch> { - TEvNextRecordBatch(const std::shared_ptr<arrow::RecordBatch>& batch, size_t pathInd, std::function<void()> functor, ui64 ingressDelta, TDuration cpuTimeDelta) - : Batch(batch), PathIndex(pathInd), Functor(functor), IngressDelta(ingressDelta), CpuTimeDelta(cpuTimeDelta) { + TEvNextRecordBatch(const std::shared_ptr<arrow::RecordBatch>& batch, size_t pathInd, ui64 ingressDelta, TDuration cpuTimeDelta) + : Batch(batch), PathIndex(pathInd), IngressDelta(ingressDelta), CpuTimeDelta(cpuTimeDelta) { } std::shared_ptr<arrow::RecordBatch> Batch; const size_t PathIndex; - std::function<void()> Functor; const ui64 IngressDelta; const TDuration CpuTimeDelta; }; - struct TEvBlockProcessed : public NActors::TEventLocal<TEvBlockProcessed, EvBlockProcessed> { - TEvBlockProcessed() {} - }; - - struct TEvPause : public NActors::TEventLocal<TEvPause, EvPause> { - }; - struct TEvContinue : public NActors::TEventLocal<TEvContinue, EvContinue> { }; @@ -1011,8 +1000,9 @@ struct TReadSpec { bool Arrow = false; bool ThreadPool = false; - ui64 ParallelRowGroupCount = 1; + ui64 ParallelRowGroupCount = 0; bool RowGroupReordering = true; + ui64 ParallelDownloadCount = 0; std::unordered_map<TStringBuf, TType*, THash<TStringBuf>> RowSpec; NDB::ColumnsWithTypeAndName CHColumns; std::shared_ptr<arrow::Schema> ArrowSchema; @@ -1173,6 +1163,151 @@ std::shared_ptr<arrow::RecordBatch> ConvertArrowColumns(std::shared_ptr<arrow::R return arrow::RecordBatch::Make(batch->schema(), batch->num_rows(), columns); } +struct TReadBufferCounter { + using TPtr = std::shared_ptr<TReadBufferCounter>; + + TReadBufferCounter(ui64 limit, + TActorSystem* actorSystem, + NMonitoring::TDynamicCounters::TCounterPtr queueDataSize, + NMonitoring::TDynamicCounters::TCounterPtr taskQueueDataSize, + NMonitoring::TDynamicCounters::TCounterPtr downloadPaused, + NMonitoring::TDynamicCounters::TCounterPtr taskDownloadPaused, + NMonitoring::TDynamicCounters::TCounterPtr taskChunkDownloadCount, + NMonitoring::THistogramPtr decodedChunkSizeHist) + : Limit(limit) + , ActorSystem(actorSystem) + , QueueDataSize(queueDataSize) + , TaskQueueDataSize(taskQueueDataSize) + , DownloadPaused(downloadPaused) + , TaskDownloadPaused(taskDownloadPaused) + , TaskChunkDownloadCount(taskChunkDownloadCount) + , DecodedChunkSizeHist(decodedChunkSizeHist) + { + } + + ~TReadBufferCounter() { + Notify(); + if (Value) { + if (QueueDataSize) { + QueueDataSize->Sub(Value); + } + if (TaskQueueDataSize) { + TaskQueueDataSize->Sub(Value); + } + Value = 0; + } + } + + bool IsFull() const { + return Value >= Limit; + } + + double Ratio() const { + return DownloadedBytes ? static_cast<double>(DecodedBytes) / DownloadedBytes : 1.0; + } + + ui64 FairShare() { + return CoroCount ? Limit / CoroCount : Limit; + } + + void IncChunk() { + ChunkCount++; + if (TaskChunkDownloadCount) { + TaskChunkDownloadCount->Inc(); + } + } + + void DecChunk() { + ChunkCount--; + if (TaskChunkDownloadCount) { + TaskChunkDownloadCount->Dec(); + } + } + + bool Add(ui64 delta, NActors::TActorId producer, bool paused = false) { + if (DecodedChunkSizeHist) { + DecodedChunkSizeHist->Collect(delta); + } + Value += delta; + if (QueueDataSize) { + QueueDataSize->Add(delta); + } + if (TaskQueueDataSize) { + TaskQueueDataSize->Add(delta); + } + if ((Value + delta / 2) >= Limit) { + if (!paused) { + if (DownloadPaused) { + DownloadPaused->Inc(); + } + if (TaskDownloadPaused) { + TaskDownloadPaused->Inc(); + } + Producers.push_back(producer); + paused = true; + } + } + return paused; + } + + void Sub(ui64 delta) { + Y_ASSERT(Value >= delta); + Value -= delta; + if (QueueDataSize) { + QueueDataSize->Sub(delta); + } + if (TaskQueueDataSize) { + TaskQueueDataSize->Sub(delta); + } + if (Value * 4 < Limit * 3) { // l.eq.t 75% + Notify(); + } + } + + void Notify() { + if (!Producers.empty()) { + if (DownloadPaused) { + DownloadPaused->Sub(Producers.size()); + } + if (TaskDownloadPaused) { + TaskDownloadPaused->Sub(Producers.size()); + } + for (auto producer : Producers) { + ActorSystem->Send(new IEventHandle(producer, TActorId{}, new TEvPrivate::TEvContinue())); + } + Producers.clear(); + } + } + + void UpdateProgress(ui64 deltaDownloadedBytes, ui64 deltaDecodedBytes, ui64 deltaDecodedRows) { + DownloadedBytes += deltaDownloadedBytes; + DecodedBytes += deltaDecodedBytes; + DecodedRows += deltaDecodedRows; + } + + ui64 Value = 0; + const ui64 Limit; + ui64 CoroCount = 0; + ui64 ChunkCount = 0; + ui64 DownloadedBytes = 0; + ui64 DecodedBytes = 0; + ui64 DecodedRows = 0; + std::vector<NActors::TActorId> Producers; + TActorSystem* ActorSystem = nullptr; + NMonitoring::TDynamicCounters::TCounterPtr QueueDataSize; + NMonitoring::TDynamicCounters::TCounterPtr TaskQueueDataSize; + NMonitoring::TDynamicCounters::TCounterPtr DownloadPaused; + NMonitoring::TDynamicCounters::TCounterPtr TaskDownloadPaused; + NMonitoring::TDynamicCounters::TCounterPtr TaskChunkDownloadCount; + NMonitoring::THistogramPtr DecodedChunkSizeHist; +}; + +struct TParquetFileInfo { + ui64 RowCount = 0; + ui64 CompressedSize = 0; + ui64 UncompressedSize = 0; +}; + class TS3ReadCoroImpl : public TActorCoroImpl { friend class TS3StreamReadActor; @@ -1304,7 +1439,14 @@ public: ); while (NDB::Block batch = stream->read()) { - Send(ParentActorId, new TEvPrivate::TEvNextBlock(batch, PathIndex, [](){}, TakeIngressDelta(), TakeCpuTimeDelta())); + Paused = QueueBufferCounter->Add(batch.bytes(), SelfActorId); + Send(ParentActorId, new TEvPrivate::TEvNextBlock(batch, PathIndex, TakeIngressDelta(), TakeCpuTimeDelta())); + if (Paused) { + CpuTime += GetCpuTimeDelta(); + auto ev = WaitForSpecificEvent<TEvPrivate::TEvContinue>(&TS3ReadCoroImpl::ProcessUnexpectedEvent); + HandleEvent(*ev); + StartCycleCount = GetCycleCountFast(); + } } LOG_CORO_D("RunClickHouseParserOverHttp - FINISHED"); @@ -1332,23 +1474,16 @@ public: ) ); - auto actorSystem = GetActorSystem(); - auto selfId = SelfActorId; - size_t cntBlocksInFly = 0; - while (NDB::Block batch = stream->read()) { - if (++cntBlocksInFly > MaxBlocksInFly) { - WaitForSpecificEvent<TEvPrivate::TEvBlockProcessed>(&TS3ReadCoroImpl::ProcessUnexpectedEvent); - --cntBlocksInFly; + Paused = QueueBufferCounter->Add(batch.bytes(), SelfActorId); + Send(ParentActorId, new TEvPrivate::TEvNextBlock(batch, PathIndex, TakeIngressDelta(), TakeCpuTimeDelta())); + if (Paused) { + CpuTime += GetCpuTimeDelta(); + auto ev = WaitForSpecificEvent<TEvPrivate::TEvContinue>(&TS3ReadCoroImpl::ProcessUnexpectedEvent); + HandleEvent(*ev); + StartCycleCount = GetCycleCountFast(); } - Send(ParentActorId, new TEvPrivate::TEvNextBlock(batch, PathIndex, [actorSystem, selfId]() { - actorSystem->Send(new IEventHandle(selfId, TActorId{}, new TEvPrivate::TEvBlockProcessed())); - }, TakeIngressDelta(), TakeCpuTimeDelta())); } - while (cntBlocksInFly--) { - WaitForSpecificEvent<TEvPrivate::TEvBlockProcessed>(&TS3ReadCoroImpl::ProcessUnexpectedEvent); - } - IngressBytes += GetFileLength(fileName); LOG_CORO_D("RunClickHouseParserOverFile FINISHED"); @@ -1426,9 +1561,17 @@ public: std::shared_ptr<arrow::RecordBatch> batch; ::arrow::Status status; while (status = reader->ReadNext(&batch), status.ok() && batch) { + auto convertedBatch = ConvertArrowColumns(batch, columnConverters); + Paused = QueueBufferCounter->Add(NUdf::GetSizeOfArrowBatchInBytes(*convertedBatch), SelfActorId); Send(ParentActorId, new TEvPrivate::TEvNextRecordBatch( - ConvertArrowColumns(batch, columnConverters), PathIndex, [](){}, TakeIngressDelta(), TakeCpuTimeDelta() + convertedBatch, PathIndex, TakeIngressDelta(), TakeCpuTimeDelta() )); + if (Paused) { + CpuTime += GetCpuTimeDelta(); + auto ev = WaitForSpecificEvent<TEvPrivate::TEvContinue>(&TS3ReadCoroImpl::ProcessUnexpectedEvent); + HandleEvent(*ev); + StartCycleCount = GetCycleCountFast(); + } } if (!status.ok()) { throw yexception() << status.ToString(); @@ -1509,12 +1652,6 @@ public: } arrow::Status WillNeed(const std::vector<arrow::io::ReadRange>& readRanges) { - if (Paused) { - CpuTime += GetCpuTimeDelta(); - auto ev = WaitForSpecificEvent<TEvPrivate::TEvContinue>(&TS3ReadCoroImpl::ProcessUnexpectedEvent); - HandleEvent(*ev); - StartCycleCount = GetCycleCountFast(); - } if (readRanges.empty()) { // select count(*) case if (CurrentRowGroupIndex) { ReadyRowGroups.push(*CurrentRowGroupIndex); @@ -1575,10 +1712,6 @@ public: auto ev = WaitForSpecificEvent<TEvPrivate::TEvReadResult2>(&TS3ReadCoroImpl::ProcessUnexpectedEvent); HandleEvent(*ev); } - if (Paused) { - auto ev = WaitForSpecificEvent<TEvPrivate::TEvContinue>(&TS3ReadCoroImpl::ProcessUnexpectedEvent); - HandleEvent(*ev); - } StartCycleCount = GetCycleCountFast(); @@ -1594,9 +1727,8 @@ public: LOG_CORO_D("RunCoroBlockArrowParserOverHttp"); - ui64 readerCount = std::max(1ul, ReadSpec->ParallelRowGroupCount); + ui64 readerCount = 1; - std::shared_ptr<parquet::FileMetaData> metadata; std::vector<std::unique_ptr<parquet::arrow::FileReader>> readers; parquet::arrow::FileReaderBuilder builder; @@ -1622,11 +1754,31 @@ public: BuildColumnConverters(ReadSpec->ArrowSchema, schema, columnIndices, columnConverters); - if (columnIndices.empty()) { - // select count(*) case - single reader is enough - readerCount = 1; - } else if (readerCount > numGroups) { - readerCount = numGroups; + // select count(*) case - single reader is enough + if (!columnIndices.empty()) { + if (ReadSpec->ParallelRowGroupCount) { + readerCount = ReadSpec->ParallelRowGroupCount; + } else { + // we want to read in parallel as much as 1/2 of fair share bytes + // (it's compressed size, after decoding it will grow) + ui64 compressedSize = 0; + for (int i = 0; i < fileMetadata->num_row_groups(); i++) { + auto rowGroup = fileMetadata->RowGroup(i); + for (const auto columIndex : columnIndices) { + compressedSize += rowGroup->ColumnChunk(columIndex)->total_compressed_size(); + } + } + // count = (fair_share / 2) / (compressed_size / num_group) + auto desiredReaderCount = (QueueBufferCounter->FairShare() * numGroups) / (compressedSize * 2); + // min is 1 + // max is 5 (should be also tuned probably) + if (desiredReaderCount) { + readerCount = std::min(desiredReaderCount, 5ul); + } + } + if (readerCount > numGroups) { + readerCount = numGroups; + } } if (readerCount > 1) { @@ -1644,6 +1796,7 @@ public: if (!columnIndices.empty()) { CurrentRowGroupIndex = i; THROW_ARROW_NOT_OK(readers[i]->WillNeedRowGroups({ static_cast<int>(i) }, columnIndices)); + QueueBufferCounter->IncChunk(); } RowGroupReaderIndex[i] = i; } @@ -1652,6 +1805,12 @@ public: ui64 readyGroupCount = 0; while (readyGroupCount < numGroups) { + if (Paused) { + CpuTime += GetCpuTimeDelta(); + auto ev = WaitForSpecificEvent<TEvPrivate::TEvContinue>(&TS3ReadCoroImpl::ProcessUnexpectedEvent); + HandleEvent(*ev); + StartCycleCount = GetCycleCountFast(); + } ui64 readyGroupIndex; if (!columnIndices.empty()) { @@ -1671,6 +1830,7 @@ public: // select count(*) case - no columns, no download, just fetch meta info instantly readyGroupIndex = readyGroupCount; } + QueueBufferCounter->DecChunk(); auto readyReaderIndex = RowGroupReaderIndex[readyGroupIndex]; RowGroupReaderIndex.erase(readyGroupIndex); @@ -1680,28 +1840,40 @@ public: THROW_ARROW_NOT_OK(readers[readyReaderIndex]->DecodeRowGroups({ static_cast<int>(readyGroupIndex) }, columnIndices, &table)); readyGroupCount++; + auto downloadedBytes = ReadInflightSize[readyGroupIndex]; + ui64 decodedBytes = 0; + ReadInflightSize.erase(readyGroupIndex); + auto reader = std::make_unique<arrow::TableBatchReader>(*table); + std::shared_ptr<arrow::RecordBatch> batch; arrow::Status status; while (status = reader->ReadNext(&batch), status.ok() && batch) { + auto convertedBatch = ConvertArrowColumns(batch, columnConverters); + auto size = NUdf::GetSizeOfArrowBatchInBytes(*convertedBatch); + decodedBytes += size; + Paused = QueueBufferCounter->Add(size, SelfActorId, Paused); Send(ParentActorId, new TEvPrivate::TEvNextRecordBatch( - ConvertArrowColumns(batch, columnConverters), PathIndex, [](){}, TakeIngressDelta(), TakeCpuTimeDelta() + convertedBatch, PathIndex, TakeIngressDelta(), TakeCpuTimeDelta() )); } if (!status.ok()) { throw yexception() << status.ToString(); } + QueueBufferCounter->UpdateProgress(downloadedBytes, decodedBytes, table->num_rows()); if (RawInflightSize) { - RawInflightSize->Sub(ReadInflightSize[readyGroupIndex]); + RawInflightSize->Sub(downloadedBytes); } - ReadInflightSize.erase(readyGroupIndex); if (nextGroup < numGroups) { if (!columnIndices.empty()) { CurrentRowGroupIndex = nextGroup; THROW_ARROW_NOT_OK(readers[readyReaderIndex]->WillNeedRowGroups({ static_cast<int>(nextGroup) }, columnIndices)); + QueueBufferCounter->IncChunk(); } RowGroupReaderIndex[nextGroup] = readyReaderIndex; nextGroup++; + } else { + readers[readyReaderIndex].reset(); } } } @@ -1734,35 +1906,39 @@ public: BuildColumnConverters(ReadSpec->ArrowSchema, schema, columnIndices, columnConverters); - auto actorSystem = GetActorSystem(); - auto selfId = SelfActorId; - size_t cntBlocksInFly = 0; - for (int group = 0; group < fileReader->num_row_groups(); group++) { + if (Paused) { + CpuTime += GetCpuTimeDelta(); + LOG_CORO_D("RunCoroBlockArrowParserOverFile - PAUSED " << QueueBufferCounter->Value); + auto ev = WaitForSpecificEvent<TEvPrivate::TEvContinue>(&TS3ReadCoroImpl::ProcessUnexpectedEvent); + HandleEvent(*ev); + LOG_CORO_D("RunCoroBlockArrowParserOverFile - CONTINUE " << QueueBufferCounter->Value); + StartCycleCount = GetCycleCountFast(); + } + std::shared_ptr<arrow::Table> table; + ui64 ingressBytes = IngressBytes; THROW_ARROW_NOT_OK(fileReader->ReadRowGroup(group, columnIndices, &table)); + ui64 downloadedBytes = IngressBytes - ingressBytes; auto reader = std::make_unique<arrow::TableBatchReader>(*table); + ui64 decodedBytes = 0; std::shared_ptr<arrow::RecordBatch> batch; ::arrow::Status status; while (status = reader->ReadNext(&batch), status.ok() && batch) { - if (++cntBlocksInFly > MaxBlocksInFly) { - WaitForSpecificEvent<TEvPrivate::TEvBlockProcessed>(&TS3ReadCoroImpl::ProcessUnexpectedEvent); - --cntBlocksInFly; - } + auto convertedBatch = ConvertArrowColumns(batch, columnConverters); + auto size = NUdf::GetSizeOfArrowBatchInBytes(*convertedBatch); + decodedBytes += size; + Paused = QueueBufferCounter->Add(size, SelfActorId, Paused); Send(ParentActorId, new TEvPrivate::TEvNextRecordBatch( - ConvertArrowColumns(batch, columnConverters), PathIndex, [actorSystem, selfId]() { - actorSystem->Send(new IEventHandle(selfId, TActorId{}, new TEvPrivate::TEvBlockProcessed())); - }, TakeIngressDelta(), TakeCpuTimeDelta() + convertedBatch, PathIndex, TakeIngressDelta(), TakeCpuTimeDelta() )); } if (!status.ok()) { throw yexception() << status.ToString(); } - } - while (cntBlocksInFly--) { - WaitForSpecificEvent<TEvPrivate::TEvBlockProcessed>(&TS3ReadCoroImpl::ProcessUnexpectedEvent); + QueueBufferCounter->UpdateProgress(downloadedBytes, decodedBytes, table->num_rows()); } LOG_CORO_D("RunCoroBlockArrowParserOverFile - FINISHED"); @@ -1772,7 +1948,6 @@ public: hFunc(TEvPrivate::TEvReadStarted, Handle); hFunc(TEvPrivate::TEvDataPart, Handle); hFunc(TEvPrivate::TEvReadFinished, Handle); - hFunc(TEvPrivate::TEvPause, Handle); hFunc(TEvPrivate::TEvContinue, Handle); hFunc(TEvPrivate::TEvReadResult2, Handle); hFunc(NActors::TEvents::TEvPoison, Handle); @@ -1890,11 +2065,6 @@ public: } } - void Handle(TEvPrivate::TEvPause::TPtr&) { - LOG_CORO_D("TEvPause"); - Paused = true; - } - void HandleEvent(TEvPrivate::TEvContinue::THandle&) { LOG_CORO_D("TEvContinue"); Paused = false; @@ -1909,6 +2079,7 @@ public: } void Handle(NActors::TEvents::TEvPoison::TPtr&) { + LOG_CORO_D("TEvPoison"); RetryStuff->Cancel(); throw TS3ReadAbort(); } @@ -1918,18 +2089,20 @@ private: public: TS3ReadCoroImpl(ui64 inputIndex, const TTxId& txId, const NActors::TActorId& computeActorId, const TRetryStuff::TPtr& retryStuff, const TReadSpec::TPtr& readSpec, size_t pathIndex, - const TString& path, const TString& url, const std::size_t maxBlocksInFly, IArrowReader::TPtr arrowReader, + const TString& path, const TString& url, IArrowReader::TPtr arrowReader, const TS3ReadActorFactoryConfig& readActorFactoryCfg, + TReadBufferCounter::TPtr queueBufferCounter, const ::NMonitoring::TDynamicCounters::TCounterPtr& deferredQueueSize, const ::NMonitoring::TDynamicCounters::TCounterPtr& httpInflightSize, const ::NMonitoring::TDynamicCounters::TCounterPtr& httpDataRps, const ::NMonitoring::TDynamicCounters::TCounterPtr& rawInflightSize) : TActorCoroImpl(256_KB), ReadActorFactoryCfg(readActorFactoryCfg), InputIndex(inputIndex), TxId(txId), RetryStuff(retryStuff), ReadSpec(readSpec), ComputeActorId(computeActorId), - PathIndex(pathIndex), Path(path), Url(url), MaxBlocksInFly(maxBlocksInFly), ArrowReader(arrowReader), + PathIndex(pathIndex), Path(path), Url(url), ArrowReader(arrowReader), + QueueBufferCounter(queueBufferCounter), DeferredQueueSize(deferredQueueSize), HttpInflightSize(httpInflightSize), - HttpDataRps(httpDataRps), RawInflightSize(rawInflightSize) - {} + HttpDataRps(httpDataRps), RawInflightSize(rawInflightSize) { + } ~TS3ReadCoroImpl() override { if (DeferredDataParts.size() && DeferredQueueSize) { @@ -2085,7 +2258,6 @@ private: std::size_t LastOffset = 0; TString LastData; - std::size_t MaxBlocksInFly = 2; IArrowReader::TPtr ArrowReader; ui64 IngressBytes = 0; TDuration CpuTime; @@ -2093,6 +2265,7 @@ private: TString InputBuffer; bool Paused = false; std::queue<THolder<TEvPrivate::TEvDataPart>> DeferredDataParts; + TReadBufferCounter::TPtr QueueBufferCounter; const ::NMonitoring::TDynamicCounters::TCounterPtr DeferredQueueSize; const ::NMonitoring::TDynamicCounters::TCounterPtr HttpInflightSize; const ::NMonitoring::TDynamicCounters::TCounterPtr HttpDataRps; @@ -2127,7 +2300,6 @@ public: const TReadSpec::TPtr& readSpec, const NActors::TActorId& computeActorId, const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy, - const std::size_t maxBlocksInFly, IArrowReader::TPtr arrowReader, const TS3ReadActorFactoryConfig& readActorFactoryCfg, ::NMonitoring::TDynamicCounterPtr counters, @@ -2148,7 +2320,6 @@ public: , AddPathIndex(addPathIndex) , StartPathIndex(startPathIndex) , ReadSpec(readSpec) - , MaxBlocksInFly(maxBlocksInFly) , ArrowReader(std::move(arrowReader)) , Counters(std::move(counters)) , TaskCounters(std::move(taskCounters)) @@ -2157,12 +2328,16 @@ public: QueueDataSize = Counters->GetCounter("QueueDataSize"); QueueDataLimit = Counters->GetCounter("QueueDataLimit"); QueueBlockCount = Counters->GetCounter("QueueBlockCount"); + DownloadCount = Counters->GetCounter("DownloadCount"); DownloadPaused = Counters->GetCounter("DownloadPaused"); QueueDataLimit->Add(ReadActorFactoryCfg.DataInflight); + DecodedChunkSizeHist = Counters->GetHistogram("ChunkSizeBytes", NMonitoring::ExplicitHistogram({100,1000,10'000,30'000,100'000,300'000,1'000'000,3'000'000,10'000'000,30'000'000,100'000'000})); } if (TaskCounters) { TaskQueueDataSize = TaskCounters->GetCounter("QueueDataSize"); TaskQueueDataLimit = TaskCounters->GetCounter("QueueDataLimit"); + TaskDownloadCount = TaskCounters->GetCounter("DownloadCount"); + TaskChunkDownloadCount = TaskCounters->GetCounter("ChunkDownloadCount"); TaskDownloadPaused = TaskCounters->GetCounter("DownloadPaused"); DeferredQueueSize = TaskCounters->GetCounter("DeferredQueueSize"); HttpInflightSize = TaskCounters->GetCounter("HttpInflightSize"); @@ -2175,6 +2350,15 @@ public: void Bootstrap() { LOG_D("TS3StreamReadActor", "Bootstrap"); + QueueBufferCounter = std::make_shared<TReadBufferCounter>( + ReadActorFactoryCfg.DataInflight, + TActivationContext::ActorSystem(), + QueueDataSize, + TaskQueueDataSize, + DownloadPaused, + TaskDownloadPaused, + TaskChunkDownloadCount, + DecodedChunkSizeHist); FileQueueActor = RegisterWithSameMailbox(new TS3FileQueueActor{ TxId, std::move(Paths), @@ -2195,22 +2379,39 @@ public: // no path is pending return false; } - - if (QueueTotalDataSize > ReadActorFactoryCfg.DataInflight) { + if (QueueBufferCounter->IsFull()) { // too large data inflight return false; } - if (DownloadInflight >= ReadActorFactoryCfg.MaxInflight) { - // too large download inflight + if (QueueBufferCounter->CoroCount >= ReadActorFactoryCfg.MaxInflight) { + // hard limit return false; } + if (ReadSpec->ParallelDownloadCount) { + if (QueueBufferCounter->CoroCount >= ReadSpec->ParallelDownloadCount) { + // explicit limit + return false; + } + } else { + if (QueueBufferCounter->CoroCount && DownloadSize * QueueBufferCounter->Ratio() > ReadActorFactoryCfg.DataInflight * 2) { + // dynamic limit + return false; + } + } RegisterCoro(); return true; } void RegisterCoro() { - DownloadInflight++; + QueueBufferCounter->CoroCount++; + if (Counters) { + DownloadCount->Inc(); + } + if (TaskCounters) { + TaskDownloadCount->Inc(); + } const auto& objectPath = ReadPathFromCache(); + DownloadSize += objectPath.Size; const TString requestId = CreateGuidAsString(); auto stuff = std::make_shared<TRetryStuff>( Gateway, @@ -2221,7 +2422,6 @@ public: requestId, RetryPolicy); auto pathIndex = objectPath.PathIndex + StartPathIndex; - RetryStuffForFile.emplace(pathIndex, stuff); if (TaskCounters) { HttpInflightLimit->Add(Gateway->GetBuffersSizePerStream()); } @@ -2229,7 +2429,6 @@ public: "TS3StreamReadActor", "RegisterCoro with path " << objectPath.Path << " with pathIndex " << pathIndex); - ::NMonitoring::TDynamicCounters::TCounterPtr inflightCounter; auto impl = MakeHolder<TS3ReadCoroImpl>( InputIndex, TxId, @@ -2239,14 +2438,16 @@ public: pathIndex, objectPath.Path, Url, - MaxBlocksInFly, ArrowReader, ReadActorFactoryCfg, + QueueBufferCounter, DeferredQueueSize, HttpInflightSize, HttpDataRps, RawInflightSize); - CoroActors.insert(RegisterWithSameMailbox(new TS3ReadCoroActor(std::move(impl)))); + auto coroActorId = RegisterWithSameMailbox(new TS3ReadCoroActor(std::move(impl))); + CoroActors.insert(coroActorId); + RetryStuffForFile.emplace(coroActorId, stuff); } TObjectPath ReadPathFromCache() { @@ -2292,12 +2493,11 @@ private: class TReadyBlock { public: - TReadyBlock(TEvPrivate::TEvNextBlock::TPtr& event) : PathInd(event->Get()->PathIndex), Functor (std::move(event->Get()->Functor)) { Block.swap(event->Get()->Block); } - TReadyBlock(TEvPrivate::TEvNextRecordBatch::TPtr& event) : Batch(event->Get()->Batch), PathInd(event->Get()->PathIndex), Functor(std::move(event->Get()->Functor)) {} + TReadyBlock(TEvPrivate::TEvNextBlock::TPtr& event) : PathInd(event->Get()->PathIndex) { Block.swap(event->Get()->Block); } + TReadyBlock(TEvPrivate::TEvNextRecordBatch::TPtr& event) : Batch(event->Get()->Batch), PathInd(event->Get()->PathIndex) {} NDB::Block Block; std::shared_ptr<arrow::RecordBatch> Batch; size_t PathInd; - std::function<void()> Functor; }; void SaveState(const NDqProto::TCheckpoint&, NDqProto::TSourceState&) final {} @@ -2353,8 +2553,6 @@ private: value = HolderFactory.Create<TBoxedBlock>(Blocks.front().Block); } - Blocks.front().Functor(); - if (AddPathIndex) { NUdf::TUnboxedValue* tupleItems = nullptr; auto tuple = ContainerCache.NewArray(HolderFactory, 2, tupleItems); @@ -2367,19 +2565,13 @@ private: total += s; output.emplace_back(std::move(value)); Blocks.pop_front(); - QueueTotalDataSize -= s; + QueueBufferCounter->Sub(s); if (Counters) { - QueueDataSize->Sub(s); QueueBlockCount->Dec(); } - if (TaskCounters) { - TaskQueueDataSize->Sub(s); - } TryRegisterCoro(); } while (!Blocks.empty() && free > 0LL && GetBlockSize(Blocks.front()) <= size_t(free)); - MaybeContinue(); - finished = LastFileWasProcessed(); if (finished) { ContainerCache.Clear(); @@ -2393,24 +2585,17 @@ private: void PassAway() override { // Is called from Compute Actor LOG_D("TS3StreamReadActor", "PassAway"); if (Counters) { - QueueDataSize->Sub(QueueTotalDataSize); QueueBlockCount->Sub(Blocks.size()); QueueDataLimit->Sub(ReadActorFactoryCfg.DataInflight); + DownloadCount->Sub(QueueBufferCounter->CoroCount); } if (TaskCounters) { - TaskQueueDataSize->Sub(QueueTotalDataSize); TaskQueueDataLimit->Sub(ReadActorFactoryCfg.DataInflight); HttpInflightLimit->Sub(Gateway->GetBuffersSizePerStream() * CoroActors.size()); + TaskDownloadCount->Sub(QueueBufferCounter->CoroCount); + TaskChunkDownloadCount->Sub(QueueBufferCounter->ChunkCount); } - if (Paused) { - if (Counters) { - DownloadPaused->Dec(); - } - if (TaskCounters) { - TaskDownloadPaused->Dec(); - } - } - QueueTotalDataSize = 0; + QueueBufferCounter.reset(); for (const auto actorId : CoroActors) { Send(actorId, new NActors::TEvents::TEvPoison()); @@ -2424,37 +2609,6 @@ private: TActorBootstrapped<TS3StreamReadActor>::PassAway(); } - void MaybePause() { - if (!Paused && QueueTotalDataSize >= ReadActorFactoryCfg.DataInflight) { - for (const auto actorId : CoroActors) { - Send(actorId, new TEvPrivate::TEvPause()); - } - Paused = true; - if (Counters) { - DownloadPaused->Inc(); - } - if (TaskCounters) { - TaskDownloadPaused->Inc(); - } - } - } - - void MaybeContinue() { - // resume download on 3/4 == 75% to avoid oscillation (hysteresis) - if (Paused && QueueTotalDataSize * 4 < ReadActorFactoryCfg.DataInflight * 3) { - for (const auto actorId : CoroActors) { - Send(actorId, new TEvPrivate::TEvContinue()); - } - Paused = false; - if (Counters) { - DownloadPaused->Dec(); - } - if (TaskCounters) { - TaskDownloadPaused->Dec(); - } - } - } - STRICT_STFUNC(StateFunc, hFunc(TEvPrivate::TEvRetryEventFunc, HandleRetry); hFunc(TEvPrivate::TEvNextBlock, HandleNextBlock); @@ -2474,12 +2628,13 @@ private: ObjectPathCache.end(), std::make_move_iterator(objectPathBatch->Get()->ObjectPaths.begin()), std::make_move_iterator(objectPathBatch->Get()->ObjectPaths.end())); - LOG_W( + LOG_D( "TS3StreamReadActor", "HandleObjectPathBatch " << ObjectPathCache.size() << " IsObjectQueueEmpty " << IsObjectQueueEmpty << " MaxInflight " << ReadActorFactoryCfg.MaxInflight); while (TryRegisterCoro()) {} } + void HandleObjectPathReadError(TEvPrivate::TEvObjectPathReadError::TPtr& result) { IsObjectQueueEmpty = true; LOG_W("TS3StreamReadActor", "Error while object listing, details: TEvObjectPathReadError: " << result->Get()->Issues.ToOneLineString()); @@ -2495,16 +2650,9 @@ private: YQL_ENSURE(!ReadSpec->Arrow); IngressBytes += next->Get()->IngressDelta; CpuTime += next->Get()->CpuTimeDelta; - auto size = next->Get()->Block.bytes(); - QueueTotalDataSize += size; if (Counters) { QueueBlockCount->Inc(); - QueueDataSize->Add(size); } - if (TaskCounters) { - TaskQueueDataSize->Add(size); - } - MaybePause(); Blocks.emplace_back(next); Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived(InputIndex)); ReportMemoryUsage(); @@ -2514,16 +2662,9 @@ private: YQL_ENSURE(ReadSpec->Arrow); IngressBytes += next->Get()->IngressDelta; CpuTime += next->Get()->CpuTimeDelta; - auto size = NUdf::GetSizeOfArrowBatchInBytes(*next->Get()->Batch); - QueueTotalDataSize += size; if (Counters) { QueueBlockCount->Inc(); - QueueDataSize->Add(size); } - if (TaskCounters) { - TaskQueueDataSize->Add(size); - } - MaybePause(); Blocks.emplace_back(next); Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived(InputIndex)); ReportMemoryUsage(); @@ -2533,12 +2674,29 @@ private: CoroActors.erase(ev->Sender); IngressBytes += ev->Get()->IngressDelta; CpuTime += ev->Get()->CpuTimeDelta; - RetryStuffForFile.erase(ev->Get()->PathIndex); + + auto it = RetryStuffForFile.find(ev->Sender); + if (it == RetryStuffForFile.end()) { + return; + } + RetryStuffForFile.erase(it); + auto size = it->second->SizeLimit; + if (DownloadSize < size) { + DownloadSize = 0; + } else { + DownloadSize -= size; + } if (TaskCounters) { HttpInflightLimit->Sub(Gateway->GetBuffersSizePerStream()); } - DownloadInflight--; + QueueBufferCounter->CoroCount--; + if (Counters) { + DownloadCount->Dec(); + } + if (TaskCounters) { + TaskDownloadCount->Dec(); + } CompletedFiles++; if (!ObjectPathCache.empty()) { TryRegisterCoro(); @@ -2560,7 +2718,7 @@ private: const TS3ReadActorFactoryConfig ReadActorFactoryCfg; const IHTTPGateway::TPtr Gateway; - THashMap<size_t, TRetryStuff::TPtr> RetryStuffForFile; + THashMap<NActors::TActorId, TRetryStuff::TPtr> RetryStuffForFile; const THolderFactory& HolderFactory; TPlainContainerCache ContainerCache; TPlainContainerCache ArrowTupleContainerCache; @@ -2585,30 +2743,32 @@ private: size_t CompletedFiles = 0; const TReadSpec::TPtr ReadSpec; std::deque<TReadyBlock> Blocks; - const std::size_t MaxBlocksInFly; IArrowReader::TPtr ArrowReader; ui64 IngressBytes = 0; TDuration CpuTime; mutable TInstant LastMemoryReport = TInstant::Now(); - ui64 QueueTotalDataSize = 0; + TReadBufferCounter::TPtr QueueBufferCounter; ::NMonitoring::TDynamicCounters::TCounterPtr QueueDataSize; ::NMonitoring::TDynamicCounters::TCounterPtr QueueDataLimit; ::NMonitoring::TDynamicCounters::TCounterPtr QueueBlockCount; + ::NMonitoring::TDynamicCounters::TCounterPtr DownloadCount; ::NMonitoring::TDynamicCounters::TCounterPtr DownloadPaused; ::NMonitoring::TDynamicCounters::TCounterPtr DeferredQueueSize; + ::NMonitoring::TDynamicCounters::TCounterPtr TaskDownloadCount; ::NMonitoring::TDynamicCounters::TCounterPtr TaskDownloadPaused; + ::NMonitoring::TDynamicCounters::TCounterPtr TaskChunkDownloadCount; ::NMonitoring::TDynamicCounters::TCounterPtr TaskQueueDataSize; ::NMonitoring::TDynamicCounters::TCounterPtr TaskQueueDataLimit; ::NMonitoring::TDynamicCounters::TCounterPtr HttpInflightSize; ::NMonitoring::TDynamicCounters::TCounterPtr HttpInflightLimit; ::NMonitoring::TDynamicCounters::TCounterPtr HttpDataRps; ::NMonitoring::TDynamicCounters::TCounterPtr RawInflightSize; + ::NMonitoring::THistogramPtr DecodedChunkSizeHist; ::NMonitoring::TDynamicCounterPtr Counters; ::NMonitoring::TDynamicCounterPtr TaskCounters; - ui64 DownloadInflight = 0; + ui64 DownloadSize = 0; std::set<NActors::TActorId> CoroActors; NActors::TActorId FileQueueActor; - bool Paused = false; const ui64 FileSizeLimit; }; @@ -2821,8 +2981,9 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor( const auto readSpec = std::make_shared<TReadSpec>(); readSpec->Arrow = params.GetArrow(); readSpec->ThreadPool = params.GetThreadPool(); - readSpec->ParallelRowGroupCount = std::max(1ul, params.GetParallelRowGroupCount()); + readSpec->ParallelRowGroupCount = params.GetParallelRowGroupCount(); readSpec->RowGroupReordering = params.GetRowGroupReordering(); + readSpec->ParallelDownloadCount = params.GetParallelDownloadCount(); if (readSpec->Arrow) { fileSizeLimit = cfg.BlockFileSizeLimit; arrow::SchemaBuilder builder; @@ -2906,12 +3067,9 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor( #undef SET_FLAG #undef SUPPORTED_FLAGS - std::size_t maxBlocksInFly = 2; - if (const auto it = settings.find("fileReadBlocksInFly"); settings.cend() != it) - maxBlocksInFly = FromString<ui64>(it->second); const auto actor = new TS3StreamReadActor(inputIndex, txId, std::move(gateway), holderFactory, params.GetUrl(), authToken, pathPattern, pathPatternVariant, std::move(paths), addPathIndex, startPathIndex, readSpec, computeActorId, retryPolicy, - maxBlocksInFly, arrowReader, cfg, counters, taskCounters, fileSizeLimit); + arrowReader, cfg, counters, taskCounters, fileSizeLimit); return {actor, actor}; } else { diff --git a/ydb/library/yql/providers/s3/proto/source.proto b/ydb/library/yql/providers/s3/proto/source.proto index 8cbe1d7287e..426cf90f21b 100644 --- a/ydb/library/yql/providers/s3/proto/source.proto +++ b/ydb/library/yql/providers/s3/proto/source.proto @@ -19,4 +19,5 @@ message TSource { bool ThreadPool = 8; uint64 ParallelRowGroupCount = 9; bool RowGroupReordering = 10; + uint64 ParallelDownloadCount = 11; } diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp index 420d42bbcc8..e438503cdf2 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp @@ -283,8 +283,9 @@ public: srcDesc.SetFormat(parseSettings.Format().StringValue().c_str()); srcDesc.SetArrow(bool(parseSettings.Maybe<TS3ArrowSettings>())); srcDesc.SetThreadPool(State_->Configuration->ArrowThreadPool.Get().GetOrElse(true)); - srcDesc.SetParallelRowGroupCount(State_->Configuration->ArrowParallelRowGroupCount.Get().GetOrElse(1)); - srcDesc.SetRowGroupReordering(State_->Configuration->ArrowRowGroupReordering.Get().GetOrElse(false)); + srcDesc.SetParallelRowGroupCount(State_->Configuration->ArrowParallelRowGroupCount.Get().GetOrElse(0)); + srcDesc.SetRowGroupReordering(State_->Configuration->ArrowRowGroupReordering.Get().GetOrElse(true)); + srcDesc.SetParallelDownloadCount(State_->Configuration->ParallelDownloadCount.Get().GetOrElse(0)); const TStructExprType* fullRowType = parseSettings.RowType().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TStructExprType>(); // exclude extra columns to get actual row type we need to read from input diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp index c1199d8565b..9734654fc77 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp @@ -18,6 +18,7 @@ TS3Configuration::TS3Configuration() REGISTER_SETTING(*this, ArrowThreadPool); REGISTER_SETTING(*this, ArrowParallelRowGroupCount).Lower(1); REGISTER_SETTING(*this, ArrowRowGroupReordering); + REGISTER_SETTING(*this, ParallelDownloadCount); REGISTER_SETTING(*this, UseBlocksSource); REGISTER_SETTING(*this, AtomicUploadCommit); REGISTER_SETTING(*this, UseConcurrentDirectoryLister); diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_settings.h b/ydb/library/yql/providers/s3/provider/yql_s3_settings.h index 2744af335df..6a08fbd7b73 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_settings.h +++ b/ydb/library/yql/providers/s3/provider/yql_s3_settings.h @@ -20,6 +20,7 @@ struct TS3Settings { NCommon::TConfSetting<bool, false> ArrowThreadPool; NCommon::TConfSetting<ui64, false> ArrowParallelRowGroupCount; // Number of parquet row groups to read in parallel, min == 1 NCommon::TConfSetting<bool, false> ArrowRowGroupReordering; // Allow to push rows from file in any order, default false, but usually it is OK + NCommon::TConfSetting<ui64, false> ParallelDownloadCount; // Number of files to read in parallel, min == 1 NCommon::TConfSetting<bool, false> UseBlocksSource; // Use blocks source (if exists) for scalar MKQL mode NCommon::TConfSetting<bool, false> AtomicUploadCommit; // Commit each file independently, w/o transaction semantic over all files NCommon::TConfSetting<bool, false> UseConcurrentDirectoryLister; |