diff options
author | bbiff <bbiff@yandex-team.com> | 2022-08-24 16:44:16 +0300 |
---|---|---|
committer | bbiff <bbiff@yandex-team.com> | 2022-08-24 16:44:16 +0300 |
commit | 7eeb61811ba1588f20d70abeadc7b9308dd785dd (patch) | |
tree | 71e1f4bcfd0bef1563239e2bea85d8de0e5eb275 | |
parent | 653146b941fae8faaf0cf733e371816e7c69e7d2 (diff) | |
download | ydb-7eeb61811ba1588f20d70abeadc7b9308dd785dd.tar.gz |
added limit parameter
7 files changed, 353 insertions, 22 deletions
diff --git a/util/generic/function_ref.h b/util/generic/function_ref.h new file mode 100644 index 0000000000..c55de8694c --- /dev/null +++ b/util/generic/function_ref.h @@ -0,0 +1,122 @@ +#pragma once + +#include <util/generic/function.h> +#include <util/system/yassert.h> + +#include <functional> + +template <typename Signature> +class TFunctionRef; + +template <typename Ret, typename... Args, bool IsNoexcept> +class TFunctionRef<Ret(Args...) noexcept(IsNoexcept)> { +public: + using TSignature = Ret(Args...) noexcept(IsNoexcept); + +private: + union TErasedCallable { + const void* Functor; + void (*Function)(); + }; + using TProxy = Ret (*)(TErasedCallable callable, Args...); + + // Making this a lambda inside TFunctionRef ctor caused: + // "error: cannot compile this forwarded non-trivially copyable parameter yet" + // on clang-win-i686-release. + // + // Using correct noexcept specifiers here (noexcept(IsNoexcept)) caused miscompilation on clang: + // https://github.com/llvm/llvm-project/issues/55280. + template <typename Functor> + static Ret InvokeErasedFunctor(TErasedCallable callable, Args... args) { + auto& ref = *static_cast<const std::remove_reference_t<Functor>*>(callable.Functor); + return static_cast<Ret>(std::invoke(ref, std::forward<Args>(args)...)); + } + + template <typename Function> + static Ret InvokeErasedFunction(TErasedCallable callable, Args... args) { + auto* function = reinterpret_cast<Function*>(callable.Function); + return static_cast<Ret>(std::invoke(function, std::forward<Args>(args)...)); + } + + template <class F> + static constexpr bool IsInvocableUsing = std::conditional_t< + IsNoexcept, + std::is_nothrow_invocable_r<Ret, F, Args...>, + std::is_invocable_r<Ret, F, Args...>>::value; + + // clang-format off + template <class Callable> + static constexpr bool IsSuitableFunctor = + IsInvocableUsing<Callable> + && !std::is_function_v<Callable> + && !std::is_same_v<std::remove_cvref_t<Callable>, TFunctionRef>; + + template <class Callable> + static constexpr bool IsSuitableFunction = + IsInvocableUsing<Callable> + && std::is_function_v<Callable>; + // clang-format on + +public: + // Function ref should not be default constructible. + // While the function ref can have empty state (for example, Proxy_ == nullptr), + // It does not make sense in common usage cases. + TFunctionRef() = delete; + + // Construct function ref from a functor. + template <typename Functor, typename = std::enable_if_t<IsSuitableFunctor<Functor>>> + TFunctionRef(Functor&& functor) noexcept + : Callable_{ + .Functor = std::addressof(functor), + } + , Proxy_{InvokeErasedFunctor<Functor>} + { + } + + // Construct function ref from a function pointer. + template <typename Function, typename = std::enable_if_t<IsSuitableFunction<Function>>> + TFunctionRef(Function* function) noexcept + : Callable_{ + .Function = reinterpret_cast<void (*)()>(function), + } + , Proxy_{InvokeErasedFunction<Function>} + { + } + + // Copy ctors & assignment. + // Just copy pointers. + TFunctionRef(const TFunctionRef& rhs) noexcept = default; + TFunctionRef& operator=(const TFunctionRef& rhs) noexcept = default; + + Ret operator()(Args... args) const noexcept(IsNoexcept) { + return Proxy_(Callable_, std::forward<Args>(args)...); + } + +private: + TErasedCallable Callable_; + TProxy Proxy_ = nullptr; +}; + +namespace NPrivate { + + template <typename Callable, typename Signature = typename TCallableTraits<Callable>::TSignature> + struct TIsNothrowInvocable; + + template <typename Callable, typename Ret, typename... Args> + struct TIsNothrowInvocable<Callable, Ret(Args...)> { + static constexpr bool IsNoexcept = std::is_nothrow_invocable_r_v<Ret, Callable, Args...>; + using TSignature = Ret(Args...) noexcept(IsNoexcept); + }; + + template <typename Callable> + struct TCallableTraitsWithNoexcept { + using TSignature = typename TIsNothrowInvocable<Callable>::TSignature; + }; + +} // namespace NPrivate + +template <typename Callable> +TFunctionRef(Callable&&) -> TFunctionRef<typename NPrivate::TCallableTraitsWithNoexcept<Callable>::TSignature>; + +template <typename Function> +TFunctionRef(Function*) -> TFunctionRef<Function>; diff --git a/util/generic/function_ref_ut.cpp b/util/generic/function_ref_ut.cpp new file mode 100644 index 0000000000..59f9ae35cd --- /dev/null +++ b/util/generic/function_ref_ut.cpp @@ -0,0 +1,150 @@ +#include "function_ref.h" + +#include <library/cpp/testing/unittest/registar.h> + +Y_UNIT_TEST_SUITE(TestFunctionRef) { + template <typename Signature> + struct TTestFunction; + + template <typename Ret, typename... Args, bool IsNoexcept> + struct TTestFunction<Ret(Args...) noexcept(IsNoexcept)> { + Ret operator()(Args...) const noexcept(IsNoexcept) { + return {}; + } + }; + + Y_UNIT_TEST(NonDefaultConstructible) { + static_assert(!std::is_default_constructible_v<TFunctionRef<void()>>); + static_assert(!std::is_default_constructible_v<TFunctionRef<void() noexcept>>); + static_assert(!std::is_default_constructible_v<TFunctionRef<int(double, void********* megaptr, TTestFunction<void(int)>)>>); + } + + int F1(bool x) { + if (x) + throw 19; + return 42; + } + + int F2(bool x) noexcept { + return 42 + x; + } + + static const TTestFunction<int(bool)> C1; + static const TTestFunction<int(bool) noexcept> C2; + + Y_UNIT_TEST(Noexcept) { + static_assert(std::is_constructible_v<TFunctionRef<int(bool)>, decltype(F1)>); + static_assert(std::is_constructible_v<TFunctionRef<int(bool)>, decltype(F2)>); + static_assert(!std::is_constructible_v<TFunctionRef<int(bool) noexcept>, decltype(F1)>); + static_assert(std::is_constructible_v<TFunctionRef<int(bool) noexcept>, decltype(F2)>); + + static_assert(std::is_constructible_v<TFunctionRef<int(bool)>, decltype(C1)>); + static_assert(std::is_constructible_v<TFunctionRef<int(bool)>, decltype(C2)>); + static_assert(!std::is_constructible_v<TFunctionRef<int(bool) noexcept>, decltype(C1)>); + static_assert(std::is_constructible_v<TFunctionRef<int(bool) noexcept>, decltype(C2)>); + } + + Y_UNIT_TEST(Deduction) { + TFunctionRef ref1(F1); + TFunctionRef ref2(F2); + TFunctionRef ref3(C1); + TFunctionRef ref4(C2); + + static_assert(!std::is_nothrow_invocable_r_v<int, decltype(ref1), bool>); + static_assert(std::is_nothrow_invocable_r_v<int, decltype(ref2), bool>); + static_assert(std::is_same_v<decltype(ref1)::TSignature, int(bool)>); + static_assert(std::is_same_v<decltype(ref2)::TSignature, int(bool) noexcept>); + } + + void WithCallback(TFunctionRef<double(double, int) noexcept>); + + void Iterate(int from, int to, TFunctionRef<void(int)> callback) { + while (from < to) { + callback(from++); + } + } + + void IterateNoexcept(int from, int to, TFunctionRef<void(int) noexcept> callback) { + while (from < to) { + callback(from++); + } + } + + Y_UNIT_TEST(AsArgument) { + int sum = 0; + Iterate(0, 10, [&](int x) { sum += x; }); + UNIT_ASSERT_EQUAL(sum, 45); + + Iterate(0, 10, [&](int x) noexcept { sum += x; }); + UNIT_ASSERT_EQUAL(sum, 90); + + IterateNoexcept(0, 10, [&](int x) noexcept { sum += x; }); + UNIT_ASSERT_EQUAL(sum, 135); + + auto summer = [&](int x) { sum += x; }; + Iterate(0, 10, summer); + Iterate(0, 10, summer); + Iterate(0, 10, summer); + UNIT_ASSERT_EQUAL(sum, 270); + + TFunctionRef ref = summer; + Iterate(0, 10, ref); + UNIT_ASSERT_EQUAL(sum, 315); + } + + int GlobalSum = 0; + void AddToGlobalSum(int x) { + GlobalSum += x; + } + + Y_UNIT_TEST(FunctionPointer) { + GlobalSum = 0; + Iterate(0, 10, AddToGlobalSum); + UNIT_ASSERT_EQUAL(GlobalSum, 45); + + TFunctionRef ref1 = AddToGlobalSum; + Iterate(0, 10, ref1); + UNIT_ASSERT_EQUAL(GlobalSum, 90); + + TFunctionRef ref2{AddToGlobalSum}; + Iterate(0, 10, ref2); + UNIT_ASSERT_EQUAL(GlobalSum, 135); + } + + Y_UNIT_TEST(Reassign) { + TFunctionRef kek = [](double) { return 42; }; + kek = [](double) { return 19; }; + kek = [](int) { return 22.8; }; + } + + const char* Greet() { + return "Hello, world!"; + } + + Y_UNIT_TEST(ImplicitCasts) { + TFunctionRef<void(int)> ref = [](int x) { return x; }; + ref = [](double x) { return x; }; + ref = [](char x) { return x; }; + + TFunctionRef<int()> ref1 = [] { return 0.5; }; + ref1 = [] { return 'a'; }; + ref1 = [] { return 124u; }; + + TFunctionRef<TStringBuf()> ref2{Greet}; + } + + Y_UNIT_TEST(StatelessLambdaLifetime) { + TFunctionRef<int(int, int)> ref{[](int a, int b) { return a + b; }}; + UNIT_ASSERT_EQUAL(ref(5, 5), 10); + } + + Y_UNIT_TEST(ForwardArguments) { + char x = 'x'; + TFunctionRef<void(std::unique_ptr<int>, char&)> ref = [](std::unique_ptr<int> ptr, char& ch) { + UNIT_ASSERT_EQUAL(*ptr, 5); + ch = 'a'; + }; + ref(std::make_unique<int>(5), x); + UNIT_ASSERT_EQUAL(x, 'a'); + } +} diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp index 2ff2b767c2..ae9ccc3e1a 100644 --- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp +++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp @@ -31,7 +31,7 @@ public: PUT }; - TEasyCurl(const ::NMonitoring::TDynamicCounters::TCounterPtr& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadedBytes, TString url, IHTTPGateway::THeaders headers, EMethod method, size_t offset = 0ULL, bool withBody = false, const TCurlInitConfig& config = TCurlInitConfig()) + TEasyCurl(const ::NMonitoring::TDynamicCounters::TCounterPtr& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadedBytes, TString url, IHTTPGateway::THeaders headers, EMethod method, size_t offset = 0ULL, bool withBody = false, ui64 expectedSize = 0, const TCurlInitConfig& config = TCurlInitConfig()) : Offset(offset), Handle(curl_easy_init()), Counter(counter), DownloadedBytes(downloadedBytes), UploadedBytes(uploadedBytes) { switch (method) { @@ -44,7 +44,6 @@ public: curl_easy_setopt(Handle, CURLOPT_UPLOAD, 1L); break; } - curl_easy_setopt(Handle, CURLOPT_URL, url.c_str()); curl_easy_setopt(Handle, CURLOPT_USERAGENT, "YQ HTTP gateway"); curl_easy_setopt(Handle, CURLOPT_SSL_VERIFYPEER, 0L); @@ -57,14 +56,14 @@ public: std::bind(&curl_slist_append, std::placeholders::_1, std::bind(&TString::c_str, std::placeholders::_2))); curl_easy_setopt(Handle, CURLOPT_HTTPHEADER, Headers); } - - if (Offset) { - curl_easy_setopt(Handle, CURLOPT_RANGE, (ToString(Offset) += '-').c_str()); + TStringBuilder byteRange; + byteRange << Offset << "-"; + if (expectedSize) { + byteRange << Offset + expectedSize - 1; } - + curl_easy_setopt(Handle, CURLOPT_RANGE, byteRange.c_str()); curl_easy_setopt(Handle, EMethod::PUT == method ? CURLOPT_HEADERFUNCTION : CURLOPT_WRITEFUNCTION, &WriteMemoryCallback); curl_easy_setopt(Handle, EMethod::PUT == method ? CURLOPT_HEADERDATA :CURLOPT_WRITEDATA, static_cast<void*>(this)); - if (withBody) { curl_easy_setopt(Handle, CURLOPT_READFUNCTION, &ReadMemoryCallback); curl_easy_setopt(Handle, CURLOPT_READDATA, static_cast<void*>(this)); @@ -124,7 +123,7 @@ public: using TWeakPtr = std::weak_ptr<TEasyCurlBuffer>; TEasyCurlBuffer(const ::NMonitoring::TDynamicCounters::TCounterPtr& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadededBytes, TString url, EMethod method, TString data, IHTTPGateway::THeaders headers, size_t offset, size_t expectedSize, IHTTPGateway::TOnResult callback, IRetryPolicy<long>::IRetryState::TPtr retryState, const TCurlInitConfig& config = TCurlInitConfig()) - : TEasyCurl(counter, downloadedBytes, uploadededBytes, url, headers, method, offset, !data.empty(), std::move(config)), ExpectedSize(expectedSize), Data(std::move(data)), Input(Data), Output(Buffer), RetryState(std::move(retryState)) + : TEasyCurl(counter, downloadedBytes, uploadededBytes, url, headers, method, offset, !data.empty(), expectedSize, std::move(config)), ExpectedSize(expectedSize), Data(std::move(data)), Input(Data), Output(Buffer), RetryState(std::move(retryState)) { Output.Reserve(ExpectedSize); Callbacks.emplace(std::move(callback)); @@ -220,7 +219,7 @@ public: IHTTPGateway::TOnNewDataPart onNewData, IHTTPGateway::TOnDownloadFinish onFinish, const TCurlInitConfig& config = TCurlInitConfig()) - : TEasyCurl(counter, downloadedBytes, uploadededBytes, url, headers, EMethod::GET, offset, false, std::move(config)), OnStart(std::move(onStart)), OnNewData(std::move(onNewData)), OnFinish(std::move(onFinish)), Counter(std::make_shared<std::atomic_size_t>(0ULL)) + : TEasyCurl(counter, downloadedBytes, uploadededBytes, url, headers, EMethod::GET, offset, false, 0, std::move(config)), OnStart(std::move(onStart)), OnNewData(std::move(onNewData)), OnFinish(std::move(onFinish)), Counter(std::make_shared<std::atomic_size_t>(0ULL)) {} static TPtr Make( @@ -571,7 +570,6 @@ private: IRetryPolicy<long>::TPtr retryPolicy) final { Rps->Inc(); - if (expectedSize > MaxSimulatenousDownloadsSize) { TIssue error(TStringBuilder() << "Too big file for downloading: size " << expectedSize << ", but limit is " << MaxSimulatenousDownloadsSize); callback(TIssues{error}); diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index b66b29bf45..a5adc87876 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -128,7 +128,8 @@ public: TPathList&& paths, bool addPathIndex, ui64 startPathIndex, - const NActors::TActorId& computeActorId + const NActors::TActorId& computeActorId, + ui64 expectedSize ) : Gateway(std::move(gateway)) , HolderFactory(holderFactory) , InputIndex(inputIndex) @@ -139,6 +140,7 @@ public: , Paths(std::move(paths)) , AddPathIndex(addPathIndex) , StartPathIndex(startPathIndex) + , ExpectedSize(expectedSize) {} void Bootstrap() { @@ -146,7 +148,7 @@ public: for (size_t pathInd = 0; pathInd < Paths.size(); ++pathInd) { const TPath& path = Paths[pathInd]; Gateway->Download(Url + std::get<TString>(path), - Headers, std::get<size_t>(path), + Headers, std::min(std::get<size_t>(path), ExpectedSize), std::bind(&TS3ReadActor::OnDownloadFinished, ActorSystem, SelfId(), std::placeholders::_1, pathInd + StartPathIndex), {}, GetS3RetryPolicy()); }; } @@ -247,6 +249,7 @@ private: const TPathList Paths; const bool AddPathIndex; const ui64 StartPathIndex; + const ui64 ExpectedSize; std::queue<std::tuple<IHTTPGateway::TContent, ui64>> Blocks; }; @@ -257,6 +260,7 @@ struct TReadSpec { NDB::ColumnsWithTypeAndName Columns; NDB::FormatSettings Settings; TString Format, Compression; + ui64 ExpectedSize = 0; }; struct TRetryStuff { @@ -778,9 +782,9 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor( const auto readSpec = std::make_shared<TReadSpec>(); readSpec->Columns.resize(structType->GetMembersCount()); for (ui32 i = 0U; i < structType->GetMembersCount(); ++i) { - auto& colsumn = readSpec->Columns[i]; - colsumn.type = MetaToClickHouse(structType->GetMemberType(i), intervalUnit); - colsumn.name = structType->GetMemberName(i); + auto& column = readSpec->Columns[i]; + column.type = MetaToClickHouse(structType->GetMemberType(i), intervalUnit); + column.name = structType->GetMemberName(i); } readSpec->Format = params.GetFormat(); @@ -808,8 +812,12 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor( std::move(paths), addPathIndex, startPathIndex, readSpec, computeActorId); return {actor, actor}; } else { + ui64 expectedSize = std::numeric_limits<ui64>::max(); + if (const auto it = settings.find("expectedSize"); settings.cend() != it) + expectedSize = FromString<ui64>(it->second); + const auto actor = new TS3ReadActor(inputIndex, std::move(gateway), holderFactory, params.GetUrl(), authToken, - std::move(paths), addPathIndex, startPathIndex, computeActorId); + std::move(paths), addPathIndex, startPathIndex, computeActorId, expectedSize); return {actor, actor}; } } diff --git a/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json b/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json index 5b5c5dde1a..52c4f78149 100644 --- a/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json +++ b/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json @@ -51,7 +51,10 @@ { "Name": "TS3SourceSettings", "Base": "TS3SourceSettingsBase", - "Match": {"Type": "Callable", "Name": "S3SourceSettings"} + "Match": {"Type": "Callable", "Name": "S3SourceSettings"}, + "Children": [ + {"Index": 2, "Name": "ExpectedSize", "Type": "TCoAtom", "Optional": true} + ] }, { "Name": "TS3ParseSettings", diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp index 4dd25acf55..df10744867 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp @@ -95,7 +95,7 @@ public: } TStatus HandleS3SourceSettings(const TExprNode::TPtr& input, TExprContext& ctx) { - if (!EnsureArgsCount(*input, 2U, ctx)) { + if (!EnsureMinArgsCount(*input, 2U, ctx)) { return TStatus::Error; } @@ -250,17 +250,20 @@ public: return TStatus::Error; } + auto format = input->Child(TS3Object::idx_Format)->Content(); + if (!EnsureAtom(*input->Child(TS3Object::idx_Format), ctx) || - !NCommon::ValidateFormat(input->Child(TS3Object::idx_Format)->Content(), ctx)) + !NCommon::ValidateFormat(format, ctx)) { return TStatus::Error; } + if (input->ChildrenSize() > TS3Object::idx_Settings) { bool haveProjection = false; bool havePartitionedBy = false; auto validator = [&](TStringBuf name, const TExprNode& setting, TExprContext& ctx) { - if ((name == "compression" || name == "projection" || name == "data.interval.unit") && setting.ChildrenSize() != 2) { + if ((name == "compression" || name == "projection" || name == "data.interval.unit" || name == "readmaxbytes") && setting.ChildrenSize() != 2) { ctx.AddError(TIssue(ctx.GetPosition(setting.Pos()), TStringBuilder() << "Expected single value setting for " << name << ", but got " << setting.ChildrenSize() - 1)); return false; @@ -324,6 +327,24 @@ public: return NCommon::ValidateIntervalUnit(unit, ctx); } + if (name == "readmaxbytes") { + auto& value = setting.Tail(); + if (format != "raw") { + ctx.AddError(TIssue(ctx.GetPosition(value.Pos()), "read_max_bytes can only be used with raw format")); + return false; + } + if (!value.IsAtom()) { + if (!EnsureStringOrUtf8Type(value, ctx)) { + return false; + } + if (!value.IsCallable({"String", "Utf8"})) { + ctx.AddError(TIssue(ctx.GetPosition(value.Pos()), "read_max_bytes must be literal value")); + return false; + } + } + return true; + } + YQL_ENSURE(name == "projection"); haveProjection = true; if (!EnsureAtom(setting.Tail(), ctx)) { @@ -338,7 +359,7 @@ public: return true; }; if (!EnsureValidSettings(*input->Child(TS3Object::idx_Settings), - { "compression", "partitionedby", "projection", "data.interval.unit" }, validator, ctx)) + { "compression", "partitionedby", "projection", "data.interval.unit", "readmaxbytes" }, validator, ctx)) { return TStatus::Error; } diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp index a361d0ef6b..b979ff31ac 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp @@ -188,18 +188,41 @@ public: .Seal().Build() ); } + auto readSettings = s3ReadObject.Object().Settings().Cast().Ptr(); - return Build<TDqSourceWrap>(ctx, read->Pos()) + int expectedSizeIndex = -1; + for (size_t childInd = 0; childInd < readSettings->ChildrenSize(); ++childInd) { + if (readSettings->Child(childInd)->Head().Content() == "readmaxbytes") { + expectedSizeIndex = childInd; + break; + } + } + + if (expectedSizeIndex != -1) { + return Build<TDqSourceWrap>(ctx, read->Pos()) .Input<TS3SourceSettings>() .Paths(s3ReadObject.Object().Paths()) .Token<TCoSecureParam>() .Name().Build(token) .Build() + .ExpectedSize(readSettings->Child(expectedSizeIndex)->TailPtr()) .Build() .RowType(ExpandType(s3ReadObject.Pos(), *rowType, ctx)) .DataSource(s3ReadObject.DataSource().Cast<TCoDataSource>()) .Settings(ctx.NewList(s3ReadObject.Object().Pos(), std::move(settings))) .Done().Ptr(); + } + return Build<TDqSourceWrap>(ctx, read->Pos()) + .Input<TS3SourceSettings>() + .Paths(s3ReadObject.Object().Paths()) + .Token<TCoSecureParam>() + .Name().Build(token) + .Build() + .Build() + .RowType(ExpandType(s3ReadObject.Pos(), *rowType, ctx)) + .DataSource(s3ReadObject.DataSource().Cast<TCoDataSource>()) + .Settings(ctx.NewList(s3ReadObject.Object().Pos(), std::move(settings))) + .Done().Ptr(); } } return read; @@ -240,6 +263,12 @@ public: srcDesc.MutableSettings()->insert({TString(settings.Ref().Child(i)->Head().Content()), TString(settings.Ref().Child(i)->Tail().IsAtom() ? settings.Ref().Child(i)->Tail().Content() : settings.Ref().Child(i)->Tail().Head().Content())}); } } + } else if (const auto maySourceSettings = source.Settings().Maybe<TS3SourceSettings>()){ + const auto sourceSettings = maySourceSettings.Cast(); + auto expectedSize = sourceSettings.ExpectedSize(); + if (expectedSize.IsValid()) { + srcDesc.MutableSettings()->insert({"expectedSize", expectedSize.Cast().StringValue()}); + } } if (extraColumnsType->GetSize()) { |