aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraozeritsky <aozeritsky@ydb.tech>2023-08-28 19:27:29 +0300
committeraozeritsky <aozeritsky@ydb.tech>2023-08-28 20:52:25 +0300
commit6303c55c9b257865e486fb1106ddd368b7420931 (patch)
treebcfb190dc2415da194faf6d0f44bad1408e31f74
parent15ab95546529b917e592563f8081c972c08d301c (diff)
downloadydb-6303c55c9b257865e486fb1106ddd368b7420931.tar.gz
Add statistics for s3 provider
-rw-r--r--ydb/library/yql/providers/dq/provider/yql_dq_statistics.cpp15
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp30
2 files changed, 39 insertions, 6 deletions
diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_statistics.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_statistics.cpp
index e2aa1f4ba6..61c899e125 100644
--- a/ydb/library/yql/providers/dq/provider/yql_dq_statistics.cpp
+++ b/ydb/library/yql/providers/dq/provider/yql_dq_statistics.cpp
@@ -29,21 +29,24 @@ public:
auto ret = OptimizeExpr(input, output, [*this](const TExprNode::TPtr& input, TExprContext& ctx) {
Y_UNUSED(ctx);
auto output = input;
+ bool hasDqSource = false;
if (TCoFlatMap::Match(input.Get())){
NDq::InferStatisticsForFlatMap(input, State->TypeCtx);
} else if(TCoSkipNullMembers::Match(input.Get())){
NDq::InferStatisticsForSkipNullMembers(input, State->TypeCtx);
- } else if (TDqReadWrapBase::Match(input.Get())) {
- auto read = input->Child(TDqReadWrapBase::idx_Input);
+ } else if (TDqReadWrapBase::Match(input.Get()) || (hasDqSource = TDqSourceWrapBase::Match(input.Get()))) {
+ auto node = hasDqSource
+ ? input
+ : input->Child(TDqReadWrapBase::idx_Input);
auto dataSourceChildIndex = 1;
- YQL_ENSURE(read->ChildrenSize() > 1);
- YQL_ENSURE(read->Child(dataSourceChildIndex)->IsCallable("DataSource"));
- auto dataSourceName = read->Child(dataSourceChildIndex)->Child(0)->Content();
+ YQL_ENSURE(node->ChildrenSize() > 1);
+ YQL_ENSURE(node->Child(dataSourceChildIndex)->IsCallable("DataSource"));
+ auto dataSourceName = node->Child(dataSourceChildIndex)->Child(0)->Content();
auto datasource = State->TypeCtx->DataSourceMap.FindPtr(dataSourceName);
YQL_ENSURE(datasource);
if (auto dqIntegration = (*datasource)->GetDqIntegration()) {
- auto stat = dqIntegration->ReadStatistics(read, ctx);
+ auto stat = dqIntegration->ReadStatistics(node, ctx);
if (stat) {
State->TypeCtx->SetStats(input.Get(), std::move(std::make_shared<TOptimizerStatistics>(*stat)));
}
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
index fe2e309de9..112999eaa3 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
@@ -130,6 +130,36 @@ public:
return Nothing();
}
+ TMaybe<TOptimizerStatistics> ReadStatistics(const TExprNode::TPtr& sourceWrap, TExprContext& ctx) override {
+ Y_UNUSED(ctx);
+ double size = 0;
+ double cols = 0;
+ double rows = 0;
+ if (const auto& maybeArrowSettings = TMaybeNode<TS3ArrowSettings>(sourceWrap->Child(0))) {
+ const auto& arrowSettings = maybeArrowSettings.Cast();
+ for (size_t i = 0; i < arrowSettings.Paths().Size(); ++i) {
+ auto batch = arrowSettings.Paths().Item(i);
+ TStringBuf packed = batch.Data().Literal().Value();
+ bool isTextEncoded = FromString<bool>(batch.IsText().Literal().Value());
+ TPathList paths;
+ UnpackPathsList(packed, isTextEncoded, paths);
+
+ for (const auto& path : paths) {
+ size += path.Size;
+ }
+ }
+
+ if (arrowSettings.RowType().Maybe<TCoStructType>()) {
+ cols = arrowSettings.RowType().Ptr()->ChildrenSize();
+ }
+
+ rows = size / 1024; // magic estimate
+ return TOptimizerStatistics(rows, cols);
+ } else {
+ return Nothing();
+ }
+ }
+
TExprNode::TPtr WrapRead(const TDqSettings&, const TExprNode::TPtr& read, TExprContext& ctx) override {
if (const auto& maybeS3ReadObject = TMaybeNode<TS3ReadObject>(read)) {
const auto& s3ReadObject = maybeS3ReadObject.Cast();