aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhor911 <hor911@ydb.tech>2023-03-07 19:39:51 +0300
committerhor911 <hor911@ydb.tech>2023-03-07 19:39:51 +0300
commit4535e7738c8863c638701ec4f40badae92a73b98 (patch)
tree9bd0d5f874e143116078968eec14ca0d875685ca
parent78605f0e58d4d527c9164ff4cb352708b42e1e78 (diff)
downloadydb-4535e7738c8863c638701ec4f40badae92a73b98.tar.gz
Tunable read ahead row group count + memory back pressure
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp201
1 files changed, 148 insertions, 53 deletions
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
index fbc8f0143bd..68eea4ad522 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
@@ -1014,6 +1014,7 @@ struct TReadSpec {
bool Arrow = false;
bool ThreadPool = false;
+ ui64 ReadAheadRowGroupCount = 0;
std::unordered_map<TStringBuf, TType*, THash<TStringBuf>> RowSpec;
NDB::ColumnsWithTypeAndName CHColumns;
std::shared_ptr<arrow::Schema> ArrowSchema;
@@ -1430,8 +1431,9 @@ public:
}
struct TReadCache {
- ui64 Cookie;
+ ui64 Cookie = 0;
TString Data;
+ bool Ready = false;
};
struct TReadRangeCompare
@@ -1444,6 +1446,8 @@ public:
ui64 RangeCookie = 0;
std::map<TEvPrivate::TReadRange, TReadCache, TReadRangeCompare> RangeCache;
+ ui64 CacheInflightSize = 0;
+ ui64 ReadInflightSize = 0;
static void OnResult(TActorSystem* actorSystem, TActorId selfId, TEvPrivate::TReadRange range, ui64 cookie, IHTTPGateway::TResult&& result) {
switch (result.index()) {
@@ -1476,9 +1480,11 @@ public:
}
arrow::Status WillNeed(const std::vector<arrow::io::ReadRange>& readRanges) {
- if (!RangeCache.empty()) {
- LOG_CORO_W("WillNeed is called before previous ranges are completely processed");
- RangeCache.clear();
+ if (Paused) {
+ CpuTime += GetCpuTimeDelta();
+ auto ev = WaitForSpecificEvent<TEvPrivate::TEvContinue>();
+ HandleEvent(*ev);
+ StartCycleCount = GetCycleCountFast();
}
for (auto& range : readRanges) {
GetOrCreate(TEvPrivate::TReadRange{ .Offset = range.offset, .Length = range.length });
@@ -1486,38 +1492,62 @@ public:
return {};
}
+ void HandleEvent(TEvPrivate::TEvReadResult2::THandle& event) {
+
+ if (event.Get()->Failure) {
+ throw yexception() << event.Get()->Issues.ToOneLineString();
+ }
+ auto readyRange = event.Get()->ReadRange;
+ LOG_CORO_D("Download FINISHED [" << readyRange.Offset << "-" << readyRange.Length << "], cookie: " << event.Cookie);
+ IngressBytes += readyRange.Length;
+
+ auto it = RangeCache.find(readyRange);
+
+ if (it == RangeCache.end()) {
+ LOG_CORO_W("Download completed for unknown/discarded range [" << readyRange.Offset << "-" << readyRange.Length << "]");
+ return;
+ }
+
+ if (it->second.Cookie != event.Cookie) {
+ LOG_CORO_W("Mistmatched cookie for range [" << readyRange.Offset << "-" << readyRange.Length << "], received " << event.Cookie << ", expected " << it->second.Cookie);
+ return;
+ }
+
+ it->second.Data = event.Get()->Result.Extract();
+ it->second.Ready = true;
+ ui64 size = it->second.Data.size();
+ CacheInflightSize += size;
+ if (RawInflightSize) {
+ RawInflightSize->Add(size);
+ }
+ }
+
arrow::Result<std::shared_ptr<arrow::Buffer>> ReadAt(int64_t position, int64_t nbytes) {
LOG_CORO_D("ReadAt STARTED [" << position << "-" << nbytes << "]");
TEvPrivate::TReadRange range { .Offset = position, .Length = nbytes };
- auto cache = GetOrCreate(range);
- while (cache.Data.empty()) {
- CpuTime += GetCpuTimeDelta();
+ auto& cache = GetOrCreate(range);
+
+ CpuTime += GetCpuTimeDelta();
+
+ while (!cache.Ready) {
auto ev = WaitForSpecificEvent<TEvPrivate::TEvReadResult2>();
- StartCycleCount = GetCycleCountFast();
- if (ev->Get()->Failure) {
- throw yexception() << ev->Get()->Issues.ToOneLineString();
- }
- auto readyRange = ev->Get()->ReadRange;
- LOG_CORO_D("Download FINISHED [" << readyRange.Offset << "-" << readyRange.Length << "], cookie: " << ev->Cookie);
- IngressBytes += readyRange.Length;
- if (range.Offset == readyRange.Offset && range.Length == readyRange.Length && ev->Cookie == cache.Cookie) {
- cache.Data = ev->Get()->Result.Extract();
- break;
- } else {
- auto it = RangeCache.find(readyRange);
- if (it == RangeCache.end()) {
- LOG_CORO_W("Download completed for unknown/discarded range [" << readyRange.Offset << "-" << readyRange.Length << "]");
- } else if (it->second.Cookie != ev->Cookie) {
- LOG_CORO_W("Mistmatched cookie for range [" << readyRange.Offset << "-" << readyRange.Length << "], received " << ev->Cookie << ", expected " << it->second.Cookie);
- } else {
- it->second.Data = ev->Get()->Result.Extract();
- }
- }
+ HandleEvent(*ev);
}
+ if (Paused) {
+ auto ev = WaitForSpecificEvent<TEvPrivate::TEvContinue>();
+ HandleEvent(*ev);
+ }
+
+ StartCycleCount = GetCycleCountFast();
+
TString data = cache.Data;
RangeCache.erase(range);
- LOG_CORO_D("ReadAt FINISHED [" << position << "-" << nbytes << "] #" << data.size());
+ auto size = data.size();
+ CacheInflightSize -= size;
+ ReadInflightSize += size;
+ LOG_CORO_D("ReadAt FINISHED [" << position << "-" << nbytes << "] #" << size);
+
return arrow::Buffer::FromString(data);
}
@@ -1525,39 +1555,81 @@ public:
LOG_CORO_D("RunCoroBlockArrowParserOverHttp");
- std::shared_ptr<arrow::io::RandomAccessFile> arrowFile = std::make_shared<THttpRandomAccessFile>(this, RetryStuff->SizeLimit);
- std::unique_ptr<parquet::arrow::FileReader> fileReader;
+ ui64 readerCount = ReadSpec->ReadAheadRowGroupCount + 1;
+
+ std::shared_ptr<parquet::FileMetaData> metadata;
+ std::vector<std::unique_ptr<parquet::arrow::FileReader>> readers;
+
parquet::arrow::FileReaderBuilder builder;
builder.memory_pool(arrow::default_memory_pool());
parquet::ArrowReaderProperties properties;
properties.set_cache_options(arrow::io::CacheOptions::LazyDefaults());
properties.set_pre_buffer(true);
builder.properties(properties);
- THROW_ARROW_NOT_OK(builder.Open(arrowFile));
- THROW_ARROW_NOT_OK(builder.Build(&fileReader));
- std::shared_ptr<arrow::Schema> schema;
- THROW_ARROW_NOT_OK(fileReader->GetSchema(&schema));
- std::vector<int> columnIndices;
- std::vector<TColumnConverter> columnConverters;
+ // init the 1st reader, get meta/rg count
+ readers.resize(1);
+ THROW_ARROW_NOT_OK(builder.Open(std::make_shared<THttpRandomAccessFile>(this, RetryStuff->SizeLimit)));
+ THROW_ARROW_NOT_OK(builder.Build(&readers[0]));
+ auto fileMetadata = readers[0]->parquet_reader()->metadata();
+ ui64 numGroups = readers[0]->num_row_groups();
- BuildColumnConverters(ReadSpec->ArrowSchema, schema, columnIndices, columnConverters);
+ if (numGroups) {
- for (int group = 0; group < fileReader->num_row_groups(); group++) {
+ std::shared_ptr<arrow::Schema> schema;
+ THROW_ARROW_NOT_OK(readers[0]->GetSchema(&schema));
+ std::vector<int> columnIndices;
+ std::vector<TColumnConverter> columnConverters;
- std::shared_ptr<arrow::Table> table;
- THROW_ARROW_NOT_OK(fileReader->ReadRowGroup(group, columnIndices, &table));
- auto reader = std::make_unique<arrow::TableBatchReader>(*table);
+ BuildColumnConverters(ReadSpec->ArrowSchema, schema, columnIndices, columnConverters);
- std::shared_ptr<arrow::RecordBatch> batch;
- ::arrow::Status status;
- while (status = reader->ReadNext(&batch), status.ok() && batch) {
- Send(ParentActorId, new TEvPrivate::TEvNextRecordBatch(
- ConvertArrowColumns(batch, columnConverters), PathIndex, [](){}, TakeIngressDelta(), TakeCpuTimeDelta()
- ));
+ if (readerCount > numGroups) {
+ readerCount = numGroups;
}
- if (!status.ok()) {
- throw yexception() << status.ToString();
+
+ if (readerCount > 1) {
+ // init other readers if any
+ readers.resize(readerCount);
+ for (ui64 i = 1; i < readerCount; i++) {
+ THROW_ARROW_NOT_OK(builder.Open(std::make_shared<THttpRandomAccessFile>(this, RetryStuff->SizeLimit),
+ parquet::default_reader_properties(),
+ fileMetadata));
+ THROW_ARROW_NOT_OK(builder.Build(&readers[i]));
+ }
+ }
+
+ for (ui64 i = 0; i < readerCount; i++) {
+ THROW_ARROW_NOT_OK(readers[i]->WillNeedRowGroups({ static_cast<int>(i) }, columnIndices));
+ }
+
+ ui64 currentReader = 0;
+ for (ui64 group = 0; group < numGroups; group++) {
+
+ std::shared_ptr<arrow::Table> table;
+
+ THROW_ARROW_NOT_OK(readers[currentReader]->DecodeRowGroups({ static_cast<int>(group) }, columnIndices, &table));
+
+ auto reader = std::make_unique<arrow::TableBatchReader>(*table);
+ std::shared_ptr<arrow::RecordBatch> batch;
+ arrow::Status status;
+ while (status = reader->ReadNext(&batch), status.ok() && batch) {
+ Send(ParentActorId, new TEvPrivate::TEvNextRecordBatch(
+ ConvertArrowColumns(batch, columnConverters), PathIndex, [](){}, TakeIngressDelta(), TakeCpuTimeDelta()
+ ));
+ }
+ if (!status.ok()) {
+ throw yexception() << status.ToString();
+ }
+ if (RawInflightSize) {
+ RawInflightSize->Sub(ReadInflightSize);
+ ReadInflightSize = 0;
+ }
+ if (group + readerCount < numGroups) {
+ THROW_ARROW_NOT_OK(readers[currentReader]->WillNeedRowGroups({ static_cast<int>(group + readerCount) }, columnIndices));
+ }
+ if (++currentReader >= readerCount) {
+ currentReader = 0;
+ }
}
}
@@ -1629,6 +1701,7 @@ public:
hFunc(TEvPrivate::TEvReadFinished, Handle);
hFunc(TEvPrivate::TEvPause, Handle);
hFunc(TEvPrivate::TEvContinue, Handle);
+ hFunc(TEvPrivate::TEvReadResult2, Handle);
hFunc(NActors::TEvents::TEvPoison, Handle);
)
@@ -1740,11 +1813,19 @@ public:
Paused = true;
}
- void Handle(TEvPrivate::TEvContinue::TPtr&) {
+ void HandleEvent(TEvPrivate::TEvContinue::THandle&) {
LOG_CORO_D("TEvContinue");
Paused = false;
}
+ void Handle(TEvPrivate::TEvContinue::TPtr& ev) {
+ HandleEvent(*ev);
+ }
+
+ void Handle(TEvPrivate::TEvReadResult2::TPtr& ev) {
+ HandleEvent(*ev);
+ }
+
void Handle(NActors::TEvents::TEvPoison::TPtr&) {
RetryStuff->Cancel();
throw TS3ReadAbort();
@@ -1759,17 +1840,26 @@ public:
const TS3ReadActorFactoryConfig& readActorFactoryCfg,
const ::NMonitoring::TDynamicCounters::TCounterPtr& deferredQueueSize,
const ::NMonitoring::TDynamicCounters::TCounterPtr& httpInflightSize,
- const ::NMonitoring::TDynamicCounters::TCounterPtr& httpDataRps)
+ const ::NMonitoring::TDynamicCounters::TCounterPtr& httpDataRps,
+ const ::NMonitoring::TDynamicCounters::TCounterPtr& rawInflightSize)
: TActorCoroImpl(256_KB), ReadActorFactoryCfg(readActorFactoryCfg), InputIndex(inputIndex),
TxId(txId), RetryStuff(retryStuff), ReadSpec(readSpec), ComputeActorId(computeActorId),
PathIndex(pathIndex), Path(path), Url(url), MaxBlocksInFly(maxBlocksInFly), ArrowReader(arrowReader),
- DeferredQueueSize(deferredQueueSize), HttpInflightSize(httpInflightSize), HttpDataRps(httpDataRps)
+ DeferredQueueSize(deferredQueueSize), HttpInflightSize(httpInflightSize),
+ HttpDataRps(httpDataRps), RawInflightSize(rawInflightSize)
{}
~TS3ReadCoroImpl() override {
if (DeferredDataParts.size() && DeferredQueueSize) {
DeferredQueueSize->Sub(DeferredDataParts.size());
}
+ if (CacheInflightSize || ReadInflightSize) {
+ if (RawInflightSize) {
+ RawInflightSize->Sub(ReadInflightSize + ReadInflightSize);
+ }
+ CacheInflightSize = 0;
+ ReadInflightSize = 0;
+ }
}
private:
@@ -1927,6 +2017,7 @@ private:
const ::NMonitoring::TDynamicCounters::TCounterPtr DeferredQueueSize;
const ::NMonitoring::TDynamicCounters::TCounterPtr HttpInflightSize;
const ::NMonitoring::TDynamicCounters::TCounterPtr HttpDataRps;
+ const ::NMonitoring::TDynamicCounters::TCounterPtr RawInflightSize;
};
class TS3ReadCoroActor : public TActorCoro {
@@ -1999,6 +2090,7 @@ public:
HttpInflightLimit = TaskCounters->GetCounter("HttpInflightLimit");
HttpDataRps = TaskCounters->GetCounter("HttpDataRps", true);
TaskQueueDataLimit->Add(ReadActorFactoryCfg.DataInflight);
+ RawInflightSize = TaskCounters->GetCounter("RawInflightSize");
}
}
@@ -2073,7 +2165,8 @@ public:
ReadActorFactoryCfg,
DeferredQueueSize,
HttpInflightSize,
- HttpDataRps);
+ HttpDataRps,
+ RawInflightSize);
CoroActors.insert(RegisterWithSameMailbox(new TS3ReadCoroActor(std::move(impl))));
}
@@ -2438,6 +2531,7 @@ private:
::NMonitoring::TDynamicCounters::TCounterPtr HttpInflightSize;
::NMonitoring::TDynamicCounters::TCounterPtr HttpInflightLimit;
::NMonitoring::TDynamicCounters::TCounterPtr HttpDataRps;
+ ::NMonitoring::TDynamicCounters::TCounterPtr RawInflightSize;
::NMonitoring::TDynamicCounterPtr Counters;
::NMonitoring::TDynamicCounterPtr TaskCounters;
ui64 DownloadInflight = 0;
@@ -2659,6 +2753,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
const auto readSpec = std::make_shared<TReadSpec>();
readSpec->Arrow = params.GetArrow();
readSpec->ThreadPool = params.GetThreadPool();
+ readSpec->ReadAheadRowGroupCount = params.GetReadAheadRowGroupCount();
if (readSpec->Arrow) {
arrow::SchemaBuilder builder;
const TStringBuf blockLengthColumn("_yql_block_length"sv);