aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorFilitov Mikhail <filitovme@gmail.com>2024-02-14 13:17:52 +0100
committerGitHub <noreply@github.com>2024-02-14 13:17:52 +0100
commit133543677242820edf78a8d6ae9be8faa92eae4c (patch)
treed2a6c176afaa1249998bd48a7f5b7a1d605c1fdb
parenta26ebd7d10133f3a00ed3d26f20b466353a8aff5 (diff)
downloadydb-133543677242820edf78a8d6ae9be8faa92eae4c.tar.gz
[YQL-17709] [compute] split spilling into actor and non-actor parts (#1761)
-rw-r--r--ydb/library/yql/dq/actors/spilling/compute_storage.cpp32
-rw-r--r--ydb/library/yql/dq/actors/spilling/compute_storage.h6
-rw-r--r--ydb/library/yql/dq/actors/spilling/compute_storage_actor.cpp184
-rw-r--r--ydb/library/yql/dq/actors/spilling/compute_storage_actor.h47
-rw-r--r--ydb/library/yql/minikql/computation/mkql_spiller.h4
5 files changed, 142 insertions, 131 deletions
diff --git a/ydb/library/yql/dq/actors/spilling/compute_storage.cpp b/ydb/library/yql/dq/actors/spilling/compute_storage.cpp
index 48f30d628c8..cba96b115a6 100644
--- a/ydb/library/yql/dq/actors/spilling/compute_storage.cpp
+++ b/ydb/library/yql/dq/actors/spilling/compute_storage.cpp
@@ -9,7 +9,7 @@ using namespace NActors;
TDqComputeStorage::TDqComputeStorage(TTxId txId, std::function<void()> wakeUpCallback, TActorSystem* actorSystem) : ActorSystem_(actorSystem) {
TStringStream spillerName;
spillerName << "Spiller" << "_" << CreateGuidAsString();
- ComputeStorageActor_ = CreateDqComputeStorageActor(txId, spillerName.Str(), wakeUpCallback, ActorSystem_);
+ ComputeStorageActor_ = CreateDqComputeStorageActor(txId, spillerName.Str(), wakeUpCallback);
ComputeStorageActorId_ = ActorSystem_->Register(ComputeStorageActor_->GetActor());
}
@@ -18,19 +18,37 @@ TDqComputeStorage::~TDqComputeStorage() {
}
NThreading::TFuture<NKikimr::NMiniKQL::ISpiller::TKey> TDqComputeStorage::Put(TRope&& blob) {
- return ComputeStorageActor_->Put(std::move(blob));
+ auto promise = NThreading::NewPromise<NKikimr::NMiniKQL::ISpiller::TKey>();
+ auto future = promise.GetFuture();
+
+ ActorSystem_->Send(ComputeStorageActorId_, new TEvPut(std::move(blob), std::move(promise)));
+ return future;
}
-std::optional<NThreading::TFuture<TRope>> TDqComputeStorage::Get(TKey key) {
- return ComputeStorageActor_->Get(key);
+NThreading::TFuture<std::optional<TRope>> TDqComputeStorage::Get(TKey key) {
+ return GetInternal(key, false);
}
NThreading::TFuture<void> TDqComputeStorage::Delete(TKey key) {
- return ComputeStorageActor_->Delete(key);
+ auto promise = NThreading::NewPromise<void>();
+ auto future = promise.GetFuture();
+
+ ActorSystem_->Send(ComputeStorageActorId_, new TEvDelete(key, std::move(promise)));
+
+ return future;
}
-std::optional<NThreading::TFuture<TRope>> TDqComputeStorage::Extract(TKey key) {
- return ComputeStorageActor_->Extract(key);
+NThreading::TFuture<std::optional<TRope>> TDqComputeStorage::Extract(TKey key) {
+ return GetInternal(key, true);
+}
+
+NThreading::TFuture<std::optional<TRope>> TDqComputeStorage::GetInternal(TKey key, bool removeBlobAfterRead) {
+
+ auto promise = NThreading::NewPromise<std::optional<TRope>>();
+ auto future = promise.GetFuture();
+
+ ActorSystem_->Send(ComputeStorageActorId_, new TEvGet(key, std::move(promise), removeBlobAfterRead));
+ return future;
}
} // namespace NYql::NDq
diff --git a/ydb/library/yql/dq/actors/spilling/compute_storage.h b/ydb/library/yql/dq/actors/spilling/compute_storage.h
index 208fa011623..695770bfef2 100644
--- a/ydb/library/yql/dq/actors/spilling/compute_storage.h
+++ b/ydb/library/yql/dq/actors/spilling/compute_storage.h
@@ -23,13 +23,15 @@ public:
NThreading::TFuture<TKey> Put(TRope&& blob);
- std::optional<NThreading::TFuture<TRope>> Get(TKey key);
+ NThreading::TFuture<std::optional<TRope>> Get(TKey key);
- std::optional<NThreading::TFuture<TRope>> Extract(TKey key);
+ NThreading::TFuture<std::optional<TRope>> Extract(TKey key);
NThreading::TFuture<void> Delete(TKey key);
private:
+ NThreading::TFuture<std::optional<TRope>> GetInternal(TKey key, bool removeBlobAfterRead);
+
NActors::TActorSystem* ActorSystem_;
IDqComputeStorageActor* ComputeStorageActor_;
NActors::TActorId ComputeStorageActorId_;
diff --git a/ydb/library/yql/dq/actors/spilling/compute_storage_actor.cpp b/ydb/library/yql/dq/actors/spilling/compute_storage_actor.cpp
index e1de850d579..8989f9da34c 100644
--- a/ydb/library/yql/dq/actors/spilling/compute_storage_actor.cpp
+++ b/ydb/library/yql/dq/actors/spilling/compute_storage_actor.cpp
@@ -36,19 +36,21 @@ class TDqComputeStorageActor : public NActors::TActorBootstrapped<TDqComputeStor
// size + promise with key
using TWritingBlobInfo = std::pair<ui64, NThreading::TPromise<IDqComputeStorageActor::TKey>>;
// remove after read + promise with blob
- using TLoadingBlobInfo = std::pair<bool, NThreading::TPromise<TRope>>;
+ using TLoadingBlobInfo = std::pair<bool, NThreading::TPromise<std::optional<TRope>>>;
// void promise that completes when block is removed
using TDeletingBlobInfo = NThreading::TPromise<void>;
public:
- TDqComputeStorageActor(TTxId txId, const TString& spillerName, std::function<void()> wakeupCallback, TActorSystem* actorSystem)
+ TDqComputeStorageActor(TTxId txId, const TString& spillerName, std::function<void()> wakeupCallback)
: TxId_(txId),
- ActorSystem_(actorSystem),
SpillerName_(spillerName),
WakeupCallback_(wakeupCallback)
{
}
void Bootstrap() {
+ auto spillingActor = CreateDqLocalFileSpillingActor(TxId_, SpillerName_,
+ SelfId(), false);
+ SpillingActorId_ = Register(spillingActor);
Become(&TDqComputeStorageActor::WorkState);
}
@@ -58,110 +60,83 @@ public:
return this;
}
- NThreading::TFuture<IDqComputeStorageActor::TKey> Put(TRope&& blob) override {
- InitializeIfNot();
- // Use lock to prevent race when state is changed on event processing and on Put call
- std::lock_guard lock(Mutex_);
-
- FailOnError();
-
- ui64 size = blob.size();
-
- ActorSystem_->Send(SpillingActorId_, new TEvDqSpilling::TEvWrite(NextBlobId, std::move(blob)));
-
- auto it = WritingBlobs_.emplace(NextBlobId, std::make_pair(size, NThreading::NewPromise<IDqComputeStorageActor::TKey>())).first;
- WritingBlobsSize_ += size;
-
- ++NextBlobId;
- auto& promise = it->second.second;
+protected:
- return promise.GetFuture();
+ void FailOnError() {
+ if (Error_) {
+ LOG_E("Error: " << *Error_);
+ Send(SpillingActorId_, new TEvents::TEvPoison);
+ }
}
- std::optional<NThreading::TFuture<TRope>> Get(IDqComputeStorageActor::TKey blobId) override {
- return GetInternal(blobId, false);
+private:
+ STATEFN(WorkState) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvDqSpilling::TEvWriteResult, HandleWork);
+ hFunc(TEvDqSpilling::TEvReadResult, HandleWork);
+ hFunc(TEvDqSpilling::TEvError, HandleWork);
+ hFunc(TEvGet, HandleWork);
+ hFunc(TEvPut, HandleWork);
+ hFunc(TEvDelete, HandleWork);
+ hFunc(TEvents::TEvPoison, HandleWork);
+ default:
+ Y_ABORT("TDqComputeStorageActor::WorkState unexpected event type: %" PRIx32 " event: %s",
+ ev->GetTypeRewrite(),
+ ev->ToString().data());
+ }
}
- std::optional<NThreading::TFuture<TRope>> Extract(IDqComputeStorageActor::TKey blobId) override {
- return GetInternal(blobId, true);
+ void HandleWork(TEvents::TEvPoison::TPtr&) {
+ Send(SpillingActorId_, new TEvents::TEvPoison);
+ PassAway();
}
- NThreading::TFuture<void> Delete(IDqComputeStorageActor::TKey blobId) override {
- InitializeIfNot();
- // Use lock to prevent race when state is changed on event processing and on Delete call
- std::lock_guard lock(Mutex_);
-
- FailOnError();
-
- auto promise = NThreading::NewPromise<void>();
- auto future = promise.GetFuture();
-
- if (!StoredBlobs_.contains(blobId)) {
- promise.SetValue();
- return future;
- }
+ void HandleWork(TEvPut::TPtr& ev) {
+ auto& msg = *ev->Get();
+ ui64 size = msg.Blob_.size();
- DeletingBlobs_.emplace(blobId, std::move(promise));
+ Send(SpillingActorId_, new TEvDqSpilling::TEvWrite(NextBlobId, std::move(msg.Blob_)));
- ActorSystem_->Send(SpillingActorId_, new TEvDqSpilling::TEvRead(blobId, true));
+ WritingBlobs_.emplace(NextBlobId, std::make_pair(size, std::move(msg.Promise_)));
+ WritingBlobsSize_ += size;
- return future;
+ ++NextBlobId;
}
-protected:
- std::optional<NThreading::TFuture<TRope>>GetInternal(IDqComputeStorageActor::TKey blobId, bool removeAfterRead) {
- InitializeIfNot();
- // Use lock to prevent race when state is changed on event processing and on Get call
- std::lock_guard lock(Mutex_);
-
- FailOnError();
+ void HandleWork(TEvGet::TPtr& ev) {
+ auto& msg = *ev->Get();
- if (!StoredBlobs_.contains(blobId)) return std::nullopt;
+ if (!StoredBlobs_.contains(msg.Key_)) {
+ msg.Promise_.SetValue(std::nullopt);
+ return;
+ }
- TLoadingBlobInfo loadingblobInfo = std::make_pair(removeAfterRead, NThreading::NewPromise<TRope>());
- auto it = LoadingBlobs_.emplace(blobId, std::move(loadingblobInfo)).first;
+ bool removeBlobAfterRead = msg.RemoveBlobAfterRead_;
- ActorSystem_->Send(SpillingActorId_, new TEvDqSpilling::TEvRead(blobId, false));
+ TLoadingBlobInfo loadingBlobInfo = std::make_pair(removeBlobAfterRead, std::move(msg.Promise_));
+ LoadingBlobs_.emplace(msg.Key_, std::move(loadingBlobInfo));
- auto& promise = it->second.second;
- return promise.GetFuture();
+ Send(SpillingActorId_, new TEvDqSpilling::TEvRead(msg.Key_, removeBlobAfterRead));
}
- void PassAway() override {
- InitializeIfNot();
- ActorSystem_->Send(SpillingActorId_, new TEvents::TEvPoison);
- TBase::PassAway();
- }
+ void HandleWork(TEvDelete::TPtr& ev) {
+ auto& msg = *ev->Get();
- void FailOnError() {
- InitializeIfNot();
- if (Error_) {
- LOG_E("Error: " << *Error_);
- ActorSystem_->Send(SpillingActorId_, new TEvents::TEvPoison);
+ if (!StoredBlobs_.contains(msg.Key_)) {
+ msg.Promise_.SetValue();
+ return;
}
- }
-private:
- STATEFN(WorkState) {
- switch (ev->GetTypeRewrite()) {
- hFunc(TEvDqSpilling::TEvWriteResult, HandleWork);
- hFunc(TEvDqSpilling::TEvReadResult, HandleWork);
- hFunc(TEvDqSpilling::TEvError, HandleWork);
- cFunc(TEvents::TEvPoison::EventType, PassAway);
- default:
- Y_ABORT("TDqComputeStorageActor::WorkState unexpected event type: %" PRIx32 " event: %s",
- ev->GetTypeRewrite(),
- ev->ToString().data());
- }
+ DeletingBlobs_.emplace(msg.Key_, std::move(msg.Promise_));
+
+ Send(SpillingActorId_, new TEvDqSpilling::TEvRead(msg.Key_, true));
}
void HandleWork(TEvDqSpilling::TEvWriteResult::TPtr& ev) {
auto& msg = *ev->Get();
LOG_T("[TEvWriteResult] blobId: " << msg.BlobId);
- // Use lock to prevent race when state is changed on event processing and on Put call
- std::lock_guard lock(Mutex_);
auto it = WritingBlobs_.find(msg.BlobId);
if (it == WritingBlobs_.end()) {
@@ -169,7 +144,7 @@ private:
Error_ = "Internal error";
- ActorSystem_->Send(SpillingActorId_, new TEvents::TEvPoison);
+ Send(SpillingActorId_, new TEvents::TEvPoison);
return;
}
@@ -180,11 +155,11 @@ private:
StoredBlobsCount_++;
StoredBlobsSize_ += size;
- StoredBlobs_.insert(msg.BlobId);
-
// complete future and wake up waiting compute node
promise.SetValue(msg.BlobId);
+ StoredBlobs_.emplace(msg.BlobId);
+
WritingBlobs_.erase(it);
WakeupCallback_();
}
@@ -193,9 +168,6 @@ private:
auto& msg = *ev->Get();
LOG_T("[TEvReadResult] blobId: " << msg.BlobId << ", size: " << msg.Blob.size());
- // Use lock to prevent race when state is changed on event processing and on Put call
- std::lock_guard lock(Mutex_);
-
// Deletion is read without fetching the results. So, after the deletion library sends TEvReadResult event
// Check if the intention was to delete and complete correct future in this case.
if (HandleDelete(msg.BlobId, msg.Blob.Size())) {
@@ -209,13 +181,13 @@ private:
Error_ = "Internal error";
- ActorSystem_->Send(SpillingActorId_, new TEvents::TEvPoison);
+ Send(SpillingActorId_, new TEvents::TEvPoison);
return;
}
bool removedAfterRead = it->second.first;
if (removedAfterRead) {
- UpdateStatsAfterBlobDeletion(msg.BlobId, msg.Blob.Size());
+ UpdateStatsAfterBlobDeletion(msg.Blob.Size(), msg.BlobId);
}
TRope res(TString(reinterpret_cast<const char*>(msg.Blob.Data()), msg.Blob.Size()));
@@ -235,13 +207,13 @@ private:
Error_.ConstructInPlace(msg.Message);
}
- bool HandleDelete(IDqComputeStorageActor::TKey blobId, ui64 size) {
+ bool HandleDelete(TKey blobId, ui64 size) {
auto it = DeletingBlobs_.find(blobId);
if (it == DeletingBlobs_.end()) {
return false;
}
- UpdateStatsAfterBlobDeletion(blobId, size);
+ UpdateStatsAfterBlobDeletion(size, blobId);
auto& promise = it->second;
promise.SetValue();
@@ -249,44 +221,29 @@ private:
return true;
}
- void UpdateStatsAfterBlobDeletion(IDqComputeStorageActor::TKey blobId, ui64 size) {
+ void UpdateStatsAfterBlobDeletion(ui64 size, TKey blobId) {
StoredBlobsCount_--;
StoredBlobsSize_ -= size;
StoredBlobs_.erase(blobId);
}
- // It's illegal to initialize an inner actor in the actor's ctor. Because in this case ctx will not be initialized because it's initialized afger Bootstrap event.
- // But also it's not possible to initialize inner actor in the bootstrap function because in this case Put/Get may be called before the Bootstrap -> inner worker will be uninitialized.
- // In current implementation it's still possible to leave inner actor uninitialized that is why it's planned to split this class into Actor part + non actor part
- void InitializeIfNot() {
- if (IsInitialized_) return;
- auto spillingActor = CreateDqLocalFileSpillingActor(TxId_, SpillerName_,
- SelfId(), false);
- SpillingActorId_ = Register(spillingActor);
-
- IsInitialized_ = true;
- }
-
-
protected:
const TTxId TxId_;
TActorId SpillingActorId_;
- TActorSystem* ActorSystem_;
- TMap<IDqComputeStorageActor::TKey, TWritingBlobInfo> WritingBlobs_;
- TSet<ui64> StoredBlobs_;
+ TMap<TKey, TWritingBlobInfo> WritingBlobs_;
ui64 WritingBlobsSize_ = 0;
ui32 StoredBlobsCount_ = 0;
ui64 StoredBlobsSize_ = 0;
- TMap<IDqComputeStorageActor::TKey, TLoadingBlobInfo> LoadingBlobs_;
+ TMap<TKey, TLoadingBlobInfo> LoadingBlobs_;
- TMap<IDqComputeStorageActor::TKey, TDeletingBlobInfo> DeletingBlobs_;
+ TMap<TKey, TDeletingBlobInfo> DeletingBlobs_;
TMaybe<TString> Error_;
- IDqComputeStorageActor::TKey NextBlobId = 0;
+ TKey NextBlobId = 0;
TString SpillerName_;
@@ -294,15 +251,14 @@ private:
std::function<void()> WakeupCallback_;
- private:
- std::mutex Mutex_;
+ TSet<TKey> StoredBlobs_;
};
} // anonymous namespace
-IDqComputeStorageActor* CreateDqComputeStorageActor(TTxId txId, const TString& spillerName, std::function<void()> wakeupCallback, TActorSystem* actorSystem) {
- return new TDqComputeStorageActor(txId, spillerName, wakeupCallback, actorSystem);
+IDqComputeStorageActor* CreateDqComputeStorageActor(TTxId txId, const TString& spillerName, std::function<void()> wakeupCallback) {
+ return new TDqComputeStorageActor(txId, spillerName, wakeupCallback);
}
-} // namespace NYql::NDq
+} // namespace NYql::NDq
diff --git a/ydb/library/yql/dq/actors/spilling/compute_storage_actor.h b/ydb/library/yql/dq/actors/spilling/compute_storage_actor.h
index a041c3caec7..5c680a54bff 100644
--- a/ydb/library/yql/dq/actors/spilling/compute_storage_actor.h
+++ b/ydb/library/yql/dq/actors/spilling/compute_storage_actor.h
@@ -6,7 +6,6 @@
namespace NYql::NDq {
-// This class will be refactored to be the Actor part of the spiller
class IDqComputeStorageActor
{
public:
@@ -16,16 +15,52 @@ public:
virtual ~IDqComputeStorageActor() = default;
virtual NActors::IActor* GetActor() = 0;
+};
+
+struct TDqComputeStorageActorEvents {
+ enum {
+ EvPut = EventSpaceBegin(NActors::TEvents::EEventSpace::ES_USERSPACE) + 30000,
+ EvGet,
+ EvExtract,
+ EvDelete
+ };
+};
- virtual NThreading::TFuture<TKey> Put(TRope&& blob) = 0;
+struct TEvPut : NActors::TEventLocal<TEvPut, TDqComputeStorageActorEvents::EvPut> {
+ TEvPut(TRope&& blob, NThreading::TPromise<IDqComputeStorageActor::TKey>&& promise)
+ : Blob_(std::move(blob))
+ , Promise_(std::move(promise))
+ {
+ }
- virtual std::optional<NThreading::TFuture<TRope>> Get(TKey key) = 0;
+ TRope Blob_;
+ NThreading::TPromise<IDqComputeStorageActor::TKey> Promise_;
+};
+
+struct TEvGet : NActors::TEventLocal<TEvGet, TDqComputeStorageActorEvents::EvGet> {
+ TEvGet(IDqComputeStorageActor::TKey key, NThreading::TPromise<std::optional<TRope>>&& promise, bool removeBlobAfterRead)
+ : Key_(key)
+ , Promise_(std::move(promise))
+ , RemoveBlobAfterRead_(removeBlobAfterRead)
+ {
+ }
+
+ IDqComputeStorageActor::TKey Key_;
+ NThreading::TPromise<std::optional<TRope>> Promise_;
+ bool RemoveBlobAfterRead_;
+};
- virtual std::optional<NThreading::TFuture<TRope>> Extract(TKey key) = 0;
+struct TEvDelete : NActors::TEventLocal<TEvDelete, TDqComputeStorageActorEvents::EvDelete> {
+ TEvDelete(IDqComputeStorageActor::TKey key, NThreading::TPromise<void>&& promise)
+ : Key_(key)
+ , Promise_(std::move(promise))
+ {
+ }
- virtual NThreading::TFuture<void> Delete(TKey key) = 0;
+ IDqComputeStorageActor::TKey Key_;
+ NThreading::TPromise<void> Promise_;
};
-IDqComputeStorageActor* CreateDqComputeStorageActor(TTxId txId, const TString& spillerName, std::function<void()> wakeupCallback, NActors::TActorSystem* actorSystem);
+IDqComputeStorageActor* CreateDqComputeStorageActor(TTxId txId, const TString& spillerName, std::function<void()> wakeupCallback);
} // namespace NYql::NDq
diff --git a/ydb/library/yql/minikql/computation/mkql_spiller.h b/ydb/library/yql/minikql/computation/mkql_spiller.h
index c376a088d66..28799fa3ee6 100644
--- a/ydb/library/yql/minikql/computation/mkql_spiller.h
+++ b/ydb/library/yql/minikql/computation/mkql_spiller.h
@@ -15,11 +15,11 @@ struct ISpiller
///\return
/// nullopt for absent keys
/// TFuture
- virtual std::optional<NThreading::TFuture<TRope>> Get(TKey key) = 0;
+ virtual NThreading::TFuture<std::optional<TRope>> Get(TKey key) = 0;
virtual NThreading::TFuture<void> Delete(TKey) = 0;
///Get + Delete
///Stored value may be moved to future
- virtual std::optional<NThreading::TFuture<TRope>> Extract(TKey key) = 0;
+ virtual NThreading::TFuture<std::optional<TRope>> Extract(TKey key) = 0;
};
}//namespace NKikimr::NMiniKQL