aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhor911 <hor911@ydb.tech>2023-03-09 16:00:16 +0300
committerhor911 <hor911@ydb.tech>2023-03-09 16:00:16 +0300
commit7d3614a719ab373119c4446eb060cdde89f80601 (patch)
tree0d3959c3de6551877ab12fdf9e110cbb64bdc23c
parent921a6ad63c98a9c6f6e51ca15112dbc6939dcd69 (diff)
downloadydb-7d3614a719ab373119c4446eb060cdde89f80601.tar.gz
Reordered row group read
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp139
-rw-r--r--ydb/library/yql/providers/s3/proto/source.proto3
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp3
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp3
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_settings.h3
5 files changed, 114 insertions, 37 deletions
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
index 2bdbaaa2756..9b2cef3cdfd 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
@@ -72,6 +72,7 @@
#include <util/stream/format.h>
#include <util/system/fstat.h>
+#include <algorithm>
#include <queue>
#ifdef THROW
@@ -1014,7 +1015,8 @@ struct TReadSpec {
bool Arrow = false;
bool ThreadPool = false;
- ui64 ReadAheadRowGroupCount = 0;
+ ui64 ParallelRowGroupCount = 1;
+ bool RowGroupReordering = true;
std::unordered_map<TStringBuf, TType*, THash<TStringBuf>> RowSpec;
NDB::ColumnsWithTypeAndName CHColumns;
std::shared_ptr<arrow::Schema> ArrowSchema;
@@ -1433,6 +1435,7 @@ public:
struct TReadCache {
ui64 Cookie = 0;
TString Data;
+ std::optional<ui64> RowGroupIndex;
bool Ready = false;
};
@@ -1446,8 +1449,11 @@ public:
ui64 RangeCookie = 0;
std::map<TEvPrivate::TReadRange, TReadCache, TReadRangeCompare> RangeCache;
- ui64 CacheInflightSize = 0;
- ui64 ReadInflightSize = 0;
+ std::map<ui64, ui64> ReadInflightSize;
+ std::optional<ui64> CurrentRowGroupIndex;
+ std::map<ui64, ui64> RowGroupRangeInflight;
+ std::priority_queue<ui64, std::vector<ui64>, std::greater<ui64>> ReadyRowGroups;
+ std::map<ui64, ui64> RowGroupReaderIndex;
static void OnResult(TActorSystem* actorSystem, TActorId selfId, TEvPrivate::TReadRange range, ui64 cookie, IHTTPGateway::TResult&& result) {
switch (result.index()) {
@@ -1462,6 +1468,18 @@ public:
}
}
+ ui64 DecreaseRowGroupInflight(ui64 rowGroupIndex) {
+ auto inflight = RowGroupRangeInflight[rowGroupIndex];
+ if (inflight > 1) {
+ RowGroupRangeInflight[rowGroupIndex] = --inflight;
+ } else {
+ inflight = 0;
+ RowGroupRangeInflight.erase(rowGroupIndex);
+ }
+ return inflight;
+ }
+
+
TReadCache& GetOrCreate(TEvPrivate::TReadRange range) {
auto it = RangeCache.find(range);
if (it != RangeCache.end()) {
@@ -1475,7 +1493,18 @@ public:
RetryStuff->RetryPolicy);
LOG_CORO_D("Download STARTED [" << range.Offset << "-" << range.Length << "], cookie: " << RangeCookie);
auto& result = RangeCache[range];
- result.Cookie = RangeCookie; // may overwrite old range in case of desync?
+ if (result.Cookie) {
+ // may overwrite old range in case of desync?
+ if (result.RowGroupIndex) {
+ LOG_CORO_W("RangeInfo DISCARDED [" << range.Offset << "-" << range.Length << "], cookie: " << RangeCookie << ", rowGroup " << *result.RowGroupIndex);
+ DecreaseRowGroupInflight(*result.RowGroupIndex);
+ }
+ }
+ result.RowGroupIndex = CurrentRowGroupIndex;
+ result.Cookie = RangeCookie;
+ if (CurrentRowGroupIndex) {
+ RowGroupRangeInflight[*CurrentRowGroupIndex]++;
+ }
return result;
}
@@ -1486,8 +1515,14 @@ public:
HandleEvent(*ev);
StartCycleCount = GetCycleCountFast();
}
- for (auto& range : readRanges) {
- GetOrCreate(TEvPrivate::TReadRange{ .Offset = range.offset, .Length = range.length });
+ if (readRanges.empty()) { // select count(*) case
+ if (CurrentRowGroupIndex) {
+ ReadyRowGroups.push(*CurrentRowGroupIndex);
+ }
+ } else {
+ for (auto& range : readRanges) {
+ GetOrCreate(TEvPrivate::TReadRange{ .Offset = range.offset, .Length = range.length });
+ }
}
return {};
}
@@ -1514,10 +1549,14 @@ public:
}
it->second.Data = event.Get()->Result.Extract();
- it->second.Ready = true;
ui64 size = it->second.Data.size();
- CacheInflightSize += size;
- if (RawInflightSize) {
+ it->second.Ready = true;
+ if (it->second.RowGroupIndex) {
+ if (!DecreaseRowGroupInflight(*it->second.RowGroupIndex)) {
+ LOG_CORO_D("RowGroup #" << *it->second.RowGroupIndex << " is READY");
+ ReadyRowGroups.push(*it->second.RowGroupIndex);
+ }
+ ReadInflightSize[*it->second.RowGroupIndex] += size;
RawInflightSize->Add(size);
}
}
@@ -1543,10 +1582,8 @@ public:
TString data = cache.Data;
RangeCache.erase(range);
- auto size = data.size();
- CacheInflightSize -= size;
- ReadInflightSize += size;
- LOG_CORO_D("ReadAt FINISHED [" << position << "-" << nbytes << "] #" << size);
+
+ LOG_CORO_D("ReadAt FINISHED [" << position << "-" << nbytes << "] #" << data.size());
return arrow::Buffer::FromString(data);
}
@@ -1555,7 +1592,7 @@ public:
LOG_CORO_D("RunCoroBlockArrowParserOverHttp");
- ui64 readerCount = ReadSpec->ReadAheadRowGroupCount + 1;
+ ui64 readerCount = std::max(1ul, ReadSpec->ParallelRowGroupCount);
std::shared_ptr<parquet::FileMetaData> metadata;
std::vector<std::unique_ptr<parquet::arrow::FileReader>> readers;
@@ -1583,7 +1620,10 @@ public:
BuildColumnConverters(ReadSpec->ArrowSchema, schema, columnIndices, columnConverters);
- if (readerCount > numGroups) {
+ if (columnIndices.empty()) {
+ // select count(*) case - single reader is enough
+ readerCount = 1;
+ } else if (readerCount > numGroups) {
readerCount = numGroups;
}
@@ -1599,15 +1639,44 @@ public:
}
for (ui64 i = 0; i < readerCount; i++) {
- THROW_ARROW_NOT_OK(readers[i]->WillNeedRowGroups({ static_cast<int>(i) }, columnIndices));
+ if (!columnIndices.empty()) {
+ CurrentRowGroupIndex = i;
+ THROW_ARROW_NOT_OK(readers[i]->WillNeedRowGroups({ static_cast<int>(i) }, columnIndices));
+ }
+ RowGroupReaderIndex[i] = i;
}
- ui64 currentReader = 0;
- for (ui64 group = 0; group < numGroups; group++) {
+ ui64 nextGroup = readerCount;
+ ui64 readyGroupCount = 0;
+
+ while (readyGroupCount < numGroups) {
+
+ ui64 readyGroupIndex;
+ if (!columnIndices.empty()) {
+ CpuTime += GetCpuTimeDelta();
+
+ // if reordering is not allowed wait for row groups sequentially
+ while (ReadyRowGroups.empty()
+ || (!ReadSpec->RowGroupReordering && ReadyRowGroups.top() > readyGroupCount) ) {
+ ProcessOneEvent();
+ }
+
+ StartCycleCount = GetCycleCountFast();
+
+ readyGroupIndex = ReadyRowGroups.top();
+ ReadyRowGroups.pop();
+ } else {
+ // select count(*) case - no columns, no download, just fetch meta info instantly
+ readyGroupIndex = readyGroupCount;
+ }
+ auto readyReaderIndex = RowGroupReaderIndex[readyGroupIndex];
+ RowGroupReaderIndex.erase(readyGroupIndex);
std::shared_ptr<arrow::Table> table;
- THROW_ARROW_NOT_OK(readers[currentReader]->DecodeRowGroups({ static_cast<int>(group) }, columnIndices, &table));
+ LOG_CORO_D("Decode RowGroup " << readyGroupIndex << " of " << numGroups << " from reader " << readyReaderIndex);
+ THROW_ARROW_NOT_OK(readers[readyReaderIndex]->DecodeRowGroups({ static_cast<int>(readyGroupIndex) }, columnIndices, &table));
+ readyGroupCount++;
auto reader = std::make_unique<arrow::TableBatchReader>(*table);
std::shared_ptr<arrow::RecordBatch> batch;
@@ -1621,14 +1690,16 @@ public:
throw yexception() << status.ToString();
}
if (RawInflightSize) {
- RawInflightSize->Sub(ReadInflightSize);
- ReadInflightSize = 0;
- }
- if (group + readerCount < numGroups) {
- THROW_ARROW_NOT_OK(readers[currentReader]->WillNeedRowGroups({ static_cast<int>(group + readerCount) }, columnIndices));
+ RawInflightSize->Sub(ReadInflightSize[readyGroupIndex]);
}
- if (++currentReader >= readerCount) {
- currentReader = 0;
+ ReadInflightSize.erase(readyGroupIndex);
+ if (nextGroup < numGroups) {
+ if (!columnIndices.empty()) {
+ CurrentRowGroupIndex = nextGroup;
+ THROW_ARROW_NOT_OK(readers[readyReaderIndex]->WillNeedRowGroups({ static_cast<int>(nextGroup) }, columnIndices));
+ }
+ RowGroupReaderIndex[nextGroup] = readyReaderIndex;
+ nextGroup++;
}
}
}
@@ -1853,13 +1924,14 @@ public:
if (DeferredDataParts.size() && DeferredQueueSize) {
DeferredQueueSize->Sub(DeferredDataParts.size());
}
- if (CacheInflightSize || ReadInflightSize) {
- if (RawInflightSize) {
- RawInflightSize->Sub(ReadInflightSize + ReadInflightSize);
- }
- CacheInflightSize = 0;
- ReadInflightSize = 0;
+ auto rawInflightSize = 0;
+ for (auto it : ReadInflightSize) {
+ rawInflightSize += it.second;
+ }
+ if (rawInflightSize && RawInflightSize) {
+ RawInflightSize->Sub(rawInflightSize);
}
+ ReadInflightSize.clear();
}
private:
@@ -2753,7 +2825,8 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
const auto readSpec = std::make_shared<TReadSpec>();
readSpec->Arrow = params.GetArrow();
readSpec->ThreadPool = params.GetThreadPool();
- readSpec->ReadAheadRowGroupCount = params.GetReadAheadRowGroupCount();
+ readSpec->ParallelRowGroupCount = std::max(1ul, params.GetParallelRowGroupCount());
+ readSpec->RowGroupReordering = params.GetRowGroupReordering();
if (readSpec->Arrow) {
arrow::SchemaBuilder builder;
const TStringBuf blockLengthColumn("_yql_block_length"sv);
diff --git a/ydb/library/yql/providers/s3/proto/source.proto b/ydb/library/yql/providers/s3/proto/source.proto
index bf379c1b992..8cbe1d7287e 100644
--- a/ydb/library/yql/providers/s3/proto/source.proto
+++ b/ydb/library/yql/providers/s3/proto/source.proto
@@ -17,5 +17,6 @@ message TSource {
map<string, string> Settings = 6;
bool Arrow = 7;
bool ThreadPool = 8;
- uint64 ReadAheadRowGroupCount = 9;
+ uint64 ParallelRowGroupCount = 9;
+ bool RowGroupReordering = 10;
}
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
index 42c2b77e607..60c2e0750de 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
@@ -278,7 +278,8 @@ public:
srcDesc.SetFormat(parseSettings.Format().StringValue().c_str());
srcDesc.SetArrow(bool(parseSettings.Maybe<TS3ArrowSettings>()));
srcDesc.SetThreadPool(State_->Configuration->ArrowThreadPool.Get().GetOrElse(true));
- srcDesc.SetReadAheadRowGroupCount(State_->Configuration->ArrowReadAheadRowGroupCount.Get().GetOrElse(0));
+ srcDesc.SetParallelRowGroupCount(State_->Configuration->ArrowParallelRowGroupCount.Get().GetOrElse(1));
+ srcDesc.SetRowGroupReordering(State_->Configuration->ArrowRowGroupReordering.Get().GetOrElse(false));
const TStructExprType* fullRowType = parseSettings.RowType().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TStructExprType>();
// exclude extra columns to get actual row type we need to read from input
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp
index fe48fc0be09..580dddf2813 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp
@@ -16,7 +16,8 @@ TS3Configuration::TS3Configuration()
REGISTER_SETTING(*this, InFlightMemoryLimit);
REGISTER_SETTING(*this, JsonListSizeLimit).Upper(100'000);
REGISTER_SETTING(*this, ArrowThreadPool);
- REGISTER_SETTING(*this, ArrowReadAheadRowGroupCount);
+ REGISTER_SETTING(*this, ArrowParallelRowGroupCount).Lower(1);
+ REGISTER_SETTING(*this, ArrowRowGroupReordering);
}
TS3Settings::TConstPtr TS3Configuration::Snapshot() const {
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_settings.h b/ydb/library/yql/providers/s3/provider/yql_s3_settings.h
index 53e8c3a7e46..f7db3ee905d 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_settings.h
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_settings.h
@@ -18,7 +18,8 @@ struct TS3Settings {
NCommon::TConfSetting<ui64, false> InFlightMemoryLimit; // Maximum memory used by one sink.
NCommon::TConfSetting<ui64, false> JsonListSizeLimit; // Limit of elements count in json list written to S3 file. Default: 10'000. Max: 100'000.
NCommon::TConfSetting<bool, false> ArrowThreadPool;
- NCommon::TConfSetting<ui64, false> ArrowReadAheadRowGroupCount;
+ NCommon::TConfSetting<ui64, false> ArrowParallelRowGroupCount; // Number of parquet row groups to read in parallel, min == 1
+ NCommon::TConfSetting<bool, false> ArrowRowGroupReordering; // Allow to push rows from file in any order, default false, but usually it is OK
};
struct TS3ClusterSettings {