diff options
author | hor911 <hor911@ydb.tech> | 2023-08-25 13:23:09 +0300 |
---|---|---|
committer | hor911 <hor911@ydb.tech> | 2023-08-25 14:07:28 +0300 |
commit | b135460aae0ab7483ff22ea433e91d59e5599077 (patch) | |
tree | 441c5e42c7b17ba30670b43d71ae10b8ee9d5c44 | |
parent | 03b54fe9cee65c8b7f2a74c9a5524f35980da866 (diff) | |
download | ydb-b135460aae0ab7483ff22ea433e91d59e5599077.tar.gz |
Resource Snapshot in Data Executer
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_data_executer.cpp | 52 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_impl.h | 113 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_scan_executer.cpp | 8 | ||||
-rw-r--r-- | ydb/core/kqp/rm_service/kqp_rm_service.cpp | 49 | ||||
-rw-r--r-- | ydb/core/kqp/rm_service/kqp_rm_service.h | 1 |
5 files changed, 166 insertions, 57 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index b7b6373ccc..6a73fbac76 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -294,6 +294,7 @@ public: hFunc(TEvKqpExecuter::TEvShardsResolveStatus, HandleResolve); hFunc(TEvKqp::TEvAbortExecution, HandleAbortExecution); hFunc(NMetadata::NProvider::TEvRefreshSubscriberData, HandleRefreshSubscriberData); + hFunc(TEvPrivate::TEvResourcesSnapshot, HandleResolve); default: UnexpectedEvent("WaitResolveState", ev->GetTypeRewrite()); } @@ -1642,23 +1643,46 @@ private: YQL_ENSURE(result.second); } - void ExecuteAfterFetchingSecrets() { - bool waitSecretsSnapshot = false; + void HandleResolve(TEvPrivate::TEvResourcesSnapshot::TPtr& ev) { + if (ev->Get()->Snapshot.empty()) { + LOG_E("Can not find default state storage group for database " << Database); + } + ResourceSnapshot = std::move(ev->Get()->Snapshot); + ResourceSnapshotRequired = false; + if (!SecretSnapshotRequired) { + Execute(); + } + } + + void DoExecute() { for (const auto& transaction : Request.Transactions) { - if (!transaction.Body->GetSecretNames().empty()) { - FetchSecrets(); - waitSecretsSnapshot = true; - break; + if (!transaction.Body->GetSecretNames().empty()) { + SecretSnapshotRequired = true; + } + for (const auto& stage : transaction.Body->GetStages()) { + if (stage.SourcesSize() > 0 && stage.GetSources(0).GetTypeCase() == NKqpProto::TKqpSource::kExternalSource) { + ResourceSnapshotRequired = true; + HasExternalSources = true; + } } } - if (!waitSecretsSnapshot) { - Execute(); + if (!SecretSnapshotRequired && !ResourceSnapshotRequired) { + return Execute(); + } + if (SecretSnapshotRequired) { + FetchSecrets(); + } + if (ResourceSnapshotRequired) { + GetResourcesSnapshot(); } } void OnSecretsFetched() override { - Execute(); + SecretSnapshotRequired = false; + if (!ResourceSnapshotRequired) { + Execute(); + } } void Execute() { @@ -1713,8 +1737,7 @@ private: } break; case NKqpProto::TKqpSource::kExternalSource: - BuildReadTasksFromSource(stageInfo, secureParams); - HasExternalSources = true; + BuildReadTasksFromSource(stageInfo, secureParams, ResourceSnapshot); break; default: YQL_ENSURE(false, "unknown source type"); @@ -1861,7 +1884,7 @@ private: void HandleResolve(TEvKqpExecuter::TEvTableResolveStatus::TPtr& ev) { if (!TBase::HandleResolve(ev)) return; - ExecuteAfterFetchingSecrets(); + DoExecute(); } void HandleResolve(TEvKqpExecuter::TEvShardsResolveStatus::TPtr& ev) { @@ -2147,7 +2170,7 @@ private: Planner = CreateKqpPlanner(TasksGraph, TxId, SelfId(), GetSnapshot(), Database, UserToken, Deadline.GetOrElse(TInstant::Zero()), Request.StatsMode, false, Nothing(), - ExecuterSpan, {}, ExecuterRetriesConfig, dataQueryPool /* isDataQuery */, Request.MkqlMemoryLimit, AsyncIoFactory, enableOptForTasks); + ExecuterSpan, std::move(ResourceSnapshot), ExecuterRetriesConfig, dataQueryPool /* isDataQuery */, Request.MkqlMemoryLimit, AsyncIoFactory, enableOptForTasks); auto err = Planner->PlanExecution(); if (err) { @@ -2353,6 +2376,9 @@ private: bool UnknownAffectedShardCount = false; bool HasExternalSources = false; + bool SecretSnapshotRequired = false; + bool ResourceSnapshotRequired = false; + TVector<NKikimrKqp::TKqpNodeResources> ResourceSnapshot; ui64 TxCoordinator = 0; THashMap<ui64, TShardState> ShardStates; diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index 6e0bd07d1a..49810f3b8f 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -76,40 +76,39 @@ inline bool IsDebugLogEnabled() { TActorId ReportToRl(ui64 ru, const TString& database, const TString& userToken, const NKikimrKqp::TRlPath& path); -template <class TDerived, EExecType ExecType> -class TKqpExecuterBase : public TActorBootstrapped<TDerived> { -protected: - struct TEvPrivate { - enum EEv { - EvRetry = EventSpaceBegin(TEvents::ES_PRIVATE), - EvResourcesSnapshot, - EvReattachToShard, - }; +struct TEvPrivate { + enum EEv { + EvRetry = EventSpaceBegin(TEvents::ES_PRIVATE), + EvResourcesSnapshot, + EvReattachToShard, + }; - struct TEvRetry : public TEventLocal<TEvRetry, EEv::EvRetry> { - ui32 RequestId; - TActorId Target; + struct TEvRetry : public TEventLocal<TEvRetry, EEv::EvRetry> { + ui32 RequestId; + TActorId Target; - TEvRetry(ui64 requestId, const TActorId& target) - : RequestId(requestId) - , Target(target) {} - }; + TEvRetry(ui64 requestId, const TActorId& target) + : RequestId(requestId) + , Target(target) {} + }; - struct TEvResourcesSnapshot : public TEventLocal<TEvResourcesSnapshot, EEv::EvResourcesSnapshot> { - TVector<NKikimrKqp::TKqpNodeResources> Snapshot; + struct TEvResourcesSnapshot : public TEventLocal<TEvResourcesSnapshot, EEv::EvResourcesSnapshot> { + TVector<NKikimrKqp::TKqpNodeResources> Snapshot; - TEvResourcesSnapshot(TVector<NKikimrKqp::TKqpNodeResources>&& snapshot) - : Snapshot(std::move(snapshot)) {} - }; + TEvResourcesSnapshot(TVector<NKikimrKqp::TKqpNodeResources>&& snapshot) + : Snapshot(std::move(snapshot)) {} + }; - struct TEvReattachToShard : public TEventLocal<TEvReattachToShard, EvReattachToShard> { - const ui64 TabletId; + struct TEvReattachToShard : public TEventLocal<TEvReattachToShard, EvReattachToShard> { + const ui64 TabletId; - explicit TEvReattachToShard(ui64 tabletId) - : TabletId(tabletId) {} - }; + explicit TEvReattachToShard(ui64 tabletId) + : TabletId(tabletId) {} }; +}; +template <class TDerived, EExecType ExecType> +class TKqpExecuterBase : public TActorBootstrapped<TDerived> { public: TKqpExecuterBase(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, @@ -410,6 +409,14 @@ protected: return secureParams; } + void GetResourcesSnapshot() { + GetKqpResourceManager()->RequestClusterResourcesInfo( + [as = TlsActivationContext->ActorSystem(), self = SelfId()](TVector<NKikimrKqp::TKqpNodeResources>&& resources) { + TAutoPtr<IEventHandle> eh = new IEventHandle(self, self, new TEvPrivate::TEvResourcesSnapshot(std::move(resources))); + as->Send(eh); + }); + } + protected: bool CheckExecutionComplete() { if (Planner && Planner->GetPendingComputeActors().empty() && Planner->GetPendingComputeTasks().empty()) { @@ -764,7 +771,7 @@ protected: } } - void BuildReadTasksFromSource(TStageInfo& stageInfo, const TMap<TString, TString>& secureParams) { + void BuildReadTasksFromSource(TStageInfo& stageInfo, const TMap<TString, TString>& secureParams, const TVector<NKikimrKqp::TKqpNodeResources>& resourceSnapshot) { const auto& stage = stageInfo.Meta.GetStage(stageInfo.Id); YQL_ENSURE(stage.GetSources(0).HasExternalSource()); @@ -772,7 +779,26 @@ protected: const auto& stageSource = stage.GetSources(0); const auto& externalSource = stageSource.GetExternalSource(); - for (const TString& partitionParam : externalSource.GetPartitionedTaskParams()) { + + ui32 taskCount = externalSource.GetPartitionedTaskParams().size(); + + if (!resourceSnapshot.empty()) { + ui32 maxTaskcount = resourceSnapshot.size() * 2; + if (taskCount > maxTaskcount) { + taskCount = maxTaskcount; + } + } + + auto sourceName = externalSource.GetSourceName(); + TString structuredToken; + if (sourceName) { + structuredToken = NYql::CreateStructuredTokenParser(externalSource.GetAuthInfo()).ToBuilder().ReplaceReferences(secureParams).ToJson(); + } + + TVector<ui64> tasksIds; + + // generate all tasks + for (ui32 i = 0; i < taskCount; i++) { auto& task = TasksGraph.AddTask(stageInfo); auto& input = task.Inputs[stageSource.GetInputIndex()]; @@ -780,17 +806,32 @@ protected: input.SourceSettings = externalSource.GetSettings(); input.SourceType = externalSource.GetType(); - task.Meta.ReadRanges.push_back(partitionParam); - - auto sourceName = externalSource.GetSourceName(); - if (sourceName) { - auto structuredToken = NYql::CreateStructuredTokenParser(externalSource.GetAuthInfo()).ToBuilder().ReplaceReferences(secureParams).ToJson(); + if (structuredToken) { task.Meta.SecureParams.emplace(sourceName, structuredToken); } - task.Meta.Type = TTaskMeta::TTaskType::Compute; + if (resourceSnapshot.empty()) { + task.Meta.Type = TTaskMeta::TTaskType::Compute; + } else { + task.Meta.NodeId = resourceSnapshot[i % resourceSnapshot.size()].GetNodeId(); + task.Meta.Type = TTaskMeta::TTaskType::Scan; + } - BuildSinks(stage, task, secureParams); + tasksIds.push_back(task.Id); + } + + // distribute read ranges between them + ui32 currentTaskIndex = 0; + for (const TString& partitionParam : externalSource.GetPartitionedTaskParams()) { + TasksGraph.GetTask(tasksIds[currentTaskIndex]).Meta.ReadRanges.push_back(partitionParam); + if (++currentTaskIndex >= tasksIds.size()) { + currentTaskIndex = 0; + } + } + + // finish building + for (auto taskId : tasksIds) { + BuildSinks(stage, TasksGraph.GetTask(taskId), secureParams); } } @@ -950,7 +991,7 @@ protected: protected: void UnexpectedEvent(const TString& state, ui32 eventType) { LOG_C("TKqpExecuter, unexpected event: " << eventType << ", at state:" << state << ", selfID: " << this->SelfId()); - InternalError(TStringBuilder() << "Unexpected event at TKqpScanExecuter, state: " << state + InternalError(TStringBuilder() << "Unexpected event at TKqpExecuter, state: " << state << ", event: " << eventType); } diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp index 4c79c54dd1..b640dc214a 100644 --- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp @@ -475,14 +475,6 @@ private: } } - void GetResourcesSnapshot() { - GetKqpResourceManager()->RequestClusterResourcesInfo( - [as = TlsActivationContext->ActorSystem(), self = SelfId()](TVector<NKikimrKqp::TKqpNodeResources>&& resources) { - TAutoPtr<IEventHandle> eh = new IEventHandle(self, self, new TEvPrivate::TEvResourcesSnapshot(std::move(resources))); - as->Send(eh); - }); - } - void HandleResolve(TEvKqpExecuter::TEvTableResolveStatus::TPtr& ev) { if (!TBase::HandleResolve(ev)) return; TSet<ui64> shardIds; diff --git a/ydb/core/kqp/rm_service/kqp_rm_service.cpp b/ydb/core/kqp/rm_service/kqp_rm_service.cpp index 874a4b1721..9d46d332fe 100644 --- a/ydb/core/kqp/rm_service/kqp_rm_service.cpp +++ b/ydb/core/kqp/rm_service/kqp_rm_service.cpp @@ -317,6 +317,11 @@ public: void FreeResources(ui64 txId, ui64 taskId, const TKqpResourcesRequest& resources) override { + if (resources.MemoryPool == EKqpMemoryPool::DataQuery) { + NotifyExternalResourcesFreed(txId, taskId, resources); + return; + } + auto& txBucket = TxBucket(txId); { @@ -492,6 +497,50 @@ public: FireResourcesPublishing(); } + void NotifyExternalResourcesFreed(ui64 txId, ui64 taskId, const TKqpResourcesRequest& resources) override { + LOG_AS_D("TxId: " << txId << ", taskId: " << taskId << ". External free: " << resources.ToString()); + + YQL_ENSURE(resources.MemoryPool == EKqpMemoryPool::DataQuery); + + ui64 releaseMemory = 0; + + auto& txBucket = TxBucket(txId); + with_lock (txBucket.Lock) { + auto txIt = txBucket.Txs.find(txId); + if (txIt == txBucket.Txs.end()) { + return; + } + + auto taskIt = txIt->second.Tasks.find(taskId); + if (taskIt == txIt->second.Tasks.end()) { + return; + } + + if (taskIt->second.ExternalDataQueryMemory <= resources.Memory) { + releaseMemory = taskIt->second.ExternalDataQueryMemory; + if (txIt->second.Tasks.size() == 1) { + txBucket.Txs.erase(txId); + } else { + txIt->second.Tasks.erase(taskIt); + txIt->second.TxExternalDataQueryMemory -= releaseMemory; + } + } else { + releaseMemory = resources.Memory; + taskIt->second.ExternalDataQueryMemory -= resources.Memory; + } + } // with_lock (txBucket.Lock) + + with_lock (Lock) { + Y_VERIFY_DEBUG(ExternalDataQueryMemory >= releaseMemory); + ExternalDataQueryMemory -= releaseMemory; + } // with_lock (Lock) + + Counters->RmExternalMemory->Sub(releaseMemory); + Y_VERIFY_DEBUG(Counters->RmExternalMemory->Val() >= 0); + + FireResourcesPublishing(); + } + void NotifyExternalResourcesFreed(ui64 txId, ui64 taskId) override { LOG_AS_D("TxId: " << txId << ", taskId: " << taskId << ". External free."); diff --git a/ydb/core/kqp/rm_service/kqp_rm_service.h b/ydb/core/kqp/rm_service/kqp_rm_service.h index fbcff7151c..fcb768ae04 100644 --- a/ydb/core/kqp/rm_service/kqp_rm_service.h +++ b/ydb/core/kqp/rm_service/kqp_rm_service.h @@ -85,6 +85,7 @@ public: virtual void FreeResources(ui64 txId) = 0; virtual void NotifyExternalResourcesAllocated(ui64 txId, ui64 taskId, const TKqpResourcesRequest& resources) = 0; + virtual void NotifyExternalResourcesFreed(ui64 txId, ui64 taskId, const TKqpResourcesRequest& resources) = 0; virtual void NotifyExternalResourcesFreed(ui64 txId, ui64 taskId) = 0; virtual void RequestClusterResourcesInfo(TOnResourcesSnapshotCallback&& callback) = 0; |