aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authore-zudin <e-zudin@ydb.tech>2023-12-06 15:54:19 +0300
committere-zudin <e-zudin@ydb.tech>2023-12-06 16:35:59 +0300
commit1c561659d3a0f1fcc89a3cd615ee731f0e2bcf2c (patch)
tree7c8ead875b4850184b0a1c7cc5b376f0f4fce3d2
parent94c7732c3802bdb6c527fdcc678e8ee88622530c (diff)
downloadydb-1c561659d3a0f1fcc89a3cd615ee731f0e2bcf2c.tar.gz
Rework push down limit to support both v2 and v1
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp3
-rw-r--r--ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json3
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp10
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp4
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp1
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp51
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp45
-rw-r--r--ydb/tests/fq/s3/test_push_down.py74
-rw-r--r--ydb/tests/fq/s3/ya.make1
9 files changed, 137 insertions, 55 deletions
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
index a8783794bad..e9bb49a6019 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
@@ -1863,7 +1863,8 @@ public:
readers[readyReaderIndex].reset();
}
if (isCancelled) {
- LOG_CORO_D("RunCoroBlockArrowParserOverHttp - STOPPED ON SATURATION");
+ LOG_CORO_D("RunCoroBlockArrowParserOverHttp - STOPPED ON SATURATION, downloaded " <<
+ QueueBufferCounter->DownloadedBytes << " bytes");
break;
}
}
diff --git a/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json b/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json
index ea5b33c99dc..90df231ec91 100644
--- a/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json
+++ b/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json
@@ -94,8 +94,7 @@
"Children": [
{"Index": 0, "Name": "Paths", "Type": "TS3Paths"},
{"Index": 1, "Name": "Format", "Type": "TCoAtom"},
- {"Index": 2, "Name": "RowsLimitHint", "Type": "TCoAtom"},
- {"Index": 3, "Name": "Settings", "Type": "TExprBase", "Optional": true}
+ {"Index": 2, "Name": "Settings", "Type": "TExprBase", "Optional": true}
]
},
{
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp
index 364b058ebe1..bccfdca0e12 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp
@@ -311,6 +311,10 @@ public:
return TStatus::Error;
}
+ if (!EnsureAtom(*input->Child(TS3SourceSettings::idx_RowsLimitHint), ctx)) {
+ return TStatus::Error;
+ }
+
const TTypeAnnotationNode* itemType = ctx.MakeType<TDataExprType>(EDataSlot::String);
if (extraColumnsType->GetSize()) {
itemType = ctx.MakeType<TTupleExprType>(
@@ -510,7 +514,7 @@ public:
}
TStatus HandleObject(const TExprNode::TPtr& input, TExprContext& ctx) {
- if (!EnsureMinMaxArgsCount(*input, 2U, 4U, ctx)) {
+ if (!EnsureMinMaxArgsCount(*input, 2U, 3U, ctx)) {
return TStatus::Error;
}
@@ -524,10 +528,6 @@ public:
return TStatus::Error;
}
- if (!EnsureAtom(*input->Child(TS3SourceSettings::idx_RowsLimitHint), ctx)) {
- return TStatus::Error;
- }
-
if (input->ChildrenSize() > TS3Object::idx_Settings) {
bool haveProjection = false;
bool havePartitionedBy = false;
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
index 14cfce344f5..68e5095c6a4 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
@@ -230,7 +230,7 @@ public:
.Token<TCoSecureParam>()
.Name().Build(token)
.Build()
- .RowsLimitHint(s3ReadObject.Object().RowsLimitHint())
+ .RowsLimitHint(ctx.NewAtom(read->Pos(), ""))
.Format(s3ReadObject.Object().Format())
.RowType(ExpandType(s3ReadObject.Pos(), *rowType, ctx))
.Settings(s3ReadObject.Object().Settings())
@@ -273,7 +273,7 @@ public:
.Token<TCoSecureParam>()
.Name().Build(token)
.Build()
- .RowsLimitHint(s3ReadObject.Object().RowsLimitHint())
+ .RowsLimitHint(ctx.NewAtom(read->Pos(), ""))
.SizeLimit(
sizeLimitIndex != -1 ? readSettings->Child(sizeLimitIndex)->TailPtr()
: emptyNode)
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp
index f0f06e5fd6d..f5b85cd31c5 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp
@@ -496,7 +496,6 @@ private:
s3Object = Build<TS3Object>(ctx, object.Pos())
.Paths(ctx.NewList(object.Pos(), std::move(pathNodes)))
.Format(std::move(format))
- .RowsLimitHint(ctx.NewAtom(object.Pos(), ""))
.Settings(ctx.NewList(object.Pos(), std::move(settings)))
.Done().Ptr();
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp
index fef7d446f82..fb9d3956068 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp
@@ -171,6 +171,7 @@ public:
AddHandler(0, &TCoExtractMembers::Match, HNDL(ExtractMembersOverDqSource));
AddHandler(0, &TDqSourceWrap::Match, HNDL(MergeS3Paths));
AddHandler(0, &TDqSourceWrap::Match, HNDL(CleanupExtraColumns));
+ AddHandler(0, &TCoTake::Match, HNDL(PushDownLimit));
#undef HNDL
}
@@ -661,6 +662,56 @@ public:
}
+ template <class TSettings>
+ TMaybeNode<TExprBase> ConstructNodeForPushDownLimit(TCoTake take, TDqSourceWrap source, TSettings settings, TCoUint64 count, TExprContext& ctx) const {
+ return Build<TCoTake>(ctx, take.Pos())
+ .InitFrom(take)
+ .Input<TDqSourceWrap>()
+ .InitFrom(source)
+ .Input<TSettings>()
+ .InitFrom(settings)
+ .RowsLimitHint(count.Literal())
+ .Build()
+ .Build()
+ .Done();
+ }
+
+ TMaybeNode<TExprBase> PushDownLimit(TExprBase node, TExprContext& ctx) const {
+ auto take = node.Cast<TCoTake>();
+
+ auto maybeSource = take.Input().Maybe<TDqSourceWrap>();
+ auto maybeSettings = maybeSource.Input().Maybe<TS3SourceSettingsBase>();
+ if (!maybeSettings) {
+ return take;
+ }
+ YQL_CLOG(TRACE, ProviderS3) << "Trying to push down limit for S3";
+
+ auto maybeCount = take.Count().Maybe<TCoUint64>();
+ if (!maybeCount) {
+ return take;
+ }
+ auto count = maybeCount.Cast();
+ auto countNum = FromString<ui64>(count.Literal().Ref().Content());
+ YQL_ENSURE(countNum > 0, "Got invalid limit " << countNum << " to push down");
+
+ auto settings = maybeSettings.Cast();
+ if (auto rowsLimitHintStr = settings.RowsLimitHint().Ref().Content(); !rowsLimitHintStr.empty()) {
+ // LimitHint is already pushed down
+ auto rowsLimitHint = FromString<ui64>(rowsLimitHintStr);
+ if (countNum >= rowsLimitHint) {
+ // Already propagated
+ return node;
+ }
+ }
+
+ if (auto sourceSettings = maybeSettings.Maybe<TS3SourceSettings>()) {
+ return ConstructNodeForPushDownLimit(take, maybeSource.Cast(), sourceSettings.Cast(), count, ctx);
+ }
+ auto parseSettings = maybeSettings.Maybe<TS3ParseSettings>();
+ YQL_ENSURE(parseSettings, "unsupported type derived from TS3SourceSettingsBase");
+ return ConstructNodeForPushDownLimit(take, maybeSource.Cast(), parseSettings.Cast(), count, ctx);
+ }
+
private:
const TS3State::TPtr State_;
};
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp
index 6c7176fa0e6..f3eefa4f676 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp
@@ -7,6 +7,7 @@
#include <ydb/library/yql/dq/opt/dq_opt_phy.h>
#include <ydb/library/yql/providers/common/transform/yql_optimize.h>
#include <ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.h>
+#include <ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.h>
#include <util/generic/size_literals.h>
@@ -97,7 +98,6 @@ public:
AddHandler(0, &TS3WriteObject::Match, HNDL(S3WriteObject));
}
AddHandler(0, &TS3Insert::Match, HNDL(S3Insert));
- AddHandler(0, &TCoTake::Match, HNDL(PushDownLimit));
#undef HNDL
}
@@ -112,49 +112,6 @@ public:
return TExprBase(maybeRead.Cast().World().Ptr());
}
- TMaybeNode<TExprBase> PushDownLimit(TExprBase node, TExprContext& ctx) const {
- auto take = node.Cast<TCoTake>();
-
- auto maybeReadObject = take
- .Input().Maybe<TCoRight>()
- .Input().Maybe<TS3ReadObject>();
- if (!maybeReadObject) {
- return node;
- }
-
- auto maybeCount = take.Count().Maybe<TCoUint64>();
- if (!maybeCount) {
- return node;
- }
- auto count = maybeCount.Cast();
- auto countNum = FromString<ui64>(count.Literal().Ref().Content());
- YQL_ENSURE(countNum > 0, "Got invalid limit " << countNum << " to push down");
-
- auto object = maybeReadObject.Cast().Object();
- if (auto rowsLimitHintStr = object.RowsLimitHint().Ref().Content(); !rowsLimitHintStr.empty()) {
- // LimitHint is already pushed down
- auto rowsLimitHint = FromString<ui64>(rowsLimitHintStr);
- if (countNum >= rowsLimitHint) {
- // Already propagated
- return node;
- }
- }
-
- return Build<TCoTake>(ctx, take.Pos())
- .InitFrom(take)
- .Input<TCoRight>()
- .Input<TS3ReadObject>()
- .InitFrom(maybeReadObject.Cast())
- .Object()
- .InitFrom(object)
- .RowsLimitHint(count.Literal())
- .Build()
- .Build()
- .Build()
- .Count(count.Ptr())
- .Done();
- }
-
TMaybe<TDqStage> BuildSinkStage(TPositionHandle writePos, TS3DataSink dataSink, TS3Target target, TExprBase input, TExprContext& ctx, const TGetParents& getParents) const {
const auto& cluster = dataSink.Cluster().StringValue();
const auto token = "cluster:default_" + cluster;
diff --git a/ydb/tests/fq/s3/test_push_down.py b/ydb/tests/fq/s3/test_push_down.py
new file mode 100644
index 00000000000..9d90e232705
--- /dev/null
+++ b/ydb/tests/fq/s3/test_push_down.py
@@ -0,0 +1,74 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import boto3
+import json
+import pytest
+
+import ydb.public.api.protos.draft.fq_pb2 as fq
+
+from ydb.tests.tools.fq_runner.kikimr_utils import yq_all
+
+BUCKET_NAME = 'fbucket'
+
+
+class TestS3PushDown:
+ @staticmethod
+ def create_s3_client(s3):
+ resource = boto3.resource(
+ 's3',
+ endpoint_url=s3.s3_url,
+ aws_access_key_id='key',
+ aws_secret_access_key='secret_key'
+ )
+
+ bucket = resource.Bucket(BUCKET_NAME)
+ bucket.create(ACL='public-read')
+ bucket.objects.all().delete()
+
+ return boto3.client(
+ 's3',
+ endpoint_url=s3.s3_url,
+ aws_access_key_id='key',
+ aws_secret_access_key='secret_key',
+ )
+
+ @yq_all
+ @pytest.mark.parametrize('client', [{'folder_id': 'my_folder'}], indirect=True)
+ def test_simple_case(self, kikimr, s3, client, yq_version):
+ file_head = 'Fruit,Price,Weight\n'
+ file_rows = [file_head]
+ for i in range(0, 10_000):
+ file_rows.append(f'Fruit#{i + 1},{i},{i + 3}\n')
+ file_content = ''.join(file_rows)
+ file_size = len(file_content)
+
+ s3_client = TestS3PushDown.create_s3_client(s3)
+ s3_client.put_object(Body=file_content, Bucket=BUCKET_NAME, Key='fruits.csv')
+
+ kikimr.control_plane.wait_bootstrap(1)
+ client.create_storage_connection('fruitbucket', 'fbucket')
+
+ sql = '''
+ select *
+ from fruitbucket.`fruits.csv`
+ with (format=csv_with_names, schema (
+ Fruit String not null,
+ Price Int not null,
+ Weight Int not null
+ ))
+ limit 1;
+ '''
+
+ query_id = client.create_query('test', sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
+ client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)
+
+ stat = json.loads(client.describe_query(query_id).result.query.statistics.json)
+ ingress_bytes = None
+ if yq_version == 'v1':
+ ingress_bytes = stat['Graph=0']['IngressBytes']
+ else:
+ assert yq_version == 'v2'
+ ingress_bytes = stat['ResultSet']['IngressBytes']
+
+ assert ingress_bytes['sum'] < file_size, f'loaded too much for a pushed down query: {json.dumps(stat)}'
diff --git a/ydb/tests/fq/s3/ya.make b/ydb/tests/fq/s3/ya.make
index 3c679fd81ea..4b1eca093ea 100644
--- a/ydb/tests/fq/s3/ya.make
+++ b/ydb/tests/fq/s3/ya.make
@@ -28,6 +28,7 @@ TEST_SRCS(
test_formats.py
test_inflight.py
test_insert.py
+ test_push_down.py
test_s3.py
test_size_limit.py
test_statistics.py