diff options
author | shumkovnd <shumkovnd@yandex-team.com> | 2023-03-16 12:00:41 +0300 |
---|---|---|
committer | shumkovnd <shumkovnd@yandex-team.com> | 2023-03-16 12:00:41 +0300 |
commit | 08655020c8e2770e2462b48a54106ef30564ea69 (patch) | |
tree | 25f14c1762ea49d692558817d8dd6b21dd19d7b0 | |
parent | d1c25d0356fc7efced0d8da5091bff22b862aa50 (diff) | |
download | ydb-08655020c8e2770e2462b48a54106ef30564ea69.tar.gz |
Add sharded state to kqp_node_service
-rw-r--r-- | ydb/core/kqp/node_service/kqp_node_service.cpp | 180 | ||||
-rw-r--r-- | ydb/core/kqp/node_service/kqp_node_state.h | 144 |
2 files changed, 180 insertions, 144 deletions
diff --git a/ydb/core/kqp/node_service/kqp_node_service.cpp b/ydb/core/kqp/node_service/kqp_node_service.cpp index e24d9df17f..ef60b69503 100644 --- a/ydb/core/kqp/node_service/kqp_node_service.cpp +++ b/ydb/core/kqp/node_service/kqp_node_service.cpp @@ -49,6 +49,8 @@ TString TasksIdsStr(const TTasksCollection& tasks) { return TStringBuilder() << "[" << JoinSeq(", ", ids) << "]"; } +constexpr ui64 BucketsCount = 64; + class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> { using TBase = TActorBootstrapped<TKqpNodeService>; @@ -57,6 +59,34 @@ public: return NKikimrServices::TActivity::KQP_NODE_SERVICE; } + static void FinishKqpTask(ui64 txId, ui64 taskId, bool success, const NYql::TIssues& issues, + NKqpNode::TState& bucket) { + LOG_D("TxId: " << txId << ", finish compute task: " << taskId << ", success: " << success + << ", message: " << issues.ToOneLineString()); + + auto ctx = bucket.RemoveTask(txId, taskId, success); + + if (!ctx) { + LOG_E("TxId: " << txId << ", task: " << taskId << " unknown task"); + return; + } + + if (ctx->ComputeActorsNumber == 0) { + LOG_D("TxId: " << txId << ", requester: " << ctx->Requester << " completed"); + GetKqpResourceManager()->FreeResources(txId); + } else { + LOG_D("TxId: " << txId << ", finish compute task: " << taskId + << (success ? "" : " (cancelled)") + << ", remains " << ctx->ComputeActorsNumber << " compute actors and " + << ctx->TotalMemory << " bytes in the current request"); + GetKqpResourceManager()->FreeResources(txId, taskId); + } + + if (ctx->FinixTx) { + LOG_D("TxId: " << txId << ", requester: " << ctx->Requester << " completed"); + } + } + TKqpNodeService(const NKikimrConfig::TTableServiceConfig& config, const TIntrusivePtr<TKqpCounters>& counters, IKqpNodeComputeActorFactory* caFactory) : Config(config.GetResourceManager()) @@ -87,7 +117,7 @@ private: STATEFN(WorkState) { switch (ev->GetTypeRewrite()) { hFunc(TEvKqpNode::TEvStartKqpTasksRequest, HandleWork); - hFunc(TEvKqpNode::TEvFinishKqpTask, HandleWork); + hFunc(TEvKqpNode::TEvFinishKqpTask, HandleWork); // used only for unit tests hFunc(TEvKqpNode::TEvCancelKqpTasksRequest, HandleWork); hFunc(TEvents::TEvWakeup, HandleWork); // misc @@ -119,7 +149,9 @@ private: NKqpNode::TTasksRequest request; request.Executer = ActorIdFromProto(msg.GetExecuterActorId()); - if (State.Exists(txId, requester)) { + auto& bucket = GetStateBucketByTx(txId); + + if (bucket.Exists(txId, requester)) { LOG_E("TxId: " << txId << ", requester: " << requester << ", request already exists"); return ReplyError(txId, request.Executer, msg, NKikimrKqp::TEvStartKqpTasksResponse::INTERNAL_ERROR); } @@ -156,7 +188,7 @@ private: LOG_D("TxId: " << txId << ", channels: " << requestChannels << ", computeActors: " << msg.GetTasks().size() << ", memory: " << request.TotalMemory); - ui64 txMemory = State.GetTxMemory(txId, memoryPool) + request.TotalMemory; + auto txMemory = bucket.GetTxMemory(txId, memoryPool) + request.TotalMemory; if (txMemory > Config.GetQueryMemoryLimit()) { LOG_N("TxId: " << txId << ", requested too many memory: " << request.TotalMemory << "(" << txMemory << " for this Tx), limit: " << Config.GetQueryMemoryLimit()); @@ -243,8 +275,7 @@ private: // compute actor should not arm timer since in case of timeout it will receive TEvAbortExecution from Executer auto timeout = TDuration::MilliSeconds(msgRtSettings.GetTimeoutMs()); request.Deadline = TAppData::TimeProvider->Now() + timeout + /* gap */ TDuration::Seconds(5); - - ExpiringRequests.emplace(request.Deadline, TRequestId{txId, requester}); + bucket.InsertExpiringRequest(request.Deadline, txId, requester); } runtimeSettingsBase.ExtraMemoryAllocationPool = memoryPool; @@ -261,7 +292,6 @@ private: runtimeSettingsBase.ReportStatsSettings = NYql::NDq::TReportStatsSettings{MinStatInterval, MaxStatInterval}; TShardsScanningPolicy scanPolicy(Config.GetShardsScanningPolicy()); - auto actorSystem = TlsActivationContext->ActorSystem(); // start compute actors for (int i = 0; i < msg.GetTasks().size(); ++i) { @@ -274,9 +304,9 @@ private: "actual size: %ld, min: %ld", memoryLimits.ChannelBufferSize, Config.GetMinChannelBufferSize()); auto runtimeSettings = runtimeSettingsBase; - runtimeSettings.TerminateHandler = [actorSystem, rm = SelfId(), txId, taskId = dqTask.GetId()] + runtimeSettings.TerminateHandler = [txId, taskId = dqTask.GetId(), &bucket] (bool success, const NYql::TIssues& issues) { - actorSystem->Send(rm, new TEvKqpNode::TEvFinishKqpTask(txId, taskId, success, issues)); + FinishKqpTask(txId, taskId, success, issues, bucket); }; ETableKind tableKind = ETableKind::Unknown; @@ -318,51 +348,13 @@ private: Send(request.Executer, reply.Release(), IEventHandle::FlagTrackDelivery, txId); - State.NewRequest(txId, requester, std::move(request), memoryPool); + bucket.NewRequest(txId, requester, std::move(request), memoryPool); } + // used only for unit tests void HandleWork(TEvKqpNode::TEvFinishKqpTask::TPtr& ev) { auto& msg = *ev->Get(); - - LOG_D("TxId: " << msg.TxId << ", finish compute task: " << msg.TaskId << ", success: " << msg.Success - << ", message: " << msg.Issues.ToOneLineString()); - - auto task = State.RemoveTask(msg.TxId, msg.TaskId, msg.Success, [this, &msg] - (const TActorId& requester, const NKqpNode::TTasksRequest& request, const NKqpNode::TTaskContext&, bool finishTx) { - THolder<IEventBase> ev; - - if (request.InFlyTasks.empty()) { - LOG_D("TxId: " << msg.TxId << ", requester: " << requester << " completed"); - - auto bounds = ExpiringRequests.equal_range(request.Deadline); - for (auto it = bounds.first; it != bounds.second; ) { - if (it->second.TxId == msg.TxId && it->second.Requester == requester) { - auto delIt = it++; - ExpiringRequests.erase(delIt); - } else { - ++it; - } - } - - ResourceManager()->FreeResources(msg.TxId); - } else { - LOG_D("TxId: " << msg.TxId << ", finish compute task: " << msg.TaskId - << (msg.Success ? "" : " (cancelled)") - << ", remains " << request.InFlyTasks.size() << " compute actors and " << request.TotalMemory - << " bytes in the current request"); - - ResourceManager()->FreeResources(msg.TxId, msg.TaskId); - } - - if (finishTx) { - LOG_D("TxId: " << msg.TxId << ", requester: " << requester << " completed"); - } - }); - - if (!task) { - LOG_E("TxId: " << msg.TxId << ", task: " << msg.TaskId << " unknown task"); - return; - } + FinishKqpTask(msg.TxId, msg.TaskId, msg.Success, msg.Issues, GetStateBucketByTx(msg.TxId)); } void HandleWork(TEvKqpNode::TEvCancelKqpTasksRequest::TPtr& ev) { @@ -374,39 +366,41 @@ private: } void TerminateTx(ui64 txId, const TString& reason) { - State.RemoveTx(txId, [this, &txId, &reason](const NKqpNode::TTasksRequest& request) { - LOG_D("TxId: " << txId << ", cancel granted resources"); + auto& bucket = GetStateBucketByTx(txId); + auto tasksToAbort = bucket.RemoveTx(txId); + if (!tasksToAbort.empty()) { + LOG_D("TxId: " << txId << ", cancel granted resources"); ResourceManager()->FreeResources(txId); - for (auto& [taskId, task] : request.InFlyTasks) { - auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(NYql::NDqProto::StatusIds::UNSPECIFIED, reason); - Send(task.ComputeActorId, abortEv.Release()); + for (const auto& tasksRequest: tasksToAbort) { + for (const auto& [taskId, task] : tasksRequest.InFlyTasks) { + auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(NYql::NDqProto::StatusIds::UNSPECIFIED, + reason); + Send(task.ComputeActorId, abortEv.Release()); + } } - }); + } } void HandleWork(TEvents::TEvWakeup::TPtr& ev) { Schedule(TDuration::Seconds(1), ev->Release().Release()); - - auto it = ExpiringRequests.begin(); - auto now = TAppData::TimeProvider->Now(); - while (it != ExpiringRequests.end() && it->first < now) { - auto reqId = it->second; - auto delIt = it++; - ExpiringRequests.erase(delIt); - - auto request = State.RemoveRequest(reqId.TxId, reqId.Requester); - LOG_D("txId: " << reqId.TxId << ", requester: " << reqId.Requester << ", execution timeout, request: " << bool(request)); - if (!request) { - // it is ok since in most cases requests is finished by exlicit TEvAbortExecution from their Executer - LOG_I("txId: " << reqId.TxId << ", requester: " << reqId.Requester << ", unknown request"); - continue; + std::vector<ui64> txIdsToFree; + for (auto& bucket : Buckets) { + auto expiredRequests = bucket.ClearExpiredRequests(); + for (auto& cxt : expiredRequests) { + LOG_D("txId: " << cxt.RequestId.TxId << ", requester: " << cxt.RequestId.Requester + << ", execution timeout, request: " << cxt.Exists); + if (!cxt.Exists) { + // it is ok since in most cases requests is finished by exlicit TEvAbortExecution from their Executer + LOG_I("txId: " << cxt.RequestId.TxId << ", requester: " << cxt.RequestId.Requester + << ", unknown request"); + continue; + } + // don't send to executer and compute actors, they will be destroyed by TEvAbortExecution in that order: + // KqpProxy -> SessionActor -> Executer -> ComputeActor + ResourceManager()->FreeResources(cxt.RequestId.TxId); } - - ResourceManager()->FreeResources(reqId.TxId); - // don't send to executer and compute actors, they will be destroyed by TEvAbortExecution in that order: - // KqpProxy -> SessionActor -> Executer -> ComputeActor } } @@ -469,11 +463,6 @@ private: } void HandleWork(NMon::TEvHttpInfo::TPtr& ev) { - THashMap<ui64, TVector<std::pair<const TActorId, const NKqpNode::TTasksRequest*>>> byTx; - for (auto& [key, request] : State.Requests) { - byTx[key.first].emplace_back(key.second, &request); - } - TStringStream str; HTML(str) { PRE() { @@ -482,26 +471,8 @@ private: str << Endl; str << Endl << "Transactions:" << Endl; - for (auto& [txId, requests] : byTx) { - auto& meta = State.Meta[txId]; - str << " TxId: " << txId << Endl; - str << " Memory: " << meta.TotalMemory << Endl; - str << " MemoryPool: " << (ui32) meta.MemoryPool << Endl; - str << " Compute actors: " << meta.TotalComputeActors << Endl; - str << " Start time: " << meta.StartTime << Endl; - str << " Requests:" << Endl; - for (auto& [requester, request] : requests) { - str << " Requester: " << requester << Endl; - str << " Deadline: " << request->Deadline << Endl; - str << " Memory: " << request->TotalMemory << Endl; - str << " In-fly tasks:" << Endl; - for (auto& [taskId, task] : request->InFlyTasks) { - str << " Task: " << taskId << Endl; - str << " Memory: " << task.Memory << Endl; - str << " Channels: " << task.Channels << Endl; - str << " Compute actor: " << task.ComputeActorId << Endl; - } - } + for (auto& bucket : Buckets) { + bucket.GetInfo(str); } } } @@ -532,17 +503,18 @@ private: return ResourceManager_; } + NKqpNode::TState& GetStateBucketByTx(ui64 txId) { + return Buckets[txId % Buckets.size()]; + } + private: NKikimrConfig::TTableServiceConfig::TResourceManager Config; TIntrusivePtr<TKqpCounters> Counters; IKqpNodeComputeActorFactory* CaFactory; NRm::IKqpResourceManager* ResourceManager_ = nullptr; - NKqpNode::TState State; - struct TRequestId { - ui64 TxId; - TActorId Requester; - }; - std::multimap<TInstant, TRequestId> ExpiringRequests; + + //state sharded by TxId + std::array<NKqpNode::TState, BucketsCount> Buckets; }; diff --git a/ydb/core/kqp/node_service/kqp_node_state.h b/ydb/core/kqp/node_service/kqp_node_state.h index ec49b30d59..b1eb6e8fd3 100644 --- a/ydb/core/kqp/node_service/kqp_node_state.h +++ b/ydb/core/kqp/node_service/kqp_node_state.h @@ -43,20 +43,38 @@ struct TTxMeta { TInstant StartTime; }; -struct TState { - THashMap<std::pair<ui64, const TActorId>, TTasksRequest> Requests; - THashMultiMap<ui64, const TActorId> SenderIdsByTxId; - THashMap<ui64, TTxMeta> Meta; +struct TRemoveTaskContext { + ui64 TotalMemory = 0; + ui64 ComputeActorsNumber = 0; + bool FinixTx = false; +}; + +class TState { +public: + struct TRequestId { + ui64 TxId; + TActorId Requester; + }; + + struct TRemoveTaskContext { + ui64 TotalMemory = 0; + ui64 ComputeActorsNumber = 0; + bool FinixTx = false; + TActorId Requester; + }; + + struct ExpiredRequestContext { + TRequestId RequestId; + bool Exists; + }; bool Exists(ui64 txId, const TActorId& requester) const { + TReadGuard guard(RWLock); return Requests.contains(std::make_pair(txId, requester)); } - TTasksRequest* GetRequest(ui64 txId, const TActorId& requester) { - return Requests.FindPtr(std::make_pair(txId, requester)); - } - ui64 GetTxMemory(ui64 txId, NRm::EKqpMemoryPool memoryPool) const { + TReadGuard guard(RWLock); if (auto* meta = Meta.FindPtr(txId)) { return meta->MemoryPool == memoryPool ? meta->TotalMemory : 0; } @@ -64,6 +82,7 @@ struct TState { } void NewRequest(ui64 txId, const TActorId& requester, TTasksRequest&& request, NRm::EKqpMemoryPool memoryPool) { + TWriteGuard guard(RWLock); auto& meta = Meta[txId]; meta.TotalMemory += request.TotalMemory; meta.TotalComputeActors += request.InFlyTasks.size(); @@ -79,26 +98,9 @@ struct TState { YQL_ENSURE(Requests.size() == SenderIdsByTxId.size()); } - std::tuple<TTaskContext*, TActorId, TTasksRequest*, TTxMeta*> GetTask(ui64 txId, ui64 taskId) { - YQL_ENSURE(Requests.size() == SenderIdsByTxId.size()); - const auto senders = SenderIdsByTxId.equal_range(txId); - - for (auto senderIt = senders.first; senderIt != senders.second; ++senderIt) { - auto requestIt = Requests.find(*senderIt); - YQL_ENSURE(requestIt != Requests.end()); - - auto taskIt = requestIt->second.InFlyTasks.find(taskId); - if (taskIt != requestIt->second.InFlyTasks.end()) { - return std::make_tuple(&taskIt->second, senderIt->second, &requestIt->second, Meta.FindPtr(txId)); - } - } - - return std::make_tuple(nullptr, TActorId(), nullptr, nullptr); - } - - TMaybe<TTaskContext> RemoveTask(ui64 txId, ui64 taskId, bool success, - std::function<void(const TActorId&, const TTasksRequest&, const TTaskContext&, bool)>&& cb) + TMaybe<TRemoveTaskContext> RemoveTask(ui64 txId, ui64 taskId, bool success) { + TWriteGuard guard(RWLock); YQL_ENSURE(Requests.size() == SenderIdsByTxId.size()); const auto senders = SenderIdsByTxId.equal_range(txId); for (auto senderIt = senders.first; senderIt != senders.second; ++senderIt) { @@ -120,18 +122,28 @@ struct TState { meta.TotalMemory -= task.Memory; meta.TotalComputeActors--; - cb(senderIt->second, requestIt->second, task, meta.TotalComputeActors == 0); - if (requestIt->second.InFlyTasks.empty()) { + auto bounds = ExpiringRequests.equal_range(requestIt->second.Deadline); + for (auto it = bounds.first; it != bounds.second; ) { + if (it->second.TxId == txId && it->second.Requester == senderIt->second) { + auto delIt = it++; + ExpiringRequests.erase(delIt); + } else { + ++it; + } + } Requests.erase(*senderIt); SenderIdsByTxId.erase(senderIt); YQL_ENSURE(Requests.size() == SenderIdsByTxId.size()); } + if (meta.TotalComputeActors == 0) { Meta.erase(txId); } - return std::move(task); + return TRemoveTaskContext{ + requestIt->second.TotalMemory, requestIt->second.InFlyTasks.size(), meta.TotalComputeActors == 0, senderIt->second + }; } } @@ -139,6 +151,7 @@ struct TState { } TMaybe<TTasksRequest> RemoveRequest(ui64 txId, const TActorId& requester) { + TWriteGuard guard(RWLock); auto key = std::make_pair(txId, requester); auto* request = Requests.FindPtr(key); if (!request) { @@ -171,34 +184,85 @@ struct TState { return ret; } - bool RemoveTx(ui64 txId, std::function<void(const TTasksRequest&)>&& cb) { + std::vector<TTasksRequest> RemoveTx(ui64 txId) { + TWriteGuard guard(RWLock); Meta.erase(txId); YQL_ENSURE(Requests.size() == SenderIdsByTxId.size()); const auto senders = SenderIdsByTxId.equal_range(txId); + std::vector<TTasksRequest> ret; for (auto senderIt = senders.first; senderIt != senders.second; ++senderIt) { auto requestIt = Requests.find(*senderIt); YQL_ENSURE(requestIt != Requests.end()); - cb(requestIt->second); + ret.push_back(std::move(requestIt->second)); Requests.erase(requestIt); } - auto erased = SenderIdsByTxId.erase(txId); + SenderIdsByTxId.erase(txId); YQL_ENSURE(Requests.size() == SenderIdsByTxId.size()); - return erased > 0; + return ret; + } + + void InsertExpiringRequest(TInstant deadline, ui64 txId, TActorId requester) { + TWriteGuard guard(RWLock); + ExpiringRequests.emplace(deadline, TRequestId{txId, requester}); } - ui64 UsedMemory(NRm::EKqpMemoryPool memoryPool) const { - ui64 mem = 0; - for (auto& [_, meta] : Meta) { - if (meta.MemoryPool == memoryPool) { - mem += meta.TotalMemory; + std::vector<ExpiredRequestContext> ClearExpiredRequests() { + TWriteGuard guard(RWLock); + std::vector<ExpiredRequestContext> ret; + auto it = ExpiringRequests.begin(); + auto now = TAppData::TimeProvider->Now(); + while (it != ExpiringRequests.end() && it->first < now) { + auto reqId = it->second; + auto delIt = it++; + ExpiringRequests.erase(delIt); + + auto request = RemoveRequest(reqId.TxId, reqId.Requester); + ret.push_back({reqId, bool(request)}); + } + return ret; + } + + void GetInfo(TStringStream& str) { + TReadGuard guard(RWLock); + TMap<ui64, TVector<std::pair<const TActorId, const NKqpNode::TTasksRequest*>>> byTx; + for (auto& [key, request] : Requests) { + byTx[key.first].emplace_back(key.second, &request); + } + for (auto& [txId, requests] : byTx) { + auto& meta = Meta[txId]; + str << " TxId: " << txId << Endl; + str << " Memory: " << meta.TotalMemory << Endl; + str << " MemoryPool: " << (ui32) meta.MemoryPool << Endl; + str << " Compute actors: " << meta.TotalComputeActors << Endl; + str << " Start time: " << meta.StartTime << Endl; + str << " Requests:" << Endl; + for (auto& [requester, request] : requests) { + str << " Requester: " << requester << Endl; + str << " Deadline: " << request->Deadline << Endl; + str << " Memory: " << request->TotalMemory << Endl; + str << " In-fly tasks:" << Endl; + for (auto& [taskId, task] : request->InFlyTasks) { + str << " Task: " << taskId << Endl; + str << " Memory: " << task.Memory << Endl; + str << " Channels: " << task.Channels << Endl; + str << " Compute actor: " << task.ComputeActorId << Endl; + } } } - return mem; } +private: + + TRWMutex RWLock; // Lock for state bucket + + std::multimap<TInstant, TRequestId> ExpiringRequests; + + THashMap<std::pair<ui64, const TActorId>, TTasksRequest> Requests; + THashMultiMap<ui64, const TActorId> SenderIdsByTxId; + THashMap<ui64, TTxMeta> Meta; }; } // namespace NKqpNode |