diff options
author | hcpp <hcpp@ydb.tech> | 2022-09-07 00:04:50 +0300 |
---|---|---|
committer | hcpp <hcpp@ydb.tech> | 2022-09-07 00:04:50 +0300 |
commit | de0b37fee68ce7ed01fa9bd894573880926acd7c (patch) | |
tree | daefa8c0c4911df2572b748fd9f96b23fb31f499 | |
parent | 636ec676a65d8a309a4cf03a90ed3d7c9115f990 (diff) | |
download | ydb-de0b37fee68ce7ed01fa9bd894573880926acd7c.tar.gz |
size limit has been added
6 files changed, 75 insertions, 61 deletions
diff --git a/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp index 7aec0935635..95c1c0207a7 100644 --- a/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp +++ b/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp @@ -48,13 +48,13 @@ public: void Download( TString url, THeaders headers, - std::size_t expectedSize, + std::size_t sizeLimit, TOnResult callback, TString data, IRetryPolicy<long>::TPtr retryPolicy) { - Y_UNUSED(expectedSize); + Y_UNUSED(sizeLimit); Y_UNUSED(retryPolicy); auto key = TKeyType(url, headers, data); @@ -73,6 +73,7 @@ public: TString , THeaders , std::size_t , + std::size_t , TOnDownloadStart , TOnNewDataPart , TOnDownloadFinish ) final { diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp index 02f701808ad..35054489a4f 100644 --- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp +++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp @@ -78,8 +78,8 @@ public: PUT }; - TEasyCurl(const ::NMonitoring::TDynamicCounters::TCounterPtr& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadedBytes, TString url, IHTTPGateway::THeaders headers, EMethod method, size_t offset = 0ULL, size_t bodySize = 0, ui64 expectedSize = 0, const TCurlInitConfig& config = TCurlInitConfig()) - : Headers(headers), Method(method), Offset(offset), BodySize(bodySize), ExpectedSize(expectedSize), Counter(counter), DownloadedBytes(downloadedBytes), UploadedBytes(uploadedBytes), Config(config), Url(url) + TEasyCurl(const ::NMonitoring::TDynamicCounters::TCounterPtr& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadedBytes, TString url, IHTTPGateway::THeaders headers, EMethod method, size_t offset = 0ULL, size_t bodySize = 0, ui64 sizeLimit = 0, const TCurlInitConfig& config = TCurlInitConfig()) + : Headers(headers), Method(method), Offset(offset), BodySize(bodySize), SizeLimit(sizeLimit), Counter(counter), DownloadedBytes(downloadedBytes), UploadedBytes(uploadedBytes), Config(config), Url(url) { InitHandles(); Counter->Inc(); @@ -130,8 +130,8 @@ public: } TStringBuilder byteRange; byteRange << Offset << "-"; - if (ExpectedSize) { - byteRange << Offset + ExpectedSize - 1; + if (SizeLimit) { + byteRange << Offset + SizeLimit - 1; } curl_easy_setopt(Handle, CURLOPT_RANGE, byteRange.c_str()); curl_easy_setopt(Handle, EMethod::PUT == Method ? CURLOPT_HEADERFUNCTION : CURLOPT_WRITEFUNCTION, &WriteMemoryCallback); @@ -191,7 +191,7 @@ private: const EMethod Method; const size_t Offset; const size_t BodySize; - const ui64 ExpectedSize; + const ui64 SizeLimit; CURL* Handle = nullptr; curl_slist* CurlHeaders = nullptr; const ::NMonitoring::TDynamicCounters::TCounterPtr Counter; @@ -207,19 +207,19 @@ public: using TPtr = std::shared_ptr<TEasyCurlBuffer>; using TWeakPtr = std::weak_ptr<TEasyCurlBuffer>; - TEasyCurlBuffer(const ::NMonitoring::TDynamicCounters::TCounterPtr& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadededBytes, TString url, EMethod method, TString data, IHTTPGateway::THeaders headers, size_t offset, size_t expectedSize, IHTTPGateway::TOnResult callback, IRetryPolicy<long>::IRetryState::TPtr retryState, const TCurlInitConfig& config = TCurlInitConfig()) - : TEasyCurl(counter, downloadedBytes, uploadededBytes, url, headers, method, offset, data.size(), expectedSize, std::move(config)), ExpectedSize(expectedSize), Data(std::move(data)), Input(Data), Output(Buffer), RetryState(std::move(retryState)) + TEasyCurlBuffer(const ::NMonitoring::TDynamicCounters::TCounterPtr& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadededBytes, TString url, EMethod method, TString data, IHTTPGateway::THeaders headers, size_t offset, size_t sizeLimit, IHTTPGateway::TOnResult callback, IRetryPolicy<long>::IRetryState::TPtr retryState, const TCurlInitConfig& config = TCurlInitConfig()) + : TEasyCurl(counter, downloadedBytes, uploadededBytes, url, headers, method, offset, data.size(), sizeLimit, std::move(config)), SizeLimit(sizeLimit), Data(std::move(data)), Input(Data), Output(Buffer), RetryState(std::move(retryState)) { - Output.Reserve(ExpectedSize); + Output.Reserve(SizeLimit); Callbacks.emplace(std::move(callback)); } - static TPtr Make(const ::NMonitoring::TDynamicCounters::TCounterPtr& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadededBytes, TString url, EMethod method, TString data, IHTTPGateway::THeaders headers, size_t offset, size_t expectedSize, IHTTPGateway::TOnResult callback, IRetryPolicy<long>::IRetryState::TPtr retryState, const TCurlInitConfig& config = TCurlInitConfig()) { - return std::make_shared<TEasyCurlBuffer>(counter, downloadedBytes, uploadededBytes, std::move(url), method, std::move(data), std::move(headers), offset, expectedSize, std::move(callback), std::move(retryState), std::move(config)); + static TPtr Make(const ::NMonitoring::TDynamicCounters::TCounterPtr& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadededBytes, TString url, EMethod method, TString data, IHTTPGateway::THeaders headers, size_t offset, size_t sizeLimit, IHTTPGateway::TOnResult callback, IRetryPolicy<long>::IRetryState::TPtr retryState, const TCurlInitConfig& config = TCurlInitConfig()) { + return std::make_shared<TEasyCurlBuffer>(counter, downloadedBytes, uploadededBytes, std::move(url), method, std::move(data), std::move(headers), offset, sizeLimit, std::move(callback), std::move(retryState), std::move(config)); } - size_t GetExpectedSize() const { - return ExpectedSize; + size_t GetSizeLimit() const { + return SizeLimit; } // return true if callback successfully added to this work @@ -241,7 +241,7 @@ public: void Reset() { Buffer.clear(); TStringOutput(Buffer).Swap(Output); - Output.Reserve(ExpectedSize); + Output.Reserve(SizeLimit); TStringInput(Data).Swap(Input); FreeHandles(); InitHandles(); @@ -280,7 +280,7 @@ private: return Input.Read(buffer, size * nmemb); } - const size_t ExpectedSize; + const size_t SizeLimit; const TString Data; TString Buffer; TStringInput Input; @@ -302,11 +302,14 @@ public: const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadededBytes, TString url, IHTTPGateway::THeaders headers, size_t offset, + size_t sizeLimit, IHTTPGateway::TOnDownloadStart onStart, IHTTPGateway::TOnNewDataPart onNewData, IHTTPGateway::TOnDownloadFinish onFinish, const TCurlInitConfig& config = TCurlInitConfig()) - : TEasyCurl(counter, downloadedBytes, uploadededBytes, url, headers, EMethod::GET, offset, 0, 0, std::move(config)), OnStart(std::move(onStart)), OnNewData(std::move(onNewData)), OnFinish(std::move(onFinish)), Counter(std::make_shared<std::atomic_size_t>(0ULL)) + : TEasyCurl(counter, downloadedBytes, uploadededBytes, url, headers, EMethod::GET, offset, 0, 0, std::move(config)) + , OnStart(std::move(onStart)), OnNewData(std::move(onNewData)), OnFinish(std::move(onFinish)), Counter(std::make_shared<std::atomic_size_t>(0ULL)) + , Offset(offset), SizeLimit(sizeLimit) {} static TPtr Make( @@ -316,12 +319,13 @@ public: TString url, IHTTPGateway::THeaders headers, size_t offset, + size_t sizeLimit, IHTTPGateway::TOnDownloadStart onStart, IHTTPGateway::TOnNewDataPart onNewData, IHTTPGateway::TOnDownloadFinish onFinish, const TCurlInitConfig& config = TCurlInitConfig()) { - return std::make_shared<TEasyCurlStream>(counter, downloadedBytes, uploadededBytes, std::move(url), std::move(headers), offset, std::move(onStart), std::move(onNewData), std::move(onFinish), std::move(config)); + return std::make_shared<TEasyCurlStream>(counter, downloadedBytes, uploadededBytes, std::move(url), std::move(headers), offset, sizeLimit, std::move(onStart), std::move(onNewData), std::move(onFinish), std::move(config)); } enum class EAction : i8 { @@ -340,6 +344,10 @@ public: } if (Working != Counter->load() < buffersSize) { + if (!Working && SizeLimit && Offset + Position >= SizeLimit) { + OnFinish(TIssues()); + return EAction::Drop; + } if (Working = !Working) SkipTo(Position); return Working ? EAction::Work : EAction::Stop; @@ -391,6 +399,8 @@ private: size_t Position = 0ULL; bool Cancelled = false; bool StreamStarted = false; + size_t Offset = 0; + size_t SizeLimit = 0; }; using TKeyType = std::tuple<TString, size_t, IHTTPGateway::THeaders, TString, IRetryPolicy<long>::TPtr>; @@ -430,7 +440,7 @@ public: , OutputMemory(Counters->GetCounter("OutputMemory")) , PerformCycles(Counters->GetCounter("PerformCycles", true)) , AwaitQueue(Counters->GetCounter("AwaitQueue")) - , AwaitQueueTopExpectedSize(Counters->GetCounter("AwaitQueueTopExpectedSize")) + , AwaitQueueTopSizeLimit(Counters->GetCounter("AwaitQueueTopSizeLimit")) , DownloadedBytes(Counters->GetCounter("DownloadedBytes", true)) , UploadedBytes(Counters->GetCounter("UploadedBytes", true)) { @@ -568,10 +578,10 @@ private: Delayed.pop(); } - const ui64 topExpectedSize = Await.empty() ? 0 : Await.front()->GetExpectedSize(); - AwaitQueueTopExpectedSize->Set(topExpectedSize); - while (!Await.empty() && Allocated.size() < MaxHandlers && AllocatedSize + Await.front()->GetExpectedSize() <= MaxSimulatenousDownloadsSize) { - AllocatedSize += Await.front()->GetExpectedSize(); + const ui64 topSizeLimit = Await.empty() ? 0 : Await.front()->GetSizeLimit(); + AwaitQueueTopSizeLimit->Set(topSizeLimit); + while (!Await.empty() && Allocated.size() < MaxHandlers && AllocatedSize + Await.front()->GetSizeLimit() <= MaxSimulatenousDownloadsSize) { + AllocatedSize += Await.front()->GetSizeLimit(); const auto handle = Await.front()->GetHandle(); Allocated.emplace(handle, std::move(Await.front())); Await.pop(); @@ -593,7 +603,7 @@ private: curl_easy_getinfo(easy->GetHandle(), CURLINFO_RESPONSE_CODE, &httpResponseCode); if (auto buffer = std::dynamic_pointer_cast<TEasyCurlBuffer>(easy)) { - AllocatedSize -= buffer->GetExpectedSize(); + AllocatedSize -= buffer->GetSizeLimit(); if (const auto& nextRetryDelay = buffer->GetNextRetryDelay(httpResponseCode)) { buffer->Reset(); Delayed.emplace(nextRetryDelay->ToDeadLine(), std::move(buffer)); @@ -646,14 +656,14 @@ private: void Download( TString url, THeaders headers, - size_t expectedSize, + size_t sizeLimit, TOnResult callback, TString data, IRetryPolicy<long>::TPtr retryPolicy) final { Rps->Inc(); - if (expectedSize > MaxSimulatenousDownloadsSize) { - TIssue error(TStringBuilder() << "Too big file for downloading: size " << expectedSize << ", but limit is " << MaxSimulatenousDownloadsSize); + if (sizeLimit > MaxSimulatenousDownloadsSize) { + TIssue error(TStringBuilder() << "Too big file for downloading: size " << sizeLimit << ", but limit is " << MaxSimulatenousDownloadsSize); callback(TIssues{error}); return; } @@ -663,21 +673,22 @@ private: if (easy->AddCallback(callback)) return; - auto easy = TEasyCurlBuffer::Make(InFlight, DownloadedBytes, UploadedBytes, std::move(url), TEasyCurl::EMethod::GET, std::move(data), std::move(headers), 0U, expectedSize, std::move(callback), retryPolicy ? retryPolicy->CreateRetryState() : nullptr, InitConfig); + auto easy = TEasyCurlBuffer::Make(InFlight, DownloadedBytes, UploadedBytes, std::move(url), TEasyCurl::EMethod::GET, std::move(data), std::move(headers), 0U, sizeLimit, std::move(callback), retryPolicy ? retryPolicy->CreateRetryState() : nullptr, InitConfig); entry = easy; Await.emplace(std::move(easy)); - Wakeup(expectedSize); + Wakeup(sizeLimit); } TCancelHook Download( TString url, THeaders headers, size_t offset, + size_t sizeLimit, TOnDownloadStart onStart, TOnNewDataPart onNewData, TOnDownloadFinish onFinish) final { - auto stream = TEasyCurlStream::Make(InFlightStreams, DownloadedBytes, UploadedBytes, std::move(url), std::move(headers), offset, std::move(onStart), std::move(onNewData), std::move(onFinish)); + auto stream = TEasyCurlStream::Make(InFlightStreams, DownloadedBytes, UploadedBytes, std::move(url), std::move(headers), offset, sizeLimit, std::move(onStart), std::move(onNewData), std::move(onFinish)); const std::unique_lock lock(Sync); const auto handle = stream->GetHandle(); TEasyCurlStream::TWeakPtr weak = stream; @@ -692,14 +703,14 @@ private: void OnRetry(TEasyCurlBuffer::TPtr easy) { const std::unique_lock lock(Sync); - const size_t expectedSize = easy->GetExpectedSize(); + const size_t sizeLimit = easy->GetSizeLimit(); Await.emplace(std::move(easy)); - Wakeup(expectedSize); + Wakeup(sizeLimit); } - void Wakeup(size_t expectedSize) { + void Wakeup(size_t sizeLimit) { AwaitQueue->Set(Await.size()); - if (Allocated.size() < MaxHandlers && AllocatedSize + expectedSize + OutputSize.load() <= MaxSimulatenousDownloadsSize) { + if (Allocated.size() < MaxHandlers && AllocatedSize + sizeLimit + OutputSize.load() <= MaxSimulatenousDownloadsSize) { curl_multi_wakeup(Handle); } } @@ -739,7 +750,7 @@ private: const ::NMonitoring::TDynamicCounters::TCounterPtr OutputMemory; const ::NMonitoring::TDynamicCounters::TCounterPtr PerformCycles; const ::NMonitoring::TDynamicCounters::TCounterPtr AwaitQueue; - const ::NMonitoring::TDynamicCounters::TCounterPtr AwaitQueueTopExpectedSize; + const ::NMonitoring::TDynamicCounters::TCounterPtr AwaitQueueTopSizeLimit; const ::NMonitoring::TDynamicCounters::TCounterPtr DownloadedBytes; const ::NMonitoring::TDynamicCounters::TCounterPtr UploadedBytes; }; diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h index 8399029aff6..e26861cca33 100644 --- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h +++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h @@ -75,7 +75,7 @@ public: virtual void Download( TString url, THeaders headers, - std::size_t expectedSize, + std::size_t sizeLimit, TOnResult callback, TString data = {}, IRetryPolicy</*http response code*/long>::TPtr RetryPolicy = IRetryPolicy<long>::GetNoRetryPolicy() @@ -103,6 +103,7 @@ public: TString url, THeaders headers, std::size_t offset, + std::size_t sizeLimit, TOnDownloadStart onStart, TOnNewDataPart onNewData, TOnDownloadFinish onFinish) = 0; diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index 96f1ea5b849..ed6c025b397 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -130,7 +130,7 @@ public: bool addPathIndex, ui64 startPathIndex, const NActors::TActorId& computeActorId, - ui64 expectedSize + ui64 sizeLimit ) : Gateway(std::move(gateway)) , HolderFactory(holderFactory) , InputIndex(inputIndex) @@ -141,7 +141,7 @@ public: , Paths(std::move(paths)) , AddPathIndex(addPathIndex) , StartPathIndex(startPathIndex) - , ExpectedSize(expectedSize) + , SizeLimit(sizeLimit) {} void Bootstrap() { @@ -149,7 +149,7 @@ public: for (size_t pathInd = 0; pathInd < Paths.size(); ++pathInd) { const TPath& path = Paths[pathInd]; Gateway->Download(Url + std::get<TString>(path), - Headers, std::min(std::get<size_t>(path), ExpectedSize), + Headers, std::min(std::get<size_t>(path), SizeLimit), std::bind(&TS3ReadActor::OnDownloadFinished, ActorSystem, SelfId(), std::placeholders::_1, pathInd + StartPathIndex), {}, GetS3RetryPolicy()); }; } @@ -250,7 +250,7 @@ private: const TPathList Paths; const bool AddPathIndex; const ui64 StartPathIndex; - const ui64 ExpectedSize; + const ui64 SizeLimit; std::queue<std::tuple<IHTTPGateway::TContent, ui64>> Blocks; }; @@ -261,7 +261,7 @@ struct TReadSpec { NDB::ColumnsWithTypeAndName Columns; NDB::FormatSettings Settings; TString Format, Compression; - ui64 ExpectedSize = 0; + ui64 SizeLimit = 0; }; struct TRetryStuff { @@ -271,14 +271,14 @@ struct TRetryStuff { IHTTPGateway::TPtr gateway, TString url, const IHTTPGateway::THeaders& headers, - std::size_t expectedSize - ) : Gateway(std::move(gateway)), Url(std::move(url)), Headers(headers), ExpectedSize(expectedSize), Offset(0U), RetryState(GetS3RetryPolicy()->CreateRetryState()) + std::size_t sizeLimit + ) : Gateway(std::move(gateway)), Url(std::move(url)), Headers(headers), SizeLimit(sizeLimit), Offset(0U), RetryState(GetS3RetryPolicy()->CreateRetryState()) {} const IHTTPGateway::TPtr Gateway; const TString Url; const IHTTPGateway::THeaders Headers; - const std::size_t ExpectedSize; + const std::size_t SizeLimit; std::size_t Offset = 0U; const IRetryPolicy<long>::IRetryState::TPtr RetryState; @@ -485,18 +485,19 @@ private: } } - if (retryStuff->NextRetryDelay) + if (retryStuff->NextRetryDelay && retryStuff->Offset < retryStuff->SizeLimit) actorSystem->Schedule(*retryStuff->NextRetryDelay, new IEventHandle(parent, self, new TEvPrivate::TEvRetryEventFunc(std::bind(&TS3ReadCoroActor::DownloadStart, retryStuff, self, parent)))); else actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvReadFinished)); } static void DownloadStart(const TRetryStuff::TPtr& retryStuff, const TActorId& self, const TActorId& parent) { + auto* as = TActivationContext::ActorSystem(); retryStuff->CancelHook = retryStuff->Gateway->Download(retryStuff->Url, - retryStuff->Headers, retryStuff->Offset, - std::bind(&TS3ReadCoroActor::OnDownloadStart, TActivationContext::ActorSystem(), self, parent, std::placeholders::_1), - std::bind(&TS3ReadCoroActor::OnNewData, TActivationContext::ActorSystem(), self, parent, std::placeholders::_1), - std::bind(&TS3ReadCoroActor::OnDownloadFinished, TActivationContext::ActorSystem(), self, parent, retryStuff, std::placeholders::_1)); + retryStuff->Headers, retryStuff->Offset, retryStuff->SizeLimit, + std::bind(&TS3ReadCoroActor::OnDownloadStart, as, self, parent, std::placeholders::_1), + std::bind(&TS3ReadCoroActor::OnNewData, as, self, parent, std::placeholders::_1), + std::bind(&TS3ReadCoroActor::OnDownloadFinished, as, self, parent, retryStuff, std::placeholders::_1)); } void Registered(TActorSystem* sys, const TActorId& parent) override { @@ -834,12 +835,12 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor( std::move(paths), addPathIndex, startPathIndex, readSpec, computeActorId); return {actor, actor}; } else { - ui64 expectedSize = std::numeric_limits<ui64>::max(); - if (const auto it = settings.find("expectedSize"); settings.cend() != it) - expectedSize = FromString<ui64>(it->second); + ui64 sizeLimit = std::numeric_limits<ui64>::max(); + if (const auto it = settings.find("sizeLimit"); settings.cend() != it) + sizeLimit = FromString<ui64>(it->second); const auto actor = new TS3ReadActor(inputIndex, std::move(gateway), holderFactory, params.GetUrl(), authToken, - std::move(paths), addPathIndex, startPathIndex, computeActorId, expectedSize); + std::move(paths), addPathIndex, startPathIndex, computeActorId, sizeLimit); return {actor, actor}; } } diff --git a/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json b/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json index 52c4f78149f..97242d2f588 100644 --- a/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json +++ b/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json @@ -53,7 +53,7 @@ "Base": "TS3SourceSettingsBase", "Match": {"Type": "Callable", "Name": "S3SourceSettings"}, "Children": [ - {"Index": 2, "Name": "ExpectedSize", "Type": "TCoAtom", "Optional": true} + {"Index": 2, "Name": "SizeLimit", "Type": "TCoAtom", "Optional": true} ] }, { diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp index b979ff31ac4..c5521aa798c 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp @@ -190,22 +190,22 @@ public: } auto readSettings = s3ReadObject.Object().Settings().Cast().Ptr(); - int expectedSizeIndex = -1; + int sizeLimitIndex = -1; for (size_t childInd = 0; childInd < readSettings->ChildrenSize(); ++childInd) { if (readSettings->Child(childInd)->Head().Content() == "readmaxbytes") { - expectedSizeIndex = childInd; + sizeLimitIndex = childInd; break; } } - if (expectedSizeIndex != -1) { + if (sizeLimitIndex != -1) { return Build<TDqSourceWrap>(ctx, read->Pos()) .Input<TS3SourceSettings>() .Paths(s3ReadObject.Object().Paths()) .Token<TCoSecureParam>() .Name().Build(token) .Build() - .ExpectedSize(readSettings->Child(expectedSizeIndex)->TailPtr()) + .SizeLimit(readSettings->Child(sizeLimitIndex)->TailPtr()) .Build() .RowType(ExpandType(s3ReadObject.Pos(), *rowType, ctx)) .DataSource(s3ReadObject.DataSource().Cast<TCoDataSource>()) @@ -265,9 +265,9 @@ public: } } else if (const auto maySourceSettings = source.Settings().Maybe<TS3SourceSettings>()){ const auto sourceSettings = maySourceSettings.Cast(); - auto expectedSize = sourceSettings.ExpectedSize(); - if (expectedSize.IsValid()) { - srcDesc.MutableSettings()->insert({"expectedSize", expectedSize.Cast().StringValue()}); + auto sizeLimit = sourceSettings.SizeLimit(); + if (sizeLimit.IsValid()) { + srcDesc.MutableSettings()->insert({"sizeLimit", sizeLimit.Cast().StringValue()}); } } |