diff options
author | shumkovnd <shumkovnd@yandex-team.com> | 2023-03-17 17:50:23 +0300 |
---|---|---|
committer | shumkovnd <shumkovnd@yandex-team.com> | 2023-03-17 17:50:23 +0300 |
commit | d76188ee91496f94beb448c82d8be6f07c2708e7 (patch) | |
tree | 7cfd80ddcf2a1d4781259b6c04a422fab88fc0e9 | |
parent | 7aefab771fa67b74c4e5879cdcc631bf390faa57 (diff) | |
download | ydb-d76188ee91496f94beb448c82d8be6f07c2708e7.tar.gz |
another yet fix
-rw-r--r-- | ydb/core/kqp/node_service/kqp_node_state.h | 68 |
1 files changed, 37 insertions, 31 deletions
diff --git a/ydb/core/kqp/node_service/kqp_node_state.h b/ydb/core/kqp/node_service/kqp_node_state.h index 8e0cc58c715..45a6c4e9d50 100644 --- a/ydb/core/kqp/node_service/kqp_node_state.h +++ b/ydb/core/kqp/node_service/kqp_node_state.h @@ -154,36 +154,7 @@ public: TMaybe<TTasksRequest> RemoveRequest(ui64 txId, const TActorId& requester) { TWriteGuard guard(RWLock); - auto key = std::make_pair(txId, requester); - auto* request = Requests.FindPtr(key); - if (!request) { - return Nothing(); - } - - TMaybe<TTasksRequest> ret = std::move(*request); - Requests.erase(key); - - const auto senders = SenderIdsByTxId.equal_range(txId); - for (auto senderIt = senders.first; senderIt != senders.second; ++senderIt) { - if (senderIt->second == requester) { - SenderIdsByTxId.erase(senderIt); - break; - } - } - - YQL_ENSURE(Requests.size() == SenderIdsByTxId.size()); - - auto& meta = Meta[txId]; - Y_VERIFY_DEBUG(meta.TotalMemory >= ret->TotalMemory); - Y_VERIFY_DEBUG(meta.TotalComputeActors >= 1); - meta.TotalMemory -= ret->TotalMemory; - meta.TotalComputeActors -= ret->InFlyTasks.size(); - - if (meta.TotalComputeActors == 0) { - Meta.erase(txId); - } - - return ret; + return RemoveRequestImpl(txId, requester); } std::vector<TTasksRequest> RemoveTx(ui64 txId) { @@ -222,7 +193,7 @@ public: auto delIt = it++; ExpiringRequests.erase(delIt); - auto request = RemoveRequest(reqId.TxId, reqId.Requester); + auto request = RemoveRequestImpl(reqId.TxId, reqId.Requester); ret.push_back({reqId, bool(request)}); } return ret; @@ -258,6 +229,41 @@ public: } private: + TMaybe<TTasksRequest> RemoveRequestImpl(ui64 txId, const TActorId& requester) { + auto key = std::make_pair(txId, requester); + auto* request = Requests.FindPtr(key); + if (!request) { + return Nothing(); + } + + TMaybe<TTasksRequest> ret = std::move(*request); + Requests.erase(key); + + const auto senders = SenderIdsByTxId.equal_range(txId); + for (auto senderIt = senders.first; senderIt != senders.second; ++senderIt) { + if (senderIt->second == requester) { + SenderIdsByTxId.erase(senderIt); + break; + } + } + + YQL_ENSURE(Requests.size() == SenderIdsByTxId.size()); + + auto& meta = Meta[txId]; + Y_VERIFY_DEBUG(meta.TotalMemory >= ret->TotalMemory); + Y_VERIFY_DEBUG(meta.TotalComputeActors >= 1); + meta.TotalMemory -= ret->TotalMemory; + meta.TotalComputeActors -= ret->InFlyTasks.size(); + + if (meta.TotalComputeActors == 0) { + Meta.erase(txId); + } + + return ret; + } + +private: + TRWMutex RWLock; // Lock for state bucket std::multimap<TInstant, TRequestId> ExpiringRequests; |