aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorshumkovnd <shumkovnd@yandex-team.com>2023-03-17 17:50:23 +0300
committershumkovnd <shumkovnd@yandex-team.com>2023-03-17 17:50:23 +0300
commitd76188ee91496f94beb448c82d8be6f07c2708e7 (patch)
tree7cfd80ddcf2a1d4781259b6c04a422fab88fc0e9
parent7aefab771fa67b74c4e5879cdcc631bf390faa57 (diff)
downloadydb-d76188ee91496f94beb448c82d8be6f07c2708e7.tar.gz
another yet fix
-rw-r--r--ydb/core/kqp/node_service/kqp_node_state.h68
1 files changed, 37 insertions, 31 deletions
diff --git a/ydb/core/kqp/node_service/kqp_node_state.h b/ydb/core/kqp/node_service/kqp_node_state.h
index 8e0cc58c715..45a6c4e9d50 100644
--- a/ydb/core/kqp/node_service/kqp_node_state.h
+++ b/ydb/core/kqp/node_service/kqp_node_state.h
@@ -154,36 +154,7 @@ public:
TMaybe<TTasksRequest> RemoveRequest(ui64 txId, const TActorId& requester) {
TWriteGuard guard(RWLock);
- auto key = std::make_pair(txId, requester);
- auto* request = Requests.FindPtr(key);
- if (!request) {
- return Nothing();
- }
-
- TMaybe<TTasksRequest> ret = std::move(*request);
- Requests.erase(key);
-
- const auto senders = SenderIdsByTxId.equal_range(txId);
- for (auto senderIt = senders.first; senderIt != senders.second; ++senderIt) {
- if (senderIt->second == requester) {
- SenderIdsByTxId.erase(senderIt);
- break;
- }
- }
-
- YQL_ENSURE(Requests.size() == SenderIdsByTxId.size());
-
- auto& meta = Meta[txId];
- Y_VERIFY_DEBUG(meta.TotalMemory >= ret->TotalMemory);
- Y_VERIFY_DEBUG(meta.TotalComputeActors >= 1);
- meta.TotalMemory -= ret->TotalMemory;
- meta.TotalComputeActors -= ret->InFlyTasks.size();
-
- if (meta.TotalComputeActors == 0) {
- Meta.erase(txId);
- }
-
- return ret;
+ return RemoveRequestImpl(txId, requester);
}
std::vector<TTasksRequest> RemoveTx(ui64 txId) {
@@ -222,7 +193,7 @@ public:
auto delIt = it++;
ExpiringRequests.erase(delIt);
- auto request = RemoveRequest(reqId.TxId, reqId.Requester);
+ auto request = RemoveRequestImpl(reqId.TxId, reqId.Requester);
ret.push_back({reqId, bool(request)});
}
return ret;
@@ -258,6 +229,41 @@ public:
}
private:
+ TMaybe<TTasksRequest> RemoveRequestImpl(ui64 txId, const TActorId& requester) {
+ auto key = std::make_pair(txId, requester);
+ auto* request = Requests.FindPtr(key);
+ if (!request) {
+ return Nothing();
+ }
+
+ TMaybe<TTasksRequest> ret = std::move(*request);
+ Requests.erase(key);
+
+ const auto senders = SenderIdsByTxId.equal_range(txId);
+ for (auto senderIt = senders.first; senderIt != senders.second; ++senderIt) {
+ if (senderIt->second == requester) {
+ SenderIdsByTxId.erase(senderIt);
+ break;
+ }
+ }
+
+ YQL_ENSURE(Requests.size() == SenderIdsByTxId.size());
+
+ auto& meta = Meta[txId];
+ Y_VERIFY_DEBUG(meta.TotalMemory >= ret->TotalMemory);
+ Y_VERIFY_DEBUG(meta.TotalComputeActors >= 1);
+ meta.TotalMemory -= ret->TotalMemory;
+ meta.TotalComputeActors -= ret->InFlyTasks.size();
+
+ if (meta.TotalComputeActors == 0) {
+ Meta.erase(txId);
+ }
+
+ return ret;
+ }
+
+private:
+
TRWMutex RWLock; // Lock for state bucket
std::multimap<TInstant, TRequestId> ExpiringRequests;