aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhor911 <hor911@ydb.tech>2023-04-28 15:27:45 +0300
committerhor911 <hor911@ydb.tech>2023-04-28 15:27:45 +0300
commitfeb79a8842a81474d42b9c460ffb7d93ef44184c (patch)
tree813b5499dda34dfd1c9e01d4873e24969cde018b
parent277bf2b433110a4e3a5a127d7cc801d050d0394f (diff)
downloadydb-feb79a8842a81474d42b9c460ffb7d93ef44184c.tar.gz
Accurate mem quota
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp496
-rw-r--r--ydb/library/yql/providers/s3/proto/source.proto1
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp5
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp1
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_settings.h1
5 files changed, 333 insertions, 171 deletions
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
index bb97cee6be7..798a50b6de7 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
@@ -157,9 +157,7 @@ struct TEvPrivate {
EvRetry,
EvNextBlock,
EvNextRecordBatch,
- EvBlockProcessed,
EvFileFinished,
- EvPause,
EvContinue,
EvFutureResolved,
EvObjectPathBatch,
@@ -246,35 +244,26 @@ struct TEvPrivate {
};
struct TEvNextBlock : public NActors::TEventLocal<TEvNextBlock, EvNextBlock> {
- TEvNextBlock(NDB::Block& block, size_t pathInd, std::function<void()> functor, ui64 ingressDelta, TDuration cpuTimeDelta)
- : PathIndex(pathInd), Functor(functor), IngressDelta(ingressDelta), CpuTimeDelta(cpuTimeDelta) {
+ TEvNextBlock(NDB::Block& block, size_t pathInd, ui64 ingressDelta, TDuration cpuTimeDelta)
+ : PathIndex(pathInd), IngressDelta(ingressDelta), CpuTimeDelta(cpuTimeDelta) {
Block.swap(block);
}
NDB::Block Block;
const size_t PathIndex;
- std::function<void()> Functor;
const ui64 IngressDelta;
const TDuration CpuTimeDelta;
};
struct TEvNextRecordBatch : public NActors::TEventLocal<TEvNextRecordBatch, EvNextRecordBatch> {
- TEvNextRecordBatch(const std::shared_ptr<arrow::RecordBatch>& batch, size_t pathInd, std::function<void()> functor, ui64 ingressDelta, TDuration cpuTimeDelta)
- : Batch(batch), PathIndex(pathInd), Functor(functor), IngressDelta(ingressDelta), CpuTimeDelta(cpuTimeDelta) {
+ TEvNextRecordBatch(const std::shared_ptr<arrow::RecordBatch>& batch, size_t pathInd, ui64 ingressDelta, TDuration cpuTimeDelta)
+ : Batch(batch), PathIndex(pathInd), IngressDelta(ingressDelta), CpuTimeDelta(cpuTimeDelta) {
}
std::shared_ptr<arrow::RecordBatch> Batch;
const size_t PathIndex;
- std::function<void()> Functor;
const ui64 IngressDelta;
const TDuration CpuTimeDelta;
};
- struct TEvBlockProcessed : public NActors::TEventLocal<TEvBlockProcessed, EvBlockProcessed> {
- TEvBlockProcessed() {}
- };
-
- struct TEvPause : public NActors::TEventLocal<TEvPause, EvPause> {
- };
-
struct TEvContinue : public NActors::TEventLocal<TEvContinue, EvContinue> {
};
@@ -1011,8 +1000,9 @@ struct TReadSpec {
bool Arrow = false;
bool ThreadPool = false;
- ui64 ParallelRowGroupCount = 1;
+ ui64 ParallelRowGroupCount = 0;
bool RowGroupReordering = true;
+ ui64 ParallelDownloadCount = 0;
std::unordered_map<TStringBuf, TType*, THash<TStringBuf>> RowSpec;
NDB::ColumnsWithTypeAndName CHColumns;
std::shared_ptr<arrow::Schema> ArrowSchema;
@@ -1173,6 +1163,151 @@ std::shared_ptr<arrow::RecordBatch> ConvertArrowColumns(std::shared_ptr<arrow::R
return arrow::RecordBatch::Make(batch->schema(), batch->num_rows(), columns);
}
+struct TReadBufferCounter {
+ using TPtr = std::shared_ptr<TReadBufferCounter>;
+
+ TReadBufferCounter(ui64 limit,
+ TActorSystem* actorSystem,
+ NMonitoring::TDynamicCounters::TCounterPtr queueDataSize,
+ NMonitoring::TDynamicCounters::TCounterPtr taskQueueDataSize,
+ NMonitoring::TDynamicCounters::TCounterPtr downloadPaused,
+ NMonitoring::TDynamicCounters::TCounterPtr taskDownloadPaused,
+ NMonitoring::TDynamicCounters::TCounterPtr taskChunkDownloadCount,
+ NMonitoring::THistogramPtr decodedChunkSizeHist)
+ : Limit(limit)
+ , ActorSystem(actorSystem)
+ , QueueDataSize(queueDataSize)
+ , TaskQueueDataSize(taskQueueDataSize)
+ , DownloadPaused(downloadPaused)
+ , TaskDownloadPaused(taskDownloadPaused)
+ , TaskChunkDownloadCount(taskChunkDownloadCount)
+ , DecodedChunkSizeHist(decodedChunkSizeHist)
+ {
+ }
+
+ ~TReadBufferCounter() {
+ Notify();
+ if (Value) {
+ if (QueueDataSize) {
+ QueueDataSize->Sub(Value);
+ }
+ if (TaskQueueDataSize) {
+ TaskQueueDataSize->Sub(Value);
+ }
+ Value = 0;
+ }
+ }
+
+ bool IsFull() const {
+ return Value >= Limit;
+ }
+
+ double Ratio() const {
+ return DownloadedBytes ? static_cast<double>(DecodedBytes) / DownloadedBytes : 1.0;
+ }
+
+ ui64 FairShare() {
+ return CoroCount ? Limit / CoroCount : Limit;
+ }
+
+ void IncChunk() {
+ ChunkCount++;
+ if (TaskChunkDownloadCount) {
+ TaskChunkDownloadCount->Inc();
+ }
+ }
+
+ void DecChunk() {
+ ChunkCount--;
+ if (TaskChunkDownloadCount) {
+ TaskChunkDownloadCount->Dec();
+ }
+ }
+
+ bool Add(ui64 delta, NActors::TActorId producer, bool paused = false) {
+ if (DecodedChunkSizeHist) {
+ DecodedChunkSizeHist->Collect(delta);
+ }
+ Value += delta;
+ if (QueueDataSize) {
+ QueueDataSize->Add(delta);
+ }
+ if (TaskQueueDataSize) {
+ TaskQueueDataSize->Add(delta);
+ }
+ if ((Value + delta / 2) >= Limit) {
+ if (!paused) {
+ if (DownloadPaused) {
+ DownloadPaused->Inc();
+ }
+ if (TaskDownloadPaused) {
+ TaskDownloadPaused->Inc();
+ }
+ Producers.push_back(producer);
+ paused = true;
+ }
+ }
+ return paused;
+ }
+
+ void Sub(ui64 delta) {
+ Y_ASSERT(Value >= delta);
+ Value -= delta;
+ if (QueueDataSize) {
+ QueueDataSize->Sub(delta);
+ }
+ if (TaskQueueDataSize) {
+ TaskQueueDataSize->Sub(delta);
+ }
+ if (Value * 4 < Limit * 3) { // l.eq.t 75%
+ Notify();
+ }
+ }
+
+ void Notify() {
+ if (!Producers.empty()) {
+ if (DownloadPaused) {
+ DownloadPaused->Sub(Producers.size());
+ }
+ if (TaskDownloadPaused) {
+ TaskDownloadPaused->Sub(Producers.size());
+ }
+ for (auto producer : Producers) {
+ ActorSystem->Send(new IEventHandle(producer, TActorId{}, new TEvPrivate::TEvContinue()));
+ }
+ Producers.clear();
+ }
+ }
+
+ void UpdateProgress(ui64 deltaDownloadedBytes, ui64 deltaDecodedBytes, ui64 deltaDecodedRows) {
+ DownloadedBytes += deltaDownloadedBytes;
+ DecodedBytes += deltaDecodedBytes;
+ DecodedRows += deltaDecodedRows;
+ }
+
+ ui64 Value = 0;
+ const ui64 Limit;
+ ui64 CoroCount = 0;
+ ui64 ChunkCount = 0;
+ ui64 DownloadedBytes = 0;
+ ui64 DecodedBytes = 0;
+ ui64 DecodedRows = 0;
+ std::vector<NActors::TActorId> Producers;
+ TActorSystem* ActorSystem = nullptr;
+ NMonitoring::TDynamicCounters::TCounterPtr QueueDataSize;
+ NMonitoring::TDynamicCounters::TCounterPtr TaskQueueDataSize;
+ NMonitoring::TDynamicCounters::TCounterPtr DownloadPaused;
+ NMonitoring::TDynamicCounters::TCounterPtr TaskDownloadPaused;
+ NMonitoring::TDynamicCounters::TCounterPtr TaskChunkDownloadCount;
+ NMonitoring::THistogramPtr DecodedChunkSizeHist;
+};
+
+struct TParquetFileInfo {
+ ui64 RowCount = 0;
+ ui64 CompressedSize = 0;
+ ui64 UncompressedSize = 0;
+};
+
class TS3ReadCoroImpl : public TActorCoroImpl {
friend class TS3StreamReadActor;
@@ -1304,7 +1439,14 @@ public:
);
while (NDB::Block batch = stream->read()) {
- Send(ParentActorId, new TEvPrivate::TEvNextBlock(batch, PathIndex, [](){}, TakeIngressDelta(), TakeCpuTimeDelta()));
+ Paused = QueueBufferCounter->Add(batch.bytes(), SelfActorId);
+ Send(ParentActorId, new TEvPrivate::TEvNextBlock(batch, PathIndex, TakeIngressDelta(), TakeCpuTimeDelta()));
+ if (Paused) {
+ CpuTime += GetCpuTimeDelta();
+ auto ev = WaitForSpecificEvent<TEvPrivate::TEvContinue>(&TS3ReadCoroImpl::ProcessUnexpectedEvent);
+ HandleEvent(*ev);
+ StartCycleCount = GetCycleCountFast();
+ }
}
LOG_CORO_D("RunClickHouseParserOverHttp - FINISHED");
@@ -1332,23 +1474,16 @@ public:
)
);
- auto actorSystem = GetActorSystem();
- auto selfId = SelfActorId;
- size_t cntBlocksInFly = 0;
-
while (NDB::Block batch = stream->read()) {
- if (++cntBlocksInFly > MaxBlocksInFly) {
- WaitForSpecificEvent<TEvPrivate::TEvBlockProcessed>(&TS3ReadCoroImpl::ProcessUnexpectedEvent);
- --cntBlocksInFly;
+ Paused = QueueBufferCounter->Add(batch.bytes(), SelfActorId);
+ Send(ParentActorId, new TEvPrivate::TEvNextBlock(batch, PathIndex, TakeIngressDelta(), TakeCpuTimeDelta()));
+ if (Paused) {
+ CpuTime += GetCpuTimeDelta();
+ auto ev = WaitForSpecificEvent<TEvPrivate::TEvContinue>(&TS3ReadCoroImpl::ProcessUnexpectedEvent);
+ HandleEvent(*ev);
+ StartCycleCount = GetCycleCountFast();
}
- Send(ParentActorId, new TEvPrivate::TEvNextBlock(batch, PathIndex, [actorSystem, selfId]() {
- actorSystem->Send(new IEventHandle(selfId, TActorId{}, new TEvPrivate::TEvBlockProcessed()));
- }, TakeIngressDelta(), TakeCpuTimeDelta()));
}
- while (cntBlocksInFly--) {
- WaitForSpecificEvent<TEvPrivate::TEvBlockProcessed>(&TS3ReadCoroImpl::ProcessUnexpectedEvent);
- }
-
IngressBytes += GetFileLength(fileName);
LOG_CORO_D("RunClickHouseParserOverFile FINISHED");
@@ -1426,9 +1561,17 @@ public:
std::shared_ptr<arrow::RecordBatch> batch;
::arrow::Status status;
while (status = reader->ReadNext(&batch), status.ok() && batch) {
+ auto convertedBatch = ConvertArrowColumns(batch, columnConverters);
+ Paused = QueueBufferCounter->Add(NUdf::GetSizeOfArrowBatchInBytes(*convertedBatch), SelfActorId);
Send(ParentActorId, new TEvPrivate::TEvNextRecordBatch(
- ConvertArrowColumns(batch, columnConverters), PathIndex, [](){}, TakeIngressDelta(), TakeCpuTimeDelta()
+ convertedBatch, PathIndex, TakeIngressDelta(), TakeCpuTimeDelta()
));
+ if (Paused) {
+ CpuTime += GetCpuTimeDelta();
+ auto ev = WaitForSpecificEvent<TEvPrivate::TEvContinue>(&TS3ReadCoroImpl::ProcessUnexpectedEvent);
+ HandleEvent(*ev);
+ StartCycleCount = GetCycleCountFast();
+ }
}
if (!status.ok()) {
throw yexception() << status.ToString();
@@ -1509,12 +1652,6 @@ public:
}
arrow::Status WillNeed(const std::vector<arrow::io::ReadRange>& readRanges) {
- if (Paused) {
- CpuTime += GetCpuTimeDelta();
- auto ev = WaitForSpecificEvent<TEvPrivate::TEvContinue>(&TS3ReadCoroImpl::ProcessUnexpectedEvent);
- HandleEvent(*ev);
- StartCycleCount = GetCycleCountFast();
- }
if (readRanges.empty()) { // select count(*) case
if (CurrentRowGroupIndex) {
ReadyRowGroups.push(*CurrentRowGroupIndex);
@@ -1575,10 +1712,6 @@ public:
auto ev = WaitForSpecificEvent<TEvPrivate::TEvReadResult2>(&TS3ReadCoroImpl::ProcessUnexpectedEvent);
HandleEvent(*ev);
}
- if (Paused) {
- auto ev = WaitForSpecificEvent<TEvPrivate::TEvContinue>(&TS3ReadCoroImpl::ProcessUnexpectedEvent);
- HandleEvent(*ev);
- }
StartCycleCount = GetCycleCountFast();
@@ -1594,9 +1727,8 @@ public:
LOG_CORO_D("RunCoroBlockArrowParserOverHttp");
- ui64 readerCount = std::max(1ul, ReadSpec->ParallelRowGroupCount);
+ ui64 readerCount = 1;
- std::shared_ptr<parquet::FileMetaData> metadata;
std::vector<std::unique_ptr<parquet::arrow::FileReader>> readers;
parquet::arrow::FileReaderBuilder builder;
@@ -1622,11 +1754,31 @@ public:
BuildColumnConverters(ReadSpec->ArrowSchema, schema, columnIndices, columnConverters);
- if (columnIndices.empty()) {
- // select count(*) case - single reader is enough
- readerCount = 1;
- } else if (readerCount > numGroups) {
- readerCount = numGroups;
+ // select count(*) case - single reader is enough
+ if (!columnIndices.empty()) {
+ if (ReadSpec->ParallelRowGroupCount) {
+ readerCount = ReadSpec->ParallelRowGroupCount;
+ } else {
+ // we want to read in parallel as much as 1/2 of fair share bytes
+ // (it's compressed size, after decoding it will grow)
+ ui64 compressedSize = 0;
+ for (int i = 0; i < fileMetadata->num_row_groups(); i++) {
+ auto rowGroup = fileMetadata->RowGroup(i);
+ for (const auto columIndex : columnIndices) {
+ compressedSize += rowGroup->ColumnChunk(columIndex)->total_compressed_size();
+ }
+ }
+ // count = (fair_share / 2) / (compressed_size / num_group)
+ auto desiredReaderCount = (QueueBufferCounter->FairShare() * numGroups) / (compressedSize * 2);
+ // min is 1
+ // max is 5 (should be also tuned probably)
+ if (desiredReaderCount) {
+ readerCount = std::min(desiredReaderCount, 5ul);
+ }
+ }
+ if (readerCount > numGroups) {
+ readerCount = numGroups;
+ }
}
if (readerCount > 1) {
@@ -1644,6 +1796,7 @@ public:
if (!columnIndices.empty()) {
CurrentRowGroupIndex = i;
THROW_ARROW_NOT_OK(readers[i]->WillNeedRowGroups({ static_cast<int>(i) }, columnIndices));
+ QueueBufferCounter->IncChunk();
}
RowGroupReaderIndex[i] = i;
}
@@ -1652,6 +1805,12 @@ public:
ui64 readyGroupCount = 0;
while (readyGroupCount < numGroups) {
+ if (Paused) {
+ CpuTime += GetCpuTimeDelta();
+ auto ev = WaitForSpecificEvent<TEvPrivate::TEvContinue>(&TS3ReadCoroImpl::ProcessUnexpectedEvent);
+ HandleEvent(*ev);
+ StartCycleCount = GetCycleCountFast();
+ }
ui64 readyGroupIndex;
if (!columnIndices.empty()) {
@@ -1671,6 +1830,7 @@ public:
// select count(*) case - no columns, no download, just fetch meta info instantly
readyGroupIndex = readyGroupCount;
}
+ QueueBufferCounter->DecChunk();
auto readyReaderIndex = RowGroupReaderIndex[readyGroupIndex];
RowGroupReaderIndex.erase(readyGroupIndex);
@@ -1680,28 +1840,40 @@ public:
THROW_ARROW_NOT_OK(readers[readyReaderIndex]->DecodeRowGroups({ static_cast<int>(readyGroupIndex) }, columnIndices, &table));
readyGroupCount++;
+ auto downloadedBytes = ReadInflightSize[readyGroupIndex];
+ ui64 decodedBytes = 0;
+ ReadInflightSize.erase(readyGroupIndex);
+
auto reader = std::make_unique<arrow::TableBatchReader>(*table);
+
std::shared_ptr<arrow::RecordBatch> batch;
arrow::Status status;
while (status = reader->ReadNext(&batch), status.ok() && batch) {
+ auto convertedBatch = ConvertArrowColumns(batch, columnConverters);
+ auto size = NUdf::GetSizeOfArrowBatchInBytes(*convertedBatch);
+ decodedBytes += size;
+ Paused = QueueBufferCounter->Add(size, SelfActorId, Paused);
Send(ParentActorId, new TEvPrivate::TEvNextRecordBatch(
- ConvertArrowColumns(batch, columnConverters), PathIndex, [](){}, TakeIngressDelta(), TakeCpuTimeDelta()
+ convertedBatch, PathIndex, TakeIngressDelta(), TakeCpuTimeDelta()
));
}
if (!status.ok()) {
throw yexception() << status.ToString();
}
+ QueueBufferCounter->UpdateProgress(downloadedBytes, decodedBytes, table->num_rows());
if (RawInflightSize) {
- RawInflightSize->Sub(ReadInflightSize[readyGroupIndex]);
+ RawInflightSize->Sub(downloadedBytes);
}
- ReadInflightSize.erase(readyGroupIndex);
if (nextGroup < numGroups) {
if (!columnIndices.empty()) {
CurrentRowGroupIndex = nextGroup;
THROW_ARROW_NOT_OK(readers[readyReaderIndex]->WillNeedRowGroups({ static_cast<int>(nextGroup) }, columnIndices));
+ QueueBufferCounter->IncChunk();
}
RowGroupReaderIndex[nextGroup] = readyReaderIndex;
nextGroup++;
+ } else {
+ readers[readyReaderIndex].reset();
}
}
}
@@ -1734,35 +1906,39 @@ public:
BuildColumnConverters(ReadSpec->ArrowSchema, schema, columnIndices, columnConverters);
- auto actorSystem = GetActorSystem();
- auto selfId = SelfActorId;
- size_t cntBlocksInFly = 0;
-
for (int group = 0; group < fileReader->num_row_groups(); group++) {
+ if (Paused) {
+ CpuTime += GetCpuTimeDelta();
+ LOG_CORO_D("RunCoroBlockArrowParserOverFile - PAUSED " << QueueBufferCounter->Value);
+ auto ev = WaitForSpecificEvent<TEvPrivate::TEvContinue>(&TS3ReadCoroImpl::ProcessUnexpectedEvent);
+ HandleEvent(*ev);
+ LOG_CORO_D("RunCoroBlockArrowParserOverFile - CONTINUE " << QueueBufferCounter->Value);
+ StartCycleCount = GetCycleCountFast();
+ }
+
std::shared_ptr<arrow::Table> table;
+ ui64 ingressBytes = IngressBytes;
THROW_ARROW_NOT_OK(fileReader->ReadRowGroup(group, columnIndices, &table));
+ ui64 downloadedBytes = IngressBytes - ingressBytes;
auto reader = std::make_unique<arrow::TableBatchReader>(*table);
+ ui64 decodedBytes = 0;
std::shared_ptr<arrow::RecordBatch> batch;
::arrow::Status status;
while (status = reader->ReadNext(&batch), status.ok() && batch) {
- if (++cntBlocksInFly > MaxBlocksInFly) {
- WaitForSpecificEvent<TEvPrivate::TEvBlockProcessed>(&TS3ReadCoroImpl::ProcessUnexpectedEvent);
- --cntBlocksInFly;
- }
+ auto convertedBatch = ConvertArrowColumns(batch, columnConverters);
+ auto size = NUdf::GetSizeOfArrowBatchInBytes(*convertedBatch);
+ decodedBytes += size;
+ Paused = QueueBufferCounter->Add(size, SelfActorId, Paused);
Send(ParentActorId, new TEvPrivate::TEvNextRecordBatch(
- ConvertArrowColumns(batch, columnConverters), PathIndex, [actorSystem, selfId]() {
- actorSystem->Send(new IEventHandle(selfId, TActorId{}, new TEvPrivate::TEvBlockProcessed()));
- }, TakeIngressDelta(), TakeCpuTimeDelta()
+ convertedBatch, PathIndex, TakeIngressDelta(), TakeCpuTimeDelta()
));
}
if (!status.ok()) {
throw yexception() << status.ToString();
}
- }
- while (cntBlocksInFly--) {
- WaitForSpecificEvent<TEvPrivate::TEvBlockProcessed>(&TS3ReadCoroImpl::ProcessUnexpectedEvent);
+ QueueBufferCounter->UpdateProgress(downloadedBytes, decodedBytes, table->num_rows());
}
LOG_CORO_D("RunCoroBlockArrowParserOverFile - FINISHED");
@@ -1772,7 +1948,6 @@ public:
hFunc(TEvPrivate::TEvReadStarted, Handle);
hFunc(TEvPrivate::TEvDataPart, Handle);
hFunc(TEvPrivate::TEvReadFinished, Handle);
- hFunc(TEvPrivate::TEvPause, Handle);
hFunc(TEvPrivate::TEvContinue, Handle);
hFunc(TEvPrivate::TEvReadResult2, Handle);
hFunc(NActors::TEvents::TEvPoison, Handle);
@@ -1890,11 +2065,6 @@ public:
}
}
- void Handle(TEvPrivate::TEvPause::TPtr&) {
- LOG_CORO_D("TEvPause");
- Paused = true;
- }
-
void HandleEvent(TEvPrivate::TEvContinue::THandle&) {
LOG_CORO_D("TEvContinue");
Paused = false;
@@ -1909,6 +2079,7 @@ public:
}
void Handle(NActors::TEvents::TEvPoison::TPtr&) {
+ LOG_CORO_D("TEvPoison");
RetryStuff->Cancel();
throw TS3ReadAbort();
}
@@ -1918,18 +2089,20 @@ private:
public:
TS3ReadCoroImpl(ui64 inputIndex, const TTxId& txId, const NActors::TActorId& computeActorId,
const TRetryStuff::TPtr& retryStuff, const TReadSpec::TPtr& readSpec, size_t pathIndex,
- const TString& path, const TString& url, const std::size_t maxBlocksInFly, IArrowReader::TPtr arrowReader,
+ const TString& path, const TString& url, IArrowReader::TPtr arrowReader,
const TS3ReadActorFactoryConfig& readActorFactoryCfg,
+ TReadBufferCounter::TPtr queueBufferCounter,
const ::NMonitoring::TDynamicCounters::TCounterPtr& deferredQueueSize,
const ::NMonitoring::TDynamicCounters::TCounterPtr& httpInflightSize,
const ::NMonitoring::TDynamicCounters::TCounterPtr& httpDataRps,
const ::NMonitoring::TDynamicCounters::TCounterPtr& rawInflightSize)
: TActorCoroImpl(256_KB), ReadActorFactoryCfg(readActorFactoryCfg), InputIndex(inputIndex),
TxId(txId), RetryStuff(retryStuff), ReadSpec(readSpec), ComputeActorId(computeActorId),
- PathIndex(pathIndex), Path(path), Url(url), MaxBlocksInFly(maxBlocksInFly), ArrowReader(arrowReader),
+ PathIndex(pathIndex), Path(path), Url(url), ArrowReader(arrowReader),
+ QueueBufferCounter(queueBufferCounter),
DeferredQueueSize(deferredQueueSize), HttpInflightSize(httpInflightSize),
- HttpDataRps(httpDataRps), RawInflightSize(rawInflightSize)
- {}
+ HttpDataRps(httpDataRps), RawInflightSize(rawInflightSize) {
+ }
~TS3ReadCoroImpl() override {
if (DeferredDataParts.size() && DeferredQueueSize) {
@@ -2085,7 +2258,6 @@ private:
std::size_t LastOffset = 0;
TString LastData;
- std::size_t MaxBlocksInFly = 2;
IArrowReader::TPtr ArrowReader;
ui64 IngressBytes = 0;
TDuration CpuTime;
@@ -2093,6 +2265,7 @@ private:
TString InputBuffer;
bool Paused = false;
std::queue<THolder<TEvPrivate::TEvDataPart>> DeferredDataParts;
+ TReadBufferCounter::TPtr QueueBufferCounter;
const ::NMonitoring::TDynamicCounters::TCounterPtr DeferredQueueSize;
const ::NMonitoring::TDynamicCounters::TCounterPtr HttpInflightSize;
const ::NMonitoring::TDynamicCounters::TCounterPtr HttpDataRps;
@@ -2127,7 +2300,6 @@ public:
const TReadSpec::TPtr& readSpec,
const NActors::TActorId& computeActorId,
const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy,
- const std::size_t maxBlocksInFly,
IArrowReader::TPtr arrowReader,
const TS3ReadActorFactoryConfig& readActorFactoryCfg,
::NMonitoring::TDynamicCounterPtr counters,
@@ -2148,7 +2320,6 @@ public:
, AddPathIndex(addPathIndex)
, StartPathIndex(startPathIndex)
, ReadSpec(readSpec)
- , MaxBlocksInFly(maxBlocksInFly)
, ArrowReader(std::move(arrowReader))
, Counters(std::move(counters))
, TaskCounters(std::move(taskCounters))
@@ -2157,12 +2328,16 @@ public:
QueueDataSize = Counters->GetCounter("QueueDataSize");
QueueDataLimit = Counters->GetCounter("QueueDataLimit");
QueueBlockCount = Counters->GetCounter("QueueBlockCount");
+ DownloadCount = Counters->GetCounter("DownloadCount");
DownloadPaused = Counters->GetCounter("DownloadPaused");
QueueDataLimit->Add(ReadActorFactoryCfg.DataInflight);
+ DecodedChunkSizeHist = Counters->GetHistogram("ChunkSizeBytes", NMonitoring::ExplicitHistogram({100,1000,10'000,30'000,100'000,300'000,1'000'000,3'000'000,10'000'000,30'000'000,100'000'000}));
}
if (TaskCounters) {
TaskQueueDataSize = TaskCounters->GetCounter("QueueDataSize");
TaskQueueDataLimit = TaskCounters->GetCounter("QueueDataLimit");
+ TaskDownloadCount = TaskCounters->GetCounter("DownloadCount");
+ TaskChunkDownloadCount = TaskCounters->GetCounter("ChunkDownloadCount");
TaskDownloadPaused = TaskCounters->GetCounter("DownloadPaused");
DeferredQueueSize = TaskCounters->GetCounter("DeferredQueueSize");
HttpInflightSize = TaskCounters->GetCounter("HttpInflightSize");
@@ -2175,6 +2350,15 @@ public:
void Bootstrap() {
LOG_D("TS3StreamReadActor", "Bootstrap");
+ QueueBufferCounter = std::make_shared<TReadBufferCounter>(
+ ReadActorFactoryCfg.DataInflight,
+ TActivationContext::ActorSystem(),
+ QueueDataSize,
+ TaskQueueDataSize,
+ DownloadPaused,
+ TaskDownloadPaused,
+ TaskChunkDownloadCount,
+ DecodedChunkSizeHist);
FileQueueActor = RegisterWithSameMailbox(new TS3FileQueueActor{
TxId,
std::move(Paths),
@@ -2195,22 +2379,39 @@ public:
// no path is pending
return false;
}
-
- if (QueueTotalDataSize > ReadActorFactoryCfg.DataInflight) {
+ if (QueueBufferCounter->IsFull()) {
// too large data inflight
return false;
}
- if (DownloadInflight >= ReadActorFactoryCfg.MaxInflight) {
- // too large download inflight
+ if (QueueBufferCounter->CoroCount >= ReadActorFactoryCfg.MaxInflight) {
+ // hard limit
return false;
}
+ if (ReadSpec->ParallelDownloadCount) {
+ if (QueueBufferCounter->CoroCount >= ReadSpec->ParallelDownloadCount) {
+ // explicit limit
+ return false;
+ }
+ } else {
+ if (QueueBufferCounter->CoroCount && DownloadSize * QueueBufferCounter->Ratio() > ReadActorFactoryCfg.DataInflight * 2) {
+ // dynamic limit
+ return false;
+ }
+ }
RegisterCoro();
return true;
}
void RegisterCoro() {
- DownloadInflight++;
+ QueueBufferCounter->CoroCount++;
+ if (Counters) {
+ DownloadCount->Inc();
+ }
+ if (TaskCounters) {
+ TaskDownloadCount->Inc();
+ }
const auto& objectPath = ReadPathFromCache();
+ DownloadSize += objectPath.Size;
const TString requestId = CreateGuidAsString();
auto stuff = std::make_shared<TRetryStuff>(
Gateway,
@@ -2221,7 +2422,6 @@ public:
requestId,
RetryPolicy);
auto pathIndex = objectPath.PathIndex + StartPathIndex;
- RetryStuffForFile.emplace(pathIndex, stuff);
if (TaskCounters) {
HttpInflightLimit->Add(Gateway->GetBuffersSizePerStream());
}
@@ -2229,7 +2429,6 @@ public:
"TS3StreamReadActor",
"RegisterCoro with path " << objectPath.Path << " with pathIndex "
<< pathIndex);
- ::NMonitoring::TDynamicCounters::TCounterPtr inflightCounter;
auto impl = MakeHolder<TS3ReadCoroImpl>(
InputIndex,
TxId,
@@ -2239,14 +2438,16 @@ public:
pathIndex,
objectPath.Path,
Url,
- MaxBlocksInFly,
ArrowReader,
ReadActorFactoryCfg,
+ QueueBufferCounter,
DeferredQueueSize,
HttpInflightSize,
HttpDataRps,
RawInflightSize);
- CoroActors.insert(RegisterWithSameMailbox(new TS3ReadCoroActor(std::move(impl))));
+ auto coroActorId = RegisterWithSameMailbox(new TS3ReadCoroActor(std::move(impl)));
+ CoroActors.insert(coroActorId);
+ RetryStuffForFile.emplace(coroActorId, stuff);
}
TObjectPath ReadPathFromCache() {
@@ -2292,12 +2493,11 @@ private:
class TReadyBlock {
public:
- TReadyBlock(TEvPrivate::TEvNextBlock::TPtr& event) : PathInd(event->Get()->PathIndex), Functor (std::move(event->Get()->Functor)) { Block.swap(event->Get()->Block); }
- TReadyBlock(TEvPrivate::TEvNextRecordBatch::TPtr& event) : Batch(event->Get()->Batch), PathInd(event->Get()->PathIndex), Functor(std::move(event->Get()->Functor)) {}
+ TReadyBlock(TEvPrivate::TEvNextBlock::TPtr& event) : PathInd(event->Get()->PathIndex) { Block.swap(event->Get()->Block); }
+ TReadyBlock(TEvPrivate::TEvNextRecordBatch::TPtr& event) : Batch(event->Get()->Batch), PathInd(event->Get()->PathIndex) {}
NDB::Block Block;
std::shared_ptr<arrow::RecordBatch> Batch;
size_t PathInd;
- std::function<void()> Functor;
};
void SaveState(const NDqProto::TCheckpoint&, NDqProto::TSourceState&) final {}
@@ -2353,8 +2553,6 @@ private:
value = HolderFactory.Create<TBoxedBlock>(Blocks.front().Block);
}
- Blocks.front().Functor();
-
if (AddPathIndex) {
NUdf::TUnboxedValue* tupleItems = nullptr;
auto tuple = ContainerCache.NewArray(HolderFactory, 2, tupleItems);
@@ -2367,19 +2565,13 @@ private:
total += s;
output.emplace_back(std::move(value));
Blocks.pop_front();
- QueueTotalDataSize -= s;
+ QueueBufferCounter->Sub(s);
if (Counters) {
- QueueDataSize->Sub(s);
QueueBlockCount->Dec();
}
- if (TaskCounters) {
- TaskQueueDataSize->Sub(s);
- }
TryRegisterCoro();
} while (!Blocks.empty() && free > 0LL && GetBlockSize(Blocks.front()) <= size_t(free));
- MaybeContinue();
-
finished = LastFileWasProcessed();
if (finished) {
ContainerCache.Clear();
@@ -2393,24 +2585,17 @@ private:
void PassAway() override { // Is called from Compute Actor
LOG_D("TS3StreamReadActor", "PassAway");
if (Counters) {
- QueueDataSize->Sub(QueueTotalDataSize);
QueueBlockCount->Sub(Blocks.size());
QueueDataLimit->Sub(ReadActorFactoryCfg.DataInflight);
+ DownloadCount->Sub(QueueBufferCounter->CoroCount);
}
if (TaskCounters) {
- TaskQueueDataSize->Sub(QueueTotalDataSize);
TaskQueueDataLimit->Sub(ReadActorFactoryCfg.DataInflight);
HttpInflightLimit->Sub(Gateway->GetBuffersSizePerStream() * CoroActors.size());
+ TaskDownloadCount->Sub(QueueBufferCounter->CoroCount);
+ TaskChunkDownloadCount->Sub(QueueBufferCounter->ChunkCount);
}
- if (Paused) {
- if (Counters) {
- DownloadPaused->Dec();
- }
- if (TaskCounters) {
- TaskDownloadPaused->Dec();
- }
- }
- QueueTotalDataSize = 0;
+ QueueBufferCounter.reset();
for (const auto actorId : CoroActors) {
Send(actorId, new NActors::TEvents::TEvPoison());
@@ -2424,37 +2609,6 @@ private:
TActorBootstrapped<TS3StreamReadActor>::PassAway();
}
- void MaybePause() {
- if (!Paused && QueueTotalDataSize >= ReadActorFactoryCfg.DataInflight) {
- for (const auto actorId : CoroActors) {
- Send(actorId, new TEvPrivate::TEvPause());
- }
- Paused = true;
- if (Counters) {
- DownloadPaused->Inc();
- }
- if (TaskCounters) {
- TaskDownloadPaused->Inc();
- }
- }
- }
-
- void MaybeContinue() {
- // resume download on 3/4 == 75% to avoid oscillation (hysteresis)
- if (Paused && QueueTotalDataSize * 4 < ReadActorFactoryCfg.DataInflight * 3) {
- for (const auto actorId : CoroActors) {
- Send(actorId, new TEvPrivate::TEvContinue());
- }
- Paused = false;
- if (Counters) {
- DownloadPaused->Dec();
- }
- if (TaskCounters) {
- TaskDownloadPaused->Dec();
- }
- }
- }
-
STRICT_STFUNC(StateFunc,
hFunc(TEvPrivate::TEvRetryEventFunc, HandleRetry);
hFunc(TEvPrivate::TEvNextBlock, HandleNextBlock);
@@ -2474,12 +2628,13 @@ private:
ObjectPathCache.end(),
std::make_move_iterator(objectPathBatch->Get()->ObjectPaths.begin()),
std::make_move_iterator(objectPathBatch->Get()->ObjectPaths.end()));
- LOG_W(
+ LOG_D(
"TS3StreamReadActor",
"HandleObjectPathBatch " << ObjectPathCache.size() << " IsObjectQueueEmpty "
<< IsObjectQueueEmpty << " MaxInflight " << ReadActorFactoryCfg.MaxInflight);
while (TryRegisterCoro()) {}
}
+
void HandleObjectPathReadError(TEvPrivate::TEvObjectPathReadError::TPtr& result) {
IsObjectQueueEmpty = true;
LOG_W("TS3StreamReadActor", "Error while object listing, details: TEvObjectPathReadError: " << result->Get()->Issues.ToOneLineString());
@@ -2495,16 +2650,9 @@ private:
YQL_ENSURE(!ReadSpec->Arrow);
IngressBytes += next->Get()->IngressDelta;
CpuTime += next->Get()->CpuTimeDelta;
- auto size = next->Get()->Block.bytes();
- QueueTotalDataSize += size;
if (Counters) {
QueueBlockCount->Inc();
- QueueDataSize->Add(size);
}
- if (TaskCounters) {
- TaskQueueDataSize->Add(size);
- }
- MaybePause();
Blocks.emplace_back(next);
Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived(InputIndex));
ReportMemoryUsage();
@@ -2514,16 +2662,9 @@ private:
YQL_ENSURE(ReadSpec->Arrow);
IngressBytes += next->Get()->IngressDelta;
CpuTime += next->Get()->CpuTimeDelta;
- auto size = NUdf::GetSizeOfArrowBatchInBytes(*next->Get()->Batch);
- QueueTotalDataSize += size;
if (Counters) {
QueueBlockCount->Inc();
- QueueDataSize->Add(size);
}
- if (TaskCounters) {
- TaskQueueDataSize->Add(size);
- }
- MaybePause();
Blocks.emplace_back(next);
Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived(InputIndex));
ReportMemoryUsage();
@@ -2533,12 +2674,29 @@ private:
CoroActors.erase(ev->Sender);
IngressBytes += ev->Get()->IngressDelta;
CpuTime += ev->Get()->CpuTimeDelta;
- RetryStuffForFile.erase(ev->Get()->PathIndex);
+
+ auto it = RetryStuffForFile.find(ev->Sender);
+ if (it == RetryStuffForFile.end()) {
+ return;
+ }
+ RetryStuffForFile.erase(it);
+ auto size = it->second->SizeLimit;
+ if (DownloadSize < size) {
+ DownloadSize = 0;
+ } else {
+ DownloadSize -= size;
+ }
if (TaskCounters) {
HttpInflightLimit->Sub(Gateway->GetBuffersSizePerStream());
}
- DownloadInflight--;
+ QueueBufferCounter->CoroCount--;
+ if (Counters) {
+ DownloadCount->Dec();
+ }
+ if (TaskCounters) {
+ TaskDownloadCount->Dec();
+ }
CompletedFiles++;
if (!ObjectPathCache.empty()) {
TryRegisterCoro();
@@ -2560,7 +2718,7 @@ private:
const TS3ReadActorFactoryConfig ReadActorFactoryCfg;
const IHTTPGateway::TPtr Gateway;
- THashMap<size_t, TRetryStuff::TPtr> RetryStuffForFile;
+ THashMap<NActors::TActorId, TRetryStuff::TPtr> RetryStuffForFile;
const THolderFactory& HolderFactory;
TPlainContainerCache ContainerCache;
TPlainContainerCache ArrowTupleContainerCache;
@@ -2585,30 +2743,32 @@ private:
size_t CompletedFiles = 0;
const TReadSpec::TPtr ReadSpec;
std::deque<TReadyBlock> Blocks;
- const std::size_t MaxBlocksInFly;
IArrowReader::TPtr ArrowReader;
ui64 IngressBytes = 0;
TDuration CpuTime;
mutable TInstant LastMemoryReport = TInstant::Now();
- ui64 QueueTotalDataSize = 0;
+ TReadBufferCounter::TPtr QueueBufferCounter;
::NMonitoring::TDynamicCounters::TCounterPtr QueueDataSize;
::NMonitoring::TDynamicCounters::TCounterPtr QueueDataLimit;
::NMonitoring::TDynamicCounters::TCounterPtr QueueBlockCount;
+ ::NMonitoring::TDynamicCounters::TCounterPtr DownloadCount;
::NMonitoring::TDynamicCounters::TCounterPtr DownloadPaused;
::NMonitoring::TDynamicCounters::TCounterPtr DeferredQueueSize;
+ ::NMonitoring::TDynamicCounters::TCounterPtr TaskDownloadCount;
::NMonitoring::TDynamicCounters::TCounterPtr TaskDownloadPaused;
+ ::NMonitoring::TDynamicCounters::TCounterPtr TaskChunkDownloadCount;
::NMonitoring::TDynamicCounters::TCounterPtr TaskQueueDataSize;
::NMonitoring::TDynamicCounters::TCounterPtr TaskQueueDataLimit;
::NMonitoring::TDynamicCounters::TCounterPtr HttpInflightSize;
::NMonitoring::TDynamicCounters::TCounterPtr HttpInflightLimit;
::NMonitoring::TDynamicCounters::TCounterPtr HttpDataRps;
::NMonitoring::TDynamicCounters::TCounterPtr RawInflightSize;
+ ::NMonitoring::THistogramPtr DecodedChunkSizeHist;
::NMonitoring::TDynamicCounterPtr Counters;
::NMonitoring::TDynamicCounterPtr TaskCounters;
- ui64 DownloadInflight = 0;
+ ui64 DownloadSize = 0;
std::set<NActors::TActorId> CoroActors;
NActors::TActorId FileQueueActor;
- bool Paused = false;
const ui64 FileSizeLimit;
};
@@ -2821,8 +2981,9 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
const auto readSpec = std::make_shared<TReadSpec>();
readSpec->Arrow = params.GetArrow();
readSpec->ThreadPool = params.GetThreadPool();
- readSpec->ParallelRowGroupCount = std::max(1ul, params.GetParallelRowGroupCount());
+ readSpec->ParallelRowGroupCount = params.GetParallelRowGroupCount();
readSpec->RowGroupReordering = params.GetRowGroupReordering();
+ readSpec->ParallelDownloadCount = params.GetParallelDownloadCount();
if (readSpec->Arrow) {
fileSizeLimit = cfg.BlockFileSizeLimit;
arrow::SchemaBuilder builder;
@@ -2906,12 +3067,9 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
#undef SET_FLAG
#undef SUPPORTED_FLAGS
- std::size_t maxBlocksInFly = 2;
- if (const auto it = settings.find("fileReadBlocksInFly"); settings.cend() != it)
- maxBlocksInFly = FromString<ui64>(it->second);
const auto actor = new TS3StreamReadActor(inputIndex, txId, std::move(gateway), holderFactory, params.GetUrl(), authToken, pathPattern, pathPatternVariant,
std::move(paths), addPathIndex, startPathIndex, readSpec, computeActorId, retryPolicy,
- maxBlocksInFly, arrowReader, cfg, counters, taskCounters, fileSizeLimit);
+ arrowReader, cfg, counters, taskCounters, fileSizeLimit);
return {actor, actor};
} else {
diff --git a/ydb/library/yql/providers/s3/proto/source.proto b/ydb/library/yql/providers/s3/proto/source.proto
index 8cbe1d7287e..426cf90f21b 100644
--- a/ydb/library/yql/providers/s3/proto/source.proto
+++ b/ydb/library/yql/providers/s3/proto/source.proto
@@ -19,4 +19,5 @@ message TSource {
bool ThreadPool = 8;
uint64 ParallelRowGroupCount = 9;
bool RowGroupReordering = 10;
+ uint64 ParallelDownloadCount = 11;
}
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
index 420d42bbcc8..e438503cdf2 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
@@ -283,8 +283,9 @@ public:
srcDesc.SetFormat(parseSettings.Format().StringValue().c_str());
srcDesc.SetArrow(bool(parseSettings.Maybe<TS3ArrowSettings>()));
srcDesc.SetThreadPool(State_->Configuration->ArrowThreadPool.Get().GetOrElse(true));
- srcDesc.SetParallelRowGroupCount(State_->Configuration->ArrowParallelRowGroupCount.Get().GetOrElse(1));
- srcDesc.SetRowGroupReordering(State_->Configuration->ArrowRowGroupReordering.Get().GetOrElse(false));
+ srcDesc.SetParallelRowGroupCount(State_->Configuration->ArrowParallelRowGroupCount.Get().GetOrElse(0));
+ srcDesc.SetRowGroupReordering(State_->Configuration->ArrowRowGroupReordering.Get().GetOrElse(true));
+ srcDesc.SetParallelDownloadCount(State_->Configuration->ParallelDownloadCount.Get().GetOrElse(0));
const TStructExprType* fullRowType = parseSettings.RowType().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TStructExprType>();
// exclude extra columns to get actual row type we need to read from input
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp
index c1199d8565b..9734654fc77 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp
@@ -18,6 +18,7 @@ TS3Configuration::TS3Configuration()
REGISTER_SETTING(*this, ArrowThreadPool);
REGISTER_SETTING(*this, ArrowParallelRowGroupCount).Lower(1);
REGISTER_SETTING(*this, ArrowRowGroupReordering);
+ REGISTER_SETTING(*this, ParallelDownloadCount);
REGISTER_SETTING(*this, UseBlocksSource);
REGISTER_SETTING(*this, AtomicUploadCommit);
REGISTER_SETTING(*this, UseConcurrentDirectoryLister);
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_settings.h b/ydb/library/yql/providers/s3/provider/yql_s3_settings.h
index 2744af335df..6a08fbd7b73 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_settings.h
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_settings.h
@@ -20,6 +20,7 @@ struct TS3Settings {
NCommon::TConfSetting<bool, false> ArrowThreadPool;
NCommon::TConfSetting<ui64, false> ArrowParallelRowGroupCount; // Number of parquet row groups to read in parallel, min == 1
NCommon::TConfSetting<bool, false> ArrowRowGroupReordering; // Allow to push rows from file in any order, default false, but usually it is OK
+ NCommon::TConfSetting<ui64, false> ParallelDownloadCount; // Number of files to read in parallel, min == 1
NCommon::TConfSetting<bool, false> UseBlocksSource; // Use blocks source (if exists) for scalar MKQL mode
NCommon::TConfSetting<bool, false> AtomicUploadCommit; // Commit each file independently, w/o transaction semantic over all files
NCommon::TConfSetting<bool, false> UseConcurrentDirectoryLister;