diff options
author | Aidar Samerkhanov <aidarsamer@ydb.tech> | 2023-12-20 11:46:01 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-12-20 11:46:01 +0300 |
commit | 8a076ec337646cf0442f8abe9eb5ef0dc8e88e06 (patch) | |
tree | 83beca837b4b5d7ac3bbf3b0885b7835db59598c | |
parent | bf58f60cc2fd286707680b1c2d21803a07da0d31 (diff) | |
download | ydb-8a076ec337646cf0442f8abe9eb5ef0dc8e88e06.tar.gz |
YQL-17087: Move DqChannelStorage to separate file and use interface (#591)
4 files changed, 256 insertions, 206 deletions
diff --git a/ydb/library/yql/dq/actors/spilling/channel_storage.cpp b/ydb/library/yql/dq/actors/spilling/channel_storage.cpp index 1f8b61dcf8d..4b18d376633 100644 --- a/ydb/library/yql/dq/actors/spilling/channel_storage.cpp +++ b/ydb/library/yql/dq/actors/spilling/channel_storage.cpp @@ -1,6 +1,6 @@ #include "channel_storage.h" -#include "spilling.h" -#include "spilling_file.h" + +#include "channel_storage_actor.h" #include <ydb/library/yql/utils/yql_panic.h> #include <ydb/library/services/services.pb.h> @@ -12,220 +12,19 @@ #include <util/generic/buffer.h> #include <util/generic/map.h> #include <util/generic/set.h> -#include <util/generic/size_literals.h> namespace NYql::NDq { using namespace NActors; -#define LOG_D(s) \ - LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId_ << ", channelId: " << ChannelId_ << ". " << s) -#define LOG_I(s) \ - LOG_INFO_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId << ", channelId: " << ChannelId << ". " << s) -#define LOG_E(s) \ - LOG_ERROR_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId_ << ", channelId: " << ChannelId_ << ". " << s) -#define LOG_C(s) \ - LOG_CRIT_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId << ", channelId: " << ChannelId << ". " << s) -#define LOG_W(s) \ - LOG_WARN_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId << ", channelId: " << ChannelId << ". " << s) -#define LOG_T(s) \ - LOG_TRACE_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId_ << ", channelId: " << ChannelId_ << ". " << s) - namespace { -constexpr ui32 MAX_INFLIGHT_BLOBS_COUNT = 10; -constexpr ui64 MAX_INFLIGHT_BLOBS_SIZE = 50_MB; - -class TDqChannelStorageActor : public TActorBootstrapped<TDqChannelStorageActor> { - using TBase = TActorBootstrapped<TDqChannelStorageActor>; - -public: - TDqChannelStorageActor(TTxId txId, ui64 channelId, IDqChannelStorage::TWakeUpCallback&& wakeUp, TActorSystem* actorSystem) - : TxId_(txId) - , ChannelId_(channelId) - , WakeUp_(std::move(wakeUp)) - , ActorSystem_(actorSystem) - {} - - void Bootstrap() { - auto spillingActor = CreateDqLocalFileSpillingActor(TxId_, TStringBuilder() << "ChannelId: " << ChannelId_, - SelfId(), true); - SpillingActorId_ = Register(spillingActor); - - Become(&TDqChannelStorageActor::WorkState); - } - - static constexpr char ActorName[] = "DQ_CHANNEL_STORAGE"; - -protected: - void PassAway() override { - Send(SpillingActorId_, new TEvents::TEvPoison); - TBase::PassAway(); - } - -private: - STATEFN(WorkState) { - switch (ev->GetTypeRewrite()) { - hFunc(TEvDqSpilling::TEvWriteResult, HandleWork); - hFunc(TEvDqSpilling::TEvReadResult, HandleWork); - hFunc(TEvDqSpilling::TEvError, HandleWork); - default: - Y_ABORT("TDqChannelStorageActor::WorkState unexpected event type: %" PRIx32 " event: %s", - ev->GetTypeRewrite(), - ev->ToString().data()); - } - } - - void HandleWork(TEvDqSpilling::TEvWriteResult::TPtr& ev) { - auto& msg = *ev->Get(); - LOG_T("[TEvWriteResult] blobId: " << msg.BlobId); - - auto it = WritingBlobs_.find(msg.BlobId); - if (it == WritingBlobs_.end()) { - LOG_E("Got unexpected TEvWriteResult, blobId: " << msg.BlobId); - - Error_ = "Internal error"; - - Send(SpillingActorId_, new TEvents::TEvPoison); - return; - } - - ui64 size = it->second; - WritingBlobsSize_ -= size; - WritingBlobs_.erase(it); - - StoredBlobsCount_++; - StoredBlobsSize_ += size; - } - - void HandleWork(TEvDqSpilling::TEvReadResult::TPtr& ev) { - auto& msg = *ev->Get(); - LOG_T("[TEvReadResult] blobId: " << msg.BlobId << ", size: " << msg.Blob.size()); - - if (LoadingBlobs_.erase(msg.BlobId) != 1) { - LOG_E("[TEvReadResult] unexpected, blobId: " << msg.BlobId << ", size: " << msg.Blob.size()); - return; - } - - LoadedBlobs_[msg.BlobId].Swap(msg.Blob); - YQL_ENSURE(LoadedBlobs_[msg.BlobId].size() != 0); - - if (LoadedBlobs_.size() == 1) { - WakeUp_(); - } - } - - void HandleWork(TEvDqSpilling::TEvError::TPtr& ev) { - auto& msg = *ev->Get(); - LOG_D("[TEvError] " << msg.Message); - - Error_.ConstructInPlace(msg.Message); - } - -public: - [[nodiscard]] - const TMaybe<TString>& GetError() const { - return Error_; - } - - bool IsEmpty() const { - return WritingBlobs_.empty() && StoredBlobsCount_ == 0 && LoadedBlobs_.empty(); - } - - bool IsFull() const { - return WritingBlobs_.size() > MAX_INFLIGHT_BLOBS_COUNT || WritingBlobsSize_ > MAX_INFLIGHT_BLOBS_SIZE; - } - - void Put(ui64 blobId, TRope&& blob, ui64 cookie) { - FailOnError(); - - // TODO: timeout - // TODO: limit inflight events - - ui64 size = blob.size(); - - SendEvent(new TEvDqSpilling::TEvWrite(blobId, std::move(blob)), cookie); - - WritingBlobs_.emplace(blobId, size); - WritingBlobsSize_ += size; - } - - bool Get(ui64 blobId, TBuffer& blob, ui64 cookie) { - FailOnError(); - - auto loadedIt = LoadedBlobs_.find(blobId); - if (loadedIt != LoadedBlobs_.end()) { - YQL_ENSURE(loadedIt->second.size() != 0); - blob.Swap(loadedIt->second); - LoadedBlobs_.erase(loadedIt); - return true; - } - - auto result = LoadingBlobs_.emplace(blobId); - if (result.second) { - SendEvent(new TEvDqSpilling::TEvRead(blobId, true), cookie); - } - - return false; - } - - void Terminate() { - PassAway(); - } - -private: - void FailOnError() { - if (Error_) { - LOG_E("Error: " << *Error_); - ythrow TDqChannelStorageException() << "TxId: " << TxId_ << ", channelId: " << ChannelId_ - << ", error: " << *Error_; - } - } - - template<typename T> - void SendEvent(T* event, ui64 cookie) { - if (ActorSystem_) { - ActorSystem_->Send( - new IEventHandle( - SpillingActorId_, - SelfId(), - event, - /*flags=*/0, - cookie - ) - ); - } else { - Send(SpillingActorId_, event); - } - } - -private: - const TTxId TxId_; - const ui64 ChannelId_; - IDqChannelStorage::TWakeUpCallback WakeUp_; - TActorId SpillingActorId_; - - TMap<ui64, ui64> WritingBlobs_; // blobId -> blobSize - ui64 WritingBlobsSize_ = 0; - - ui32 StoredBlobsCount_ = 0; - ui64 StoredBlobsSize_ = 0; - - TSet<ui64> LoadingBlobs_; - TMap<ui64, TBuffer> LoadedBlobs_; - - TMaybe<TString> Error_; - - TActorSystem* ActorSystem_; -}; - - class TDqChannelStorage : public IDqChannelStorage { public: TDqChannelStorage(TTxId txId, ui64 channelId, TWakeUpCallback&& wakeUp, TActorSystem* actorSystem) { - SelfActor_ = new TDqChannelStorageActor(txId, channelId, std::move(wakeUp), actorSystem); - TlsActivationContext->AsActorContext().RegisterWithSameMailbox(SelfActor_); + SelfActor_ = CreateDqChannelStorageActor(txId, channelId, std::move(wakeUp), actorSystem); + TlsActivationContext->AsActorContext().RegisterWithSameMailbox(SelfActor_->GetActor()); } ~TDqChannelStorage() { @@ -249,7 +48,7 @@ public: } private: - TDqChannelStorageActor* SelfActor_; + IDqChannelStorageActor* SelfActor_; }; } // anonymous namespace diff --git a/ydb/library/yql/dq/actors/spilling/channel_storage_actor.cpp b/ydb/library/yql/dq/actors/spilling/channel_storage_actor.cpp new file mode 100644 index 00000000000..e98719734f3 --- /dev/null +++ b/ydb/library/yql/dq/actors/spilling/channel_storage_actor.cpp @@ -0,0 +1,232 @@ +#include "channel_storage_actor.h" + +#include "spilling.h" +#include "spilling_file.h" + +#include <ydb/library/yql/utils/yql_panic.h> +#include <ydb/library/services/services.pb.h> + +#include <ydb/library/actors/core/actor_bootstrapped.h> +#include <ydb/library/actors/core/hfunc.h> +#include <ydb/library/actors/core/log.h> + +#include <util/generic/size_literals.h> + +namespace NYql::NDq { + +using namespace NActors; + +namespace { + +#define LOG_D(s) \ + LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId_ << ", channelId: " << ChannelId_ << ". " << s) +#define LOG_I(s) \ + LOG_INFO_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId << ", channelId: " << ChannelId << ". " << s) +#define LOG_E(s) \ + LOG_ERROR_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId_ << ", channelId: " << ChannelId_ << ". " << s) +#define LOG_C(s) \ + LOG_CRIT_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId << ", channelId: " << ChannelId << ". " << s) +#define LOG_W(s) \ + LOG_WARN_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId << ", channelId: " << ChannelId << ". " << s) +#define LOG_T(s) \ + LOG_TRACE_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId_ << ", channelId: " << ChannelId_ << ". " << s) + +constexpr ui32 MAX_INFLIGHT_BLOBS_COUNT = 10; +constexpr ui64 MAX_INFLIGHT_BLOBS_SIZE = 50_MB; + +class TDqChannelStorageActor : public IDqChannelStorageActor, + public NActors::TActorBootstrapped<TDqChannelStorageActor> +{ + using TBase = TActorBootstrapped<TDqChannelStorageActor>; +public: + TDqChannelStorageActor(TTxId txId, ui64 channelId, IDqChannelStorage::TWakeUpCallback&& wakeUp, NActors::TActorSystem* actorSystem) + : TxId_(txId) + , ChannelId_(channelId) + , WakeUp_(std::move(wakeUp)) + , ActorSystem_(actorSystem) + {} + + + void Bootstrap() { + auto spillingActor = CreateDqLocalFileSpillingActor(TxId_, TStringBuilder() << "ChannelId: " << ChannelId_, + SelfId(), true); + SpillingActorId_ = Register(spillingActor); + + Become(&TDqChannelStorageActor::WorkState); + } + + static constexpr char ActorName[] = "DQ_CHANNEL_STORAGE"; + + NActors::IActor* GetActor() override { + return this; + } + +protected: + void PassAway() override { + Send(SpillingActorId_, new TEvents::TEvPoison); + TBase::PassAway(); + } + +private: + STATEFN(WorkState) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvDqSpilling::TEvWriteResult, HandleWork); + hFunc(TEvDqSpilling::TEvReadResult, HandleWork); + hFunc(TEvDqSpilling::TEvError, HandleWork); + default: + Y_ABORT("TDqChannelStorageActor::WorkState unexpected event type: %" PRIx32 " event: %s", + ev->GetTypeRewrite(), + ev->ToString().data()); + } + } + + void HandleWork(TEvDqSpilling::TEvWriteResult::TPtr& ev) { + auto& msg = *ev->Get(); + LOG_T("[TEvWriteResult] blobId: " << msg.BlobId); + + auto it = WritingBlobs_.find(msg.BlobId); + if (it == WritingBlobs_.end()) { + LOG_E("Got unexpected TEvWriteResult, blobId: " << msg.BlobId); + + Error_ = "Internal error"; + + Send(SpillingActorId_, new TEvents::TEvPoison); + return; + } + + ui64 size = it->second; + WritingBlobsSize_ -= size; + WritingBlobs_.erase(it); + + StoredBlobsCount_++; + StoredBlobsSize_ += size; + } + + void HandleWork(TEvDqSpilling::TEvReadResult::TPtr& ev) { + auto& msg = *ev->Get(); + LOG_T("[TEvReadResult] blobId: " << msg.BlobId << ", size: " << msg.Blob.size()); + + if (LoadingBlobs_.erase(msg.BlobId) != 1) { + LOG_E("[TEvReadResult] unexpected, blobId: " << msg.BlobId << ", size: " << msg.Blob.size()); + return; + } + + LoadedBlobs_[msg.BlobId].Swap(msg.Blob); + YQL_ENSURE(LoadedBlobs_[msg.BlobId].size() != 0); + + if (LoadedBlobs_.size() == 1) { + WakeUp_(); + } + } + + void HandleWork(TEvDqSpilling::TEvError::TPtr& ev) { + auto& msg = *ev->Get(); + LOG_D("[TEvError] " << msg.Message); + + Error_.ConstructInPlace(msg.Message); + } + +public: + [[nodiscard]] + const TMaybe<TString>& GetError() const { + return Error_; + } + + bool IsEmpty() const override { + return WritingBlobs_.empty() && StoredBlobsCount_ == 0 && LoadedBlobs_.empty(); + } + + bool IsFull() const override { + return WritingBlobs_.size() > MAX_INFLIGHT_BLOBS_COUNT || WritingBlobsSize_ > MAX_INFLIGHT_BLOBS_SIZE; + } + + void Put(ui64 blobId, TRope&& blob, ui64 cookie) override { + FailOnError(); + + // TODO: timeout + // TODO: limit inflight events + + ui64 size = blob.size(); + + SendEvent(new TEvDqSpilling::TEvWrite(blobId, std::move(blob)), cookie); + + WritingBlobs_.emplace(blobId, size); + WritingBlobsSize_ += size; + } + + bool Get(ui64 blobId, TBuffer& blob, ui64 cookie) override { + FailOnError(); + + auto loadedIt = LoadedBlobs_.find(blobId); + if (loadedIt != LoadedBlobs_.end()) { + YQL_ENSURE(loadedIt->second.size() != 0); + blob.Swap(loadedIt->second); + LoadedBlobs_.erase(loadedIt); + return true; + } + + auto result = LoadingBlobs_.emplace(blobId); + if (result.second) { + SendEvent(new TEvDqSpilling::TEvRead(blobId, true), cookie); + } + + return false; + } + + void Terminate() override { + PassAway(); + } + +private: + void FailOnError() { + if (Error_) { + LOG_E("Error: " << *Error_); + ythrow TDqChannelStorageException() << "TxId: " << TxId_ << ", channelId: " << ChannelId_ + << ", error: " << *Error_; + } + } + + template<typename T> + void SendEvent(T* event, ui64 cookie) { + if (ActorSystem_) { + ActorSystem_->Send( + new IEventHandle( + SpillingActorId_, + SelfId(), + event, + /*flags=*/0, + cookie + ) + ); + } else { + Send(SpillingActorId_, event); + } + } + +private: + const TTxId TxId_; + const ui64 ChannelId_; + IDqChannelStorage::TWakeUpCallback WakeUp_; + TActorId SpillingActorId_; + + TMap<ui64, ui64> WritingBlobs_; // blobId -> blobSize + ui64 WritingBlobsSize_ = 0; + + ui32 StoredBlobsCount_ = 0; + ui64 StoredBlobsSize_ = 0; + + TSet<ui64> LoadingBlobs_; + TMap<ui64, TBuffer> LoadedBlobs_; + + TMaybe<TString> Error_; + + TActorSystem* ActorSystem_; +}; + +} // anonymous namespace + +IDqChannelStorageActor* CreateDqChannelStorageActor(TTxId txId, ui64 channelId, IDqChannelStorage::TWakeUpCallback&& wakeUp, NActors::TActorSystem* actorSystem) { + return new TDqChannelStorageActor(txId, channelId, std::move(wakeUp), actorSystem); +} + +} // namespace NYql::NDq
\ No newline at end of file diff --git a/ydb/library/yql/dq/actors/spilling/channel_storage_actor.h b/ydb/library/yql/dq/actors/spilling/channel_storage_actor.h new file mode 100644 index 00000000000..f7c10d13c48 --- /dev/null +++ b/ydb/library/yql/dq/actors/spilling/channel_storage_actor.h @@ -0,0 +1,18 @@ +#include <ydb/library/yql/dq/runtime/dq_channel_storage.h> +#include "ydb/library/yql/dq/common/dq_common.h" + +#include <ydb/library/actors/core/actor.h> + +namespace NYql::NDq { + +class IDqChannelStorageActor : public IDqChannelStorage +{ +public: + virtual void Terminate() = 0; + + virtual NActors::IActor* GetActor() = 0; +}; + +IDqChannelStorageActor* CreateDqChannelStorageActor(TTxId txId, ui64 channelId, IDqChannelStorage::TWakeUpCallback&& wakeUp, NActors::TActorSystem* actorSystem); + +} // namespace NYql::NDq
\ No newline at end of file diff --git a/ydb/library/yql/dq/actors/spilling/ya.make b/ydb/library/yql/dq/actors/spilling/ya.make index 3c6c7f76616..3795f8bfc0d 100644 --- a/ydb/library/yql/dq/actors/spilling/ya.make +++ b/ydb/library/yql/dq/actors/spilling/ya.make @@ -1,6 +1,7 @@ LIBRARY() SRCS( + channel_storage_actor.cpp channel_storage.cpp spilling_counters.cpp spilling_file.cpp |