aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhor911 <hor911@ydb.tech>2022-11-21 14:39:53 +0300
committerhor911 <hor911@ydb.tech>2022-11-21 14:39:53 +0300
commitd8296c2cef1913f2580fefe89ef24ec4c84f5c6c (patch)
treed943c3dd7241ccabde37540e15c28517a0741399
parent6b13e9638c8fa795d4c1f1e8036eee2beb8517d6 (diff)
downloadydb-d8296c2cef1913f2580fefe89ef24ec4c84f5c6c.tar.gz
Limit S3 Read Inflight
-rw-r--r--ydb/core/yq/libs/config/protos/read_actors_factory.proto1
-rw-r--r--ydb/core/yq/libs/init/init.cpp3
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp73
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h1
4 files changed, 49 insertions, 29 deletions
diff --git a/ydb/core/yq/libs/config/protos/read_actors_factory.proto b/ydb/core/yq/libs/config/protos/read_actors_factory.proto
index 337b639a6ce..ab5de69f82d 100644
--- a/ydb/core/yq/libs/config/protos/read_actors_factory.proto
+++ b/ydb/core/yq/libs/config/protos/read_actors_factory.proto
@@ -11,6 +11,7 @@ import "ydb/library/yql/providers/s3/proto/retry_config.proto";
message TS3ReadActorFactoryConfig {
NYql.NS3.TRetryConfig RetryConfig = 1;
uint64 RowsInBatch = 2; // Default = 1000
+ uint64 MaxInflight = 3; // Default = 10
}
message TPqReadActorFactoryConfig {
diff --git a/ydb/core/yq/libs/init/init.cpp b/ydb/core/yq/libs/init/init.cpp
index bfc26130148..dd6b1bd0968 100644
--- a/ydb/core/yq/libs/init/init.cpp
+++ b/ydb/core/yq/libs/init/init.cpp
@@ -167,6 +167,9 @@ void Init(
if (const ui64 rowsInBatch = s3readConfig.GetRowsInBatch()) {
readActorFactoryCfg.RowsInBatch = rowsInBatch;
}
+ if (const ui64 maxInflight = s3readConfig.GetMaxInflight()) {
+ readActorFactoryCfg.MaxInflight = maxInflight;
+ }
RegisterDqPqReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory, !protoConfig.GetReadActorsFactoryConfig().GetPqReadActorFactoryConfig().GetCookieCommitMode());
RegisterYdbReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory);
RegisterS3ReadActorFactory(*asyncIoFactory, credentialsFactory,
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
index 0235992e459..8a655ffa300 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
@@ -157,14 +157,15 @@ struct TEvPrivate {
};
struct TEvReadFinished : public TEventLocal<TEvReadFinished, EvReadFinished> {
- TEvReadFinished(ui64 ingressBytes = 0)
- : IngressBytes(ingressBytes) {
+ TEvReadFinished(size_t pathIndex, ui64 ingressBytes = 0)
+ : PathIndex(pathIndex), IngressBytes(ingressBytes) {
}
- TEvReadFinished(TIssues&& issues, ui64 ingressBytes = 0)
- : Issues(std::move(issues)), IngressBytes(ingressBytes) {
+ TEvReadFinished(size_t pathIndex, TIssues&& issues, ui64 ingressBytes = 0)
+ : PathIndex(pathIndex), Issues(std::move(issues)), IngressBytes(ingressBytes) {
}
+ const size_t PathIndex;
TIssues Issues;
ui64 IngressBytes;
};
@@ -454,16 +455,16 @@ void OnNewData(TActorSystem* actorSystem, const TActorId& self, const TActorId&
actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvDataPart(std::move(data))));
}
-void OnDownloadFinished(TActorSystem* actorSystem, const TActorId& self, const TActorId& parent, TIssues issues) {
- actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvReadFinished(std::move(issues))));
+void OnDownloadFinished(TActorSystem* actorSystem, const TActorId& self, const TActorId& parent, size_t pathIndex, TIssues issues) {
+ actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvReadFinished(pathIndex, std::move(issues))));
}
-void DownloadStart(const TRetryStuff::TPtr& retryStuff, TActorSystem* actorSystem, const TActorId& self, const TActorId& parent) {
+void DownloadStart(const TRetryStuff::TPtr& retryStuff, TActorSystem* actorSystem, const TActorId& self, const TActorId& parent, size_t pathIndex) {
retryStuff->CancelHook = retryStuff->Gateway->Download(retryStuff->Url,
retryStuff->Headers, retryStuff->Offset, retryStuff->SizeLimit,
std::bind(&OnDownloadStart, actorSystem, self, parent, std::placeholders::_1),
std::bind(&OnNewData, actorSystem, self, parent, std::placeholders::_1),
- std::bind(&OnDownloadFinished, actorSystem, self, parent, std::placeholders::_1));
+ std::bind(&OnDownloadFinished, actorSystem, self, parent, pathIndex, std::placeholders::_1));
}
template <typename T>
@@ -594,7 +595,7 @@ public:
if (!RetryStuff->IsCancelled() && RetryStuff->NextRetryDelay && RetryStuff->SizeLimit > 0ULL) {
LOG_CORO_D("TS3ReadCoroImpl", "TS3ReadCoroActor" << ": " << SelfActorId << ", TxId: " << RetryStuff->TxId << ". Retry Download, Url: " << RetryStuff->Url << ", Offset: " << RetryStuff->Offset << ", request id: [" << RetryStuff->RequestId << "]");
- GetActorSystem()->Schedule(*RetryStuff->NextRetryDelay, new IEventHandle(ParentActorId, SelfActorId, new TEvPrivate::TEvRetryEventFunc(std::bind(&DownloadStart, RetryStuff, GetActorSystem(), SelfActorId, ParentActorId))));
+ GetActorSystem()->Schedule(*RetryStuff->NextRetryDelay, new IEventHandle(ParentActorId, SelfActorId, new TEvPrivate::TEvRetryEventFunc(std::bind(&DownloadStart, RetryStuff, GetActorSystem(), SelfActorId, ParentActorId, PathIndex))));
} else {
LOG_CORO_D("TS3ReadCoroImpl", "TEvReadFinished, Url: " << RetryStuff->Url << ", Offset: " << RetryStuff->Offset << ", LastOffset: " << LastOffset << ", Error: " << ServerReturnedError << ", LastData: " << GetLastDataAsText() << ", request id: [" << RetryStuff->RequestId << "]");
InputFinished = true;
@@ -738,7 +739,7 @@ private:
if (issues)
Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvAsyncInputError(InputIndex, std::move(issues), fatalCode));
else
- Send(ParentActorId, new TEvPrivate::TEvReadFinished(IngressBytes));
+ Send(ParentActorId, new TEvPrivate::TEvReadFinished(PathIndex, IngressBytes));
} catch (const TDtorException&) {
return RetryStuff->Cancel();
} catch (const std::exception& err) {
@@ -849,20 +850,22 @@ private:
class TS3ReadCoroActor : public TActorCoro {
public:
- TS3ReadCoroActor(THolder<TS3ReadCoroImpl> impl, TRetryStuff::TPtr retryStuff)
+ TS3ReadCoroActor(THolder<TS3ReadCoroImpl> impl, TRetryStuff::TPtr retryStuff, size_t pathIndex)
: TActorCoro(THolder<TActorCoroImpl>(impl.Release()))
, RetryStuff(std::move(retryStuff))
+ , PathIndex(pathIndex)
{}
private:
void Registered(TActorSystem* actorSystem, const TActorId& parent) override {
TActorCoro::Registered(actorSystem, parent); // Calls TActorCoro::OnRegister and sends bootstrap event to ourself.
if (RetryStuff->Url.substr(0, 6) != "file://") {
LOG_DEBUG_S(*actorSystem, NKikimrServices::KQP_COMPUTE, "TS3ReadCoroActor" << ": " << SelfId() << ", TxId: " << RetryStuff->TxId << ". " << "Start Download, Url: " << RetryStuff->Url << ", Offset: " << RetryStuff->Offset << ", request id: [" << RetryStuff->RequestId << "]");
- DownloadStart(RetryStuff, actorSystem, SelfId(), parent);
+ DownloadStart(RetryStuff, actorSystem, SelfId(), parent, PathIndex);
}
}
const TRetryStuff::TPtr RetryStuff;
+ const size_t PathIndex;
};
ui64 GetSizeOfData(const arrow::ArrayData& data) {
@@ -931,16 +934,21 @@ public:
void Bootstrap() {
LOG_D("TS3StreamReadActor", "Bootstrap");
Become(&TS3StreamReadActor::StateFunc);
- for (size_t pathInd = 0; pathInd < Paths.size(); ++pathInd) {
- const TPath& path = Paths[pathInd];
- const TString requestId = CreateGuidAsString();
- auto stuff = std::make_shared<TRetryStuff>(Gateway, Url + std::get<TString>(path), MakeHeaders(Token, requestId), std::get<std::size_t>(path), TxId, requestId, RetryPolicy);
- RetryStuffForFile.push_back(stuff);
- auto impl = MakeHolder<TS3ReadCoroImpl>(InputIndex, TxId, ComputeActorId, stuff, ReadSpec, pathInd + StartPathIndex, std::get<TString>(path), Url, MaxBlocksInFly, ReadActorFactoryCfg);
- RegisterWithSameMailbox(std::make_unique<TS3ReadCoroActor>(std::move(impl), std::move(stuff)).release());
+ while (CurrentPathIndex < std::min(ReadActorFactoryCfg.MaxInflight, Paths.size())) {
+ RegisterCoro(CurrentPathIndex++);
}
}
+ void RegisterCoro(size_t index) {
+ const TPath& path = Paths[index];
+ const TString requestId = CreateGuidAsString();
+ auto stuff = std::make_shared<TRetryStuff>(Gateway, Url + std::get<TString>(path), MakeHeaders(Token, requestId), std::get<std::size_t>(path), TxId, requestId, RetryPolicy);
+ auto pathIndex = index + StartPathIndex;
+ RetryStuffForFile.emplace(pathIndex, stuff);
+ auto impl = MakeHolder<TS3ReadCoroImpl>(InputIndex, TxId, ComputeActorId, stuff, ReadSpec, pathIndex, std::get<TString>(path), Url, MaxBlocksInFly, ReadActorFactoryCfg);
+ RegisterWithSameMailbox(std::make_unique<TS3ReadCoroActor>(std::move(impl), std::move(stuff), pathIndex).release());
+ }
+
static constexpr char ActorName[] = "S3_STREAM_READ_ACTOR";
private:
@@ -1031,8 +1039,8 @@ private:
// IActor & IDqComputeActorAsyncInput
void PassAway() override { // Is called from Compute Actor
LOG_D("TS3StreamReadActor", "PassAway");
- for (auto stuff: RetryStuffForFile) {
- stuff->Cancel();
+ for (auto pair: RetryStuffForFile) {
+ pair.second->Cancel();
}
ContainerCache.Clear();
TActorBootstrapped<TS3StreamReadActor>::PassAway();
@@ -1073,21 +1081,27 @@ private:
void HandleReadFinished(TEvPrivate::TEvReadFinished::TPtr& ev) {
IngressBytes = ev->Get()->IngressBytes;
+ RetryStuffForFile.erase(ev->Get()->PathIndex);
Y_VERIFY(Count);
--Count;
- /*
- If an empty range is being downloaded on the last file,
- then we need to pass the information to Compute Actor that
- the download of all data is finished in this place
- */
- if (Blocks.empty() && Count == 0) {
- Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived(InputIndex));
+
+ if (CurrentPathIndex < Paths.size()) {
+ RegisterCoro(CurrentPathIndex++);
+ } else {
+ /*
+ If an empty range is being downloaded on the last file,
+ then we need to pass the information to Compute Actor that
+ the download of all data is finished in this place
+ */
+ if (Blocks.empty() && Count == 0) {
+ Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived(InputIndex));
+ }
}
}
const TS3ReadActorFactoryConfig ReadActorFactoryCfg;
const IHTTPGateway::TPtr Gateway;
- std::vector<TRetryStuff::TPtr> RetryStuffForFile;
+ THashMap<size_t, TRetryStuff::TPtr> RetryStuffForFile;
const THolderFactory& HolderFactory;
TPlainContainerCache ContainerCache;
TPlainContainerCache ArrowTupleContainerCache;
@@ -1108,6 +1122,7 @@ private:
ui32 Count;
const std::size_t MaxBlocksInFly;
ui64 IngressBytes = 0;
+ size_t CurrentPathIndex = 0;
};
using namespace NKikimr::NMiniKQL;
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h
index 7ee8b1d4c59..bf6c6c7eab5 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h
@@ -13,6 +13,7 @@ namespace NYql::NDq {
struct TS3ReadActorFactoryConfig {
ui64 RowsInBatch = 1000;
+ ui64 MaxInflight = 1000;
};
void RegisterS3ReadActorFactory(