diff options
author | Filitov Mikhail <filitovme@gmail.com> | 2024-02-14 13:17:52 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-02-14 13:17:52 +0100 |
commit | 133543677242820edf78a8d6ae9be8faa92eae4c (patch) | |
tree | d2a6c176afaa1249998bd48a7f5b7a1d605c1fdb | |
parent | a26ebd7d10133f3a00ed3d26f20b466353a8aff5 (diff) | |
download | ydb-133543677242820edf78a8d6ae9be8faa92eae4c.tar.gz |
[YQL-17709] [compute] split spilling into actor and non-actor parts (#1761)
5 files changed, 142 insertions, 131 deletions
diff --git a/ydb/library/yql/dq/actors/spilling/compute_storage.cpp b/ydb/library/yql/dq/actors/spilling/compute_storage.cpp index 48f30d628c8..cba96b115a6 100644 --- a/ydb/library/yql/dq/actors/spilling/compute_storage.cpp +++ b/ydb/library/yql/dq/actors/spilling/compute_storage.cpp @@ -9,7 +9,7 @@ using namespace NActors; TDqComputeStorage::TDqComputeStorage(TTxId txId, std::function<void()> wakeUpCallback, TActorSystem* actorSystem) : ActorSystem_(actorSystem) { TStringStream spillerName; spillerName << "Spiller" << "_" << CreateGuidAsString(); - ComputeStorageActor_ = CreateDqComputeStorageActor(txId, spillerName.Str(), wakeUpCallback, ActorSystem_); + ComputeStorageActor_ = CreateDqComputeStorageActor(txId, spillerName.Str(), wakeUpCallback); ComputeStorageActorId_ = ActorSystem_->Register(ComputeStorageActor_->GetActor()); } @@ -18,19 +18,37 @@ TDqComputeStorage::~TDqComputeStorage() { } NThreading::TFuture<NKikimr::NMiniKQL::ISpiller::TKey> TDqComputeStorage::Put(TRope&& blob) { - return ComputeStorageActor_->Put(std::move(blob)); + auto promise = NThreading::NewPromise<NKikimr::NMiniKQL::ISpiller::TKey>(); + auto future = promise.GetFuture(); + + ActorSystem_->Send(ComputeStorageActorId_, new TEvPut(std::move(blob), std::move(promise))); + return future; } -std::optional<NThreading::TFuture<TRope>> TDqComputeStorage::Get(TKey key) { - return ComputeStorageActor_->Get(key); +NThreading::TFuture<std::optional<TRope>> TDqComputeStorage::Get(TKey key) { + return GetInternal(key, false); } NThreading::TFuture<void> TDqComputeStorage::Delete(TKey key) { - return ComputeStorageActor_->Delete(key); + auto promise = NThreading::NewPromise<void>(); + auto future = promise.GetFuture(); + + ActorSystem_->Send(ComputeStorageActorId_, new TEvDelete(key, std::move(promise))); + + return future; } -std::optional<NThreading::TFuture<TRope>> TDqComputeStorage::Extract(TKey key) { - return ComputeStorageActor_->Extract(key); +NThreading::TFuture<std::optional<TRope>> TDqComputeStorage::Extract(TKey key) { + return GetInternal(key, true); +} + +NThreading::TFuture<std::optional<TRope>> TDqComputeStorage::GetInternal(TKey key, bool removeBlobAfterRead) { + + auto promise = NThreading::NewPromise<std::optional<TRope>>(); + auto future = promise.GetFuture(); + + ActorSystem_->Send(ComputeStorageActorId_, new TEvGet(key, std::move(promise), removeBlobAfterRead)); + return future; } } // namespace NYql::NDq diff --git a/ydb/library/yql/dq/actors/spilling/compute_storage.h b/ydb/library/yql/dq/actors/spilling/compute_storage.h index 208fa011623..695770bfef2 100644 --- a/ydb/library/yql/dq/actors/spilling/compute_storage.h +++ b/ydb/library/yql/dq/actors/spilling/compute_storage.h @@ -23,13 +23,15 @@ public: NThreading::TFuture<TKey> Put(TRope&& blob); - std::optional<NThreading::TFuture<TRope>> Get(TKey key); + NThreading::TFuture<std::optional<TRope>> Get(TKey key); - std::optional<NThreading::TFuture<TRope>> Extract(TKey key); + NThreading::TFuture<std::optional<TRope>> Extract(TKey key); NThreading::TFuture<void> Delete(TKey key); private: + NThreading::TFuture<std::optional<TRope>> GetInternal(TKey key, bool removeBlobAfterRead); + NActors::TActorSystem* ActorSystem_; IDqComputeStorageActor* ComputeStorageActor_; NActors::TActorId ComputeStorageActorId_; diff --git a/ydb/library/yql/dq/actors/spilling/compute_storage_actor.cpp b/ydb/library/yql/dq/actors/spilling/compute_storage_actor.cpp index e1de850d579..8989f9da34c 100644 --- a/ydb/library/yql/dq/actors/spilling/compute_storage_actor.cpp +++ b/ydb/library/yql/dq/actors/spilling/compute_storage_actor.cpp @@ -36,19 +36,21 @@ class TDqComputeStorageActor : public NActors::TActorBootstrapped<TDqComputeStor // size + promise with key using TWritingBlobInfo = std::pair<ui64, NThreading::TPromise<IDqComputeStorageActor::TKey>>; // remove after read + promise with blob - using TLoadingBlobInfo = std::pair<bool, NThreading::TPromise<TRope>>; + using TLoadingBlobInfo = std::pair<bool, NThreading::TPromise<std::optional<TRope>>>; // void promise that completes when block is removed using TDeletingBlobInfo = NThreading::TPromise<void>; public: - TDqComputeStorageActor(TTxId txId, const TString& spillerName, std::function<void()> wakeupCallback, TActorSystem* actorSystem) + TDqComputeStorageActor(TTxId txId, const TString& spillerName, std::function<void()> wakeupCallback) : TxId_(txId), - ActorSystem_(actorSystem), SpillerName_(spillerName), WakeupCallback_(wakeupCallback) { } void Bootstrap() { + auto spillingActor = CreateDqLocalFileSpillingActor(TxId_, SpillerName_, + SelfId(), false); + SpillingActorId_ = Register(spillingActor); Become(&TDqComputeStorageActor::WorkState); } @@ -58,110 +60,83 @@ public: return this; } - NThreading::TFuture<IDqComputeStorageActor::TKey> Put(TRope&& blob) override { - InitializeIfNot(); - // Use lock to prevent race when state is changed on event processing and on Put call - std::lock_guard lock(Mutex_); - - FailOnError(); - - ui64 size = blob.size(); - - ActorSystem_->Send(SpillingActorId_, new TEvDqSpilling::TEvWrite(NextBlobId, std::move(blob))); - - auto it = WritingBlobs_.emplace(NextBlobId, std::make_pair(size, NThreading::NewPromise<IDqComputeStorageActor::TKey>())).first; - WritingBlobsSize_ += size; - - ++NextBlobId; - auto& promise = it->second.second; +protected: - return promise.GetFuture(); + void FailOnError() { + if (Error_) { + LOG_E("Error: " << *Error_); + Send(SpillingActorId_, new TEvents::TEvPoison); + } } - std::optional<NThreading::TFuture<TRope>> Get(IDqComputeStorageActor::TKey blobId) override { - return GetInternal(blobId, false); +private: + STATEFN(WorkState) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvDqSpilling::TEvWriteResult, HandleWork); + hFunc(TEvDqSpilling::TEvReadResult, HandleWork); + hFunc(TEvDqSpilling::TEvError, HandleWork); + hFunc(TEvGet, HandleWork); + hFunc(TEvPut, HandleWork); + hFunc(TEvDelete, HandleWork); + hFunc(TEvents::TEvPoison, HandleWork); + default: + Y_ABORT("TDqComputeStorageActor::WorkState unexpected event type: %" PRIx32 " event: %s", + ev->GetTypeRewrite(), + ev->ToString().data()); + } } - std::optional<NThreading::TFuture<TRope>> Extract(IDqComputeStorageActor::TKey blobId) override { - return GetInternal(blobId, true); + void HandleWork(TEvents::TEvPoison::TPtr&) { + Send(SpillingActorId_, new TEvents::TEvPoison); + PassAway(); } - NThreading::TFuture<void> Delete(IDqComputeStorageActor::TKey blobId) override { - InitializeIfNot(); - // Use lock to prevent race when state is changed on event processing and on Delete call - std::lock_guard lock(Mutex_); - - FailOnError(); - - auto promise = NThreading::NewPromise<void>(); - auto future = promise.GetFuture(); - - if (!StoredBlobs_.contains(blobId)) { - promise.SetValue(); - return future; - } + void HandleWork(TEvPut::TPtr& ev) { + auto& msg = *ev->Get(); + ui64 size = msg.Blob_.size(); - DeletingBlobs_.emplace(blobId, std::move(promise)); + Send(SpillingActorId_, new TEvDqSpilling::TEvWrite(NextBlobId, std::move(msg.Blob_))); - ActorSystem_->Send(SpillingActorId_, new TEvDqSpilling::TEvRead(blobId, true)); + WritingBlobs_.emplace(NextBlobId, std::make_pair(size, std::move(msg.Promise_))); + WritingBlobsSize_ += size; - return future; + ++NextBlobId; } -protected: - std::optional<NThreading::TFuture<TRope>>GetInternal(IDqComputeStorageActor::TKey blobId, bool removeAfterRead) { - InitializeIfNot(); - // Use lock to prevent race when state is changed on event processing and on Get call - std::lock_guard lock(Mutex_); - - FailOnError(); + void HandleWork(TEvGet::TPtr& ev) { + auto& msg = *ev->Get(); - if (!StoredBlobs_.contains(blobId)) return std::nullopt; + if (!StoredBlobs_.contains(msg.Key_)) { + msg.Promise_.SetValue(std::nullopt); + return; + } - TLoadingBlobInfo loadingblobInfo = std::make_pair(removeAfterRead, NThreading::NewPromise<TRope>()); - auto it = LoadingBlobs_.emplace(blobId, std::move(loadingblobInfo)).first; + bool removeBlobAfterRead = msg.RemoveBlobAfterRead_; - ActorSystem_->Send(SpillingActorId_, new TEvDqSpilling::TEvRead(blobId, false)); + TLoadingBlobInfo loadingBlobInfo = std::make_pair(removeBlobAfterRead, std::move(msg.Promise_)); + LoadingBlobs_.emplace(msg.Key_, std::move(loadingBlobInfo)); - auto& promise = it->second.second; - return promise.GetFuture(); + Send(SpillingActorId_, new TEvDqSpilling::TEvRead(msg.Key_, removeBlobAfterRead)); } - void PassAway() override { - InitializeIfNot(); - ActorSystem_->Send(SpillingActorId_, new TEvents::TEvPoison); - TBase::PassAway(); - } + void HandleWork(TEvDelete::TPtr& ev) { + auto& msg = *ev->Get(); - void FailOnError() { - InitializeIfNot(); - if (Error_) { - LOG_E("Error: " << *Error_); - ActorSystem_->Send(SpillingActorId_, new TEvents::TEvPoison); + if (!StoredBlobs_.contains(msg.Key_)) { + msg.Promise_.SetValue(); + return; } - } -private: - STATEFN(WorkState) { - switch (ev->GetTypeRewrite()) { - hFunc(TEvDqSpilling::TEvWriteResult, HandleWork); - hFunc(TEvDqSpilling::TEvReadResult, HandleWork); - hFunc(TEvDqSpilling::TEvError, HandleWork); - cFunc(TEvents::TEvPoison::EventType, PassAway); - default: - Y_ABORT("TDqComputeStorageActor::WorkState unexpected event type: %" PRIx32 " event: %s", - ev->GetTypeRewrite(), - ev->ToString().data()); - } + DeletingBlobs_.emplace(msg.Key_, std::move(msg.Promise_)); + + Send(SpillingActorId_, new TEvDqSpilling::TEvRead(msg.Key_, true)); } void HandleWork(TEvDqSpilling::TEvWriteResult::TPtr& ev) { auto& msg = *ev->Get(); LOG_T("[TEvWriteResult] blobId: " << msg.BlobId); - // Use lock to prevent race when state is changed on event processing and on Put call - std::lock_guard lock(Mutex_); auto it = WritingBlobs_.find(msg.BlobId); if (it == WritingBlobs_.end()) { @@ -169,7 +144,7 @@ private: Error_ = "Internal error"; - ActorSystem_->Send(SpillingActorId_, new TEvents::TEvPoison); + Send(SpillingActorId_, new TEvents::TEvPoison); return; } @@ -180,11 +155,11 @@ private: StoredBlobsCount_++; StoredBlobsSize_ += size; - StoredBlobs_.insert(msg.BlobId); - // complete future and wake up waiting compute node promise.SetValue(msg.BlobId); + StoredBlobs_.emplace(msg.BlobId); + WritingBlobs_.erase(it); WakeupCallback_(); } @@ -193,9 +168,6 @@ private: auto& msg = *ev->Get(); LOG_T("[TEvReadResult] blobId: " << msg.BlobId << ", size: " << msg.Blob.size()); - // Use lock to prevent race when state is changed on event processing and on Put call - std::lock_guard lock(Mutex_); - // Deletion is read without fetching the results. So, after the deletion library sends TEvReadResult event // Check if the intention was to delete and complete correct future in this case. if (HandleDelete(msg.BlobId, msg.Blob.Size())) { @@ -209,13 +181,13 @@ private: Error_ = "Internal error"; - ActorSystem_->Send(SpillingActorId_, new TEvents::TEvPoison); + Send(SpillingActorId_, new TEvents::TEvPoison); return; } bool removedAfterRead = it->second.first; if (removedAfterRead) { - UpdateStatsAfterBlobDeletion(msg.BlobId, msg.Blob.Size()); + UpdateStatsAfterBlobDeletion(msg.Blob.Size(), msg.BlobId); } TRope res(TString(reinterpret_cast<const char*>(msg.Blob.Data()), msg.Blob.Size())); @@ -235,13 +207,13 @@ private: Error_.ConstructInPlace(msg.Message); } - bool HandleDelete(IDqComputeStorageActor::TKey blobId, ui64 size) { + bool HandleDelete(TKey blobId, ui64 size) { auto it = DeletingBlobs_.find(blobId); if (it == DeletingBlobs_.end()) { return false; } - UpdateStatsAfterBlobDeletion(blobId, size); + UpdateStatsAfterBlobDeletion(size, blobId); auto& promise = it->second; promise.SetValue(); @@ -249,44 +221,29 @@ private: return true; } - void UpdateStatsAfterBlobDeletion(IDqComputeStorageActor::TKey blobId, ui64 size) { + void UpdateStatsAfterBlobDeletion(ui64 size, TKey blobId) { StoredBlobsCount_--; StoredBlobsSize_ -= size; StoredBlobs_.erase(blobId); } - // It's illegal to initialize an inner actor in the actor's ctor. Because in this case ctx will not be initialized because it's initialized afger Bootstrap event. - // But also it's not possible to initialize inner actor in the bootstrap function because in this case Put/Get may be called before the Bootstrap -> inner worker will be uninitialized. - // In current implementation it's still possible to leave inner actor uninitialized that is why it's planned to split this class into Actor part + non actor part - void InitializeIfNot() { - if (IsInitialized_) return; - auto spillingActor = CreateDqLocalFileSpillingActor(TxId_, SpillerName_, - SelfId(), false); - SpillingActorId_ = Register(spillingActor); - - IsInitialized_ = true; - } - - protected: const TTxId TxId_; TActorId SpillingActorId_; - TActorSystem* ActorSystem_; - TMap<IDqComputeStorageActor::TKey, TWritingBlobInfo> WritingBlobs_; - TSet<ui64> StoredBlobs_; + TMap<TKey, TWritingBlobInfo> WritingBlobs_; ui64 WritingBlobsSize_ = 0; ui32 StoredBlobsCount_ = 0; ui64 StoredBlobsSize_ = 0; - TMap<IDqComputeStorageActor::TKey, TLoadingBlobInfo> LoadingBlobs_; + TMap<TKey, TLoadingBlobInfo> LoadingBlobs_; - TMap<IDqComputeStorageActor::TKey, TDeletingBlobInfo> DeletingBlobs_; + TMap<TKey, TDeletingBlobInfo> DeletingBlobs_; TMaybe<TString> Error_; - IDqComputeStorageActor::TKey NextBlobId = 0; + TKey NextBlobId = 0; TString SpillerName_; @@ -294,15 +251,14 @@ private: std::function<void()> WakeupCallback_; - private: - std::mutex Mutex_; + TSet<TKey> StoredBlobs_; }; } // anonymous namespace -IDqComputeStorageActor* CreateDqComputeStorageActor(TTxId txId, const TString& spillerName, std::function<void()> wakeupCallback, TActorSystem* actorSystem) { - return new TDqComputeStorageActor(txId, spillerName, wakeupCallback, actorSystem); +IDqComputeStorageActor* CreateDqComputeStorageActor(TTxId txId, const TString& spillerName, std::function<void()> wakeupCallback) { + return new TDqComputeStorageActor(txId, spillerName, wakeupCallback); } -} // namespace NYql::NDq +} // namespace NYql::NDq diff --git a/ydb/library/yql/dq/actors/spilling/compute_storage_actor.h b/ydb/library/yql/dq/actors/spilling/compute_storage_actor.h index a041c3caec7..5c680a54bff 100644 --- a/ydb/library/yql/dq/actors/spilling/compute_storage_actor.h +++ b/ydb/library/yql/dq/actors/spilling/compute_storage_actor.h @@ -6,7 +6,6 @@ namespace NYql::NDq { -// This class will be refactored to be the Actor part of the spiller class IDqComputeStorageActor { public: @@ -16,16 +15,52 @@ public: virtual ~IDqComputeStorageActor() = default; virtual NActors::IActor* GetActor() = 0; +}; + +struct TDqComputeStorageActorEvents { + enum { + EvPut = EventSpaceBegin(NActors::TEvents::EEventSpace::ES_USERSPACE) + 30000, + EvGet, + EvExtract, + EvDelete + }; +}; - virtual NThreading::TFuture<TKey> Put(TRope&& blob) = 0; +struct TEvPut : NActors::TEventLocal<TEvPut, TDqComputeStorageActorEvents::EvPut> { + TEvPut(TRope&& blob, NThreading::TPromise<IDqComputeStorageActor::TKey>&& promise) + : Blob_(std::move(blob)) + , Promise_(std::move(promise)) + { + } - virtual std::optional<NThreading::TFuture<TRope>> Get(TKey key) = 0; + TRope Blob_; + NThreading::TPromise<IDqComputeStorageActor::TKey> Promise_; +}; + +struct TEvGet : NActors::TEventLocal<TEvGet, TDqComputeStorageActorEvents::EvGet> { + TEvGet(IDqComputeStorageActor::TKey key, NThreading::TPromise<std::optional<TRope>>&& promise, bool removeBlobAfterRead) + : Key_(key) + , Promise_(std::move(promise)) + , RemoveBlobAfterRead_(removeBlobAfterRead) + { + } + + IDqComputeStorageActor::TKey Key_; + NThreading::TPromise<std::optional<TRope>> Promise_; + bool RemoveBlobAfterRead_; +}; - virtual std::optional<NThreading::TFuture<TRope>> Extract(TKey key) = 0; +struct TEvDelete : NActors::TEventLocal<TEvDelete, TDqComputeStorageActorEvents::EvDelete> { + TEvDelete(IDqComputeStorageActor::TKey key, NThreading::TPromise<void>&& promise) + : Key_(key) + , Promise_(std::move(promise)) + { + } - virtual NThreading::TFuture<void> Delete(TKey key) = 0; + IDqComputeStorageActor::TKey Key_; + NThreading::TPromise<void> Promise_; }; -IDqComputeStorageActor* CreateDqComputeStorageActor(TTxId txId, const TString& spillerName, std::function<void()> wakeupCallback, NActors::TActorSystem* actorSystem); +IDqComputeStorageActor* CreateDqComputeStorageActor(TTxId txId, const TString& spillerName, std::function<void()> wakeupCallback); } // namespace NYql::NDq diff --git a/ydb/library/yql/minikql/computation/mkql_spiller.h b/ydb/library/yql/minikql/computation/mkql_spiller.h index c376a088d66..28799fa3ee6 100644 --- a/ydb/library/yql/minikql/computation/mkql_spiller.h +++ b/ydb/library/yql/minikql/computation/mkql_spiller.h @@ -15,11 +15,11 @@ struct ISpiller ///\return /// nullopt for absent keys /// TFuture - virtual std::optional<NThreading::TFuture<TRope>> Get(TKey key) = 0; + virtual NThreading::TFuture<std::optional<TRope>> Get(TKey key) = 0; virtual NThreading::TFuture<void> Delete(TKey) = 0; ///Get + Delete ///Stored value may be moved to future - virtual std::optional<NThreading::TFuture<TRope>> Extract(TKey key) = 0; + virtual NThreading::TFuture<std::optional<TRope>> Extract(TKey key) = 0; }; }//namespace NKikimr::NMiniKQL |