aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorshumkovnd <shumkovnd@yandex-team.com>2023-03-16 12:00:41 +0300
committershumkovnd <shumkovnd@yandex-team.com>2023-03-16 12:00:41 +0300
commit08655020c8e2770e2462b48a54106ef30564ea69 (patch)
tree25f14c1762ea49d692558817d8dd6b21dd19d7b0
parentd1c25d0356fc7efced0d8da5091bff22b862aa50 (diff)
downloadydb-08655020c8e2770e2462b48a54106ef30564ea69.tar.gz
Add sharded state to kqp_node_service
-rw-r--r--ydb/core/kqp/node_service/kqp_node_service.cpp180
-rw-r--r--ydb/core/kqp/node_service/kqp_node_state.h144
2 files changed, 180 insertions, 144 deletions
diff --git a/ydb/core/kqp/node_service/kqp_node_service.cpp b/ydb/core/kqp/node_service/kqp_node_service.cpp
index e24d9df17f..ef60b69503 100644
--- a/ydb/core/kqp/node_service/kqp_node_service.cpp
+++ b/ydb/core/kqp/node_service/kqp_node_service.cpp
@@ -49,6 +49,8 @@ TString TasksIdsStr(const TTasksCollection& tasks) {
return TStringBuilder() << "[" << JoinSeq(", ", ids) << "]";
}
+constexpr ui64 BucketsCount = 64;
+
class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
using TBase = TActorBootstrapped<TKqpNodeService>;
@@ -57,6 +59,34 @@ public:
return NKikimrServices::TActivity::KQP_NODE_SERVICE;
}
+ static void FinishKqpTask(ui64 txId, ui64 taskId, bool success, const NYql::TIssues& issues,
+ NKqpNode::TState& bucket) {
+ LOG_D("TxId: " << txId << ", finish compute task: " << taskId << ", success: " << success
+ << ", message: " << issues.ToOneLineString());
+
+ auto ctx = bucket.RemoveTask(txId, taskId, success);
+
+ if (!ctx) {
+ LOG_E("TxId: " << txId << ", task: " << taskId << " unknown task");
+ return;
+ }
+
+ if (ctx->ComputeActorsNumber == 0) {
+ LOG_D("TxId: " << txId << ", requester: " << ctx->Requester << " completed");
+ GetKqpResourceManager()->FreeResources(txId);
+ } else {
+ LOG_D("TxId: " << txId << ", finish compute task: " << taskId
+ << (success ? "" : " (cancelled)")
+ << ", remains " << ctx->ComputeActorsNumber << " compute actors and "
+ << ctx->TotalMemory << " bytes in the current request");
+ GetKqpResourceManager()->FreeResources(txId, taskId);
+ }
+
+ if (ctx->FinixTx) {
+ LOG_D("TxId: " << txId << ", requester: " << ctx->Requester << " completed");
+ }
+ }
+
TKqpNodeService(const NKikimrConfig::TTableServiceConfig& config, const TIntrusivePtr<TKqpCounters>& counters,
IKqpNodeComputeActorFactory* caFactory)
: Config(config.GetResourceManager())
@@ -87,7 +117,7 @@ private:
STATEFN(WorkState) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvKqpNode::TEvStartKqpTasksRequest, HandleWork);
- hFunc(TEvKqpNode::TEvFinishKqpTask, HandleWork);
+ hFunc(TEvKqpNode::TEvFinishKqpTask, HandleWork); // used only for unit tests
hFunc(TEvKqpNode::TEvCancelKqpTasksRequest, HandleWork);
hFunc(TEvents::TEvWakeup, HandleWork);
// misc
@@ -119,7 +149,9 @@ private:
NKqpNode::TTasksRequest request;
request.Executer = ActorIdFromProto(msg.GetExecuterActorId());
- if (State.Exists(txId, requester)) {
+ auto& bucket = GetStateBucketByTx(txId);
+
+ if (bucket.Exists(txId, requester)) {
LOG_E("TxId: " << txId << ", requester: " << requester << ", request already exists");
return ReplyError(txId, request.Executer, msg, NKikimrKqp::TEvStartKqpTasksResponse::INTERNAL_ERROR);
}
@@ -156,7 +188,7 @@ private:
LOG_D("TxId: " << txId << ", channels: " << requestChannels
<< ", computeActors: " << msg.GetTasks().size() << ", memory: " << request.TotalMemory);
- ui64 txMemory = State.GetTxMemory(txId, memoryPool) + request.TotalMemory;
+ auto txMemory = bucket.GetTxMemory(txId, memoryPool) + request.TotalMemory;
if (txMemory > Config.GetQueryMemoryLimit()) {
LOG_N("TxId: " << txId << ", requested too many memory: " << request.TotalMemory
<< "(" << txMemory << " for this Tx), limit: " << Config.GetQueryMemoryLimit());
@@ -243,8 +275,7 @@ private:
// compute actor should not arm timer since in case of timeout it will receive TEvAbortExecution from Executer
auto timeout = TDuration::MilliSeconds(msgRtSettings.GetTimeoutMs());
request.Deadline = TAppData::TimeProvider->Now() + timeout + /* gap */ TDuration::Seconds(5);
-
- ExpiringRequests.emplace(request.Deadline, TRequestId{txId, requester});
+ bucket.InsertExpiringRequest(request.Deadline, txId, requester);
}
runtimeSettingsBase.ExtraMemoryAllocationPool = memoryPool;
@@ -261,7 +292,6 @@ private:
runtimeSettingsBase.ReportStatsSettings = NYql::NDq::TReportStatsSettings{MinStatInterval, MaxStatInterval};
TShardsScanningPolicy scanPolicy(Config.GetShardsScanningPolicy());
- auto actorSystem = TlsActivationContext->ActorSystem();
// start compute actors
for (int i = 0; i < msg.GetTasks().size(); ++i) {
@@ -274,9 +304,9 @@ private:
"actual size: %ld, min: %ld", memoryLimits.ChannelBufferSize, Config.GetMinChannelBufferSize());
auto runtimeSettings = runtimeSettingsBase;
- runtimeSettings.TerminateHandler = [actorSystem, rm = SelfId(), txId, taskId = dqTask.GetId()]
+ runtimeSettings.TerminateHandler = [txId, taskId = dqTask.GetId(), &bucket]
(bool success, const NYql::TIssues& issues) {
- actorSystem->Send(rm, new TEvKqpNode::TEvFinishKqpTask(txId, taskId, success, issues));
+ FinishKqpTask(txId, taskId, success, issues, bucket);
};
ETableKind tableKind = ETableKind::Unknown;
@@ -318,51 +348,13 @@ private:
Send(request.Executer, reply.Release(), IEventHandle::FlagTrackDelivery, txId);
- State.NewRequest(txId, requester, std::move(request), memoryPool);
+ bucket.NewRequest(txId, requester, std::move(request), memoryPool);
}
+ // used only for unit tests
void HandleWork(TEvKqpNode::TEvFinishKqpTask::TPtr& ev) {
auto& msg = *ev->Get();
-
- LOG_D("TxId: " << msg.TxId << ", finish compute task: " << msg.TaskId << ", success: " << msg.Success
- << ", message: " << msg.Issues.ToOneLineString());
-
- auto task = State.RemoveTask(msg.TxId, msg.TaskId, msg.Success, [this, &msg]
- (const TActorId& requester, const NKqpNode::TTasksRequest& request, const NKqpNode::TTaskContext&, bool finishTx) {
- THolder<IEventBase> ev;
-
- if (request.InFlyTasks.empty()) {
- LOG_D("TxId: " << msg.TxId << ", requester: " << requester << " completed");
-
- auto bounds = ExpiringRequests.equal_range(request.Deadline);
- for (auto it = bounds.first; it != bounds.second; ) {
- if (it->second.TxId == msg.TxId && it->second.Requester == requester) {
- auto delIt = it++;
- ExpiringRequests.erase(delIt);
- } else {
- ++it;
- }
- }
-
- ResourceManager()->FreeResources(msg.TxId);
- } else {
- LOG_D("TxId: " << msg.TxId << ", finish compute task: " << msg.TaskId
- << (msg.Success ? "" : " (cancelled)")
- << ", remains " << request.InFlyTasks.size() << " compute actors and " << request.TotalMemory
- << " bytes in the current request");
-
- ResourceManager()->FreeResources(msg.TxId, msg.TaskId);
- }
-
- if (finishTx) {
- LOG_D("TxId: " << msg.TxId << ", requester: " << requester << " completed");
- }
- });
-
- if (!task) {
- LOG_E("TxId: " << msg.TxId << ", task: " << msg.TaskId << " unknown task");
- return;
- }
+ FinishKqpTask(msg.TxId, msg.TaskId, msg.Success, msg.Issues, GetStateBucketByTx(msg.TxId));
}
void HandleWork(TEvKqpNode::TEvCancelKqpTasksRequest::TPtr& ev) {
@@ -374,39 +366,41 @@ private:
}
void TerminateTx(ui64 txId, const TString& reason) {
- State.RemoveTx(txId, [this, &txId, &reason](const NKqpNode::TTasksRequest& request) {
- LOG_D("TxId: " << txId << ", cancel granted resources");
+ auto& bucket = GetStateBucketByTx(txId);
+ auto tasksToAbort = bucket.RemoveTx(txId);
+ if (!tasksToAbort.empty()) {
+ LOG_D("TxId: " << txId << ", cancel granted resources");
ResourceManager()->FreeResources(txId);
- for (auto& [taskId, task] : request.InFlyTasks) {
- auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(NYql::NDqProto::StatusIds::UNSPECIFIED, reason);
- Send(task.ComputeActorId, abortEv.Release());
+ for (const auto& tasksRequest: tasksToAbort) {
+ for (const auto& [taskId, task] : tasksRequest.InFlyTasks) {
+ auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(NYql::NDqProto::StatusIds::UNSPECIFIED,
+ reason);
+ Send(task.ComputeActorId, abortEv.Release());
+ }
}
- });
+ }
}
void HandleWork(TEvents::TEvWakeup::TPtr& ev) {
Schedule(TDuration::Seconds(1), ev->Release().Release());
-
- auto it = ExpiringRequests.begin();
- auto now = TAppData::TimeProvider->Now();
- while (it != ExpiringRequests.end() && it->first < now) {
- auto reqId = it->second;
- auto delIt = it++;
- ExpiringRequests.erase(delIt);
-
- auto request = State.RemoveRequest(reqId.TxId, reqId.Requester);
- LOG_D("txId: " << reqId.TxId << ", requester: " << reqId.Requester << ", execution timeout, request: " << bool(request));
- if (!request) {
- // it is ok since in most cases requests is finished by exlicit TEvAbortExecution from their Executer
- LOG_I("txId: " << reqId.TxId << ", requester: " << reqId.Requester << ", unknown request");
- continue;
+ std::vector<ui64> txIdsToFree;
+ for (auto& bucket : Buckets) {
+ auto expiredRequests = bucket.ClearExpiredRequests();
+ for (auto& cxt : expiredRequests) {
+ LOG_D("txId: " << cxt.RequestId.TxId << ", requester: " << cxt.RequestId.Requester
+ << ", execution timeout, request: " << cxt.Exists);
+ if (!cxt.Exists) {
+ // it is ok since in most cases requests is finished by exlicit TEvAbortExecution from their Executer
+ LOG_I("txId: " << cxt.RequestId.TxId << ", requester: " << cxt.RequestId.Requester
+ << ", unknown request");
+ continue;
+ }
+ // don't send to executer and compute actors, they will be destroyed by TEvAbortExecution in that order:
+ // KqpProxy -> SessionActor -> Executer -> ComputeActor
+ ResourceManager()->FreeResources(cxt.RequestId.TxId);
}
-
- ResourceManager()->FreeResources(reqId.TxId);
- // don't send to executer and compute actors, they will be destroyed by TEvAbortExecution in that order:
- // KqpProxy -> SessionActor -> Executer -> ComputeActor
}
}
@@ -469,11 +463,6 @@ private:
}
void HandleWork(NMon::TEvHttpInfo::TPtr& ev) {
- THashMap<ui64, TVector<std::pair<const TActorId, const NKqpNode::TTasksRequest*>>> byTx;
- for (auto& [key, request] : State.Requests) {
- byTx[key.first].emplace_back(key.second, &request);
- }
-
TStringStream str;
HTML(str) {
PRE() {
@@ -482,26 +471,8 @@ private:
str << Endl;
str << Endl << "Transactions:" << Endl;
- for (auto& [txId, requests] : byTx) {
- auto& meta = State.Meta[txId];
- str << " TxId: " << txId << Endl;
- str << " Memory: " << meta.TotalMemory << Endl;
- str << " MemoryPool: " << (ui32) meta.MemoryPool << Endl;
- str << " Compute actors: " << meta.TotalComputeActors << Endl;
- str << " Start time: " << meta.StartTime << Endl;
- str << " Requests:" << Endl;
- for (auto& [requester, request] : requests) {
- str << " Requester: " << requester << Endl;
- str << " Deadline: " << request->Deadline << Endl;
- str << " Memory: " << request->TotalMemory << Endl;
- str << " In-fly tasks:" << Endl;
- for (auto& [taskId, task] : request->InFlyTasks) {
- str << " Task: " << taskId << Endl;
- str << " Memory: " << task.Memory << Endl;
- str << " Channels: " << task.Channels << Endl;
- str << " Compute actor: " << task.ComputeActorId << Endl;
- }
- }
+ for (auto& bucket : Buckets) {
+ bucket.GetInfo(str);
}
}
}
@@ -532,17 +503,18 @@ private:
return ResourceManager_;
}
+ NKqpNode::TState& GetStateBucketByTx(ui64 txId) {
+ return Buckets[txId % Buckets.size()];
+ }
+
private:
NKikimrConfig::TTableServiceConfig::TResourceManager Config;
TIntrusivePtr<TKqpCounters> Counters;
IKqpNodeComputeActorFactory* CaFactory;
NRm::IKqpResourceManager* ResourceManager_ = nullptr;
- NKqpNode::TState State;
- struct TRequestId {
- ui64 TxId;
- TActorId Requester;
- };
- std::multimap<TInstant, TRequestId> ExpiringRequests;
+
+ //state sharded by TxId
+ std::array<NKqpNode::TState, BucketsCount> Buckets;
};
diff --git a/ydb/core/kqp/node_service/kqp_node_state.h b/ydb/core/kqp/node_service/kqp_node_state.h
index ec49b30d59..b1eb6e8fd3 100644
--- a/ydb/core/kqp/node_service/kqp_node_state.h
+++ b/ydb/core/kqp/node_service/kqp_node_state.h
@@ -43,20 +43,38 @@ struct TTxMeta {
TInstant StartTime;
};
-struct TState {
- THashMap<std::pair<ui64, const TActorId>, TTasksRequest> Requests;
- THashMultiMap<ui64, const TActorId> SenderIdsByTxId;
- THashMap<ui64, TTxMeta> Meta;
+struct TRemoveTaskContext {
+ ui64 TotalMemory = 0;
+ ui64 ComputeActorsNumber = 0;
+ bool FinixTx = false;
+};
+
+class TState {
+public:
+ struct TRequestId {
+ ui64 TxId;
+ TActorId Requester;
+ };
+
+ struct TRemoveTaskContext {
+ ui64 TotalMemory = 0;
+ ui64 ComputeActorsNumber = 0;
+ bool FinixTx = false;
+ TActorId Requester;
+ };
+
+ struct ExpiredRequestContext {
+ TRequestId RequestId;
+ bool Exists;
+ };
bool Exists(ui64 txId, const TActorId& requester) const {
+ TReadGuard guard(RWLock);
return Requests.contains(std::make_pair(txId, requester));
}
- TTasksRequest* GetRequest(ui64 txId, const TActorId& requester) {
- return Requests.FindPtr(std::make_pair(txId, requester));
- }
-
ui64 GetTxMemory(ui64 txId, NRm::EKqpMemoryPool memoryPool) const {
+ TReadGuard guard(RWLock);
if (auto* meta = Meta.FindPtr(txId)) {
return meta->MemoryPool == memoryPool ? meta->TotalMemory : 0;
}
@@ -64,6 +82,7 @@ struct TState {
}
void NewRequest(ui64 txId, const TActorId& requester, TTasksRequest&& request, NRm::EKqpMemoryPool memoryPool) {
+ TWriteGuard guard(RWLock);
auto& meta = Meta[txId];
meta.TotalMemory += request.TotalMemory;
meta.TotalComputeActors += request.InFlyTasks.size();
@@ -79,26 +98,9 @@ struct TState {
YQL_ENSURE(Requests.size() == SenderIdsByTxId.size());
}
- std::tuple<TTaskContext*, TActorId, TTasksRequest*, TTxMeta*> GetTask(ui64 txId, ui64 taskId) {
- YQL_ENSURE(Requests.size() == SenderIdsByTxId.size());
- const auto senders = SenderIdsByTxId.equal_range(txId);
-
- for (auto senderIt = senders.first; senderIt != senders.second; ++senderIt) {
- auto requestIt = Requests.find(*senderIt);
- YQL_ENSURE(requestIt != Requests.end());
-
- auto taskIt = requestIt->second.InFlyTasks.find(taskId);
- if (taskIt != requestIt->second.InFlyTasks.end()) {
- return std::make_tuple(&taskIt->second, senderIt->second, &requestIt->second, Meta.FindPtr(txId));
- }
- }
-
- return std::make_tuple(nullptr, TActorId(), nullptr, nullptr);
- }
-
- TMaybe<TTaskContext> RemoveTask(ui64 txId, ui64 taskId, bool success,
- std::function<void(const TActorId&, const TTasksRequest&, const TTaskContext&, bool)>&& cb)
+ TMaybe<TRemoveTaskContext> RemoveTask(ui64 txId, ui64 taskId, bool success)
{
+ TWriteGuard guard(RWLock);
YQL_ENSURE(Requests.size() == SenderIdsByTxId.size());
const auto senders = SenderIdsByTxId.equal_range(txId);
for (auto senderIt = senders.first; senderIt != senders.second; ++senderIt) {
@@ -120,18 +122,28 @@ struct TState {
meta.TotalMemory -= task.Memory;
meta.TotalComputeActors--;
- cb(senderIt->second, requestIt->second, task, meta.TotalComputeActors == 0);
-
if (requestIt->second.InFlyTasks.empty()) {
+ auto bounds = ExpiringRequests.equal_range(requestIt->second.Deadline);
+ for (auto it = bounds.first; it != bounds.second; ) {
+ if (it->second.TxId == txId && it->second.Requester == senderIt->second) {
+ auto delIt = it++;
+ ExpiringRequests.erase(delIt);
+ } else {
+ ++it;
+ }
+ }
Requests.erase(*senderIt);
SenderIdsByTxId.erase(senderIt);
YQL_ENSURE(Requests.size() == SenderIdsByTxId.size());
}
+
if (meta.TotalComputeActors == 0) {
Meta.erase(txId);
}
- return std::move(task);
+ return TRemoveTaskContext{
+ requestIt->second.TotalMemory, requestIt->second.InFlyTasks.size(), meta.TotalComputeActors == 0, senderIt->second
+ };
}
}
@@ -139,6 +151,7 @@ struct TState {
}
TMaybe<TTasksRequest> RemoveRequest(ui64 txId, const TActorId& requester) {
+ TWriteGuard guard(RWLock);
auto key = std::make_pair(txId, requester);
auto* request = Requests.FindPtr(key);
if (!request) {
@@ -171,34 +184,85 @@ struct TState {
return ret;
}
- bool RemoveTx(ui64 txId, std::function<void(const TTasksRequest&)>&& cb) {
+ std::vector<TTasksRequest> RemoveTx(ui64 txId) {
+ TWriteGuard guard(RWLock);
Meta.erase(txId);
YQL_ENSURE(Requests.size() == SenderIdsByTxId.size());
const auto senders = SenderIdsByTxId.equal_range(txId);
+ std::vector<TTasksRequest> ret;
for (auto senderIt = senders.first; senderIt != senders.second; ++senderIt) {
auto requestIt = Requests.find(*senderIt);
YQL_ENSURE(requestIt != Requests.end());
- cb(requestIt->second);
+ ret.push_back(std::move(requestIt->second));
Requests.erase(requestIt);
}
- auto erased = SenderIdsByTxId.erase(txId);
+ SenderIdsByTxId.erase(txId);
YQL_ENSURE(Requests.size() == SenderIdsByTxId.size());
- return erased > 0;
+ return ret;
+ }
+
+ void InsertExpiringRequest(TInstant deadline, ui64 txId, TActorId requester) {
+ TWriteGuard guard(RWLock);
+ ExpiringRequests.emplace(deadline, TRequestId{txId, requester});
}
- ui64 UsedMemory(NRm::EKqpMemoryPool memoryPool) const {
- ui64 mem = 0;
- for (auto& [_, meta] : Meta) {
- if (meta.MemoryPool == memoryPool) {
- mem += meta.TotalMemory;
+ std::vector<ExpiredRequestContext> ClearExpiredRequests() {
+ TWriteGuard guard(RWLock);
+ std::vector<ExpiredRequestContext> ret;
+ auto it = ExpiringRequests.begin();
+ auto now = TAppData::TimeProvider->Now();
+ while (it != ExpiringRequests.end() && it->first < now) {
+ auto reqId = it->second;
+ auto delIt = it++;
+ ExpiringRequests.erase(delIt);
+
+ auto request = RemoveRequest(reqId.TxId, reqId.Requester);
+ ret.push_back({reqId, bool(request)});
+ }
+ return ret;
+ }
+
+ void GetInfo(TStringStream& str) {
+ TReadGuard guard(RWLock);
+ TMap<ui64, TVector<std::pair<const TActorId, const NKqpNode::TTasksRequest*>>> byTx;
+ for (auto& [key, request] : Requests) {
+ byTx[key.first].emplace_back(key.second, &request);
+ }
+ for (auto& [txId, requests] : byTx) {
+ auto& meta = Meta[txId];
+ str << " TxId: " << txId << Endl;
+ str << " Memory: " << meta.TotalMemory << Endl;
+ str << " MemoryPool: " << (ui32) meta.MemoryPool << Endl;
+ str << " Compute actors: " << meta.TotalComputeActors << Endl;
+ str << " Start time: " << meta.StartTime << Endl;
+ str << " Requests:" << Endl;
+ for (auto& [requester, request] : requests) {
+ str << " Requester: " << requester << Endl;
+ str << " Deadline: " << request->Deadline << Endl;
+ str << " Memory: " << request->TotalMemory << Endl;
+ str << " In-fly tasks:" << Endl;
+ for (auto& [taskId, task] : request->InFlyTasks) {
+ str << " Task: " << taskId << Endl;
+ str << " Memory: " << task.Memory << Endl;
+ str << " Channels: " << task.Channels << Endl;
+ str << " Compute actor: " << task.ComputeActorId << Endl;
+ }
}
}
- return mem;
}
+private:
+
+ TRWMutex RWLock; // Lock for state bucket
+
+ std::multimap<TInstant, TRequestId> ExpiringRequests;
+
+ THashMap<std::pair<ui64, const TActorId>, TTasksRequest> Requests;
+ THashMultiMap<ui64, const TActorId> SenderIdsByTxId;
+ THashMap<ui64, TTxMeta> Meta;
};
} // namespace NKqpNode