diff options
author | e-zudin <e-zudin@ydb.tech> | 2023-12-06 15:54:19 +0300 |
---|---|---|
committer | e-zudin <e-zudin@ydb.tech> | 2023-12-06 16:35:59 +0300 |
commit | 1c561659d3a0f1fcc89a3cd615ee731f0e2bcf2c (patch) | |
tree | 7c8ead875b4850184b0a1c7cc5b376f0f4fce3d2 | |
parent | 94c7732c3802bdb6c527fdcc678e8ee88622530c (diff) | |
download | ydb-1c561659d3a0f1fcc89a3cd615ee731f0e2bcf2c.tar.gz |
Rework push down limit to support both v2 and v1
9 files changed, 137 insertions, 55 deletions
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index a8783794bad..e9bb49a6019 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -1863,7 +1863,8 @@ public: readers[readyReaderIndex].reset(); } if (isCancelled) { - LOG_CORO_D("RunCoroBlockArrowParserOverHttp - STOPPED ON SATURATION"); + LOG_CORO_D("RunCoroBlockArrowParserOverHttp - STOPPED ON SATURATION, downloaded " << + QueueBufferCounter->DownloadedBytes << " bytes"); break; } } diff --git a/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json b/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json index ea5b33c99dc..90df231ec91 100644 --- a/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json +++ b/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json @@ -94,8 +94,7 @@ "Children": [ {"Index": 0, "Name": "Paths", "Type": "TS3Paths"}, {"Index": 1, "Name": "Format", "Type": "TCoAtom"}, - {"Index": 2, "Name": "RowsLimitHint", "Type": "TCoAtom"}, - {"Index": 3, "Name": "Settings", "Type": "TExprBase", "Optional": true} + {"Index": 2, "Name": "Settings", "Type": "TExprBase", "Optional": true} ] }, { diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp index 364b058ebe1..bccfdca0e12 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp @@ -311,6 +311,10 @@ public: return TStatus::Error; } + if (!EnsureAtom(*input->Child(TS3SourceSettings::idx_RowsLimitHint), ctx)) { + return TStatus::Error; + } + const TTypeAnnotationNode* itemType = ctx.MakeType<TDataExprType>(EDataSlot::String); if (extraColumnsType->GetSize()) { itemType = ctx.MakeType<TTupleExprType>( @@ -510,7 +514,7 @@ public: } TStatus HandleObject(const TExprNode::TPtr& input, TExprContext& ctx) { - if (!EnsureMinMaxArgsCount(*input, 2U, 4U, ctx)) { + if (!EnsureMinMaxArgsCount(*input, 2U, 3U, ctx)) { return TStatus::Error; } @@ -524,10 +528,6 @@ public: return TStatus::Error; } - if (!EnsureAtom(*input->Child(TS3SourceSettings::idx_RowsLimitHint), ctx)) { - return TStatus::Error; - } - if (input->ChildrenSize() > TS3Object::idx_Settings) { bool haveProjection = false; bool havePartitionedBy = false; diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp index 14cfce344f5..68e5095c6a4 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp @@ -230,7 +230,7 @@ public: .Token<TCoSecureParam>() .Name().Build(token) .Build() - .RowsLimitHint(s3ReadObject.Object().RowsLimitHint()) + .RowsLimitHint(ctx.NewAtom(read->Pos(), "")) .Format(s3ReadObject.Object().Format()) .RowType(ExpandType(s3ReadObject.Pos(), *rowType, ctx)) .Settings(s3ReadObject.Object().Settings()) @@ -273,7 +273,7 @@ public: .Token<TCoSecureParam>() .Name().Build(token) .Build() - .RowsLimitHint(s3ReadObject.Object().RowsLimitHint()) + .RowsLimitHint(ctx.NewAtom(read->Pos(), "")) .SizeLimit( sizeLimitIndex != -1 ? readSettings->Child(sizeLimitIndex)->TailPtr() : emptyNode) diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp index f0f06e5fd6d..f5b85cd31c5 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp @@ -496,7 +496,6 @@ private: s3Object = Build<TS3Object>(ctx, object.Pos()) .Paths(ctx.NewList(object.Pos(), std::move(pathNodes))) .Format(std::move(format)) - .RowsLimitHint(ctx.NewAtom(object.Pos(), "")) .Settings(ctx.NewList(object.Pos(), std::move(settings))) .Done().Ptr(); diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp index fef7d446f82..fb9d3956068 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp @@ -171,6 +171,7 @@ public: AddHandler(0, &TCoExtractMembers::Match, HNDL(ExtractMembersOverDqSource)); AddHandler(0, &TDqSourceWrap::Match, HNDL(MergeS3Paths)); AddHandler(0, &TDqSourceWrap::Match, HNDL(CleanupExtraColumns)); + AddHandler(0, &TCoTake::Match, HNDL(PushDownLimit)); #undef HNDL } @@ -661,6 +662,56 @@ public: } + template <class TSettings> + TMaybeNode<TExprBase> ConstructNodeForPushDownLimit(TCoTake take, TDqSourceWrap source, TSettings settings, TCoUint64 count, TExprContext& ctx) const { + return Build<TCoTake>(ctx, take.Pos()) + .InitFrom(take) + .Input<TDqSourceWrap>() + .InitFrom(source) + .Input<TSettings>() + .InitFrom(settings) + .RowsLimitHint(count.Literal()) + .Build() + .Build() + .Done(); + } + + TMaybeNode<TExprBase> PushDownLimit(TExprBase node, TExprContext& ctx) const { + auto take = node.Cast<TCoTake>(); + + auto maybeSource = take.Input().Maybe<TDqSourceWrap>(); + auto maybeSettings = maybeSource.Input().Maybe<TS3SourceSettingsBase>(); + if (!maybeSettings) { + return take; + } + YQL_CLOG(TRACE, ProviderS3) << "Trying to push down limit for S3"; + + auto maybeCount = take.Count().Maybe<TCoUint64>(); + if (!maybeCount) { + return take; + } + auto count = maybeCount.Cast(); + auto countNum = FromString<ui64>(count.Literal().Ref().Content()); + YQL_ENSURE(countNum > 0, "Got invalid limit " << countNum << " to push down"); + + auto settings = maybeSettings.Cast(); + if (auto rowsLimitHintStr = settings.RowsLimitHint().Ref().Content(); !rowsLimitHintStr.empty()) { + // LimitHint is already pushed down + auto rowsLimitHint = FromString<ui64>(rowsLimitHintStr); + if (countNum >= rowsLimitHint) { + // Already propagated + return node; + } + } + + if (auto sourceSettings = maybeSettings.Maybe<TS3SourceSettings>()) { + return ConstructNodeForPushDownLimit(take, maybeSource.Cast(), sourceSettings.Cast(), count, ctx); + } + auto parseSettings = maybeSettings.Maybe<TS3ParseSettings>(); + YQL_ENSURE(parseSettings, "unsupported type derived from TS3SourceSettingsBase"); + return ConstructNodeForPushDownLimit(take, maybeSource.Cast(), parseSettings.Cast(), count, ctx); + } + private: const TS3State::TPtr State_; }; diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp index 6c7176fa0e6..f3eefa4f676 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp @@ -7,6 +7,7 @@ #include <ydb/library/yql/dq/opt/dq_opt_phy.h> #include <ydb/library/yql/providers/common/transform/yql_optimize.h> #include <ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.h> +#include <ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.h> #include <util/generic/size_literals.h> @@ -97,7 +98,6 @@ public: AddHandler(0, &TS3WriteObject::Match, HNDL(S3WriteObject)); } AddHandler(0, &TS3Insert::Match, HNDL(S3Insert)); - AddHandler(0, &TCoTake::Match, HNDL(PushDownLimit)); #undef HNDL } @@ -112,49 +112,6 @@ public: return TExprBase(maybeRead.Cast().World().Ptr()); } - TMaybeNode<TExprBase> PushDownLimit(TExprBase node, TExprContext& ctx) const { - auto take = node.Cast<TCoTake>(); - - auto maybeReadObject = take - .Input().Maybe<TCoRight>() - .Input().Maybe<TS3ReadObject>(); - if (!maybeReadObject) { - return node; - } - - auto maybeCount = take.Count().Maybe<TCoUint64>(); - if (!maybeCount) { - return node; - } - auto count = maybeCount.Cast(); - auto countNum = FromString<ui64>(count.Literal().Ref().Content()); - YQL_ENSURE(countNum > 0, "Got invalid limit " << countNum << " to push down"); - - auto object = maybeReadObject.Cast().Object(); - if (auto rowsLimitHintStr = object.RowsLimitHint().Ref().Content(); !rowsLimitHintStr.empty()) { - // LimitHint is already pushed down - auto rowsLimitHint = FromString<ui64>(rowsLimitHintStr); - if (countNum >= rowsLimitHint) { - // Already propagated - return node; - } - } - - return Build<TCoTake>(ctx, take.Pos()) - .InitFrom(take) - .Input<TCoRight>() - .Input<TS3ReadObject>() - .InitFrom(maybeReadObject.Cast()) - .Object() - .InitFrom(object) - .RowsLimitHint(count.Literal()) - .Build() - .Build() - .Build() - .Count(count.Ptr()) - .Done(); - } - TMaybe<TDqStage> BuildSinkStage(TPositionHandle writePos, TS3DataSink dataSink, TS3Target target, TExprBase input, TExprContext& ctx, const TGetParents& getParents) const { const auto& cluster = dataSink.Cluster().StringValue(); const auto token = "cluster:default_" + cluster; diff --git a/ydb/tests/fq/s3/test_push_down.py b/ydb/tests/fq/s3/test_push_down.py new file mode 100644 index 00000000000..9d90e232705 --- /dev/null +++ b/ydb/tests/fq/s3/test_push_down.py @@ -0,0 +1,74 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import boto3 +import json +import pytest + +import ydb.public.api.protos.draft.fq_pb2 as fq + +from ydb.tests.tools.fq_runner.kikimr_utils import yq_all + +BUCKET_NAME = 'fbucket' + + +class TestS3PushDown: + @staticmethod + def create_s3_client(s3): + resource = boto3.resource( + 's3', + endpoint_url=s3.s3_url, + aws_access_key_id='key', + aws_secret_access_key='secret_key' + ) + + bucket = resource.Bucket(BUCKET_NAME) + bucket.create(ACL='public-read') + bucket.objects.all().delete() + + return boto3.client( + 's3', + endpoint_url=s3.s3_url, + aws_access_key_id='key', + aws_secret_access_key='secret_key', + ) + + @yq_all + @pytest.mark.parametrize('client', [{'folder_id': 'my_folder'}], indirect=True) + def test_simple_case(self, kikimr, s3, client, yq_version): + file_head = 'Fruit,Price,Weight\n' + file_rows = [file_head] + for i in range(0, 10_000): + file_rows.append(f'Fruit#{i + 1},{i},{i + 3}\n') + file_content = ''.join(file_rows) + file_size = len(file_content) + + s3_client = TestS3PushDown.create_s3_client(s3) + s3_client.put_object(Body=file_content, Bucket=BUCKET_NAME, Key='fruits.csv') + + kikimr.control_plane.wait_bootstrap(1) + client.create_storage_connection('fruitbucket', 'fbucket') + + sql = ''' + select * + from fruitbucket.`fruits.csv` + with (format=csv_with_names, schema ( + Fruit String not null, + Price Int not null, + Weight Int not null + )) + limit 1; + ''' + + query_id = client.create_query('test', sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + + stat = json.loads(client.describe_query(query_id).result.query.statistics.json) + ingress_bytes = None + if yq_version == 'v1': + ingress_bytes = stat['Graph=0']['IngressBytes'] + else: + assert yq_version == 'v2' + ingress_bytes = stat['ResultSet']['IngressBytes'] + + assert ingress_bytes['sum'] < file_size, f'loaded too much for a pushed down query: {json.dumps(stat)}' diff --git a/ydb/tests/fq/s3/ya.make b/ydb/tests/fq/s3/ya.make index 3c679fd81ea..4b1eca093ea 100644 --- a/ydb/tests/fq/s3/ya.make +++ b/ydb/tests/fq/s3/ya.make @@ -28,6 +28,7 @@ TEST_SRCS( test_formats.py test_inflight.py test_insert.py + test_push_down.py test_s3.py test_size_limit.py test_statistics.py |