aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorbbiff <bbiff@yandex-team.com>2022-08-24 16:44:16 +0300
committerbbiff <bbiff@yandex-team.com>2022-08-24 16:44:16 +0300
commit7eeb61811ba1588f20d70abeadc7b9308dd785dd (patch)
tree71e1f4bcfd0bef1563239e2bea85d8de0e5eb275
parent653146b941fae8faaf0cf733e371816e7c69e7d2 (diff)
downloadydb-7eeb61811ba1588f20d70abeadc7b9308dd785dd.tar.gz
added limit parameter
-rw-r--r--util/generic/function_ref.h122
-rw-r--r--util/generic/function_ref_ut.cpp150
-rw-r--r--ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp18
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp20
-rw-r--r--ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json5
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp29
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp31
7 files changed, 353 insertions, 22 deletions
diff --git a/util/generic/function_ref.h b/util/generic/function_ref.h
new file mode 100644
index 0000000000..c55de8694c
--- /dev/null
+++ b/util/generic/function_ref.h
@@ -0,0 +1,122 @@
+#pragma once
+
+#include <util/generic/function.h>
+#include <util/system/yassert.h>
+
+#include <functional>
+
+template <typename Signature>
+class TFunctionRef;
+
+template <typename Ret, typename... Args, bool IsNoexcept>
+class TFunctionRef<Ret(Args...) noexcept(IsNoexcept)> {
+public:
+ using TSignature = Ret(Args...) noexcept(IsNoexcept);
+
+private:
+ union TErasedCallable {
+ const void* Functor;
+ void (*Function)();
+ };
+ using TProxy = Ret (*)(TErasedCallable callable, Args...);
+
+ // Making this a lambda inside TFunctionRef ctor caused:
+ // "error: cannot compile this forwarded non-trivially copyable parameter yet"
+ // on clang-win-i686-release.
+ //
+ // Using correct noexcept specifiers here (noexcept(IsNoexcept)) caused miscompilation on clang:
+ // https://github.com/llvm/llvm-project/issues/55280.
+ template <typename Functor>
+ static Ret InvokeErasedFunctor(TErasedCallable callable, Args... args) {
+ auto& ref = *static_cast<const std::remove_reference_t<Functor>*>(callable.Functor);
+ return static_cast<Ret>(std::invoke(ref, std::forward<Args>(args)...));
+ }
+
+ template <typename Function>
+ static Ret InvokeErasedFunction(TErasedCallable callable, Args... args) {
+ auto* function = reinterpret_cast<Function*>(callable.Function);
+ return static_cast<Ret>(std::invoke(function, std::forward<Args>(args)...));
+ }
+
+ template <class F>
+ static constexpr bool IsInvocableUsing = std::conditional_t<
+ IsNoexcept,
+ std::is_nothrow_invocable_r<Ret, F, Args...>,
+ std::is_invocable_r<Ret, F, Args...>>::value;
+
+ // clang-format off
+ template <class Callable>
+ static constexpr bool IsSuitableFunctor =
+ IsInvocableUsing<Callable>
+ && !std::is_function_v<Callable>
+ && !std::is_same_v<std::remove_cvref_t<Callable>, TFunctionRef>;
+
+ template <class Callable>
+ static constexpr bool IsSuitableFunction =
+ IsInvocableUsing<Callable>
+ && std::is_function_v<Callable>;
+ // clang-format on
+
+public:
+ // Function ref should not be default constructible.
+ // While the function ref can have empty state (for example, Proxy_ == nullptr),
+ // It does not make sense in common usage cases.
+ TFunctionRef() = delete;
+
+ // Construct function ref from a functor.
+ template <typename Functor, typename = std::enable_if_t<IsSuitableFunctor<Functor>>>
+ TFunctionRef(Functor&& functor) noexcept
+ : Callable_{
+ .Functor = std::addressof(functor),
+ }
+ , Proxy_{InvokeErasedFunctor<Functor>}
+ {
+ }
+
+ // Construct function ref from a function pointer.
+ template <typename Function, typename = std::enable_if_t<IsSuitableFunction<Function>>>
+ TFunctionRef(Function* function) noexcept
+ : Callable_{
+ .Function = reinterpret_cast<void (*)()>(function),
+ }
+ , Proxy_{InvokeErasedFunction<Function>}
+ {
+ }
+
+ // Copy ctors & assignment.
+ // Just copy pointers.
+ TFunctionRef(const TFunctionRef& rhs) noexcept = default;
+ TFunctionRef& operator=(const TFunctionRef& rhs) noexcept = default;
+
+ Ret operator()(Args... args) const noexcept(IsNoexcept) {
+ return Proxy_(Callable_, std::forward<Args>(args)...);
+ }
+
+private:
+ TErasedCallable Callable_;
+ TProxy Proxy_ = nullptr;
+};
+
+namespace NPrivate {
+
+ template <typename Callable, typename Signature = typename TCallableTraits<Callable>::TSignature>
+ struct TIsNothrowInvocable;
+
+ template <typename Callable, typename Ret, typename... Args>
+ struct TIsNothrowInvocable<Callable, Ret(Args...)> {
+ static constexpr bool IsNoexcept = std::is_nothrow_invocable_r_v<Ret, Callable, Args...>;
+ using TSignature = Ret(Args...) noexcept(IsNoexcept);
+ };
+
+ template <typename Callable>
+ struct TCallableTraitsWithNoexcept {
+ using TSignature = typename TIsNothrowInvocable<Callable>::TSignature;
+ };
+
+} // namespace NPrivate
+
+template <typename Callable>
+TFunctionRef(Callable&&) -> TFunctionRef<typename NPrivate::TCallableTraitsWithNoexcept<Callable>::TSignature>;
+
+template <typename Function>
+TFunctionRef(Function*) -> TFunctionRef<Function>;
diff --git a/util/generic/function_ref_ut.cpp b/util/generic/function_ref_ut.cpp
new file mode 100644
index 0000000000..59f9ae35cd
--- /dev/null
+++ b/util/generic/function_ref_ut.cpp
@@ -0,0 +1,150 @@
+#include "function_ref.h"
+
+#include <library/cpp/testing/unittest/registar.h>
+
+Y_UNIT_TEST_SUITE(TestFunctionRef) {
+ template <typename Signature>
+ struct TTestFunction;
+
+ template <typename Ret, typename... Args, bool IsNoexcept>
+ struct TTestFunction<Ret(Args...) noexcept(IsNoexcept)> {
+ Ret operator()(Args...) const noexcept(IsNoexcept) {
+ return {};
+ }
+ };
+
+ Y_UNIT_TEST(NonDefaultConstructible) {
+ static_assert(!std::is_default_constructible_v<TFunctionRef<void()>>);
+ static_assert(!std::is_default_constructible_v<TFunctionRef<void() noexcept>>);
+ static_assert(!std::is_default_constructible_v<TFunctionRef<int(double, void********* megaptr, TTestFunction<void(int)>)>>);
+ }
+
+ int F1(bool x) {
+ if (x)
+ throw 19;
+ return 42;
+ }
+
+ int F2(bool x) noexcept {
+ return 42 + x;
+ }
+
+ static const TTestFunction<int(bool)> C1;
+ static const TTestFunction<int(bool) noexcept> C2;
+
+ Y_UNIT_TEST(Noexcept) {
+ static_assert(std::is_constructible_v<TFunctionRef<int(bool)>, decltype(F1)>);
+ static_assert(std::is_constructible_v<TFunctionRef<int(bool)>, decltype(F2)>);
+ static_assert(!std::is_constructible_v<TFunctionRef<int(bool) noexcept>, decltype(F1)>);
+ static_assert(std::is_constructible_v<TFunctionRef<int(bool) noexcept>, decltype(F2)>);
+
+ static_assert(std::is_constructible_v<TFunctionRef<int(bool)>, decltype(C1)>);
+ static_assert(std::is_constructible_v<TFunctionRef<int(bool)>, decltype(C2)>);
+ static_assert(!std::is_constructible_v<TFunctionRef<int(bool) noexcept>, decltype(C1)>);
+ static_assert(std::is_constructible_v<TFunctionRef<int(bool) noexcept>, decltype(C2)>);
+ }
+
+ Y_UNIT_TEST(Deduction) {
+ TFunctionRef ref1(F1);
+ TFunctionRef ref2(F2);
+ TFunctionRef ref3(C1);
+ TFunctionRef ref4(C2);
+
+ static_assert(!std::is_nothrow_invocable_r_v<int, decltype(ref1), bool>);
+ static_assert(std::is_nothrow_invocable_r_v<int, decltype(ref2), bool>);
+ static_assert(std::is_same_v<decltype(ref1)::TSignature, int(bool)>);
+ static_assert(std::is_same_v<decltype(ref2)::TSignature, int(bool) noexcept>);
+ }
+
+ void WithCallback(TFunctionRef<double(double, int) noexcept>);
+
+ void Iterate(int from, int to, TFunctionRef<void(int)> callback) {
+ while (from < to) {
+ callback(from++);
+ }
+ }
+
+ void IterateNoexcept(int from, int to, TFunctionRef<void(int) noexcept> callback) {
+ while (from < to) {
+ callback(from++);
+ }
+ }
+
+ Y_UNIT_TEST(AsArgument) {
+ int sum = 0;
+ Iterate(0, 10, [&](int x) { sum += x; });
+ UNIT_ASSERT_EQUAL(sum, 45);
+
+ Iterate(0, 10, [&](int x) noexcept { sum += x; });
+ UNIT_ASSERT_EQUAL(sum, 90);
+
+ IterateNoexcept(0, 10, [&](int x) noexcept { sum += x; });
+ UNIT_ASSERT_EQUAL(sum, 135);
+
+ auto summer = [&](int x) { sum += x; };
+ Iterate(0, 10, summer);
+ Iterate(0, 10, summer);
+ Iterate(0, 10, summer);
+ UNIT_ASSERT_EQUAL(sum, 270);
+
+ TFunctionRef ref = summer;
+ Iterate(0, 10, ref);
+ UNIT_ASSERT_EQUAL(sum, 315);
+ }
+
+ int GlobalSum = 0;
+ void AddToGlobalSum(int x) {
+ GlobalSum += x;
+ }
+
+ Y_UNIT_TEST(FunctionPointer) {
+ GlobalSum = 0;
+ Iterate(0, 10, AddToGlobalSum);
+ UNIT_ASSERT_EQUAL(GlobalSum, 45);
+
+ TFunctionRef ref1 = AddToGlobalSum;
+ Iterate(0, 10, ref1);
+ UNIT_ASSERT_EQUAL(GlobalSum, 90);
+
+ TFunctionRef ref2{AddToGlobalSum};
+ Iterate(0, 10, ref2);
+ UNIT_ASSERT_EQUAL(GlobalSum, 135);
+ }
+
+ Y_UNIT_TEST(Reassign) {
+ TFunctionRef kek = [](double) { return 42; };
+ kek = [](double) { return 19; };
+ kek = [](int) { return 22.8; };
+ }
+
+ const char* Greet() {
+ return "Hello, world!";
+ }
+
+ Y_UNIT_TEST(ImplicitCasts) {
+ TFunctionRef<void(int)> ref = [](int x) { return x; };
+ ref = [](double x) { return x; };
+ ref = [](char x) { return x; };
+
+ TFunctionRef<int()> ref1 = [] { return 0.5; };
+ ref1 = [] { return 'a'; };
+ ref1 = [] { return 124u; };
+
+ TFunctionRef<TStringBuf()> ref2{Greet};
+ }
+
+ Y_UNIT_TEST(StatelessLambdaLifetime) {
+ TFunctionRef<int(int, int)> ref{[](int a, int b) { return a + b; }};
+ UNIT_ASSERT_EQUAL(ref(5, 5), 10);
+ }
+
+ Y_UNIT_TEST(ForwardArguments) {
+ char x = 'x';
+ TFunctionRef<void(std::unique_ptr<int>, char&)> ref = [](std::unique_ptr<int> ptr, char& ch) {
+ UNIT_ASSERT_EQUAL(*ptr, 5);
+ ch = 'a';
+ };
+ ref(std::make_unique<int>(5), x);
+ UNIT_ASSERT_EQUAL(x, 'a');
+ }
+}
diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
index 2ff2b767c2..ae9ccc3e1a 100644
--- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
+++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
@@ -31,7 +31,7 @@ public:
PUT
};
- TEasyCurl(const ::NMonitoring::TDynamicCounters::TCounterPtr& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadedBytes, TString url, IHTTPGateway::THeaders headers, EMethod method, size_t offset = 0ULL, bool withBody = false, const TCurlInitConfig& config = TCurlInitConfig())
+ TEasyCurl(const ::NMonitoring::TDynamicCounters::TCounterPtr& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadedBytes, TString url, IHTTPGateway::THeaders headers, EMethod method, size_t offset = 0ULL, bool withBody = false, ui64 expectedSize = 0, const TCurlInitConfig& config = TCurlInitConfig())
: Offset(offset), Handle(curl_easy_init()), Counter(counter), DownloadedBytes(downloadedBytes), UploadedBytes(uploadedBytes)
{
switch (method) {
@@ -44,7 +44,6 @@ public:
curl_easy_setopt(Handle, CURLOPT_UPLOAD, 1L);
break;
}
-
curl_easy_setopt(Handle, CURLOPT_URL, url.c_str());
curl_easy_setopt(Handle, CURLOPT_USERAGENT, "YQ HTTP gateway");
curl_easy_setopt(Handle, CURLOPT_SSL_VERIFYPEER, 0L);
@@ -57,14 +56,14 @@ public:
std::bind(&curl_slist_append, std::placeholders::_1, std::bind(&TString::c_str, std::placeholders::_2)));
curl_easy_setopt(Handle, CURLOPT_HTTPHEADER, Headers);
}
-
- if (Offset) {
- curl_easy_setopt(Handle, CURLOPT_RANGE, (ToString(Offset) += '-').c_str());
+ TStringBuilder byteRange;
+ byteRange << Offset << "-";
+ if (expectedSize) {
+ byteRange << Offset + expectedSize - 1;
}
-
+ curl_easy_setopt(Handle, CURLOPT_RANGE, byteRange.c_str());
curl_easy_setopt(Handle, EMethod::PUT == method ? CURLOPT_HEADERFUNCTION : CURLOPT_WRITEFUNCTION, &WriteMemoryCallback);
curl_easy_setopt(Handle, EMethod::PUT == method ? CURLOPT_HEADERDATA :CURLOPT_WRITEDATA, static_cast<void*>(this));
-
if (withBody) {
curl_easy_setopt(Handle, CURLOPT_READFUNCTION, &ReadMemoryCallback);
curl_easy_setopt(Handle, CURLOPT_READDATA, static_cast<void*>(this));
@@ -124,7 +123,7 @@ public:
using TWeakPtr = std::weak_ptr<TEasyCurlBuffer>;
TEasyCurlBuffer(const ::NMonitoring::TDynamicCounters::TCounterPtr& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadededBytes, TString url, EMethod method, TString data, IHTTPGateway::THeaders headers, size_t offset, size_t expectedSize, IHTTPGateway::TOnResult callback, IRetryPolicy<long>::IRetryState::TPtr retryState, const TCurlInitConfig& config = TCurlInitConfig())
- : TEasyCurl(counter, downloadedBytes, uploadededBytes, url, headers, method, offset, !data.empty(), std::move(config)), ExpectedSize(expectedSize), Data(std::move(data)), Input(Data), Output(Buffer), RetryState(std::move(retryState))
+ : TEasyCurl(counter, downloadedBytes, uploadededBytes, url, headers, method, offset, !data.empty(), expectedSize, std::move(config)), ExpectedSize(expectedSize), Data(std::move(data)), Input(Data), Output(Buffer), RetryState(std::move(retryState))
{
Output.Reserve(ExpectedSize);
Callbacks.emplace(std::move(callback));
@@ -220,7 +219,7 @@ public:
IHTTPGateway::TOnNewDataPart onNewData,
IHTTPGateway::TOnDownloadFinish onFinish,
const TCurlInitConfig& config = TCurlInitConfig())
- : TEasyCurl(counter, downloadedBytes, uploadededBytes, url, headers, EMethod::GET, offset, false, std::move(config)), OnStart(std::move(onStart)), OnNewData(std::move(onNewData)), OnFinish(std::move(onFinish)), Counter(std::make_shared<std::atomic_size_t>(0ULL))
+ : TEasyCurl(counter, downloadedBytes, uploadededBytes, url, headers, EMethod::GET, offset, false, 0, std::move(config)), OnStart(std::move(onStart)), OnNewData(std::move(onNewData)), OnFinish(std::move(onFinish)), Counter(std::make_shared<std::atomic_size_t>(0ULL))
{}
static TPtr Make(
@@ -571,7 +570,6 @@ private:
IRetryPolicy<long>::TPtr retryPolicy) final
{
Rps->Inc();
-
if (expectedSize > MaxSimulatenousDownloadsSize) {
TIssue error(TStringBuilder() << "Too big file for downloading: size " << expectedSize << ", but limit is " << MaxSimulatenousDownloadsSize);
callback(TIssues{error});
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
index b66b29bf45..a5adc87876 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
@@ -128,7 +128,8 @@ public:
TPathList&& paths,
bool addPathIndex,
ui64 startPathIndex,
- const NActors::TActorId& computeActorId
+ const NActors::TActorId& computeActorId,
+ ui64 expectedSize
) : Gateway(std::move(gateway))
, HolderFactory(holderFactory)
, InputIndex(inputIndex)
@@ -139,6 +140,7 @@ public:
, Paths(std::move(paths))
, AddPathIndex(addPathIndex)
, StartPathIndex(startPathIndex)
+ , ExpectedSize(expectedSize)
{}
void Bootstrap() {
@@ -146,7 +148,7 @@ public:
for (size_t pathInd = 0; pathInd < Paths.size(); ++pathInd) {
const TPath& path = Paths[pathInd];
Gateway->Download(Url + std::get<TString>(path),
- Headers, std::get<size_t>(path),
+ Headers, std::min(std::get<size_t>(path), ExpectedSize),
std::bind(&TS3ReadActor::OnDownloadFinished, ActorSystem, SelfId(), std::placeholders::_1, pathInd + StartPathIndex), {}, GetS3RetryPolicy());
};
}
@@ -247,6 +249,7 @@ private:
const TPathList Paths;
const bool AddPathIndex;
const ui64 StartPathIndex;
+ const ui64 ExpectedSize;
std::queue<std::tuple<IHTTPGateway::TContent, ui64>> Blocks;
};
@@ -257,6 +260,7 @@ struct TReadSpec {
NDB::ColumnsWithTypeAndName Columns;
NDB::FormatSettings Settings;
TString Format, Compression;
+ ui64 ExpectedSize = 0;
};
struct TRetryStuff {
@@ -778,9 +782,9 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
const auto readSpec = std::make_shared<TReadSpec>();
readSpec->Columns.resize(structType->GetMembersCount());
for (ui32 i = 0U; i < structType->GetMembersCount(); ++i) {
- auto& colsumn = readSpec->Columns[i];
- colsumn.type = MetaToClickHouse(structType->GetMemberType(i), intervalUnit);
- colsumn.name = structType->GetMemberName(i);
+ auto& column = readSpec->Columns[i];
+ column.type = MetaToClickHouse(structType->GetMemberType(i), intervalUnit);
+ column.name = structType->GetMemberName(i);
}
readSpec->Format = params.GetFormat();
@@ -808,8 +812,12 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
std::move(paths), addPathIndex, startPathIndex, readSpec, computeActorId);
return {actor, actor};
} else {
+ ui64 expectedSize = std::numeric_limits<ui64>::max();
+ if (const auto it = settings.find("expectedSize"); settings.cend() != it)
+ expectedSize = FromString<ui64>(it->second);
+
const auto actor = new TS3ReadActor(inputIndex, std::move(gateway), holderFactory, params.GetUrl(), authToken,
- std::move(paths), addPathIndex, startPathIndex, computeActorId);
+ std::move(paths), addPathIndex, startPathIndex, computeActorId, expectedSize);
return {actor, actor};
}
}
diff --git a/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json b/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json
index 5b5c5dde1a..52c4f78149 100644
--- a/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json
+++ b/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json
@@ -51,7 +51,10 @@
{
"Name": "TS3SourceSettings",
"Base": "TS3SourceSettingsBase",
- "Match": {"Type": "Callable", "Name": "S3SourceSettings"}
+ "Match": {"Type": "Callable", "Name": "S3SourceSettings"},
+ "Children": [
+ {"Index": 2, "Name": "ExpectedSize", "Type": "TCoAtom", "Optional": true}
+ ]
},
{
"Name": "TS3ParseSettings",
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp
index 4dd25acf55..df10744867 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp
@@ -95,7 +95,7 @@ public:
}
TStatus HandleS3SourceSettings(const TExprNode::TPtr& input, TExprContext& ctx) {
- if (!EnsureArgsCount(*input, 2U, ctx)) {
+ if (!EnsureMinArgsCount(*input, 2U, ctx)) {
return TStatus::Error;
}
@@ -250,17 +250,20 @@ public:
return TStatus::Error;
}
+ auto format = input->Child(TS3Object::idx_Format)->Content();
+
if (!EnsureAtom(*input->Child(TS3Object::idx_Format), ctx) ||
- !NCommon::ValidateFormat(input->Child(TS3Object::idx_Format)->Content(), ctx))
+ !NCommon::ValidateFormat(format, ctx))
{
return TStatus::Error;
}
+
if (input->ChildrenSize() > TS3Object::idx_Settings) {
bool haveProjection = false;
bool havePartitionedBy = false;
auto validator = [&](TStringBuf name, const TExprNode& setting, TExprContext& ctx) {
- if ((name == "compression" || name == "projection" || name == "data.interval.unit") && setting.ChildrenSize() != 2) {
+ if ((name == "compression" || name == "projection" || name == "data.interval.unit" || name == "readmaxbytes") && setting.ChildrenSize() != 2) {
ctx.AddError(TIssue(ctx.GetPosition(setting.Pos()),
TStringBuilder() << "Expected single value setting for " << name << ", but got " << setting.ChildrenSize() - 1));
return false;
@@ -324,6 +327,24 @@ public:
return NCommon::ValidateIntervalUnit(unit, ctx);
}
+ if (name == "readmaxbytes") {
+ auto& value = setting.Tail();
+ if (format != "raw") {
+ ctx.AddError(TIssue(ctx.GetPosition(value.Pos()), "read_max_bytes can only be used with raw format"));
+ return false;
+ }
+ if (!value.IsAtom()) {
+ if (!EnsureStringOrUtf8Type(value, ctx)) {
+ return false;
+ }
+ if (!value.IsCallable({"String", "Utf8"})) {
+ ctx.AddError(TIssue(ctx.GetPosition(value.Pos()), "read_max_bytes must be literal value"));
+ return false;
+ }
+ }
+ return true;
+ }
+
YQL_ENSURE(name == "projection");
haveProjection = true;
if (!EnsureAtom(setting.Tail(), ctx)) {
@@ -338,7 +359,7 @@ public:
return true;
};
if (!EnsureValidSettings(*input->Child(TS3Object::idx_Settings),
- { "compression", "partitionedby", "projection", "data.interval.unit" }, validator, ctx))
+ { "compression", "partitionedby", "projection", "data.interval.unit", "readmaxbytes" }, validator, ctx))
{
return TStatus::Error;
}
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
index a361d0ef6b..b979ff31ac 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
@@ -188,18 +188,41 @@ public:
.Seal().Build()
);
}
+ auto readSettings = s3ReadObject.Object().Settings().Cast().Ptr();
- return Build<TDqSourceWrap>(ctx, read->Pos())
+ int expectedSizeIndex = -1;
+ for (size_t childInd = 0; childInd < readSettings->ChildrenSize(); ++childInd) {
+ if (readSettings->Child(childInd)->Head().Content() == "readmaxbytes") {
+ expectedSizeIndex = childInd;
+ break;
+ }
+ }
+
+ if (expectedSizeIndex != -1) {
+ return Build<TDqSourceWrap>(ctx, read->Pos())
.Input<TS3SourceSettings>()
.Paths(s3ReadObject.Object().Paths())
.Token<TCoSecureParam>()
.Name().Build(token)
.Build()
+ .ExpectedSize(readSettings->Child(expectedSizeIndex)->TailPtr())
.Build()
.RowType(ExpandType(s3ReadObject.Pos(), *rowType, ctx))
.DataSource(s3ReadObject.DataSource().Cast<TCoDataSource>())
.Settings(ctx.NewList(s3ReadObject.Object().Pos(), std::move(settings)))
.Done().Ptr();
+ }
+ return Build<TDqSourceWrap>(ctx, read->Pos())
+ .Input<TS3SourceSettings>()
+ .Paths(s3ReadObject.Object().Paths())
+ .Token<TCoSecureParam>()
+ .Name().Build(token)
+ .Build()
+ .Build()
+ .RowType(ExpandType(s3ReadObject.Pos(), *rowType, ctx))
+ .DataSource(s3ReadObject.DataSource().Cast<TCoDataSource>())
+ .Settings(ctx.NewList(s3ReadObject.Object().Pos(), std::move(settings)))
+ .Done().Ptr();
}
}
return read;
@@ -240,6 +263,12 @@ public:
srcDesc.MutableSettings()->insert({TString(settings.Ref().Child(i)->Head().Content()), TString(settings.Ref().Child(i)->Tail().IsAtom() ? settings.Ref().Child(i)->Tail().Content() : settings.Ref().Child(i)->Tail().Head().Content())});
}
}
+ } else if (const auto maySourceSettings = source.Settings().Maybe<TS3SourceSettings>()){
+ const auto sourceSettings = maySourceSettings.Cast();
+ auto expectedSize = sourceSettings.ExpectedSize();
+ if (expectedSize.IsValid()) {
+ srcDesc.MutableSettings()->insert({"expectedSize", expectedSize.Cast().StringValue()});
+ }
}
if (extraColumnsType->GetSize()) {