aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhor911 <hor911@ydb.tech>2023-03-06 23:31:20 +0300
committerhor911 <hor911@ydb.tech>2023-03-06 23:31:20 +0300
commite61ae9ba657c9b199ec24061043a5426aed28ecc (patch)
tree983e283e67a47714ad51a7f3a8e80f12b3e08ea4
parentfccb745feb0ac047fb767a1d9829c7b48d8d8f48 (diff)
downloadydb-e61ae9ba657c9b199ec24061043a5426aed28ecc.tar.gz
Refactor s3.ArrowThreadPool and introduce s3.ArrowReadAheadRowGroupCount options
-rw-r--r--ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json5
-rw-r--r--ydb/library/yql/providers/s3/proto/source.proto1
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp3
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp8
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp2
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp1
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_settings.h1
7 files changed, 9 insertions, 12 deletions
diff --git a/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json b/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json
index 7a8e65a476..06b69e4d1e 100644
--- a/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json
+++ b/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json
@@ -77,11 +77,6 @@
"Match": {"Type": "Callable", "Name": "S3ArrowSettings"}
},
{
- "Name": "TS3CoroArrowSettings",
- "Base": "TS3ParseSettingsBase",
- "Match": {"Type": "Callable", "Name": "S3CoroArrowSettings"}
- },
- {
"Name": "TS3Read",
"Base": "TFreeArgCallable",
"Match": {"Type": "Callable", "Name": "Read!"},
diff --git a/ydb/library/yql/providers/s3/proto/source.proto b/ydb/library/yql/providers/s3/proto/source.proto
index c174ce2cd9..bf379c1b99 100644
--- a/ydb/library/yql/providers/s3/proto/source.proto
+++ b/ydb/library/yql/providers/s3/proto/source.proto
@@ -17,4 +17,5 @@ message TSource {
map<string, string> Settings = 6;
bool Arrow = 7;
bool ThreadPool = 8;
+ uint64 ReadAheadRowGroupCount = 9;
}
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp
index 00aeccd4cc..0e1c9a218c 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp
@@ -111,7 +111,6 @@ public:
AddHandler({TS3SourceSettings::CallableName()}, Hndl(&TSelf::HandleS3SourceSettings));
AddHandler({TS3ParseSettings::CallableName()}, Hndl(&TSelf::HandleS3ParseSettingsBase));
AddHandler({TS3ArrowSettings::CallableName()}, Hndl(&TSelf::HandleS3ParseSettingsBase));
- AddHandler({TS3CoroArrowSettings::CallableName()}, Hndl(&TSelf::HandleS3ParseSettingsBase));
AddHandler({TCoConfigure::CallableName()}, Hndl(&TSelf::HandleConfig));
}
@@ -178,7 +177,7 @@ public:
}
const TTypeAnnotationNode* itemType = nullptr;
- if (input->Content() == TS3ArrowSettings::CallableName() || input->Content() == TS3CoroArrowSettings::CallableName()) {
+ if (input->Content() == TS3ArrowSettings::CallableName()) {
TVector<const TItemExprType*> blockRowTypeItems;
for (const auto& x : rowType->Cast<TStructExprType>()->GetItems()) {
blockRowTypeItems.push_back(ctx.MakeType<TItemExprType>(x->GetName(), ctx.MakeType<TBlockExprType>(x->GetItemType())));
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
index 3a21c6929a..42c2b77e60 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
@@ -194,8 +194,7 @@ public:
}
return Build<TDqSourceWrap>(ctx, read->Pos())
.Input<TS3ParseSettingsBase>()
- .CallableName((supportedArrowTypes && format == "parquet") ?
- (State_->Configuration->ArrowThreadPool.Get().GetOrElse(true) ? TS3ArrowSettings::CallableName() : TS3CoroArrowSettings::CallableName() ):
+ .CallableName((supportedArrowTypes && format == "parquet") ? TS3ArrowSettings::CallableName():
TS3ParseSettings::CallableName())
.Paths(s3ReadObject.Object().Paths())
.Token<TCoSecureParam>()
@@ -277,8 +276,9 @@ public:
if (const auto mayParseSettings = settings.Maybe<TS3ParseSettingsBase>()) {
const auto parseSettings = mayParseSettings.Cast();
srcDesc.SetFormat(parseSettings.Format().StringValue().c_str());
- srcDesc.SetArrow(bool(parseSettings.Maybe<TS3ArrowSettings>()) || bool(parseSettings.Maybe<TS3CoroArrowSettings>()));
- srcDesc.SetThreadPool(bool(parseSettings.Maybe<TS3ArrowSettings>()));
+ srcDesc.SetArrow(bool(parseSettings.Maybe<TS3ArrowSettings>()));
+ srcDesc.SetThreadPool(State_->Configuration->ArrowThreadPool.Get().GetOrElse(true));
+ srcDesc.SetReadAheadRowGroupCount(State_->Configuration->ArrowReadAheadRowGroupCount.Get().GetOrElse(0));
const TStructExprType* fullRowType = parseSettings.RowType().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TStructExprType>();
// exclude extra columns to get actual row type we need to read from input
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp
index 997090570f..bd3e92d89a 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp
@@ -514,7 +514,7 @@ public:
}
}
- if (outputRowDataItems.size() == 0 && readRowDataItems.size() != 0 && !parseSettingsBase.Maybe<TS3ArrowSettings>() && !parseSettingsBase.Maybe<TS3CoroArrowSettings>()) {
+ if (outputRowDataItems.size() == 0 && readRowDataItems.size() != 0 && !parseSettingsBase.Maybe<TS3ArrowSettings>()) {
const TStructExprType* readRowDataType = ctx.MakeType<TStructExprType>(readRowDataItems);
auto item = GetLightColumn(*readRowDataType);
YQL_ENSURE(item);
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp
index a60dfa7639..fe48fc0be0 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp
@@ -16,6 +16,7 @@ TS3Configuration::TS3Configuration()
REGISTER_SETTING(*this, InFlightMemoryLimit);
REGISTER_SETTING(*this, JsonListSizeLimit).Upper(100'000);
REGISTER_SETTING(*this, ArrowThreadPool);
+ REGISTER_SETTING(*this, ArrowReadAheadRowGroupCount);
}
TS3Settings::TConstPtr TS3Configuration::Snapshot() const {
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_settings.h b/ydb/library/yql/providers/s3/provider/yql_s3_settings.h
index e2f5caa3cd..53e8c3a7e4 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_settings.h
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_settings.h
@@ -18,6 +18,7 @@ struct TS3Settings {
NCommon::TConfSetting<ui64, false> InFlightMemoryLimit; // Maximum memory used by one sink.
NCommon::TConfSetting<ui64, false> JsonListSizeLimit; // Limit of elements count in json list written to S3 file. Default: 10'000. Max: 100'000.
NCommon::TConfSetting<bool, false> ArrowThreadPool;
+ NCommon::TConfSetting<ui64, false> ArrowReadAheadRowGroupCount;
};
struct TS3ClusterSettings {