diff options
author | hor911 <hor911@ydb.tech> | 2023-05-15 18:37:14 +0300 |
---|---|---|
committer | hor911 <hor911@ydb.tech> | 2023-05-15 18:37:14 +0300 |
commit | aaa02aae62421ae89b7cecb311570b477496dd0e (patch) | |
tree | 6011ef1b9f16e7c062935dfdafc24d4c4c3942ed | |
parent | 515a7f2286e7559ea89864f64a1ed6f50a552d0b (diff) | |
download | ydb-aaa02aae62421ae89b7cecb311570b477496dd0e.tar.gz |
FreeResources (memory) w/o FinishTask
-rw-r--r-- | ydb/core/kqp/rm_service/kqp_rm_service.cpp | 40 | ||||
-rw-r--r-- | ydb/core/kqp/rm_service/kqp_rm_service.h | 13 | ||||
-rw-r--r-- | ydb/core/kqp/rm_service/kqp_rm_ut.cpp | 39 |
3 files changed, 86 insertions, 6 deletions
diff --git a/ydb/core/kqp/rm_service/kqp_rm_service.cpp b/ydb/core/kqp/rm_service/kqp_rm_service.cpp index 23667d6ba17..3b11a03ab7a 100644 --- a/ydb/core/kqp/rm_service/kqp_rm_service.cpp +++ b/ydb/core/kqp/rm_service/kqp_rm_service.cpp @@ -294,6 +294,46 @@ public: return false; } + void FreeResources(ui64 txId, ui64 taskId, const TKqpResourcesRequest& resources) override { + + auto& txBucket = TxBucket(txId); + + { + TGuard<TMutex> guard(txBucket.Lock); + + auto txIt = txBucket.Txs.find(txId); + if (txIt == txBucket.Txs.end()) { + return; + } + + auto taskIt = txIt->second.Tasks.find(taskId); + if (taskIt == txIt->second.Tasks.end()) { + return; + } + + taskIt->second.ScanQueryMemory -= resources.Memory; + taskIt->second.ExecutionUnits -= resources.ExecutionUnits; + + bool reduced = ResourceBroker->ReduceTaskResourcesInstant( + taskIt->second.ResourceBrokerTaskId, {0, resources.Memory}, SelfId); + Y_VERIFY_DEBUG(reduced); + + txIt->second.TxScanQueryMemory -= resources.Memory; + txIt->second.TxExecutionUnits -= resources.ExecutionUnits; + + ScanQueryMemoryResource.Release(resources.Memory); + ExecutionUnitsResource.Release(resources.ExecutionUnits); + } + + Counters->RmComputeActors->Sub(resources.ExecutionUnits); + Counters->RmMemory->Sub(resources.Memory); + + Y_VERIFY_DEBUG(Counters->RmComputeActors->Val() >= 0); + Y_VERIFY_DEBUG(Counters->RmMemory->Val() >= 0); + + FireResourcesPublishing(); + } + void FreeResources(ui64 txId, ui64 taskId) override { ui64 releaseScanQueryMemory = 0; ui32 releaseExecutionUnits = 0; diff --git a/ydb/core/kqp/rm_service/kqp_rm_service.h b/ydb/core/kqp/rm_service/kqp_rm_service.h index d77e9072526..5b505e01b1c 100644 --- a/ydb/core/kqp/rm_service/kqp_rm_service.h +++ b/ydb/core/kqp/rm_service/kqp_rm_service.h @@ -70,20 +70,21 @@ class IKqpResourceManager : private TNonCopyable { public: virtual ~IKqpResourceManager() = default; - virtual bool AllocateResources(ui64 queryId, ui64 taskId, const TKqpResourcesRequest& resources, + virtual bool AllocateResources(ui64 txId, ui64 taskId, const TKqpResourcesRequest& resources, TKqpNotEnoughResources* details = nullptr) = 0; using TResourcesAllocatedCallback = std::function<void(NActors::TActorSystem* as)>; using TNotEnoughtResourcesCallback = std::function<void(NActors::TActorSystem* as, const TString& reason, bool byTimeout)>; - virtual bool AllocateResources(ui64 queryId, ui64 taskId, const TKqpResourcesRequest& resources, + virtual bool AllocateResources(ui64 txId, ui64 taskId, const TKqpResourcesRequest& resources, TResourcesAllocatedCallback&& onSuccess, TNotEnoughtResourcesCallback&& onFail, TDuration timeout = {}) = 0; - virtual void FreeResources(ui64 queryId, ui64 taskId) = 0; - virtual void FreeResources(ui64 queryId) = 0; + virtual void FreeResources(ui64 txId, ui64 taskId, const TKqpResourcesRequest& resources) = 0; + virtual void FreeResources(ui64 txId, ui64 taskId) = 0; + virtual void FreeResources(ui64 txId) = 0; - virtual void NotifyExternalResourcesAllocated(ui64 queryId, ui64 taskId, const TKqpResourcesRequest& resources) = 0; - virtual void NotifyExternalResourcesFreed(ui64 queryId, ui64 taskId) = 0; + virtual void NotifyExternalResourcesAllocated(ui64 txId, ui64 taskId, const TKqpResourcesRequest& resources) = 0; + virtual void NotifyExternalResourcesFreed(ui64 txId, ui64 taskId) = 0; virtual void RequestClusterResourcesInfo(TOnResourcesSnapshotCallback&& callback) = 0; diff --git a/ydb/core/kqp/rm_service/kqp_rm_ut.cpp b/ydb/core/kqp/rm_service/kqp_rm_ut.cpp index 7f77cc9d67a..30eb41d7ca4 100644 --- a/ydb/core/kqp/rm_service/kqp_rm_ut.cpp +++ b/ydb/core/kqp/rm_service/kqp_rm_ut.cpp @@ -161,6 +161,7 @@ public: UNIT_TEST(NotEnoughExecutionUnits); UNIT_TEST(ResourceBrokerNotEnoughResources); UNIT_TEST(Snapshot); + UNIT_TEST(Reduce); UNIT_TEST_SUITE_END(); void SingleTask(); @@ -169,6 +170,7 @@ public: void NotEnoughExecutionUnits(); void ResourceBrokerNotEnoughResources(); void Snapshot(); + void Reduce(); private: THolder<TTestBasicRuntime> Runtime; @@ -369,5 +371,42 @@ void KqpRm::Snapshot() { } } +void KqpRm::Reduce() { + CreateKqpResourceManager(MakeKqpResourceManagerConfig()); + NKikimr::TActorSystemStub stub; + + auto rm = GetKqpResourceManager(ResourceManagerActorId.NodeId()); + + NRm::TKqpResourcesRequest request; + request.ExecutionUnits = 10; + request.MemoryPool = NRm::EKqpMemoryPool::ScanQuery; + request.Memory = 100; + + bool allocated = rm->AllocateResources(1, 1, request); + UNIT_ASSERT(allocated); + + AssertResourceManagerStats(rm, 1000 - 100, 100 - 10); + AssertResourceBrokerSensors(0, 100, 0, 0, 1); + + NRm::TKqpResourcesRequest reduceRequest; + reduceRequest.ExecutionUnits = 3; + reduceRequest.MemoryPool = NRm::EKqpMemoryPool::ScanQuery; + reduceRequest.Memory = 70; + + // invalid taskId + rm->FreeResources(1, 0); + AssertResourceManagerStats(rm, 1000 - 100, 100 - 10); + AssertResourceBrokerSensors(0, 100, 0, 0, 1); + + // invalid txId + rm->FreeResources(10, 1); + AssertResourceManagerStats(rm, 1000 - 100, 100 - 10); + AssertResourceBrokerSensors(0, 100, 0, 0, 1); + + rm->FreeResources(1, 1, reduceRequest); + AssertResourceManagerStats(rm, 1000 - 30, 100 - 7); + AssertResourceBrokerSensors(0, 30, 0, 0, 1); +} + } // namespace NKqp } // namespace NKikimr |