aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhcpp <hcpp@ydb.tech>2022-09-07 00:04:50 +0300
committerhcpp <hcpp@ydb.tech>2022-09-07 00:04:50 +0300
commitde0b37fee68ce7ed01fa9bd894573880926acd7c (patch)
treedaefa8c0c4911df2572b748fd9f96b23fb31f499
parent636ec676a65d8a309a4cf03a90ed3d7c9115f990 (diff)
downloadydb-de0b37fee68ce7ed01fa9bd894573880926acd7c.tar.gz
size limit has been added
-rw-r--r--ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp5
-rw-r--r--ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp77
-rw-r--r--ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h3
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp35
-rw-r--r--ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json2
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp14
6 files changed, 75 insertions, 61 deletions
diff --git a/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp
index 7aec0935635..95c1c0207a7 100644
--- a/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp
+++ b/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp
@@ -48,13 +48,13 @@ public:
void Download(
TString url,
THeaders headers,
- std::size_t expectedSize,
+ std::size_t sizeLimit,
TOnResult callback,
TString data,
IRetryPolicy<long>::TPtr retryPolicy)
{
- Y_UNUSED(expectedSize);
+ Y_UNUSED(sizeLimit);
Y_UNUSED(retryPolicy);
auto key = TKeyType(url, headers, data);
@@ -73,6 +73,7 @@ public:
TString ,
THeaders ,
std::size_t ,
+ std::size_t ,
TOnDownloadStart ,
TOnNewDataPart ,
TOnDownloadFinish ) final {
diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
index 02f701808ad..35054489a4f 100644
--- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
+++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
@@ -78,8 +78,8 @@ public:
PUT
};
- TEasyCurl(const ::NMonitoring::TDynamicCounters::TCounterPtr& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadedBytes, TString url, IHTTPGateway::THeaders headers, EMethod method, size_t offset = 0ULL, size_t bodySize = 0, ui64 expectedSize = 0, const TCurlInitConfig& config = TCurlInitConfig())
- : Headers(headers), Method(method), Offset(offset), BodySize(bodySize), ExpectedSize(expectedSize), Counter(counter), DownloadedBytes(downloadedBytes), UploadedBytes(uploadedBytes), Config(config), Url(url)
+ TEasyCurl(const ::NMonitoring::TDynamicCounters::TCounterPtr& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadedBytes, TString url, IHTTPGateway::THeaders headers, EMethod method, size_t offset = 0ULL, size_t bodySize = 0, ui64 sizeLimit = 0, const TCurlInitConfig& config = TCurlInitConfig())
+ : Headers(headers), Method(method), Offset(offset), BodySize(bodySize), SizeLimit(sizeLimit), Counter(counter), DownloadedBytes(downloadedBytes), UploadedBytes(uploadedBytes), Config(config), Url(url)
{
InitHandles();
Counter->Inc();
@@ -130,8 +130,8 @@ public:
}
TStringBuilder byteRange;
byteRange << Offset << "-";
- if (ExpectedSize) {
- byteRange << Offset + ExpectedSize - 1;
+ if (SizeLimit) {
+ byteRange << Offset + SizeLimit - 1;
}
curl_easy_setopt(Handle, CURLOPT_RANGE, byteRange.c_str());
curl_easy_setopt(Handle, EMethod::PUT == Method ? CURLOPT_HEADERFUNCTION : CURLOPT_WRITEFUNCTION, &WriteMemoryCallback);
@@ -191,7 +191,7 @@ private:
const EMethod Method;
const size_t Offset;
const size_t BodySize;
- const ui64 ExpectedSize;
+ const ui64 SizeLimit;
CURL* Handle = nullptr;
curl_slist* CurlHeaders = nullptr;
const ::NMonitoring::TDynamicCounters::TCounterPtr Counter;
@@ -207,19 +207,19 @@ public:
using TPtr = std::shared_ptr<TEasyCurlBuffer>;
using TWeakPtr = std::weak_ptr<TEasyCurlBuffer>;
- TEasyCurlBuffer(const ::NMonitoring::TDynamicCounters::TCounterPtr& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadededBytes, TString url, EMethod method, TString data, IHTTPGateway::THeaders headers, size_t offset, size_t expectedSize, IHTTPGateway::TOnResult callback, IRetryPolicy<long>::IRetryState::TPtr retryState, const TCurlInitConfig& config = TCurlInitConfig())
- : TEasyCurl(counter, downloadedBytes, uploadededBytes, url, headers, method, offset, data.size(), expectedSize, std::move(config)), ExpectedSize(expectedSize), Data(std::move(data)), Input(Data), Output(Buffer), RetryState(std::move(retryState))
+ TEasyCurlBuffer(const ::NMonitoring::TDynamicCounters::TCounterPtr& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadededBytes, TString url, EMethod method, TString data, IHTTPGateway::THeaders headers, size_t offset, size_t sizeLimit, IHTTPGateway::TOnResult callback, IRetryPolicy<long>::IRetryState::TPtr retryState, const TCurlInitConfig& config = TCurlInitConfig())
+ : TEasyCurl(counter, downloadedBytes, uploadededBytes, url, headers, method, offset, data.size(), sizeLimit, std::move(config)), SizeLimit(sizeLimit), Data(std::move(data)), Input(Data), Output(Buffer), RetryState(std::move(retryState))
{
- Output.Reserve(ExpectedSize);
+ Output.Reserve(SizeLimit);
Callbacks.emplace(std::move(callback));
}
- static TPtr Make(const ::NMonitoring::TDynamicCounters::TCounterPtr& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadededBytes, TString url, EMethod method, TString data, IHTTPGateway::THeaders headers, size_t offset, size_t expectedSize, IHTTPGateway::TOnResult callback, IRetryPolicy<long>::IRetryState::TPtr retryState, const TCurlInitConfig& config = TCurlInitConfig()) {
- return std::make_shared<TEasyCurlBuffer>(counter, downloadedBytes, uploadededBytes, std::move(url), method, std::move(data), std::move(headers), offset, expectedSize, std::move(callback), std::move(retryState), std::move(config));
+ static TPtr Make(const ::NMonitoring::TDynamicCounters::TCounterPtr& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadededBytes, TString url, EMethod method, TString data, IHTTPGateway::THeaders headers, size_t offset, size_t sizeLimit, IHTTPGateway::TOnResult callback, IRetryPolicy<long>::IRetryState::TPtr retryState, const TCurlInitConfig& config = TCurlInitConfig()) {
+ return std::make_shared<TEasyCurlBuffer>(counter, downloadedBytes, uploadededBytes, std::move(url), method, std::move(data), std::move(headers), offset, sizeLimit, std::move(callback), std::move(retryState), std::move(config));
}
- size_t GetExpectedSize() const {
- return ExpectedSize;
+ size_t GetSizeLimit() const {
+ return SizeLimit;
}
// return true if callback successfully added to this work
@@ -241,7 +241,7 @@ public:
void Reset() {
Buffer.clear();
TStringOutput(Buffer).Swap(Output);
- Output.Reserve(ExpectedSize);
+ Output.Reserve(SizeLimit);
TStringInput(Data).Swap(Input);
FreeHandles();
InitHandles();
@@ -280,7 +280,7 @@ private:
return Input.Read(buffer, size * nmemb);
}
- const size_t ExpectedSize;
+ const size_t SizeLimit;
const TString Data;
TString Buffer;
TStringInput Input;
@@ -302,11 +302,14 @@ public:
const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadededBytes,
TString url, IHTTPGateway::THeaders headers,
size_t offset,
+ size_t sizeLimit,
IHTTPGateway::TOnDownloadStart onStart,
IHTTPGateway::TOnNewDataPart onNewData,
IHTTPGateway::TOnDownloadFinish onFinish,
const TCurlInitConfig& config = TCurlInitConfig())
- : TEasyCurl(counter, downloadedBytes, uploadededBytes, url, headers, EMethod::GET, offset, 0, 0, std::move(config)), OnStart(std::move(onStart)), OnNewData(std::move(onNewData)), OnFinish(std::move(onFinish)), Counter(std::make_shared<std::atomic_size_t>(0ULL))
+ : TEasyCurl(counter, downloadedBytes, uploadededBytes, url, headers, EMethod::GET, offset, 0, 0, std::move(config))
+ , OnStart(std::move(onStart)), OnNewData(std::move(onNewData)), OnFinish(std::move(onFinish)), Counter(std::make_shared<std::atomic_size_t>(0ULL))
+ , Offset(offset), SizeLimit(sizeLimit)
{}
static TPtr Make(
@@ -316,12 +319,13 @@ public:
TString url,
IHTTPGateway::THeaders headers,
size_t offset,
+ size_t sizeLimit,
IHTTPGateway::TOnDownloadStart onStart,
IHTTPGateway::TOnNewDataPart onNewData,
IHTTPGateway::TOnDownloadFinish onFinish,
const TCurlInitConfig& config = TCurlInitConfig())
{
- return std::make_shared<TEasyCurlStream>(counter, downloadedBytes, uploadededBytes, std::move(url), std::move(headers), offset, std::move(onStart), std::move(onNewData), std::move(onFinish), std::move(config));
+ return std::make_shared<TEasyCurlStream>(counter, downloadedBytes, uploadededBytes, std::move(url), std::move(headers), offset, sizeLimit, std::move(onStart), std::move(onNewData), std::move(onFinish), std::move(config));
}
enum class EAction : i8 {
@@ -340,6 +344,10 @@ public:
}
if (Working != Counter->load() < buffersSize) {
+ if (!Working && SizeLimit && Offset + Position >= SizeLimit) {
+ OnFinish(TIssues());
+ return EAction::Drop;
+ }
if (Working = !Working)
SkipTo(Position);
return Working ? EAction::Work : EAction::Stop;
@@ -391,6 +399,8 @@ private:
size_t Position = 0ULL;
bool Cancelled = false;
bool StreamStarted = false;
+ size_t Offset = 0;
+ size_t SizeLimit = 0;
};
using TKeyType = std::tuple<TString, size_t, IHTTPGateway::THeaders, TString, IRetryPolicy<long>::TPtr>;
@@ -430,7 +440,7 @@ public:
, OutputMemory(Counters->GetCounter("OutputMemory"))
, PerformCycles(Counters->GetCounter("PerformCycles", true))
, AwaitQueue(Counters->GetCounter("AwaitQueue"))
- , AwaitQueueTopExpectedSize(Counters->GetCounter("AwaitQueueTopExpectedSize"))
+ , AwaitQueueTopSizeLimit(Counters->GetCounter("AwaitQueueTopSizeLimit"))
, DownloadedBytes(Counters->GetCounter("DownloadedBytes", true))
, UploadedBytes(Counters->GetCounter("UploadedBytes", true))
{
@@ -568,10 +578,10 @@ private:
Delayed.pop();
}
- const ui64 topExpectedSize = Await.empty() ? 0 : Await.front()->GetExpectedSize();
- AwaitQueueTopExpectedSize->Set(topExpectedSize);
- while (!Await.empty() && Allocated.size() < MaxHandlers && AllocatedSize + Await.front()->GetExpectedSize() <= MaxSimulatenousDownloadsSize) {
- AllocatedSize += Await.front()->GetExpectedSize();
+ const ui64 topSizeLimit = Await.empty() ? 0 : Await.front()->GetSizeLimit();
+ AwaitQueueTopSizeLimit->Set(topSizeLimit);
+ while (!Await.empty() && Allocated.size() < MaxHandlers && AllocatedSize + Await.front()->GetSizeLimit() <= MaxSimulatenousDownloadsSize) {
+ AllocatedSize += Await.front()->GetSizeLimit();
const auto handle = Await.front()->GetHandle();
Allocated.emplace(handle, std::move(Await.front()));
Await.pop();
@@ -593,7 +603,7 @@ private:
curl_easy_getinfo(easy->GetHandle(), CURLINFO_RESPONSE_CODE, &httpResponseCode);
if (auto buffer = std::dynamic_pointer_cast<TEasyCurlBuffer>(easy)) {
- AllocatedSize -= buffer->GetExpectedSize();
+ AllocatedSize -= buffer->GetSizeLimit();
if (const auto& nextRetryDelay = buffer->GetNextRetryDelay(httpResponseCode)) {
buffer->Reset();
Delayed.emplace(nextRetryDelay->ToDeadLine(), std::move(buffer));
@@ -646,14 +656,14 @@ private:
void Download(
TString url,
THeaders headers,
- size_t expectedSize,
+ size_t sizeLimit,
TOnResult callback,
TString data,
IRetryPolicy<long>::TPtr retryPolicy) final
{
Rps->Inc();
- if (expectedSize > MaxSimulatenousDownloadsSize) {
- TIssue error(TStringBuilder() << "Too big file for downloading: size " << expectedSize << ", but limit is " << MaxSimulatenousDownloadsSize);
+ if (sizeLimit > MaxSimulatenousDownloadsSize) {
+ TIssue error(TStringBuilder() << "Too big file for downloading: size " << sizeLimit << ", but limit is " << MaxSimulatenousDownloadsSize);
callback(TIssues{error});
return;
}
@@ -663,21 +673,22 @@ private:
if (easy->AddCallback(callback))
return;
- auto easy = TEasyCurlBuffer::Make(InFlight, DownloadedBytes, UploadedBytes, std::move(url), TEasyCurl::EMethod::GET, std::move(data), std::move(headers), 0U, expectedSize, std::move(callback), retryPolicy ? retryPolicy->CreateRetryState() : nullptr, InitConfig);
+ auto easy = TEasyCurlBuffer::Make(InFlight, DownloadedBytes, UploadedBytes, std::move(url), TEasyCurl::EMethod::GET, std::move(data), std::move(headers), 0U, sizeLimit, std::move(callback), retryPolicy ? retryPolicy->CreateRetryState() : nullptr, InitConfig);
entry = easy;
Await.emplace(std::move(easy));
- Wakeup(expectedSize);
+ Wakeup(sizeLimit);
}
TCancelHook Download(
TString url,
THeaders headers,
size_t offset,
+ size_t sizeLimit,
TOnDownloadStart onStart,
TOnNewDataPart onNewData,
TOnDownloadFinish onFinish) final
{
- auto stream = TEasyCurlStream::Make(InFlightStreams, DownloadedBytes, UploadedBytes, std::move(url), std::move(headers), offset, std::move(onStart), std::move(onNewData), std::move(onFinish));
+ auto stream = TEasyCurlStream::Make(InFlightStreams, DownloadedBytes, UploadedBytes, std::move(url), std::move(headers), offset, sizeLimit, std::move(onStart), std::move(onNewData), std::move(onFinish));
const std::unique_lock lock(Sync);
const auto handle = stream->GetHandle();
TEasyCurlStream::TWeakPtr weak = stream;
@@ -692,14 +703,14 @@ private:
void OnRetry(TEasyCurlBuffer::TPtr easy) {
const std::unique_lock lock(Sync);
- const size_t expectedSize = easy->GetExpectedSize();
+ const size_t sizeLimit = easy->GetSizeLimit();
Await.emplace(std::move(easy));
- Wakeup(expectedSize);
+ Wakeup(sizeLimit);
}
- void Wakeup(size_t expectedSize) {
+ void Wakeup(size_t sizeLimit) {
AwaitQueue->Set(Await.size());
- if (Allocated.size() < MaxHandlers && AllocatedSize + expectedSize + OutputSize.load() <= MaxSimulatenousDownloadsSize) {
+ if (Allocated.size() < MaxHandlers && AllocatedSize + sizeLimit + OutputSize.load() <= MaxSimulatenousDownloadsSize) {
curl_multi_wakeup(Handle);
}
}
@@ -739,7 +750,7 @@ private:
const ::NMonitoring::TDynamicCounters::TCounterPtr OutputMemory;
const ::NMonitoring::TDynamicCounters::TCounterPtr PerformCycles;
const ::NMonitoring::TDynamicCounters::TCounterPtr AwaitQueue;
- const ::NMonitoring::TDynamicCounters::TCounterPtr AwaitQueueTopExpectedSize;
+ const ::NMonitoring::TDynamicCounters::TCounterPtr AwaitQueueTopSizeLimit;
const ::NMonitoring::TDynamicCounters::TCounterPtr DownloadedBytes;
const ::NMonitoring::TDynamicCounters::TCounterPtr UploadedBytes;
};
diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h
index 8399029aff6..e26861cca33 100644
--- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h
+++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h
@@ -75,7 +75,7 @@ public:
virtual void Download(
TString url,
THeaders headers,
- std::size_t expectedSize,
+ std::size_t sizeLimit,
TOnResult callback,
TString data = {},
IRetryPolicy</*http response code*/long>::TPtr RetryPolicy = IRetryPolicy<long>::GetNoRetryPolicy()
@@ -103,6 +103,7 @@ public:
TString url,
THeaders headers,
std::size_t offset,
+ std::size_t sizeLimit,
TOnDownloadStart onStart,
TOnNewDataPart onNewData,
TOnDownloadFinish onFinish) = 0;
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
index 96f1ea5b849..ed6c025b397 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
@@ -130,7 +130,7 @@ public:
bool addPathIndex,
ui64 startPathIndex,
const NActors::TActorId& computeActorId,
- ui64 expectedSize
+ ui64 sizeLimit
) : Gateway(std::move(gateway))
, HolderFactory(holderFactory)
, InputIndex(inputIndex)
@@ -141,7 +141,7 @@ public:
, Paths(std::move(paths))
, AddPathIndex(addPathIndex)
, StartPathIndex(startPathIndex)
- , ExpectedSize(expectedSize)
+ , SizeLimit(sizeLimit)
{}
void Bootstrap() {
@@ -149,7 +149,7 @@ public:
for (size_t pathInd = 0; pathInd < Paths.size(); ++pathInd) {
const TPath& path = Paths[pathInd];
Gateway->Download(Url + std::get<TString>(path),
- Headers, std::min(std::get<size_t>(path), ExpectedSize),
+ Headers, std::min(std::get<size_t>(path), SizeLimit),
std::bind(&TS3ReadActor::OnDownloadFinished, ActorSystem, SelfId(), std::placeholders::_1, pathInd + StartPathIndex), {}, GetS3RetryPolicy());
};
}
@@ -250,7 +250,7 @@ private:
const TPathList Paths;
const bool AddPathIndex;
const ui64 StartPathIndex;
- const ui64 ExpectedSize;
+ const ui64 SizeLimit;
std::queue<std::tuple<IHTTPGateway::TContent, ui64>> Blocks;
};
@@ -261,7 +261,7 @@ struct TReadSpec {
NDB::ColumnsWithTypeAndName Columns;
NDB::FormatSettings Settings;
TString Format, Compression;
- ui64 ExpectedSize = 0;
+ ui64 SizeLimit = 0;
};
struct TRetryStuff {
@@ -271,14 +271,14 @@ struct TRetryStuff {
IHTTPGateway::TPtr gateway,
TString url,
const IHTTPGateway::THeaders& headers,
- std::size_t expectedSize
- ) : Gateway(std::move(gateway)), Url(std::move(url)), Headers(headers), ExpectedSize(expectedSize), Offset(0U), RetryState(GetS3RetryPolicy()->CreateRetryState())
+ std::size_t sizeLimit
+ ) : Gateway(std::move(gateway)), Url(std::move(url)), Headers(headers), SizeLimit(sizeLimit), Offset(0U), RetryState(GetS3RetryPolicy()->CreateRetryState())
{}
const IHTTPGateway::TPtr Gateway;
const TString Url;
const IHTTPGateway::THeaders Headers;
- const std::size_t ExpectedSize;
+ const std::size_t SizeLimit;
std::size_t Offset = 0U;
const IRetryPolicy<long>::IRetryState::TPtr RetryState;
@@ -485,18 +485,19 @@ private:
}
}
- if (retryStuff->NextRetryDelay)
+ if (retryStuff->NextRetryDelay && retryStuff->Offset < retryStuff->SizeLimit)
actorSystem->Schedule(*retryStuff->NextRetryDelay, new IEventHandle(parent, self, new TEvPrivate::TEvRetryEventFunc(std::bind(&TS3ReadCoroActor::DownloadStart, retryStuff, self, parent))));
else
actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvReadFinished));
}
static void DownloadStart(const TRetryStuff::TPtr& retryStuff, const TActorId& self, const TActorId& parent) {
+ auto* as = TActivationContext::ActorSystem();
retryStuff->CancelHook = retryStuff->Gateway->Download(retryStuff->Url,
- retryStuff->Headers, retryStuff->Offset,
- std::bind(&TS3ReadCoroActor::OnDownloadStart, TActivationContext::ActorSystem(), self, parent, std::placeholders::_1),
- std::bind(&TS3ReadCoroActor::OnNewData, TActivationContext::ActorSystem(), self, parent, std::placeholders::_1),
- std::bind(&TS3ReadCoroActor::OnDownloadFinished, TActivationContext::ActorSystem(), self, parent, retryStuff, std::placeholders::_1));
+ retryStuff->Headers, retryStuff->Offset, retryStuff->SizeLimit,
+ std::bind(&TS3ReadCoroActor::OnDownloadStart, as, self, parent, std::placeholders::_1),
+ std::bind(&TS3ReadCoroActor::OnNewData, as, self, parent, std::placeholders::_1),
+ std::bind(&TS3ReadCoroActor::OnDownloadFinished, as, self, parent, retryStuff, std::placeholders::_1));
}
void Registered(TActorSystem* sys, const TActorId& parent) override {
@@ -834,12 +835,12 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
std::move(paths), addPathIndex, startPathIndex, readSpec, computeActorId);
return {actor, actor};
} else {
- ui64 expectedSize = std::numeric_limits<ui64>::max();
- if (const auto it = settings.find("expectedSize"); settings.cend() != it)
- expectedSize = FromString<ui64>(it->second);
+ ui64 sizeLimit = std::numeric_limits<ui64>::max();
+ if (const auto it = settings.find("sizeLimit"); settings.cend() != it)
+ sizeLimit = FromString<ui64>(it->second);
const auto actor = new TS3ReadActor(inputIndex, std::move(gateway), holderFactory, params.GetUrl(), authToken,
- std::move(paths), addPathIndex, startPathIndex, computeActorId, expectedSize);
+ std::move(paths), addPathIndex, startPathIndex, computeActorId, sizeLimit);
return {actor, actor};
}
}
diff --git a/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json b/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json
index 52c4f78149f..97242d2f588 100644
--- a/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json
+++ b/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json
@@ -53,7 +53,7 @@
"Base": "TS3SourceSettingsBase",
"Match": {"Type": "Callable", "Name": "S3SourceSettings"},
"Children": [
- {"Index": 2, "Name": "ExpectedSize", "Type": "TCoAtom", "Optional": true}
+ {"Index": 2, "Name": "SizeLimit", "Type": "TCoAtom", "Optional": true}
]
},
{
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
index b979ff31ac4..c5521aa798c 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
@@ -190,22 +190,22 @@ public:
}
auto readSettings = s3ReadObject.Object().Settings().Cast().Ptr();
- int expectedSizeIndex = -1;
+ int sizeLimitIndex = -1;
for (size_t childInd = 0; childInd < readSettings->ChildrenSize(); ++childInd) {
if (readSettings->Child(childInd)->Head().Content() == "readmaxbytes") {
- expectedSizeIndex = childInd;
+ sizeLimitIndex = childInd;
break;
}
}
- if (expectedSizeIndex != -1) {
+ if (sizeLimitIndex != -1) {
return Build<TDqSourceWrap>(ctx, read->Pos())
.Input<TS3SourceSettings>()
.Paths(s3ReadObject.Object().Paths())
.Token<TCoSecureParam>()
.Name().Build(token)
.Build()
- .ExpectedSize(readSettings->Child(expectedSizeIndex)->TailPtr())
+ .SizeLimit(readSettings->Child(sizeLimitIndex)->TailPtr())
.Build()
.RowType(ExpandType(s3ReadObject.Pos(), *rowType, ctx))
.DataSource(s3ReadObject.DataSource().Cast<TCoDataSource>())
@@ -265,9 +265,9 @@ public:
}
} else if (const auto maySourceSettings = source.Settings().Maybe<TS3SourceSettings>()){
const auto sourceSettings = maySourceSettings.Cast();
- auto expectedSize = sourceSettings.ExpectedSize();
- if (expectedSize.IsValid()) {
- srcDesc.MutableSettings()->insert({"expectedSize", expectedSize.Cast().StringValue()});
+ auto sizeLimit = sourceSettings.SizeLimit();
+ if (sizeLimit.IsValid()) {
+ srcDesc.MutableSettings()->insert({"sizeLimit", sizeLimit.Cast().StringValue()});
}
}