diff options
author | hor911 <hor911@ydb.tech> | 2023-03-09 16:00:16 +0300 |
---|---|---|
committer | hor911 <hor911@ydb.tech> | 2023-03-09 16:00:16 +0300 |
commit | 7d3614a719ab373119c4446eb060cdde89f80601 (patch) | |
tree | 0d3959c3de6551877ab12fdf9e110cbb64bdc23c | |
parent | 921a6ad63c98a9c6f6e51ca15112dbc6939dcd69 (diff) | |
download | ydb-7d3614a719ab373119c4446eb060cdde89f80601.tar.gz |
Reordered row group read
5 files changed, 114 insertions, 37 deletions
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index 2bdbaaa2756..9b2cef3cdfd 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -72,6 +72,7 @@ #include <util/stream/format.h> #include <util/system/fstat.h> +#include <algorithm> #include <queue> #ifdef THROW @@ -1014,7 +1015,8 @@ struct TReadSpec { bool Arrow = false; bool ThreadPool = false; - ui64 ReadAheadRowGroupCount = 0; + ui64 ParallelRowGroupCount = 1; + bool RowGroupReordering = true; std::unordered_map<TStringBuf, TType*, THash<TStringBuf>> RowSpec; NDB::ColumnsWithTypeAndName CHColumns; std::shared_ptr<arrow::Schema> ArrowSchema; @@ -1433,6 +1435,7 @@ public: struct TReadCache { ui64 Cookie = 0; TString Data; + std::optional<ui64> RowGroupIndex; bool Ready = false; }; @@ -1446,8 +1449,11 @@ public: ui64 RangeCookie = 0; std::map<TEvPrivate::TReadRange, TReadCache, TReadRangeCompare> RangeCache; - ui64 CacheInflightSize = 0; - ui64 ReadInflightSize = 0; + std::map<ui64, ui64> ReadInflightSize; + std::optional<ui64> CurrentRowGroupIndex; + std::map<ui64, ui64> RowGroupRangeInflight; + std::priority_queue<ui64, std::vector<ui64>, std::greater<ui64>> ReadyRowGroups; + std::map<ui64, ui64> RowGroupReaderIndex; static void OnResult(TActorSystem* actorSystem, TActorId selfId, TEvPrivate::TReadRange range, ui64 cookie, IHTTPGateway::TResult&& result) { switch (result.index()) { @@ -1462,6 +1468,18 @@ public: } } + ui64 DecreaseRowGroupInflight(ui64 rowGroupIndex) { + auto inflight = RowGroupRangeInflight[rowGroupIndex]; + if (inflight > 1) { + RowGroupRangeInflight[rowGroupIndex] = --inflight; + } else { + inflight = 0; + RowGroupRangeInflight.erase(rowGroupIndex); + } + return inflight; + } + + TReadCache& GetOrCreate(TEvPrivate::TReadRange range) { auto it = RangeCache.find(range); if (it != RangeCache.end()) { @@ -1475,7 +1493,18 @@ public: RetryStuff->RetryPolicy); LOG_CORO_D("Download STARTED [" << range.Offset << "-" << range.Length << "], cookie: " << RangeCookie); auto& result = RangeCache[range]; - result.Cookie = RangeCookie; // may overwrite old range in case of desync? + if (result.Cookie) { + // may overwrite old range in case of desync? + if (result.RowGroupIndex) { + LOG_CORO_W("RangeInfo DISCARDED [" << range.Offset << "-" << range.Length << "], cookie: " << RangeCookie << ", rowGroup " << *result.RowGroupIndex); + DecreaseRowGroupInflight(*result.RowGroupIndex); + } + } + result.RowGroupIndex = CurrentRowGroupIndex; + result.Cookie = RangeCookie; + if (CurrentRowGroupIndex) { + RowGroupRangeInflight[*CurrentRowGroupIndex]++; + } return result; } @@ -1486,8 +1515,14 @@ public: HandleEvent(*ev); StartCycleCount = GetCycleCountFast(); } - for (auto& range : readRanges) { - GetOrCreate(TEvPrivate::TReadRange{ .Offset = range.offset, .Length = range.length }); + if (readRanges.empty()) { // select count(*) case + if (CurrentRowGroupIndex) { + ReadyRowGroups.push(*CurrentRowGroupIndex); + } + } else { + for (auto& range : readRanges) { + GetOrCreate(TEvPrivate::TReadRange{ .Offset = range.offset, .Length = range.length }); + } } return {}; } @@ -1514,10 +1549,14 @@ public: } it->second.Data = event.Get()->Result.Extract(); - it->second.Ready = true; ui64 size = it->second.Data.size(); - CacheInflightSize += size; - if (RawInflightSize) { + it->second.Ready = true; + if (it->second.RowGroupIndex) { + if (!DecreaseRowGroupInflight(*it->second.RowGroupIndex)) { + LOG_CORO_D("RowGroup #" << *it->second.RowGroupIndex << " is READY"); + ReadyRowGroups.push(*it->second.RowGroupIndex); + } + ReadInflightSize[*it->second.RowGroupIndex] += size; RawInflightSize->Add(size); } } @@ -1543,10 +1582,8 @@ public: TString data = cache.Data; RangeCache.erase(range); - auto size = data.size(); - CacheInflightSize -= size; - ReadInflightSize += size; - LOG_CORO_D("ReadAt FINISHED [" << position << "-" << nbytes << "] #" << size); + + LOG_CORO_D("ReadAt FINISHED [" << position << "-" << nbytes << "] #" << data.size()); return arrow::Buffer::FromString(data); } @@ -1555,7 +1592,7 @@ public: LOG_CORO_D("RunCoroBlockArrowParserOverHttp"); - ui64 readerCount = ReadSpec->ReadAheadRowGroupCount + 1; + ui64 readerCount = std::max(1ul, ReadSpec->ParallelRowGroupCount); std::shared_ptr<parquet::FileMetaData> metadata; std::vector<std::unique_ptr<parquet::arrow::FileReader>> readers; @@ -1583,7 +1620,10 @@ public: BuildColumnConverters(ReadSpec->ArrowSchema, schema, columnIndices, columnConverters); - if (readerCount > numGroups) { + if (columnIndices.empty()) { + // select count(*) case - single reader is enough + readerCount = 1; + } else if (readerCount > numGroups) { readerCount = numGroups; } @@ -1599,15 +1639,44 @@ public: } for (ui64 i = 0; i < readerCount; i++) { - THROW_ARROW_NOT_OK(readers[i]->WillNeedRowGroups({ static_cast<int>(i) }, columnIndices)); + if (!columnIndices.empty()) { + CurrentRowGroupIndex = i; + THROW_ARROW_NOT_OK(readers[i]->WillNeedRowGroups({ static_cast<int>(i) }, columnIndices)); + } + RowGroupReaderIndex[i] = i; } - ui64 currentReader = 0; - for (ui64 group = 0; group < numGroups; group++) { + ui64 nextGroup = readerCount; + ui64 readyGroupCount = 0; + + while (readyGroupCount < numGroups) { + + ui64 readyGroupIndex; + if (!columnIndices.empty()) { + CpuTime += GetCpuTimeDelta(); + + // if reordering is not allowed wait for row groups sequentially + while (ReadyRowGroups.empty() + || (!ReadSpec->RowGroupReordering && ReadyRowGroups.top() > readyGroupCount) ) { + ProcessOneEvent(); + } + + StartCycleCount = GetCycleCountFast(); + + readyGroupIndex = ReadyRowGroups.top(); + ReadyRowGroups.pop(); + } else { + // select count(*) case - no columns, no download, just fetch meta info instantly + readyGroupIndex = readyGroupCount; + } + auto readyReaderIndex = RowGroupReaderIndex[readyGroupIndex]; + RowGroupReaderIndex.erase(readyGroupIndex); std::shared_ptr<arrow::Table> table; - THROW_ARROW_NOT_OK(readers[currentReader]->DecodeRowGroups({ static_cast<int>(group) }, columnIndices, &table)); + LOG_CORO_D("Decode RowGroup " << readyGroupIndex << " of " << numGroups << " from reader " << readyReaderIndex); + THROW_ARROW_NOT_OK(readers[readyReaderIndex]->DecodeRowGroups({ static_cast<int>(readyGroupIndex) }, columnIndices, &table)); + readyGroupCount++; auto reader = std::make_unique<arrow::TableBatchReader>(*table); std::shared_ptr<arrow::RecordBatch> batch; @@ -1621,14 +1690,16 @@ public: throw yexception() << status.ToString(); } if (RawInflightSize) { - RawInflightSize->Sub(ReadInflightSize); - ReadInflightSize = 0; - } - if (group + readerCount < numGroups) { - THROW_ARROW_NOT_OK(readers[currentReader]->WillNeedRowGroups({ static_cast<int>(group + readerCount) }, columnIndices)); + RawInflightSize->Sub(ReadInflightSize[readyGroupIndex]); } - if (++currentReader >= readerCount) { - currentReader = 0; + ReadInflightSize.erase(readyGroupIndex); + if (nextGroup < numGroups) { + if (!columnIndices.empty()) { + CurrentRowGroupIndex = nextGroup; + THROW_ARROW_NOT_OK(readers[readyReaderIndex]->WillNeedRowGroups({ static_cast<int>(nextGroup) }, columnIndices)); + } + RowGroupReaderIndex[nextGroup] = readyReaderIndex; + nextGroup++; } } } @@ -1853,13 +1924,14 @@ public: if (DeferredDataParts.size() && DeferredQueueSize) { DeferredQueueSize->Sub(DeferredDataParts.size()); } - if (CacheInflightSize || ReadInflightSize) { - if (RawInflightSize) { - RawInflightSize->Sub(ReadInflightSize + ReadInflightSize); - } - CacheInflightSize = 0; - ReadInflightSize = 0; + auto rawInflightSize = 0; + for (auto it : ReadInflightSize) { + rawInflightSize += it.second; + } + if (rawInflightSize && RawInflightSize) { + RawInflightSize->Sub(rawInflightSize); } + ReadInflightSize.clear(); } private: @@ -2753,7 +2825,8 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor( const auto readSpec = std::make_shared<TReadSpec>(); readSpec->Arrow = params.GetArrow(); readSpec->ThreadPool = params.GetThreadPool(); - readSpec->ReadAheadRowGroupCount = params.GetReadAheadRowGroupCount(); + readSpec->ParallelRowGroupCount = std::max(1ul, params.GetParallelRowGroupCount()); + readSpec->RowGroupReordering = params.GetRowGroupReordering(); if (readSpec->Arrow) { arrow::SchemaBuilder builder; const TStringBuf blockLengthColumn("_yql_block_length"sv); diff --git a/ydb/library/yql/providers/s3/proto/source.proto b/ydb/library/yql/providers/s3/proto/source.proto index bf379c1b992..8cbe1d7287e 100644 --- a/ydb/library/yql/providers/s3/proto/source.proto +++ b/ydb/library/yql/providers/s3/proto/source.proto @@ -17,5 +17,6 @@ message TSource { map<string, string> Settings = 6; bool Arrow = 7; bool ThreadPool = 8; - uint64 ReadAheadRowGroupCount = 9; + uint64 ParallelRowGroupCount = 9; + bool RowGroupReordering = 10; } diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp index 42c2b77e607..60c2e0750de 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp @@ -278,7 +278,8 @@ public: srcDesc.SetFormat(parseSettings.Format().StringValue().c_str()); srcDesc.SetArrow(bool(parseSettings.Maybe<TS3ArrowSettings>())); srcDesc.SetThreadPool(State_->Configuration->ArrowThreadPool.Get().GetOrElse(true)); - srcDesc.SetReadAheadRowGroupCount(State_->Configuration->ArrowReadAheadRowGroupCount.Get().GetOrElse(0)); + srcDesc.SetParallelRowGroupCount(State_->Configuration->ArrowParallelRowGroupCount.Get().GetOrElse(1)); + srcDesc.SetRowGroupReordering(State_->Configuration->ArrowRowGroupReordering.Get().GetOrElse(false)); const TStructExprType* fullRowType = parseSettings.RowType().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TStructExprType>(); // exclude extra columns to get actual row type we need to read from input diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp index fe48fc0be09..580dddf2813 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp @@ -16,7 +16,8 @@ TS3Configuration::TS3Configuration() REGISTER_SETTING(*this, InFlightMemoryLimit); REGISTER_SETTING(*this, JsonListSizeLimit).Upper(100'000); REGISTER_SETTING(*this, ArrowThreadPool); - REGISTER_SETTING(*this, ArrowReadAheadRowGroupCount); + REGISTER_SETTING(*this, ArrowParallelRowGroupCount).Lower(1); + REGISTER_SETTING(*this, ArrowRowGroupReordering); } TS3Settings::TConstPtr TS3Configuration::Snapshot() const { diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_settings.h b/ydb/library/yql/providers/s3/provider/yql_s3_settings.h index 53e8c3a7e46..f7db3ee905d 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_settings.h +++ b/ydb/library/yql/providers/s3/provider/yql_s3_settings.h @@ -18,7 +18,8 @@ struct TS3Settings { NCommon::TConfSetting<ui64, false> InFlightMemoryLimit; // Maximum memory used by one sink. NCommon::TConfSetting<ui64, false> JsonListSizeLimit; // Limit of elements count in json list written to S3 file. Default: 10'000. Max: 100'000. NCommon::TConfSetting<bool, false> ArrowThreadPool; - NCommon::TConfSetting<ui64, false> ArrowReadAheadRowGroupCount; + NCommon::TConfSetting<ui64, false> ArrowParallelRowGroupCount; // Number of parquet row groups to read in parallel, min == 1 + NCommon::TConfSetting<bool, false> ArrowRowGroupReordering; // Allow to push rows from file in any order, default false, but usually it is OK }; struct TS3ClusterSettings { |