aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhor911 <hor911@ydb.tech>2023-08-25 13:23:09 +0300
committerhor911 <hor911@ydb.tech>2023-08-25 14:07:28 +0300
commitb135460aae0ab7483ff22ea433e91d59e5599077 (patch)
tree441c5e42c7b17ba30670b43d71ae10b8ee9d5c44
parent03b54fe9cee65c8b7f2a74c9a5524f35980da866 (diff)
downloadydb-b135460aae0ab7483ff22ea433e91d59e5599077.tar.gz
Resource Snapshot in Data Executer
-rw-r--r--ydb/core/kqp/executer_actor/kqp_data_executer.cpp52
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.h113
-rw-r--r--ydb/core/kqp/executer_actor/kqp_scan_executer.cpp8
-rw-r--r--ydb/core/kqp/rm_service/kqp_rm_service.cpp49
-rw-r--r--ydb/core/kqp/rm_service/kqp_rm_service.h1
5 files changed, 166 insertions, 57 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
index b7b6373ccc..6a73fbac76 100644
--- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
@@ -294,6 +294,7 @@ public:
hFunc(TEvKqpExecuter::TEvShardsResolveStatus, HandleResolve);
hFunc(TEvKqp::TEvAbortExecution, HandleAbortExecution);
hFunc(NMetadata::NProvider::TEvRefreshSubscriberData, HandleRefreshSubscriberData);
+ hFunc(TEvPrivate::TEvResourcesSnapshot, HandleResolve);
default:
UnexpectedEvent("WaitResolveState", ev->GetTypeRewrite());
}
@@ -1642,23 +1643,46 @@ private:
YQL_ENSURE(result.second);
}
- void ExecuteAfterFetchingSecrets() {
- bool waitSecretsSnapshot = false;
+ void HandleResolve(TEvPrivate::TEvResourcesSnapshot::TPtr& ev) {
+ if (ev->Get()->Snapshot.empty()) {
+ LOG_E("Can not find default state storage group for database " << Database);
+ }
+ ResourceSnapshot = std::move(ev->Get()->Snapshot);
+ ResourceSnapshotRequired = false;
+ if (!SecretSnapshotRequired) {
+ Execute();
+ }
+ }
+
+ void DoExecute() {
for (const auto& transaction : Request.Transactions) {
- if (!transaction.Body->GetSecretNames().empty()) {
- FetchSecrets();
- waitSecretsSnapshot = true;
- break;
+ if (!transaction.Body->GetSecretNames().empty()) {
+ SecretSnapshotRequired = true;
+ }
+ for (const auto& stage : transaction.Body->GetStages()) {
+ if (stage.SourcesSize() > 0 && stage.GetSources(0).GetTypeCase() == NKqpProto::TKqpSource::kExternalSource) {
+ ResourceSnapshotRequired = true;
+ HasExternalSources = true;
+ }
}
}
- if (!waitSecretsSnapshot) {
- Execute();
+ if (!SecretSnapshotRequired && !ResourceSnapshotRequired) {
+ return Execute();
+ }
+ if (SecretSnapshotRequired) {
+ FetchSecrets();
+ }
+ if (ResourceSnapshotRequired) {
+ GetResourcesSnapshot();
}
}
void OnSecretsFetched() override {
- Execute();
+ SecretSnapshotRequired = false;
+ if (!ResourceSnapshotRequired) {
+ Execute();
+ }
}
void Execute() {
@@ -1713,8 +1737,7 @@ private:
}
break;
case NKqpProto::TKqpSource::kExternalSource:
- BuildReadTasksFromSource(stageInfo, secureParams);
- HasExternalSources = true;
+ BuildReadTasksFromSource(stageInfo, secureParams, ResourceSnapshot);
break;
default:
YQL_ENSURE(false, "unknown source type");
@@ -1861,7 +1884,7 @@ private:
void HandleResolve(TEvKqpExecuter::TEvTableResolveStatus::TPtr& ev) {
if (!TBase::HandleResolve(ev)) return;
- ExecuteAfterFetchingSecrets();
+ DoExecute();
}
void HandleResolve(TEvKqpExecuter::TEvShardsResolveStatus::TPtr& ev) {
@@ -2147,7 +2170,7 @@ private:
Planner = CreateKqpPlanner(TasksGraph, TxId, SelfId(), GetSnapshot(),
Database, UserToken, Deadline.GetOrElse(TInstant::Zero()), Request.StatsMode, false, Nothing(),
- ExecuterSpan, {}, ExecuterRetriesConfig, dataQueryPool /* isDataQuery */, Request.MkqlMemoryLimit, AsyncIoFactory, enableOptForTasks);
+ ExecuterSpan, std::move(ResourceSnapshot), ExecuterRetriesConfig, dataQueryPool /* isDataQuery */, Request.MkqlMemoryLimit, AsyncIoFactory, enableOptForTasks);
auto err = Planner->PlanExecution();
if (err) {
@@ -2353,6 +2376,9 @@ private:
bool UnknownAffectedShardCount = false;
bool HasExternalSources = false;
+ bool SecretSnapshotRequired = false;
+ bool ResourceSnapshotRequired = false;
+ TVector<NKikimrKqp::TKqpNodeResources> ResourceSnapshot;
ui64 TxCoordinator = 0;
THashMap<ui64, TShardState> ShardStates;
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
index 6e0bd07d1a..49810f3b8f 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
@@ -76,40 +76,39 @@ inline bool IsDebugLogEnabled() {
TActorId ReportToRl(ui64 ru, const TString& database, const TString& userToken,
const NKikimrKqp::TRlPath& path);
-template <class TDerived, EExecType ExecType>
-class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
-protected:
- struct TEvPrivate {
- enum EEv {
- EvRetry = EventSpaceBegin(TEvents::ES_PRIVATE),
- EvResourcesSnapshot,
- EvReattachToShard,
- };
+struct TEvPrivate {
+ enum EEv {
+ EvRetry = EventSpaceBegin(TEvents::ES_PRIVATE),
+ EvResourcesSnapshot,
+ EvReattachToShard,
+ };
- struct TEvRetry : public TEventLocal<TEvRetry, EEv::EvRetry> {
- ui32 RequestId;
- TActorId Target;
+ struct TEvRetry : public TEventLocal<TEvRetry, EEv::EvRetry> {
+ ui32 RequestId;
+ TActorId Target;
- TEvRetry(ui64 requestId, const TActorId& target)
- : RequestId(requestId)
- , Target(target) {}
- };
+ TEvRetry(ui64 requestId, const TActorId& target)
+ : RequestId(requestId)
+ , Target(target) {}
+ };
- struct TEvResourcesSnapshot : public TEventLocal<TEvResourcesSnapshot, EEv::EvResourcesSnapshot> {
- TVector<NKikimrKqp::TKqpNodeResources> Snapshot;
+ struct TEvResourcesSnapshot : public TEventLocal<TEvResourcesSnapshot, EEv::EvResourcesSnapshot> {
+ TVector<NKikimrKqp::TKqpNodeResources> Snapshot;
- TEvResourcesSnapshot(TVector<NKikimrKqp::TKqpNodeResources>&& snapshot)
- : Snapshot(std::move(snapshot)) {}
- };
+ TEvResourcesSnapshot(TVector<NKikimrKqp::TKqpNodeResources>&& snapshot)
+ : Snapshot(std::move(snapshot)) {}
+ };
- struct TEvReattachToShard : public TEventLocal<TEvReattachToShard, EvReattachToShard> {
- const ui64 TabletId;
+ struct TEvReattachToShard : public TEventLocal<TEvReattachToShard, EvReattachToShard> {
+ const ui64 TabletId;
- explicit TEvReattachToShard(ui64 tabletId)
- : TabletId(tabletId) {}
- };
+ explicit TEvReattachToShard(ui64 tabletId)
+ : TabletId(tabletId) {}
};
+};
+template <class TDerived, EExecType ExecType>
+class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
public:
TKqpExecuterBase(IKqpGateway::TExecPhysicalRequest&& request, const TString& database,
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
@@ -410,6 +409,14 @@ protected:
return secureParams;
}
+ void GetResourcesSnapshot() {
+ GetKqpResourceManager()->RequestClusterResourcesInfo(
+ [as = TlsActivationContext->ActorSystem(), self = SelfId()](TVector<NKikimrKqp::TKqpNodeResources>&& resources) {
+ TAutoPtr<IEventHandle> eh = new IEventHandle(self, self, new TEvPrivate::TEvResourcesSnapshot(std::move(resources)));
+ as->Send(eh);
+ });
+ }
+
protected:
bool CheckExecutionComplete() {
if (Planner && Planner->GetPendingComputeActors().empty() && Planner->GetPendingComputeTasks().empty()) {
@@ -764,7 +771,7 @@ protected:
}
}
- void BuildReadTasksFromSource(TStageInfo& stageInfo, const TMap<TString, TString>& secureParams) {
+ void BuildReadTasksFromSource(TStageInfo& stageInfo, const TMap<TString, TString>& secureParams, const TVector<NKikimrKqp::TKqpNodeResources>& resourceSnapshot) {
const auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);
YQL_ENSURE(stage.GetSources(0).HasExternalSource());
@@ -772,7 +779,26 @@ protected:
const auto& stageSource = stage.GetSources(0);
const auto& externalSource = stageSource.GetExternalSource();
- for (const TString& partitionParam : externalSource.GetPartitionedTaskParams()) {
+
+ ui32 taskCount = externalSource.GetPartitionedTaskParams().size();
+
+ if (!resourceSnapshot.empty()) {
+ ui32 maxTaskcount = resourceSnapshot.size() * 2;
+ if (taskCount > maxTaskcount) {
+ taskCount = maxTaskcount;
+ }
+ }
+
+ auto sourceName = externalSource.GetSourceName();
+ TString structuredToken;
+ if (sourceName) {
+ structuredToken = NYql::CreateStructuredTokenParser(externalSource.GetAuthInfo()).ToBuilder().ReplaceReferences(secureParams).ToJson();
+ }
+
+ TVector<ui64> tasksIds;
+
+ // generate all tasks
+ for (ui32 i = 0; i < taskCount; i++) {
auto& task = TasksGraph.AddTask(stageInfo);
auto& input = task.Inputs[stageSource.GetInputIndex()];
@@ -780,17 +806,32 @@ protected:
input.SourceSettings = externalSource.GetSettings();
input.SourceType = externalSource.GetType();
- task.Meta.ReadRanges.push_back(partitionParam);
-
- auto sourceName = externalSource.GetSourceName();
- if (sourceName) {
- auto structuredToken = NYql::CreateStructuredTokenParser(externalSource.GetAuthInfo()).ToBuilder().ReplaceReferences(secureParams).ToJson();
+ if (structuredToken) {
task.Meta.SecureParams.emplace(sourceName, structuredToken);
}
- task.Meta.Type = TTaskMeta::TTaskType::Compute;
+ if (resourceSnapshot.empty()) {
+ task.Meta.Type = TTaskMeta::TTaskType::Compute;
+ } else {
+ task.Meta.NodeId = resourceSnapshot[i % resourceSnapshot.size()].GetNodeId();
+ task.Meta.Type = TTaskMeta::TTaskType::Scan;
+ }
- BuildSinks(stage, task, secureParams);
+ tasksIds.push_back(task.Id);
+ }
+
+ // distribute read ranges between them
+ ui32 currentTaskIndex = 0;
+ for (const TString& partitionParam : externalSource.GetPartitionedTaskParams()) {
+ TasksGraph.GetTask(tasksIds[currentTaskIndex]).Meta.ReadRanges.push_back(partitionParam);
+ if (++currentTaskIndex >= tasksIds.size()) {
+ currentTaskIndex = 0;
+ }
+ }
+
+ // finish building
+ for (auto taskId : tasksIds) {
+ BuildSinks(stage, TasksGraph.GetTask(taskId), secureParams);
}
}
@@ -950,7 +991,7 @@ protected:
protected:
void UnexpectedEvent(const TString& state, ui32 eventType) {
LOG_C("TKqpExecuter, unexpected event: " << eventType << ", at state:" << state << ", selfID: " << this->SelfId());
- InternalError(TStringBuilder() << "Unexpected event at TKqpScanExecuter, state: " << state
+ InternalError(TStringBuilder() << "Unexpected event at TKqpExecuter, state: " << state
<< ", event: " << eventType);
}
diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
index 4c79c54dd1..b640dc214a 100644
--- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
@@ -475,14 +475,6 @@ private:
}
}
- void GetResourcesSnapshot() {
- GetKqpResourceManager()->RequestClusterResourcesInfo(
- [as = TlsActivationContext->ActorSystem(), self = SelfId()](TVector<NKikimrKqp::TKqpNodeResources>&& resources) {
- TAutoPtr<IEventHandle> eh = new IEventHandle(self, self, new TEvPrivate::TEvResourcesSnapshot(std::move(resources)));
- as->Send(eh);
- });
- }
-
void HandleResolve(TEvKqpExecuter::TEvTableResolveStatus::TPtr& ev) {
if (!TBase::HandleResolve(ev)) return;
TSet<ui64> shardIds;
diff --git a/ydb/core/kqp/rm_service/kqp_rm_service.cpp b/ydb/core/kqp/rm_service/kqp_rm_service.cpp
index 874a4b1721..9d46d332fe 100644
--- a/ydb/core/kqp/rm_service/kqp_rm_service.cpp
+++ b/ydb/core/kqp/rm_service/kqp_rm_service.cpp
@@ -317,6 +317,11 @@ public:
void FreeResources(ui64 txId, ui64 taskId, const TKqpResourcesRequest& resources) override {
+ if (resources.MemoryPool == EKqpMemoryPool::DataQuery) {
+ NotifyExternalResourcesFreed(txId, taskId, resources);
+ return;
+ }
+
auto& txBucket = TxBucket(txId);
{
@@ -492,6 +497,50 @@ public:
FireResourcesPublishing();
}
+ void NotifyExternalResourcesFreed(ui64 txId, ui64 taskId, const TKqpResourcesRequest& resources) override {
+ LOG_AS_D("TxId: " << txId << ", taskId: " << taskId << ". External free: " << resources.ToString());
+
+ YQL_ENSURE(resources.MemoryPool == EKqpMemoryPool::DataQuery);
+
+ ui64 releaseMemory = 0;
+
+ auto& txBucket = TxBucket(txId);
+ with_lock (txBucket.Lock) {
+ auto txIt = txBucket.Txs.find(txId);
+ if (txIt == txBucket.Txs.end()) {
+ return;
+ }
+
+ auto taskIt = txIt->second.Tasks.find(taskId);
+ if (taskIt == txIt->second.Tasks.end()) {
+ return;
+ }
+
+ if (taskIt->second.ExternalDataQueryMemory <= resources.Memory) {
+ releaseMemory = taskIt->second.ExternalDataQueryMemory;
+ if (txIt->second.Tasks.size() == 1) {
+ txBucket.Txs.erase(txId);
+ } else {
+ txIt->second.Tasks.erase(taskIt);
+ txIt->second.TxExternalDataQueryMemory -= releaseMemory;
+ }
+ } else {
+ releaseMemory = resources.Memory;
+ taskIt->second.ExternalDataQueryMemory -= resources.Memory;
+ }
+ } // with_lock (txBucket.Lock)
+
+ with_lock (Lock) {
+ Y_VERIFY_DEBUG(ExternalDataQueryMemory >= releaseMemory);
+ ExternalDataQueryMemory -= releaseMemory;
+ } // with_lock (Lock)
+
+ Counters->RmExternalMemory->Sub(releaseMemory);
+ Y_VERIFY_DEBUG(Counters->RmExternalMemory->Val() >= 0);
+
+ FireResourcesPublishing();
+ }
+
void NotifyExternalResourcesFreed(ui64 txId, ui64 taskId) override {
LOG_AS_D("TxId: " << txId << ", taskId: " << taskId << ". External free.");
diff --git a/ydb/core/kqp/rm_service/kqp_rm_service.h b/ydb/core/kqp/rm_service/kqp_rm_service.h
index fbcff7151c..fcb768ae04 100644
--- a/ydb/core/kqp/rm_service/kqp_rm_service.h
+++ b/ydb/core/kqp/rm_service/kqp_rm_service.h
@@ -85,6 +85,7 @@ public:
virtual void FreeResources(ui64 txId) = 0;
virtual void NotifyExternalResourcesAllocated(ui64 txId, ui64 taskId, const TKqpResourcesRequest& resources) = 0;
+ virtual void NotifyExternalResourcesFreed(ui64 txId, ui64 taskId, const TKqpResourcesRequest& resources) = 0;
virtual void NotifyExternalResourcesFreed(ui64 txId, ui64 taskId) = 0;
virtual void RequestClusterResourcesInfo(TOnResourcesSnapshotCallback&& callback) = 0;