aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authore-zudin <e-zudin@ydb.tech>2023-11-23 19:46:01 +0300
committere-zudin <e-zudin@ydb.tech>2023-11-23 20:48:10 +0300
commitdba7ceaa399571511c21d2007ba1ab9f477e6be9 (patch)
tree42ca7c71134b4e2ad80d632e0f466fb4bff475c9
parent1d2cbee7308557abc8872c34fa4020b6eee54b0c (diff)
downloadydb-dba7ceaa399571511c21d2007ba1ab9f477e6be9.tar.gz
Add limit for load from S3 source
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp112
1 files changed, 101 insertions, 11 deletions
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
index 1962e7c7f6..3a9fe1a0a5 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
@@ -666,6 +666,10 @@ private:
const ES3PatternType PatternType;
};
+ui64 SubtractSaturating(ui64 lhs, ui64 rhs) {
+ return (lhs > rhs) ? lhs - rhs : 0;
+}
+
class TS3ReadActor : public TActorBootstrapped<TS3ReadActor>, public IDqComputeActorAsyncInput {
public:
TS3ReadActor(ui64 inputIndex,
@@ -685,7 +689,8 @@ public:
const TS3ReadActorFactoryConfig& readActorFactoryCfg,
::NMonitoring::TDynamicCounterPtr counters,
::NMonitoring::TDynamicCounterPtr taskCounters,
- ui64 fileSizeLimit)
+ ui64 fileSizeLimit,
+ std::optional<ui64> rowsLimitHint)
: ReadActorFactoryCfg(readActorFactoryCfg)
, Gateway(std::move(gateway))
, HolderFactory(holderFactory)
@@ -703,7 +708,8 @@ public:
, SizeLimit(sizeLimit)
, Counters(counters)
, TaskCounters(taskCounters)
- , FileSizeLimit(fileSizeLimit) {
+ , FileSizeLimit(fileSizeLimit)
+ , FilesRemained(rowsLimitHint) {
if (Counters) {
QueueDataSize = Counters->GetCounter("QueueDataSize");
QueueDataLimit = Counters->GetCounter("QueueDataLimit");
@@ -748,6 +754,10 @@ public:
// too large download inflight
return false;
}
+ if (ConsumedEnoughFiles()) {
+ // started enough downloads
+ return false;
+ }
StartDownload();
return true;
@@ -774,17 +784,18 @@ public:
Y_ENSURE(!ObjectPathCache.empty());
auto object = ObjectPathCache.back();
ObjectPathCache.pop_back();
- if (ObjectPathCache.empty() && !IsObjectQueueEmpty) {
+ if (ObjectPathCache.empty() && !IsObjectQueueEmpty && !ConsumedEnoughFiles()) {
SendPathRequest();
}
return object;
}
void SendPathRequest() {
Y_ENSURE(!IsWaitingObjectQueueResponse);
+ const ui64 requestedAmount = std::min(ReadActorFactoryCfg.MaxInflight, FilesRemained.value_or(std::numeric_limits<ui64>::max()));
Send(
FileQueueActor,
std::make_unique<TS3FileQueueActor::TEvPrivatePrivate::TEvGetNextFile>(
- ReadActorFactoryCfg.MaxInflight));
+ requestedAmount));
IsWaitingObjectQueueResponse = true;
}
@@ -807,6 +818,10 @@ private:
return CpuTime;
}
+ bool ConsumedEnoughFiles() const {
+ return FilesRemained && (*FilesRemained == 0);
+ }
+
STRICT_STFUNC(StateFunc,
hFunc(TEvPrivate::TEvReadResult, Handle);
hFunc(TEvPrivate::TEvReadError, Handle);
@@ -876,7 +891,7 @@ private:
} while (!Blocks.empty() && freeSpace > 0LL);
}
- if (LastFileWasProcessed()) {
+ if (LastFileWasProcessed() || ConsumedEnoughFiles()) {
finished = true;
ContainerCache.Clear();
}
@@ -917,6 +932,9 @@ private:
}
Blocks.emplace(std::make_tuple(std::move(result->Get()->Result), id));
DownloadInflight--;
+ if (FilesRemained) {
+ *FilesRemained = SubtractSaturating(*FilesRemained, 1);
+ }
TryStartDownload();
Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex));
} else {
@@ -1005,6 +1023,7 @@ private:
ui64 QueueTotalDataSize = 0;
ui64 DownloadInflight = 0;
const ui64 FileSizeLimit;
+ std::optional<ui64> FilesRemained;
};
struct TReadSpec {
@@ -1460,6 +1479,7 @@ public:
while (NDB::Block batch = stream->read()) {
Paused = QueueBufferCounter->Add(batch.bytes(), SelfActorId);
+ const bool isStopped = StopIfConsumedEnough(batch.rows());
Send(ParentActorId, new TEvPrivate::TEvNextBlock(batch, PathIndex, TakeIngressDelta(), TakeCpuTimeDelta()));
if (Paused) {
CpuTime += GetCpuTimeDelta();
@@ -1467,6 +1487,10 @@ public:
HandleEvent(*ev);
StartCycleCount = GetCycleCountFast();
}
+ if (isStopped) {
+ LOG_CORO_D("RunClickHouseParserOverHttp - STOPPED ON SATURATION");
+ break;
+ }
}
LOG_CORO_D("RunClickHouseParserOverHttp - FINISHED");
@@ -1496,6 +1520,7 @@ public:
while (NDB::Block batch = stream->read()) {
Paused = QueueBufferCounter->Add(batch.bytes(), SelfActorId);
+ const bool isCancelled = StopIfConsumedEnough(batch.rows());
Send(ParentActorId, new TEvPrivate::TEvNextBlock(batch, PathIndex, TakeIngressDelta(), TakeCpuTimeDelta()));
if (Paused) {
CpuTime += GetCpuTimeDelta();
@@ -1503,6 +1528,10 @@ public:
HandleEvent(*ev);
StartCycleCount = GetCycleCountFast();
}
+ if (isCancelled) {
+ LOG_CORO_D("RunClickHouseParserOverFile STOPPED ON SATURATION");
+ break;
+ }
}
IngressBytes += GetFileLength(fileName);
@@ -1801,6 +1830,7 @@ public:
std::shared_ptr<arrow::RecordBatch> batch;
arrow::Status status;
+ bool isCancelled = false;
while (status = reader->ReadNext(&batch), status.ok() && batch) {
auto convertedBatch = ConvertArrowColumns(batch, columnConverters);
auto size = NUdf::GetSizeOfArrowBatchInBytes(*convertedBatch);
@@ -1809,6 +1839,10 @@ public:
Send(ParentActorId, new TEvPrivate::TEvNextRecordBatch(
convertedBatch, PathIndex, TakeIngressDelta(), TakeCpuTimeDelta()
));
+ if (StopIfConsumedEnough(convertedBatch->num_rows())) {
+ isCancelled = true;
+ break;
+ }
}
if (!status.ok()) {
throw yexception() << status.ToString();
@@ -1828,6 +1862,10 @@ public:
} else {
readers[readyReaderIndex].reset();
}
+ if (isCancelled) {
+ LOG_CORO_D("RunCoroBlockArrowParserOverHttp - STOPPED ON SATURATION");
+ break;
+ }
}
}
@@ -1879,6 +1917,7 @@ public:
ui64 decodedBytes = 0;
std::shared_ptr<arrow::RecordBatch> batch;
::arrow::Status status;
+ bool isCancelled = false;
while (status = reader->ReadNext(&batch), status.ok() && batch) {
auto convertedBatch = ConvertArrowColumns(batch, columnConverters);
auto size = NUdf::GetSizeOfArrowBatchInBytes(*convertedBatch);
@@ -1887,11 +1926,19 @@ public:
Send(ParentActorId, new TEvPrivate::TEvNextRecordBatch(
convertedBatch, PathIndex, TakeIngressDelta(), TakeCpuTimeDelta()
));
+ if (StopIfConsumedEnough(batch->num_rows())) {
+ isCancelled = true;
+ break;
+ }
}
if (!status.ok()) {
throw yexception() << status.ToString();
}
QueueBufferCounter->UpdateProgress(downloadedBytes, decodedBytes, table->num_rows());
+ if (isCancelled) {
+ LOG_CORO_D("RunCoroBlockArrowParserOverFile - STOPPED ON SATURATION");
+ break;
+ }
}
LOG_CORO_D("RunCoroBlockArrowParserOverFile - FINISHED");
@@ -2042,7 +2089,7 @@ private:
public:
TS3ReadCoroImpl(ui64 inputIndex, const TTxId& txId, const NActors::TActorId& computeActorId,
const TRetryStuff::TPtr& retryStuff, const TReadSpec::TPtr& readSpec, size_t pathIndex,
- const TString& path, const TString& url,
+ const TString& path, const TString& url, std::optional<ui64> maxRows,
const TS3ReadActorFactoryConfig& readActorFactoryCfg,
TReadBufferCounter::TPtr queueBufferCounter,
const ::NMonitoring::TDynamicCounters::TCounterPtr& deferredQueueSize,
@@ -2051,7 +2098,7 @@ public:
const ::NMonitoring::TDynamicCounters::TCounterPtr& rawInflightSize)
: TActorCoroImpl(256_KB), ReadActorFactoryCfg(readActorFactoryCfg), InputIndex(inputIndex),
TxId(txId), RetryStuff(retryStuff), ReadSpec(readSpec), ComputeActorId(computeActorId),
- PathIndex(pathIndex), Path(path), Url(url),
+ PathIndex(pathIndex), Path(path), Url(url), RowsRemained(maxRows),
QueueBufferCounter(queueBufferCounter),
DeferredQueueSize(deferredQueueSize), HttpInflightSize(httpInflightSize),
HttpDataRps(httpDataRps), RawInflightSize(rawInflightSize) {
@@ -2088,6 +2135,20 @@ private:
return TDuration::Seconds(NHPTimer::GetSeconds(GetCycleCountFast() - StartCycleCount));
}
+ bool StopIfConsumedEnough(ui64 consumedRows) {
+ if (!RowsRemained) {
+ return false;
+ }
+
+ *RowsRemained = SubtractSaturating(*RowsRemained, consumedRows);
+ if (*RowsRemained > 0) {
+ return false;
+ }
+
+ RetryStuff->Cancel();
+ return true;
+ }
+
void Run() final {
NYql::NDqProto::StatusIds::StatusCode fatalCode = NYql::NDqProto::StatusIds::EXTERNAL_ERROR;
@@ -2211,6 +2272,7 @@ private:
TDuration CpuTime;
ui64 StartCycleCount = 0;
TString InputBuffer;
+ std::optional<ui64> RowsRemained;
bool Paused = false;
std::queue<THolder<TEvPrivate::TEvDataPart>> DeferredDataParts;
TReadBufferCounter::TPtr QueueBufferCounter;
@@ -2252,6 +2314,7 @@ public:
::NMonitoring::TDynamicCounterPtr counters,
::NMonitoring::TDynamicCounterPtr taskCounters,
ui64 fileSizeLimit,
+ std::optional<ui64> rowsLimitHint,
IMemoryQuotaManager::TPtr memoryQuotaManager
) : ReadActorFactoryCfg(readActorFactoryCfg)
, Gateway(std::move(gateway))
@@ -2266,6 +2329,7 @@ public:
, PatternVariant(patternVariant)
, Paths(std::move(paths))
, AddPathIndex(addPathIndex)
+ , RowsRemained(rowsLimitHint)
, ReadSpec(readSpec)
, Counters(std::move(counters))
, TaskCounters(std::move(taskCounters))
@@ -2397,6 +2461,7 @@ public:
pathIndex,
objectPath.Path,
Url,
+ RowsRemained,
ReadActorFactoryCfg,
QueueBufferCounter,
DeferredQueueSize,
@@ -2518,7 +2583,7 @@ private:
TryRegisterCoro();
} while (!Blocks.empty() && free > 0LL && GetBlockSize(Blocks.front()) <= size_t(free));
- finished = LastFileWasProcessed();
+ finished = ConsumedEnoughRows() || LastFileWasProcessed();
if (finished) {
ContainerCache.Clear();
ArrowTupleContainerCache.Clear();
@@ -2611,6 +2676,7 @@ private:
if (Counters) {
QueueBlockCount->Inc();
}
+ StopLoadsIfEnough(next->Get()->Block.rows());
Blocks.emplace_back(next);
Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived(InputIndex));
}
@@ -2624,6 +2690,7 @@ private:
if (Counters) {
QueueBlockCount->Inc();
}
+ StopLoadsIfEnough(next->Get()->Batch->num_rows());
Blocks.emplace_back(next);
Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived(InputIndex));
}
@@ -2679,6 +2746,24 @@ private:
return Blocks.empty() && (ListedFiles == CompletedFiles) && IsObjectQueueEmpty;
}
+ void StopLoadsIfEnough(ui64 consumedRows) {
+ if (!RowsRemained) {
+ return;
+ }
+
+ *RowsRemained = SubtractSaturating(*RowsRemained, consumedRows);
+ if (*RowsRemained == 0) {
+ LOG_T("TS3StreamReadActor", "StopLoadsIfEnough(consumedRows = " << consumedRows << ") sends poison");
+ for (const auto actorId : CoroActors) {
+ Send(actorId, new NActors::TEvents::TEvPoison());
+ }
+ }
+ }
+
+ bool ConsumedEnoughRows() const noexcept {
+ return RowsRemained && *RowsRemained == 0;
+ }
+
const TS3ReadActorFactoryConfig ReadActorFactoryCfg;
const IHTTPGateway::TPtr Gateway;
THashMap<NActors::TActorId, TRetryStuff::TPtr> RetryStuffForFile;
@@ -2704,6 +2789,7 @@ private:
const bool AddPathIndex;
size_t ListedFiles = 0;
size_t CompletedFiles = 0;
+ std::optional<ui64> RowsRemained;
const TReadSpec::TPtr ReadSpec;
std::deque<TReadyBlock> Blocks;
TDuration CpuTime;
@@ -2941,7 +3027,6 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
intervalUnit = NYql::NSerialization::TSerializationInterval::ToUnit(it->second);
}
- // For later use
std::optional<ui64> rowsLimitHint;
if (params.GetRowsLimitHint() != 0) {
rowsLimitHint = params.GetRowsLimitHint();
@@ -2958,6 +3043,11 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
readSpec->ParallelRowGroupCount = params.GetParallelRowGroupCount();
readSpec->RowGroupReordering = params.GetRowGroupReordering();
readSpec->ParallelDownloadCount = params.GetParallelDownloadCount();
+
+ if (rowsLimitHint && *rowsLimitHint <= 1000) {
+ readSpec->ParallelRowGroupCount = 1;
+ readSpec->ParallelDownloadCount = 1;
+ }
if (readSpec->Arrow) {
fileSizeLimit = cfg.BlockFileSizeLimit;
arrow::SchemaBuilder builder;
@@ -3042,7 +3132,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
#undef SUPPORTED_FLAGS
const auto actor = new TS3StreamReadActor(inputIndex, statsLevel, txId, std::move(gateway), holderFactory, params.GetUrl(), authToken, pathPattern, pathPatternVariant,
std::move(paths), addPathIndex, readSpec, computeActorId, retryPolicy,
- cfg, counters, taskCounters, fileSizeLimit, memoryQuotaManager);
+ cfg, counters, taskCounters, fileSizeLimit, rowsLimitHint, memoryQuotaManager);
return {actor, actor};
} else {
@@ -3052,7 +3142,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
const auto actor = new TS3ReadActor(inputIndex, statsLevel, txId, std::move(gateway), holderFactory, params.GetUrl(), authToken, pathPattern, pathPatternVariant,
std::move(paths), addPathIndex, computeActorId, sizeLimit, retryPolicy,
- cfg, counters, taskCounters, fileSizeLimit);
+ cfg, counters, taskCounters, fileSizeLimit, rowsLimitHint);
return {actor, actor};
}
}