aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhor911 <hor911@ydb.tech>2023-05-15 18:37:14 +0300
committerhor911 <hor911@ydb.tech>2023-05-15 18:37:14 +0300
commitaaa02aae62421ae89b7cecb311570b477496dd0e (patch)
tree6011ef1b9f16e7c062935dfdafc24d4c4c3942ed
parent515a7f2286e7559ea89864f64a1ed6f50a552d0b (diff)
downloadydb-aaa02aae62421ae89b7cecb311570b477496dd0e.tar.gz
FreeResources (memory) w/o FinishTask
-rw-r--r--ydb/core/kqp/rm_service/kqp_rm_service.cpp40
-rw-r--r--ydb/core/kqp/rm_service/kqp_rm_service.h13
-rw-r--r--ydb/core/kqp/rm_service/kqp_rm_ut.cpp39
3 files changed, 86 insertions, 6 deletions
diff --git a/ydb/core/kqp/rm_service/kqp_rm_service.cpp b/ydb/core/kqp/rm_service/kqp_rm_service.cpp
index 23667d6ba17..3b11a03ab7a 100644
--- a/ydb/core/kqp/rm_service/kqp_rm_service.cpp
+++ b/ydb/core/kqp/rm_service/kqp_rm_service.cpp
@@ -294,6 +294,46 @@ public:
return false;
}
+ void FreeResources(ui64 txId, ui64 taskId, const TKqpResourcesRequest& resources) override {
+
+ auto& txBucket = TxBucket(txId);
+
+ {
+ TGuard<TMutex> guard(txBucket.Lock);
+
+ auto txIt = txBucket.Txs.find(txId);
+ if (txIt == txBucket.Txs.end()) {
+ return;
+ }
+
+ auto taskIt = txIt->second.Tasks.find(taskId);
+ if (taskIt == txIt->second.Tasks.end()) {
+ return;
+ }
+
+ taskIt->second.ScanQueryMemory -= resources.Memory;
+ taskIt->second.ExecutionUnits -= resources.ExecutionUnits;
+
+ bool reduced = ResourceBroker->ReduceTaskResourcesInstant(
+ taskIt->second.ResourceBrokerTaskId, {0, resources.Memory}, SelfId);
+ Y_VERIFY_DEBUG(reduced);
+
+ txIt->second.TxScanQueryMemory -= resources.Memory;
+ txIt->second.TxExecutionUnits -= resources.ExecutionUnits;
+
+ ScanQueryMemoryResource.Release(resources.Memory);
+ ExecutionUnitsResource.Release(resources.ExecutionUnits);
+ }
+
+ Counters->RmComputeActors->Sub(resources.ExecutionUnits);
+ Counters->RmMemory->Sub(resources.Memory);
+
+ Y_VERIFY_DEBUG(Counters->RmComputeActors->Val() >= 0);
+ Y_VERIFY_DEBUG(Counters->RmMemory->Val() >= 0);
+
+ FireResourcesPublishing();
+ }
+
void FreeResources(ui64 txId, ui64 taskId) override {
ui64 releaseScanQueryMemory = 0;
ui32 releaseExecutionUnits = 0;
diff --git a/ydb/core/kqp/rm_service/kqp_rm_service.h b/ydb/core/kqp/rm_service/kqp_rm_service.h
index d77e9072526..5b505e01b1c 100644
--- a/ydb/core/kqp/rm_service/kqp_rm_service.h
+++ b/ydb/core/kqp/rm_service/kqp_rm_service.h
@@ -70,20 +70,21 @@ class IKqpResourceManager : private TNonCopyable {
public:
virtual ~IKqpResourceManager() = default;
- virtual bool AllocateResources(ui64 queryId, ui64 taskId, const TKqpResourcesRequest& resources,
+ virtual bool AllocateResources(ui64 txId, ui64 taskId, const TKqpResourcesRequest& resources,
TKqpNotEnoughResources* details = nullptr) = 0;
using TResourcesAllocatedCallback = std::function<void(NActors::TActorSystem* as)>;
using TNotEnoughtResourcesCallback = std::function<void(NActors::TActorSystem* as, const TString& reason, bool byTimeout)>;
- virtual bool AllocateResources(ui64 queryId, ui64 taskId, const TKqpResourcesRequest& resources,
+ virtual bool AllocateResources(ui64 txId, ui64 taskId, const TKqpResourcesRequest& resources,
TResourcesAllocatedCallback&& onSuccess, TNotEnoughtResourcesCallback&& onFail, TDuration timeout = {}) = 0;
- virtual void FreeResources(ui64 queryId, ui64 taskId) = 0;
- virtual void FreeResources(ui64 queryId) = 0;
+ virtual void FreeResources(ui64 txId, ui64 taskId, const TKqpResourcesRequest& resources) = 0;
+ virtual void FreeResources(ui64 txId, ui64 taskId) = 0;
+ virtual void FreeResources(ui64 txId) = 0;
- virtual void NotifyExternalResourcesAllocated(ui64 queryId, ui64 taskId, const TKqpResourcesRequest& resources) = 0;
- virtual void NotifyExternalResourcesFreed(ui64 queryId, ui64 taskId) = 0;
+ virtual void NotifyExternalResourcesAllocated(ui64 txId, ui64 taskId, const TKqpResourcesRequest& resources) = 0;
+ virtual void NotifyExternalResourcesFreed(ui64 txId, ui64 taskId) = 0;
virtual void RequestClusterResourcesInfo(TOnResourcesSnapshotCallback&& callback) = 0;
diff --git a/ydb/core/kqp/rm_service/kqp_rm_ut.cpp b/ydb/core/kqp/rm_service/kqp_rm_ut.cpp
index 7f77cc9d67a..30eb41d7ca4 100644
--- a/ydb/core/kqp/rm_service/kqp_rm_ut.cpp
+++ b/ydb/core/kqp/rm_service/kqp_rm_ut.cpp
@@ -161,6 +161,7 @@ public:
UNIT_TEST(NotEnoughExecutionUnits);
UNIT_TEST(ResourceBrokerNotEnoughResources);
UNIT_TEST(Snapshot);
+ UNIT_TEST(Reduce);
UNIT_TEST_SUITE_END();
void SingleTask();
@@ -169,6 +170,7 @@ public:
void NotEnoughExecutionUnits();
void ResourceBrokerNotEnoughResources();
void Snapshot();
+ void Reduce();
private:
THolder<TTestBasicRuntime> Runtime;
@@ -369,5 +371,42 @@ void KqpRm::Snapshot() {
}
}
+void KqpRm::Reduce() {
+ CreateKqpResourceManager(MakeKqpResourceManagerConfig());
+ NKikimr::TActorSystemStub stub;
+
+ auto rm = GetKqpResourceManager(ResourceManagerActorId.NodeId());
+
+ NRm::TKqpResourcesRequest request;
+ request.ExecutionUnits = 10;
+ request.MemoryPool = NRm::EKqpMemoryPool::ScanQuery;
+ request.Memory = 100;
+
+ bool allocated = rm->AllocateResources(1, 1, request);
+ UNIT_ASSERT(allocated);
+
+ AssertResourceManagerStats(rm, 1000 - 100, 100 - 10);
+ AssertResourceBrokerSensors(0, 100, 0, 0, 1);
+
+ NRm::TKqpResourcesRequest reduceRequest;
+ reduceRequest.ExecutionUnits = 3;
+ reduceRequest.MemoryPool = NRm::EKqpMemoryPool::ScanQuery;
+ reduceRequest.Memory = 70;
+
+ // invalid taskId
+ rm->FreeResources(1, 0);
+ AssertResourceManagerStats(rm, 1000 - 100, 100 - 10);
+ AssertResourceBrokerSensors(0, 100, 0, 0, 1);
+
+ // invalid txId
+ rm->FreeResources(10, 1);
+ AssertResourceManagerStats(rm, 1000 - 100, 100 - 10);
+ AssertResourceBrokerSensors(0, 100, 0, 0, 1);
+
+ rm->FreeResources(1, 1, reduceRequest);
+ AssertResourceManagerStats(rm, 1000 - 30, 100 - 7);
+ AssertResourceBrokerSensors(0, 30, 0, 0, 1);
+}
+
} // namespace NKqp
} // namespace NKikimr