diff options
author | e-zudin <e-zudin@ydb.tech> | 2023-11-23 19:46:01 +0300 |
---|---|---|
committer | e-zudin <e-zudin@ydb.tech> | 2023-11-23 20:48:10 +0300 |
commit | dba7ceaa399571511c21d2007ba1ab9f477e6be9 (patch) | |
tree | 42ca7c71134b4e2ad80d632e0f466fb4bff475c9 | |
parent | 1d2cbee7308557abc8872c34fa4020b6eee54b0c (diff) | |
download | ydb-dba7ceaa399571511c21d2007ba1ab9f477e6be9.tar.gz |
Add limit for load from S3 source
-rw-r--r-- | ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp | 112 |
1 files changed, 101 insertions, 11 deletions
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index 1962e7c7f6..3a9fe1a0a5 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -666,6 +666,10 @@ private: const ES3PatternType PatternType; }; +ui64 SubtractSaturating(ui64 lhs, ui64 rhs) { + return (lhs > rhs) ? lhs - rhs : 0; +} + class TS3ReadActor : public TActorBootstrapped<TS3ReadActor>, public IDqComputeActorAsyncInput { public: TS3ReadActor(ui64 inputIndex, @@ -685,7 +689,8 @@ public: const TS3ReadActorFactoryConfig& readActorFactoryCfg, ::NMonitoring::TDynamicCounterPtr counters, ::NMonitoring::TDynamicCounterPtr taskCounters, - ui64 fileSizeLimit) + ui64 fileSizeLimit, + std::optional<ui64> rowsLimitHint) : ReadActorFactoryCfg(readActorFactoryCfg) , Gateway(std::move(gateway)) , HolderFactory(holderFactory) @@ -703,7 +708,8 @@ public: , SizeLimit(sizeLimit) , Counters(counters) , TaskCounters(taskCounters) - , FileSizeLimit(fileSizeLimit) { + , FileSizeLimit(fileSizeLimit) + , FilesRemained(rowsLimitHint) { if (Counters) { QueueDataSize = Counters->GetCounter("QueueDataSize"); QueueDataLimit = Counters->GetCounter("QueueDataLimit"); @@ -748,6 +754,10 @@ public: // too large download inflight return false; } + if (ConsumedEnoughFiles()) { + // started enough downloads + return false; + } StartDownload(); return true; @@ -774,17 +784,18 @@ public: Y_ENSURE(!ObjectPathCache.empty()); auto object = ObjectPathCache.back(); ObjectPathCache.pop_back(); - if (ObjectPathCache.empty() && !IsObjectQueueEmpty) { + if (ObjectPathCache.empty() && !IsObjectQueueEmpty && !ConsumedEnoughFiles()) { SendPathRequest(); } return object; } void SendPathRequest() { Y_ENSURE(!IsWaitingObjectQueueResponse); + const ui64 requestedAmount = std::min(ReadActorFactoryCfg.MaxInflight, FilesRemained.value_or(std::numeric_limits<ui64>::max())); Send( FileQueueActor, std::make_unique<TS3FileQueueActor::TEvPrivatePrivate::TEvGetNextFile>( - ReadActorFactoryCfg.MaxInflight)); + requestedAmount)); IsWaitingObjectQueueResponse = true; } @@ -807,6 +818,10 @@ private: return CpuTime; } + bool ConsumedEnoughFiles() const { + return FilesRemained && (*FilesRemained == 0); + } + STRICT_STFUNC(StateFunc, hFunc(TEvPrivate::TEvReadResult, Handle); hFunc(TEvPrivate::TEvReadError, Handle); @@ -876,7 +891,7 @@ private: } while (!Blocks.empty() && freeSpace > 0LL); } - if (LastFileWasProcessed()) { + if (LastFileWasProcessed() || ConsumedEnoughFiles()) { finished = true; ContainerCache.Clear(); } @@ -917,6 +932,9 @@ private: } Blocks.emplace(std::make_tuple(std::move(result->Get()->Result), id)); DownloadInflight--; + if (FilesRemained) { + *FilesRemained = SubtractSaturating(*FilesRemained, 1); + } TryStartDownload(); Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex)); } else { @@ -1005,6 +1023,7 @@ private: ui64 QueueTotalDataSize = 0; ui64 DownloadInflight = 0; const ui64 FileSizeLimit; + std::optional<ui64> FilesRemained; }; struct TReadSpec { @@ -1460,6 +1479,7 @@ public: while (NDB::Block batch = stream->read()) { Paused = QueueBufferCounter->Add(batch.bytes(), SelfActorId); + const bool isStopped = StopIfConsumedEnough(batch.rows()); Send(ParentActorId, new TEvPrivate::TEvNextBlock(batch, PathIndex, TakeIngressDelta(), TakeCpuTimeDelta())); if (Paused) { CpuTime += GetCpuTimeDelta(); @@ -1467,6 +1487,10 @@ public: HandleEvent(*ev); StartCycleCount = GetCycleCountFast(); } + if (isStopped) { + LOG_CORO_D("RunClickHouseParserOverHttp - STOPPED ON SATURATION"); + break; + } } LOG_CORO_D("RunClickHouseParserOverHttp - FINISHED"); @@ -1496,6 +1520,7 @@ public: while (NDB::Block batch = stream->read()) { Paused = QueueBufferCounter->Add(batch.bytes(), SelfActorId); + const bool isCancelled = StopIfConsumedEnough(batch.rows()); Send(ParentActorId, new TEvPrivate::TEvNextBlock(batch, PathIndex, TakeIngressDelta(), TakeCpuTimeDelta())); if (Paused) { CpuTime += GetCpuTimeDelta(); @@ -1503,6 +1528,10 @@ public: HandleEvent(*ev); StartCycleCount = GetCycleCountFast(); } + if (isCancelled) { + LOG_CORO_D("RunClickHouseParserOverFile STOPPED ON SATURATION"); + break; + } } IngressBytes += GetFileLength(fileName); @@ -1801,6 +1830,7 @@ public: std::shared_ptr<arrow::RecordBatch> batch; arrow::Status status; + bool isCancelled = false; while (status = reader->ReadNext(&batch), status.ok() && batch) { auto convertedBatch = ConvertArrowColumns(batch, columnConverters); auto size = NUdf::GetSizeOfArrowBatchInBytes(*convertedBatch); @@ -1809,6 +1839,10 @@ public: Send(ParentActorId, new TEvPrivate::TEvNextRecordBatch( convertedBatch, PathIndex, TakeIngressDelta(), TakeCpuTimeDelta() )); + if (StopIfConsumedEnough(convertedBatch->num_rows())) { + isCancelled = true; + break; + } } if (!status.ok()) { throw yexception() << status.ToString(); @@ -1828,6 +1862,10 @@ public: } else { readers[readyReaderIndex].reset(); } + if (isCancelled) { + LOG_CORO_D("RunCoroBlockArrowParserOverHttp - STOPPED ON SATURATION"); + break; + } } } @@ -1879,6 +1917,7 @@ public: ui64 decodedBytes = 0; std::shared_ptr<arrow::RecordBatch> batch; ::arrow::Status status; + bool isCancelled = false; while (status = reader->ReadNext(&batch), status.ok() && batch) { auto convertedBatch = ConvertArrowColumns(batch, columnConverters); auto size = NUdf::GetSizeOfArrowBatchInBytes(*convertedBatch); @@ -1887,11 +1926,19 @@ public: Send(ParentActorId, new TEvPrivate::TEvNextRecordBatch( convertedBatch, PathIndex, TakeIngressDelta(), TakeCpuTimeDelta() )); + if (StopIfConsumedEnough(batch->num_rows())) { + isCancelled = true; + break; + } } if (!status.ok()) { throw yexception() << status.ToString(); } QueueBufferCounter->UpdateProgress(downloadedBytes, decodedBytes, table->num_rows()); + if (isCancelled) { + LOG_CORO_D("RunCoroBlockArrowParserOverFile - STOPPED ON SATURATION"); + break; + } } LOG_CORO_D("RunCoroBlockArrowParserOverFile - FINISHED"); @@ -2042,7 +2089,7 @@ private: public: TS3ReadCoroImpl(ui64 inputIndex, const TTxId& txId, const NActors::TActorId& computeActorId, const TRetryStuff::TPtr& retryStuff, const TReadSpec::TPtr& readSpec, size_t pathIndex, - const TString& path, const TString& url, + const TString& path, const TString& url, std::optional<ui64> maxRows, const TS3ReadActorFactoryConfig& readActorFactoryCfg, TReadBufferCounter::TPtr queueBufferCounter, const ::NMonitoring::TDynamicCounters::TCounterPtr& deferredQueueSize, @@ -2051,7 +2098,7 @@ public: const ::NMonitoring::TDynamicCounters::TCounterPtr& rawInflightSize) : TActorCoroImpl(256_KB), ReadActorFactoryCfg(readActorFactoryCfg), InputIndex(inputIndex), TxId(txId), RetryStuff(retryStuff), ReadSpec(readSpec), ComputeActorId(computeActorId), - PathIndex(pathIndex), Path(path), Url(url), + PathIndex(pathIndex), Path(path), Url(url), RowsRemained(maxRows), QueueBufferCounter(queueBufferCounter), DeferredQueueSize(deferredQueueSize), HttpInflightSize(httpInflightSize), HttpDataRps(httpDataRps), RawInflightSize(rawInflightSize) { @@ -2088,6 +2135,20 @@ private: return TDuration::Seconds(NHPTimer::GetSeconds(GetCycleCountFast() - StartCycleCount)); } + bool StopIfConsumedEnough(ui64 consumedRows) { + if (!RowsRemained) { + return false; + } + + *RowsRemained = SubtractSaturating(*RowsRemained, consumedRows); + if (*RowsRemained > 0) { + return false; + } + + RetryStuff->Cancel(); + return true; + } + void Run() final { NYql::NDqProto::StatusIds::StatusCode fatalCode = NYql::NDqProto::StatusIds::EXTERNAL_ERROR; @@ -2211,6 +2272,7 @@ private: TDuration CpuTime; ui64 StartCycleCount = 0; TString InputBuffer; + std::optional<ui64> RowsRemained; bool Paused = false; std::queue<THolder<TEvPrivate::TEvDataPart>> DeferredDataParts; TReadBufferCounter::TPtr QueueBufferCounter; @@ -2252,6 +2314,7 @@ public: ::NMonitoring::TDynamicCounterPtr counters, ::NMonitoring::TDynamicCounterPtr taskCounters, ui64 fileSizeLimit, + std::optional<ui64> rowsLimitHint, IMemoryQuotaManager::TPtr memoryQuotaManager ) : ReadActorFactoryCfg(readActorFactoryCfg) , Gateway(std::move(gateway)) @@ -2266,6 +2329,7 @@ public: , PatternVariant(patternVariant) , Paths(std::move(paths)) , AddPathIndex(addPathIndex) + , RowsRemained(rowsLimitHint) , ReadSpec(readSpec) , Counters(std::move(counters)) , TaskCounters(std::move(taskCounters)) @@ -2397,6 +2461,7 @@ public: pathIndex, objectPath.Path, Url, + RowsRemained, ReadActorFactoryCfg, QueueBufferCounter, DeferredQueueSize, @@ -2518,7 +2583,7 @@ private: TryRegisterCoro(); } while (!Blocks.empty() && free > 0LL && GetBlockSize(Blocks.front()) <= size_t(free)); - finished = LastFileWasProcessed(); + finished = ConsumedEnoughRows() || LastFileWasProcessed(); if (finished) { ContainerCache.Clear(); ArrowTupleContainerCache.Clear(); @@ -2611,6 +2676,7 @@ private: if (Counters) { QueueBlockCount->Inc(); } + StopLoadsIfEnough(next->Get()->Block.rows()); Blocks.emplace_back(next); Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived(InputIndex)); } @@ -2624,6 +2690,7 @@ private: if (Counters) { QueueBlockCount->Inc(); } + StopLoadsIfEnough(next->Get()->Batch->num_rows()); Blocks.emplace_back(next); Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived(InputIndex)); } @@ -2679,6 +2746,24 @@ private: return Blocks.empty() && (ListedFiles == CompletedFiles) && IsObjectQueueEmpty; } + void StopLoadsIfEnough(ui64 consumedRows) { + if (!RowsRemained) { + return; + } + + *RowsRemained = SubtractSaturating(*RowsRemained, consumedRows); + if (*RowsRemained == 0) { + LOG_T("TS3StreamReadActor", "StopLoadsIfEnough(consumedRows = " << consumedRows << ") sends poison"); + for (const auto actorId : CoroActors) { + Send(actorId, new NActors::TEvents::TEvPoison()); + } + } + } + + bool ConsumedEnoughRows() const noexcept { + return RowsRemained && *RowsRemained == 0; + } + const TS3ReadActorFactoryConfig ReadActorFactoryCfg; const IHTTPGateway::TPtr Gateway; THashMap<NActors::TActorId, TRetryStuff::TPtr> RetryStuffForFile; @@ -2704,6 +2789,7 @@ private: const bool AddPathIndex; size_t ListedFiles = 0; size_t CompletedFiles = 0; + std::optional<ui64> RowsRemained; const TReadSpec::TPtr ReadSpec; std::deque<TReadyBlock> Blocks; TDuration CpuTime; @@ -2941,7 +3027,6 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor( intervalUnit = NYql::NSerialization::TSerializationInterval::ToUnit(it->second); } - // For later use std::optional<ui64> rowsLimitHint; if (params.GetRowsLimitHint() != 0) { rowsLimitHint = params.GetRowsLimitHint(); @@ -2958,6 +3043,11 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor( readSpec->ParallelRowGroupCount = params.GetParallelRowGroupCount(); readSpec->RowGroupReordering = params.GetRowGroupReordering(); readSpec->ParallelDownloadCount = params.GetParallelDownloadCount(); + + if (rowsLimitHint && *rowsLimitHint <= 1000) { + readSpec->ParallelRowGroupCount = 1; + readSpec->ParallelDownloadCount = 1; + } if (readSpec->Arrow) { fileSizeLimit = cfg.BlockFileSizeLimit; arrow::SchemaBuilder builder; @@ -3042,7 +3132,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor( #undef SUPPORTED_FLAGS const auto actor = new TS3StreamReadActor(inputIndex, statsLevel, txId, std::move(gateway), holderFactory, params.GetUrl(), authToken, pathPattern, pathPatternVariant, std::move(paths), addPathIndex, readSpec, computeActorId, retryPolicy, - cfg, counters, taskCounters, fileSizeLimit, memoryQuotaManager); + cfg, counters, taskCounters, fileSizeLimit, rowsLimitHint, memoryQuotaManager); return {actor, actor}; } else { @@ -3052,7 +3142,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor( const auto actor = new TS3ReadActor(inputIndex, statsLevel, txId, std::move(gateway), holderFactory, params.GetUrl(), authToken, pathPattern, pathPatternVariant, std::move(paths), addPathIndex, computeActorId, sizeLimit, retryPolicy, - cfg, counters, taskCounters, fileSizeLimit); + cfg, counters, taskCounters, fileSizeLimit, rowsLimitHint); return {actor, actor}; } } |