diff options
author | hor911 <hor911@ydb.tech> | 2023-03-07 19:39:51 +0300 |
---|---|---|
committer | hor911 <hor911@ydb.tech> | 2023-03-07 19:39:51 +0300 |
commit | 4535e7738c8863c638701ec4f40badae92a73b98 (patch) | |
tree | 9bd0d5f874e143116078968eec14ca0d875685ca | |
parent | 78605f0e58d4d527c9164ff4cb352708b42e1e78 (diff) | |
download | ydb-4535e7738c8863c638701ec4f40badae92a73b98.tar.gz |
Tunable read ahead row group count + memory back pressure
-rw-r--r-- | ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp | 201 |
1 files changed, 148 insertions, 53 deletions
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index fbc8f0143bd..68eea4ad522 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -1014,6 +1014,7 @@ struct TReadSpec { bool Arrow = false; bool ThreadPool = false; + ui64 ReadAheadRowGroupCount = 0; std::unordered_map<TStringBuf, TType*, THash<TStringBuf>> RowSpec; NDB::ColumnsWithTypeAndName CHColumns; std::shared_ptr<arrow::Schema> ArrowSchema; @@ -1430,8 +1431,9 @@ public: } struct TReadCache { - ui64 Cookie; + ui64 Cookie = 0; TString Data; + bool Ready = false; }; struct TReadRangeCompare @@ -1444,6 +1446,8 @@ public: ui64 RangeCookie = 0; std::map<TEvPrivate::TReadRange, TReadCache, TReadRangeCompare> RangeCache; + ui64 CacheInflightSize = 0; + ui64 ReadInflightSize = 0; static void OnResult(TActorSystem* actorSystem, TActorId selfId, TEvPrivate::TReadRange range, ui64 cookie, IHTTPGateway::TResult&& result) { switch (result.index()) { @@ -1476,9 +1480,11 @@ public: } arrow::Status WillNeed(const std::vector<arrow::io::ReadRange>& readRanges) { - if (!RangeCache.empty()) { - LOG_CORO_W("WillNeed is called before previous ranges are completely processed"); - RangeCache.clear(); + if (Paused) { + CpuTime += GetCpuTimeDelta(); + auto ev = WaitForSpecificEvent<TEvPrivate::TEvContinue>(); + HandleEvent(*ev); + StartCycleCount = GetCycleCountFast(); } for (auto& range : readRanges) { GetOrCreate(TEvPrivate::TReadRange{ .Offset = range.offset, .Length = range.length }); @@ -1486,38 +1492,62 @@ public: return {}; } + void HandleEvent(TEvPrivate::TEvReadResult2::THandle& event) { + + if (event.Get()->Failure) { + throw yexception() << event.Get()->Issues.ToOneLineString(); + } + auto readyRange = event.Get()->ReadRange; + LOG_CORO_D("Download FINISHED [" << readyRange.Offset << "-" << readyRange.Length << "], cookie: " << event.Cookie); + IngressBytes += readyRange.Length; + + auto it = RangeCache.find(readyRange); + + if (it == RangeCache.end()) { + LOG_CORO_W("Download completed for unknown/discarded range [" << readyRange.Offset << "-" << readyRange.Length << "]"); + return; + } + + if (it->second.Cookie != event.Cookie) { + LOG_CORO_W("Mistmatched cookie for range [" << readyRange.Offset << "-" << readyRange.Length << "], received " << event.Cookie << ", expected " << it->second.Cookie); + return; + } + + it->second.Data = event.Get()->Result.Extract(); + it->second.Ready = true; + ui64 size = it->second.Data.size(); + CacheInflightSize += size; + if (RawInflightSize) { + RawInflightSize->Add(size); + } + } + arrow::Result<std::shared_ptr<arrow::Buffer>> ReadAt(int64_t position, int64_t nbytes) { LOG_CORO_D("ReadAt STARTED [" << position << "-" << nbytes << "]"); TEvPrivate::TReadRange range { .Offset = position, .Length = nbytes }; - auto cache = GetOrCreate(range); - while (cache.Data.empty()) { - CpuTime += GetCpuTimeDelta(); + auto& cache = GetOrCreate(range); + + CpuTime += GetCpuTimeDelta(); + + while (!cache.Ready) { auto ev = WaitForSpecificEvent<TEvPrivate::TEvReadResult2>(); - StartCycleCount = GetCycleCountFast(); - if (ev->Get()->Failure) { - throw yexception() << ev->Get()->Issues.ToOneLineString(); - } - auto readyRange = ev->Get()->ReadRange; - LOG_CORO_D("Download FINISHED [" << readyRange.Offset << "-" << readyRange.Length << "], cookie: " << ev->Cookie); - IngressBytes += readyRange.Length; - if (range.Offset == readyRange.Offset && range.Length == readyRange.Length && ev->Cookie == cache.Cookie) { - cache.Data = ev->Get()->Result.Extract(); - break; - } else { - auto it = RangeCache.find(readyRange); - if (it == RangeCache.end()) { - LOG_CORO_W("Download completed for unknown/discarded range [" << readyRange.Offset << "-" << readyRange.Length << "]"); - } else if (it->second.Cookie != ev->Cookie) { - LOG_CORO_W("Mistmatched cookie for range [" << readyRange.Offset << "-" << readyRange.Length << "], received " << ev->Cookie << ", expected " << it->second.Cookie); - } else { - it->second.Data = ev->Get()->Result.Extract(); - } - } + HandleEvent(*ev); } + if (Paused) { + auto ev = WaitForSpecificEvent<TEvPrivate::TEvContinue>(); + HandleEvent(*ev); + } + + StartCycleCount = GetCycleCountFast(); + TString data = cache.Data; RangeCache.erase(range); - LOG_CORO_D("ReadAt FINISHED [" << position << "-" << nbytes << "] #" << data.size()); + auto size = data.size(); + CacheInflightSize -= size; + ReadInflightSize += size; + LOG_CORO_D("ReadAt FINISHED [" << position << "-" << nbytes << "] #" << size); + return arrow::Buffer::FromString(data); } @@ -1525,39 +1555,81 @@ public: LOG_CORO_D("RunCoroBlockArrowParserOverHttp"); - std::shared_ptr<arrow::io::RandomAccessFile> arrowFile = std::make_shared<THttpRandomAccessFile>(this, RetryStuff->SizeLimit); - std::unique_ptr<parquet::arrow::FileReader> fileReader; + ui64 readerCount = ReadSpec->ReadAheadRowGroupCount + 1; + + std::shared_ptr<parquet::FileMetaData> metadata; + std::vector<std::unique_ptr<parquet::arrow::FileReader>> readers; + parquet::arrow::FileReaderBuilder builder; builder.memory_pool(arrow::default_memory_pool()); parquet::ArrowReaderProperties properties; properties.set_cache_options(arrow::io::CacheOptions::LazyDefaults()); properties.set_pre_buffer(true); builder.properties(properties); - THROW_ARROW_NOT_OK(builder.Open(arrowFile)); - THROW_ARROW_NOT_OK(builder.Build(&fileReader)); - std::shared_ptr<arrow::Schema> schema; - THROW_ARROW_NOT_OK(fileReader->GetSchema(&schema)); - std::vector<int> columnIndices; - std::vector<TColumnConverter> columnConverters; + // init the 1st reader, get meta/rg count + readers.resize(1); + THROW_ARROW_NOT_OK(builder.Open(std::make_shared<THttpRandomAccessFile>(this, RetryStuff->SizeLimit))); + THROW_ARROW_NOT_OK(builder.Build(&readers[0])); + auto fileMetadata = readers[0]->parquet_reader()->metadata(); + ui64 numGroups = readers[0]->num_row_groups(); - BuildColumnConverters(ReadSpec->ArrowSchema, schema, columnIndices, columnConverters); + if (numGroups) { - for (int group = 0; group < fileReader->num_row_groups(); group++) { + std::shared_ptr<arrow::Schema> schema; + THROW_ARROW_NOT_OK(readers[0]->GetSchema(&schema)); + std::vector<int> columnIndices; + std::vector<TColumnConverter> columnConverters; - std::shared_ptr<arrow::Table> table; - THROW_ARROW_NOT_OK(fileReader->ReadRowGroup(group, columnIndices, &table)); - auto reader = std::make_unique<arrow::TableBatchReader>(*table); + BuildColumnConverters(ReadSpec->ArrowSchema, schema, columnIndices, columnConverters); - std::shared_ptr<arrow::RecordBatch> batch; - ::arrow::Status status; - while (status = reader->ReadNext(&batch), status.ok() && batch) { - Send(ParentActorId, new TEvPrivate::TEvNextRecordBatch( - ConvertArrowColumns(batch, columnConverters), PathIndex, [](){}, TakeIngressDelta(), TakeCpuTimeDelta() - )); + if (readerCount > numGroups) { + readerCount = numGroups; } - if (!status.ok()) { - throw yexception() << status.ToString(); + + if (readerCount > 1) { + // init other readers if any + readers.resize(readerCount); + for (ui64 i = 1; i < readerCount; i++) { + THROW_ARROW_NOT_OK(builder.Open(std::make_shared<THttpRandomAccessFile>(this, RetryStuff->SizeLimit), + parquet::default_reader_properties(), + fileMetadata)); + THROW_ARROW_NOT_OK(builder.Build(&readers[i])); + } + } + + for (ui64 i = 0; i < readerCount; i++) { + THROW_ARROW_NOT_OK(readers[i]->WillNeedRowGroups({ static_cast<int>(i) }, columnIndices)); + } + + ui64 currentReader = 0; + for (ui64 group = 0; group < numGroups; group++) { + + std::shared_ptr<arrow::Table> table; + + THROW_ARROW_NOT_OK(readers[currentReader]->DecodeRowGroups({ static_cast<int>(group) }, columnIndices, &table)); + + auto reader = std::make_unique<arrow::TableBatchReader>(*table); + std::shared_ptr<arrow::RecordBatch> batch; + arrow::Status status; + while (status = reader->ReadNext(&batch), status.ok() && batch) { + Send(ParentActorId, new TEvPrivate::TEvNextRecordBatch( + ConvertArrowColumns(batch, columnConverters), PathIndex, [](){}, TakeIngressDelta(), TakeCpuTimeDelta() + )); + } + if (!status.ok()) { + throw yexception() << status.ToString(); + } + if (RawInflightSize) { + RawInflightSize->Sub(ReadInflightSize); + ReadInflightSize = 0; + } + if (group + readerCount < numGroups) { + THROW_ARROW_NOT_OK(readers[currentReader]->WillNeedRowGroups({ static_cast<int>(group + readerCount) }, columnIndices)); + } + if (++currentReader >= readerCount) { + currentReader = 0; + } } } @@ -1629,6 +1701,7 @@ public: hFunc(TEvPrivate::TEvReadFinished, Handle); hFunc(TEvPrivate::TEvPause, Handle); hFunc(TEvPrivate::TEvContinue, Handle); + hFunc(TEvPrivate::TEvReadResult2, Handle); hFunc(NActors::TEvents::TEvPoison, Handle); ) @@ -1740,11 +1813,19 @@ public: Paused = true; } - void Handle(TEvPrivate::TEvContinue::TPtr&) { + void HandleEvent(TEvPrivate::TEvContinue::THandle&) { LOG_CORO_D("TEvContinue"); Paused = false; } + void Handle(TEvPrivate::TEvContinue::TPtr& ev) { + HandleEvent(*ev); + } + + void Handle(TEvPrivate::TEvReadResult2::TPtr& ev) { + HandleEvent(*ev); + } + void Handle(NActors::TEvents::TEvPoison::TPtr&) { RetryStuff->Cancel(); throw TS3ReadAbort(); @@ -1759,17 +1840,26 @@ public: const TS3ReadActorFactoryConfig& readActorFactoryCfg, const ::NMonitoring::TDynamicCounters::TCounterPtr& deferredQueueSize, const ::NMonitoring::TDynamicCounters::TCounterPtr& httpInflightSize, - const ::NMonitoring::TDynamicCounters::TCounterPtr& httpDataRps) + const ::NMonitoring::TDynamicCounters::TCounterPtr& httpDataRps, + const ::NMonitoring::TDynamicCounters::TCounterPtr& rawInflightSize) : TActorCoroImpl(256_KB), ReadActorFactoryCfg(readActorFactoryCfg), InputIndex(inputIndex), TxId(txId), RetryStuff(retryStuff), ReadSpec(readSpec), ComputeActorId(computeActorId), PathIndex(pathIndex), Path(path), Url(url), MaxBlocksInFly(maxBlocksInFly), ArrowReader(arrowReader), - DeferredQueueSize(deferredQueueSize), HttpInflightSize(httpInflightSize), HttpDataRps(httpDataRps) + DeferredQueueSize(deferredQueueSize), HttpInflightSize(httpInflightSize), + HttpDataRps(httpDataRps), RawInflightSize(rawInflightSize) {} ~TS3ReadCoroImpl() override { if (DeferredDataParts.size() && DeferredQueueSize) { DeferredQueueSize->Sub(DeferredDataParts.size()); } + if (CacheInflightSize || ReadInflightSize) { + if (RawInflightSize) { + RawInflightSize->Sub(ReadInflightSize + ReadInflightSize); + } + CacheInflightSize = 0; + ReadInflightSize = 0; + } } private: @@ -1927,6 +2017,7 @@ private: const ::NMonitoring::TDynamicCounters::TCounterPtr DeferredQueueSize; const ::NMonitoring::TDynamicCounters::TCounterPtr HttpInflightSize; const ::NMonitoring::TDynamicCounters::TCounterPtr HttpDataRps; + const ::NMonitoring::TDynamicCounters::TCounterPtr RawInflightSize; }; class TS3ReadCoroActor : public TActorCoro { @@ -1999,6 +2090,7 @@ public: HttpInflightLimit = TaskCounters->GetCounter("HttpInflightLimit"); HttpDataRps = TaskCounters->GetCounter("HttpDataRps", true); TaskQueueDataLimit->Add(ReadActorFactoryCfg.DataInflight); + RawInflightSize = TaskCounters->GetCounter("RawInflightSize"); } } @@ -2073,7 +2165,8 @@ public: ReadActorFactoryCfg, DeferredQueueSize, HttpInflightSize, - HttpDataRps); + HttpDataRps, + RawInflightSize); CoroActors.insert(RegisterWithSameMailbox(new TS3ReadCoroActor(std::move(impl)))); } @@ -2438,6 +2531,7 @@ private: ::NMonitoring::TDynamicCounters::TCounterPtr HttpInflightSize; ::NMonitoring::TDynamicCounters::TCounterPtr HttpInflightLimit; ::NMonitoring::TDynamicCounters::TCounterPtr HttpDataRps; + ::NMonitoring::TDynamicCounters::TCounterPtr RawInflightSize; ::NMonitoring::TDynamicCounterPtr Counters; ::NMonitoring::TDynamicCounterPtr TaskCounters; ui64 DownloadInflight = 0; @@ -2659,6 +2753,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor( const auto readSpec = std::make_shared<TReadSpec>(); readSpec->Arrow = params.GetArrow(); readSpec->ThreadPool = params.GetThreadPool(); + readSpec->ReadAheadRowGroupCount = params.GetReadAheadRowGroupCount(); if (readSpec->Arrow) { arrow::SchemaBuilder builder; const TStringBuf blockLengthColumn("_yql_block_length"sv); |