aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAidar Samerkhanov <aidarsamer@ydb.tech>2023-12-22 17:15:26 +0300
committerGitHub <noreply@github.com>2023-12-22 17:15:26 +0300
commitad5da4d2afd66b568635ae1e634658d938252451 (patch)
tree56aee54a730e33f6189aaa539a2b3956d818d771
parent653229b09d884289a3837561d7da3351a3bb6e8f (diff)
downloadydb-ad5da4d2afd66b568635ae1e634658d938252451.tar.gz
YQL-17087: Add channel spilling to dq pipe communication (#612)
* Add channel spilling to dq pipe communication * Add EvPoison handler to DqChannelStorageActor
-rw-r--r--ydb/core/tx/datashard/datashard_kqp.cpp2
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.cpp6
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.h2
-rw-r--r--ydb/library/yql/dq/actors/spilling/channel_storage.cpp17
-rw-r--r--ydb/library/yql/dq/actors/spilling/channel_storage.h4
-rw-r--r--ydb/library/yql/dq/actors/spilling/channel_storage_actor.cpp277
-rw-r--r--ydb/library/yql/dq/actors/spilling/channel_storage_actor.h22
-rw-r--r--ydb/library/yql/dq/runtime/dq_tasks_runner.h4
-rw-r--r--ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp124
9 files changed, 362 insertions, 96 deletions
diff --git a/ydb/core/tx/datashard/datashard_kqp.cpp b/ydb/core/tx/datashard/datashard_kqp.cpp
index 2dc360b98a..9b3871ca82 100644
--- a/ydb/core/tx/datashard/datashard_kqp.cpp
+++ b/ydb/core/tx/datashard/datashard_kqp.cpp
@@ -1111,7 +1111,7 @@ public:
return {};
}
- NDq::IDqChannelStorage::TPtr CreateChannelStorage(ui64 /* channelId */, TActorSystem* /* actorSystem */) const override {
+ NDq::IDqChannelStorage::TPtr CreateChannelStorage(ui64 /* channelId */, TActorSystem* /* actorSystem */, bool /*isConcurrent*/) const override {
return {};
}
};
diff --git a/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.cpp b/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.cpp
index 82ccd35b58..9ce940a36a 100644
--- a/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.cpp
@@ -14,12 +14,12 @@ TDqTaskRunnerExecutionContext::TDqTaskRunnerExecutionContext(TTxId txId, bool wi
}
IDqChannelStorage::TPtr TDqTaskRunnerExecutionContext::CreateChannelStorage(ui64 channelId) const {
- return CreateChannelStorage(channelId, nullptr);
+ return CreateChannelStorage(channelId, nullptr, false);
}
-IDqChannelStorage::TPtr TDqTaskRunnerExecutionContext::CreateChannelStorage(ui64 channelId, NActors::TActorSystem* actorSystem) const {
+IDqChannelStorage::TPtr TDqTaskRunnerExecutionContext::CreateChannelStorage(ui64 channelId, NActors::TActorSystem* actorSystem, bool isConcurrent) const {
if (WithSpilling_) {
- return CreateDqChannelStorage(TxId_, channelId, WakeUp_, actorSystem);
+ return CreateDqChannelStorage(TxId_, channelId, WakeUp_, actorSystem, isConcurrent);
} else {
return nullptr;
}
diff --git a/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.h b/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.h
index 1c62720187..0dedb3434b 100644
--- a/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.h
+++ b/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.h
@@ -12,7 +12,7 @@ public:
TDqTaskRunnerExecutionContext(TTxId txId, bool withSpilling, IDqChannelStorage::TWakeUpCallback&& wakeUp);
IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId) const override;
- IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId, NActors::TActorSystem* actorSystem) const override;
+ IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId, NActors::TActorSystem* actorSystem, bool isConcurrent) const override;
private:
const TTxId TxId_;
diff --git a/ydb/library/yql/dq/actors/spilling/channel_storage.cpp b/ydb/library/yql/dq/actors/spilling/channel_storage.cpp
index 4b18d37663..386813d2a2 100644
--- a/ydb/library/yql/dq/actors/spilling/channel_storage.cpp
+++ b/ydb/library/yql/dq/actors/spilling/channel_storage.cpp
@@ -22,13 +22,17 @@ namespace {
class TDqChannelStorage : public IDqChannelStorage {
public:
- TDqChannelStorage(TTxId txId, ui64 channelId, TWakeUpCallback&& wakeUp, TActorSystem* actorSystem) {
- SelfActor_ = CreateDqChannelStorageActor(txId, channelId, std::move(wakeUp), actorSystem);
- TlsActivationContext->AsActorContext().RegisterWithSameMailbox(SelfActor_->GetActor());
+ TDqChannelStorage(TTxId txId, ui64 channelId, TWakeUpCallback&& wakeUp, TActorSystem* actorSystem, bool isConcurrent) {
+ if (isConcurrent) {
+ SelfActor_ = CreateConcurrentDqChannelStorageActor(txId, channelId, std::move(wakeUp), actorSystem);
+ } else {
+ SelfActor_ = CreateDqChannelStorageActor(txId, channelId, std::move(wakeUp), actorSystem);
+ }
+ SelfActorId_ = TlsActivationContext->AsActorContext().RegisterWithSameMailbox(SelfActor_->GetActor());
}
~TDqChannelStorage() {
- SelfActor_->Terminate();
+ TlsActivationContext->AsActorContext().Send(SelfActorId_, new TEvents::TEvPoison);
}
bool IsEmpty() const override {
@@ -49,13 +53,14 @@ public:
private:
IDqChannelStorageActor* SelfActor_;
+ TActorId SelfActorId_;
};
} // anonymous namespace
-IDqChannelStorage::TPtr CreateDqChannelStorage(TTxId txId, ui64 channelId, IDqChannelStorage::TWakeUpCallback wakeUp, TActorSystem* actorSystem)
+IDqChannelStorage::TPtr CreateDqChannelStorage(TTxId txId, ui64 channelId, IDqChannelStorage::TWakeUpCallback wakeUp, TActorSystem* actorSystem, bool isConcurrent)
{
- return new TDqChannelStorage(txId, channelId, std::move(wakeUp), actorSystem);
+ return new TDqChannelStorage(txId, channelId, std::move(wakeUp), actorSystem, isConcurrent);
}
} // namespace NYql::NDq
diff --git a/ydb/library/yql/dq/actors/spilling/channel_storage.h b/ydb/library/yql/dq/actors/spilling/channel_storage.h
index 481afbcaa4..71d3c6ea1f 100644
--- a/ydb/library/yql/dq/actors/spilling/channel_storage.h
+++ b/ydb/library/yql/dq/actors/spilling/channel_storage.h
@@ -10,7 +10,7 @@ namespace NActors {
namespace NYql::NDq {
-NYql::NDq::IDqChannelStorage::TPtr CreateDqChannelStorage(TTxId txId, ui64 channelId,
- NYql::NDq::IDqChannelStorage::TWakeUpCallback wakeUpCb, NActors::TActorSystem* actorSystem);
+IDqChannelStorage::TPtr CreateDqChannelStorage(TTxId txId, ui64 channelId,
+ IDqChannelStorage::TWakeUpCallback wakeUpCb, NActors::TActorSystem* actorSystem, bool isConcurrent);
} // namespace NYql::NDq
diff --git a/ydb/library/yql/dq/actors/spilling/channel_storage_actor.cpp b/ydb/library/yql/dq/actors/spilling/channel_storage_actor.cpp
index e98719734f..d64a490661 100644
--- a/ydb/library/yql/dq/actors/spilling/channel_storage_actor.cpp
+++ b/ydb/library/yql/dq/actors/spilling/channel_storage_actor.cpp
@@ -34,33 +34,140 @@ namespace {
constexpr ui32 MAX_INFLIGHT_BLOBS_COUNT = 10;
constexpr ui64 MAX_INFLIGHT_BLOBS_SIZE = 50_MB;
-class TDqChannelStorageActor : public IDqChannelStorageActor,
- public NActors::TActorBootstrapped<TDqChannelStorageActor>
+class TDqChannelStorageActorBase : public IDqChannelStorageActor
{
- using TBase = TActorBootstrapped<TDqChannelStorageActor>;
public:
- TDqChannelStorageActor(TTxId txId, ui64 channelId, IDqChannelStorage::TWakeUpCallback&& wakeUp, NActors::TActorSystem* actorSystem)
+ ~TDqChannelStorageActorBase() = default;
+
+ TDqChannelStorageActorBase(TTxId txId, ui64 channelId, IDqChannelStorage::TWakeUpCallback&& wakeUp, TActorSystem* actorSystem)
: TxId_(txId)
, ChannelId_(channelId)
, WakeUp_(std::move(wakeUp))
, ActorSystem_(actorSystem)
{}
+ bool IsEmpty() override {
+ return WritingBlobs_.empty() && StoredBlobsCount_ == 0 && LoadedBlobs_.empty();
+ }
+
+ bool IsFull() override {
+ return WritingBlobs_.size() > MAX_INFLIGHT_BLOBS_COUNT || WritingBlobsSize_ > MAX_INFLIGHT_BLOBS_SIZE;
+ }
+
+ [[nodiscard]]
+ const TMaybe<TString>& GetError() const {
+ return Error_;
+ }
+
+protected:
+
+ void PutInternal(ui64 blobId, TRope&& blob, TActorIdentity selfActorId, ui64 cookie) {
+ FailOnError();
+
+ // TODO: timeout
+ // TODO: limit inflight events
+
+ ui64 size = blob.size();
+
+ SendEvent(new TEvDqSpilling::TEvWrite(blobId, std::move(blob)), selfActorId, cookie);
+
+ WritingBlobs_.emplace(blobId, size);
+ WritingBlobsSize_ += size;
+ }
+
+ bool GetInternal(ui64 blobId, TBuffer& blob, TActorIdentity selfActorId, ui64 cookie) {
+ FailOnError();
+
+ auto loadedIt = LoadedBlobs_.find(blobId);
+ if (loadedIt != LoadedBlobs_.end()) {
+ YQL_ENSURE(loadedIt->second.size() != 0);
+ blob.Swap(loadedIt->second);
+ LoadedBlobs_.erase(loadedIt);
+ return true;
+ }
+
+ auto result = LoadingBlobs_.emplace(blobId);
+ if (result.second) {
+ SendEvent(new TEvDqSpilling::TEvRead(blobId, true), selfActorId, cookie);
+ }
+
+ return false;
+ }
+
+ void FailOnError() {
+ if (Error_) {
+ LOG_E("Error: " << *Error_);
+ ythrow TDqChannelStorageException() << "TxId: " << TxId_ << ", channelId: " << ChannelId_
+ << ", error: " << *Error_;
+ }
+ }
+
+ template<typename T> void SendEvent(T* event, TActorIdentity selfActorId, ui64 cookie) {
+ if (ActorSystem_) {
+ ActorSystem_->Send(
+ new IEventHandle(
+ SpillingActorId_,
+ selfActorId,
+ event,
+ /*flags=*/0,
+ cookie
+ )
+ );
+ } else {
+ selfActorId.Send(SpillingActorId_, event);
+ }
+ }
+
+protected:
+ const TTxId TxId_;
+ const ui64 ChannelId_;
+ IDqChannelStorage::TWakeUpCallback WakeUp_;
+ TActorId SpillingActorId_;
+
+ TMap<ui64, ui64> WritingBlobs_; // blobId -> blobSize
+ ui64 WritingBlobsSize_ = 0;
+
+ ui32 StoredBlobsCount_ = 0;
+ ui64 StoredBlobsSize_ = 0;
+
+ TSet<ui64> LoadingBlobs_;
+ TMap<ui64, TBuffer> LoadedBlobs_;
+
+ TMaybe<TString> Error_;
+
+ TActorSystem* ActorSystem_;
+};
+
+class TDqChannelStorageActor : public TDqChannelStorageActorBase,
+ public NActors::TActorBootstrapped<TDqChannelStorageActor>
+{
+ using TBase = TActorBootstrapped<TDqChannelStorageActor>;
+public:
+ TDqChannelStorageActor(TTxId txId, ui64 channelId, IDqChannelStorage::TWakeUpCallback&& wakeUp, NActors::TActorSystem* actorSystem)
+ : TDqChannelStorageActorBase(txId, channelId, std::move(wakeUp), actorSystem)
+ {}
void Bootstrap() {
auto spillingActor = CreateDqLocalFileSpillingActor(TxId_, TStringBuilder() << "ChannelId: " << ChannelId_,
SelfId(), true);
SpillingActorId_ = Register(spillingActor);
-
Become(&TDqChannelStorageActor::WorkState);
}
static constexpr char ActorName[] = "DQ_CHANNEL_STORAGE";
- NActors::IActor* GetActor() override {
+ IActor* GetActor() override {
return this;
}
+ void Put(ui64 blobId, TRope&& blob, ui64 cookie) override {
+ TDqChannelStorageActorBase::PutInternal(blobId, std::move(blob), SelfId(), cookie);
+ }
+
+ bool Get(ui64 blobId, TBuffer& blob, ui64 cookie) override {
+ return TDqChannelStorageActorBase::GetInternal(blobId, blob, SelfId(), cookie);
+ }
+
protected:
void PassAway() override {
Send(SpillingActorId_, new TEvents::TEvPoison);
@@ -73,6 +180,7 @@ private:
hFunc(TEvDqSpilling::TEvWriteResult, HandleWork);
hFunc(TEvDqSpilling::TEvReadResult, HandleWork);
hFunc(TEvDqSpilling::TEvError, HandleWork);
+ cFunc(TEvents::TEvPoison::EventType, PassAway);
default:
Y_ABORT("TDqChannelStorageActor::WorkState unexpected event type: %" PRIx32 " event: %s",
ev->GetTypeRewrite(),
@@ -125,102 +233,121 @@ private:
Error_.ConstructInPlace(msg.Message);
}
+};
+class TConcurrentDqChannelStorageActor : public TDqChannelStorageActorBase,
+ public NActors::TActorBootstrapped<TConcurrentDqChannelStorageActor>
+{
+ using TBase = TActorBootstrapped<TConcurrentDqChannelStorageActor>;
public:
- [[nodiscard]]
- const TMaybe<TString>& GetError() const {
- return Error_;
- }
+ TConcurrentDqChannelStorageActor(TTxId txId, ui64 channelId, IDqChannelStorage::TWakeUpCallback&& wakeUp, NActors::TActorSystem* actorSystem)
+ : TDqChannelStorageActorBase(txId, channelId, std::move(wakeUp), actorSystem)
+ {}
- bool IsEmpty() const override {
- return WritingBlobs_.empty() && StoredBlobsCount_ == 0 && LoadedBlobs_.empty();
- }
- bool IsFull() const override {
- return WritingBlobs_.size() > MAX_INFLIGHT_BLOBS_COUNT || WritingBlobsSize_ > MAX_INFLIGHT_BLOBS_SIZE;
+ void Bootstrap() {
+ auto spillingActor = CreateDqLocalFileSpillingActor(TxId_, TStringBuilder() << "ChannelId: " << ChannelId_,
+ SelfId(), true);
+ SpillingActorId_ = Register(spillingActor);
+ Become(&TConcurrentDqChannelStorageActor::WorkState);
}
- void Put(ui64 blobId, TRope&& blob, ui64 cookie) override {
- FailOnError();
+ static constexpr char ActorName[] = "DQ_CONCURRENT_CHANNEL_STORAGE";
- // TODO: timeout
- // TODO: limit inflight events
+ IActor* GetActor() override {
+ return this;
+ }
- ui64 size = blob.size();
+ bool IsEmpty() override {
+ std::lock_guard lock(Mutex_);
+ return TDqChannelStorageActorBase::IsEmpty();
+ }
- SendEvent(new TEvDqSpilling::TEvWrite(blobId, std::move(blob)), cookie);
+ bool IsFull() override {
+ std::lock_guard lock(Mutex_);
+ return TDqChannelStorageActorBase::IsFull();
+ }
- WritingBlobs_.emplace(blobId, size);
- WritingBlobsSize_ += size;
+ void Put(ui64 blobId, TRope&& blob, ui64 cookie) override {
+ std::lock_guard lock(Mutex_);
+ TDqChannelStorageActorBase::PutInternal(blobId, std::move(blob), SelfId(), cookie);
}
bool Get(ui64 blobId, TBuffer& blob, ui64 cookie) override {
- FailOnError();
-
- auto loadedIt = LoadedBlobs_.find(blobId);
- if (loadedIt != LoadedBlobs_.end()) {
- YQL_ENSURE(loadedIt->second.size() != 0);
- blob.Swap(loadedIt->second);
- LoadedBlobs_.erase(loadedIt);
- return true;
- }
-
- auto result = LoadingBlobs_.emplace(blobId);
- if (result.second) {
- SendEvent(new TEvDqSpilling::TEvRead(blobId, true), cookie);
- }
-
- return false;
+ std::lock_guard lock(Mutex_);
+ return TDqChannelStorageActorBase::GetInternal(blobId, blob, SelfId(), cookie);
}
- void Terminate() override {
- PassAway();
+protected:
+ void PassAway() override {
+ Send(SpillingActorId_, new TEvents::TEvPoison);
+ TBase::PassAway();
}
private:
- void FailOnError() {
- if (Error_) {
- LOG_E("Error: " << *Error_);
- ythrow TDqChannelStorageException() << "TxId: " << TxId_ << ", channelId: " << ChannelId_
- << ", error: " << *Error_;
+ STATEFN(WorkState) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvDqSpilling::TEvWriteResult, HandleWork);
+ hFunc(TEvDqSpilling::TEvReadResult, HandleWork);
+ hFunc(TEvDqSpilling::TEvError, HandleWork);
+ cFunc(TEvents::TEvPoison::EventType, PassAway);
+ default:
+ Y_ABORT("TDqChannelStorageActor::WorkState unexpected event type: %" PRIx32 " event: %s",
+ ev->GetTypeRewrite(),
+ ev->ToString().data());
}
}
- template<typename T>
- void SendEvent(T* event, ui64 cookie) {
- if (ActorSystem_) {
- ActorSystem_->Send(
- new IEventHandle(
- SpillingActorId_,
- SelfId(),
- event,
- /*flags=*/0,
- cookie
- )
- );
- } else {
- Send(SpillingActorId_, event);
+ void HandleWork(TEvDqSpilling::TEvWriteResult::TPtr& ev) {
+ auto& msg = *ev->Get();
+ LOG_T("[TEvWriteResult] blobId: " << msg.BlobId);
+
+ std::lock_guard lock(Mutex_);
+ auto it = WritingBlobs_.find(msg.BlobId);
+ if (it == WritingBlobs_.end()) {
+ LOG_E("Got unexpected TEvWriteResult, blobId: " << msg.BlobId);
+
+ Error_ = "Internal error";
+
+ Send(SpillingActorId_, new TEvents::TEvPoison);
+ return;
}
+
+ ui64 size = it->second;
+ WritingBlobsSize_ -= size;
+ WritingBlobs_.erase(it);
+
+ StoredBlobsCount_++;
+ StoredBlobsSize_ += size;
}
-private:
- const TTxId TxId_;
- const ui64 ChannelId_;
- IDqChannelStorage::TWakeUpCallback WakeUp_;
- TActorId SpillingActorId_;
+ void HandleWork(TEvDqSpilling::TEvReadResult::TPtr& ev) {
+ auto& msg = *ev->Get();
+ LOG_T("[TEvReadResult] blobId: " << msg.BlobId << ", size: " << msg.Blob.size());
- TMap<ui64, ui64> WritingBlobs_; // blobId -> blobSize
- ui64 WritingBlobsSize_ = 0;
+ std::lock_guard lock(Mutex_);
+ if (LoadingBlobs_.erase(msg.BlobId) != 1) {
+ LOG_E("[TEvReadResult] unexpected, blobId: " << msg.BlobId << ", size: " << msg.Blob.size());
+ return;
+ }
- ui32 StoredBlobsCount_ = 0;
- ui64 StoredBlobsSize_ = 0;
+ LoadedBlobs_[msg.BlobId].Swap(msg.Blob);
+ YQL_ENSURE(LoadedBlobs_[msg.BlobId].size() != 0);
- TSet<ui64> LoadingBlobs_;
- TMap<ui64, TBuffer> LoadedBlobs_;
+ if (LoadedBlobs_.size() == 1) {
+ WakeUp_();
+ }
+ }
- TMaybe<TString> Error_;
+ void HandleWork(TEvDqSpilling::TEvError::TPtr& ev) {
+ auto& msg = *ev->Get();
+ LOG_D("[TEvError] " << msg.Message);
- TActorSystem* ActorSystem_;
+ Error_.ConstructInPlace(msg.Message);
+ }
+
+private:
+ std::mutex Mutex_;
};
} // anonymous namespace
@@ -229,4 +356,8 @@ IDqChannelStorageActor* CreateDqChannelStorageActor(TTxId txId, ui64 channelId,
return new TDqChannelStorageActor(txId, channelId, std::move(wakeUp), actorSystem);
}
+IDqChannelStorageActor* CreateConcurrentDqChannelStorageActor(TTxId txId, ui64 channelId, IDqChannelStorage::TWakeUpCallback&& wakeUp, NActors::TActorSystem* actorSystem) {
+ return new TConcurrentDqChannelStorageActor(txId, channelId, std::move(wakeUp), actorSystem);
+}
+
} // namespace NYql::NDq \ No newline at end of file
diff --git a/ydb/library/yql/dq/actors/spilling/channel_storage_actor.h b/ydb/library/yql/dq/actors/spilling/channel_storage_actor.h
index f7c10d13c4..85ed7681af 100644
--- a/ydb/library/yql/dq/actors/spilling/channel_storage_actor.h
+++ b/ydb/library/yql/dq/actors/spilling/channel_storage_actor.h
@@ -5,14 +5,32 @@
namespace NYql::NDq {
-class IDqChannelStorageActor : public IDqChannelStorage
+class IDqChannelStorageActor
{
public:
- virtual void Terminate() = 0;
+ using TPtr = TIntrusivePtr<IDqChannelStorageActor>;
+ using TWakeUpCallback = std::function<void()>;
+
+ virtual ~IDqChannelStorageActor() = default;
virtual NActors::IActor* GetActor() = 0;
+
+ virtual bool IsEmpty() = 0;
+ virtual bool IsFull() = 0;
+
+ // methods Put/Get can throw `TDqChannelStorageException`
+
+ // Data should be owned by `blob` argument since the Put() call is actually asynchronous
+ virtual void Put(ui64 blobId, TRope&& blob, ui64 cookie = 0) = 0;
+
+ // TODO: there is no way for client to delete blob.
+ // It is better to replace Get() with Pull() which will delete blob after read
+ // (current clients read each blob exactly once)
+ // Get() will return false if data is not ready yet. Client should repeat Get() in this case
+ virtual bool Get(ui64 blobId, TBuffer& data, ui64 cookie = 0) = 0;
};
IDqChannelStorageActor* CreateDqChannelStorageActor(TTxId txId, ui64 channelId, IDqChannelStorage::TWakeUpCallback&& wakeUp, NActors::TActorSystem* actorSystem);
+IDqChannelStorageActor* CreateConcurrentDqChannelStorageActor(TTxId txId, ui64 channelId, IDqChannelStorage::TWakeUpCallback&& wakeUp, NActors::TActorSystem* actorSystem);
} // namespace NYql::NDq \ No newline at end of file
diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.h b/ydb/library/yql/dq/runtime/dq_tasks_runner.h
index ae16806487..b50f1ba904 100644
--- a/ydb/library/yql/dq/runtime/dq_tasks_runner.h
+++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.h
@@ -137,7 +137,7 @@ public:
TVector<IDqOutput::TPtr>&& outputs) const = 0;
virtual IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId) const = 0;
- virtual IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId, NActors::TActorSystem* actorSystem) const = 0;
+ virtual IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId, NActors::TActorSystem* actorSystem, bool isConcurrent) const = 0;
};
class TDqTaskRunnerExecutionContextBase : public IDqTaskRunnerExecutionContext {
@@ -155,7 +155,7 @@ public:
return {};
};
- IDqChannelStorage::TPtr CreateChannelStorage(ui64 /*channelId*/, NActors::TActorSystem* /*actorSystem*/) const override {
+ IDqChannelStorage::TPtr CreateChannelStorage(ui64 /*channelId*/, NActors::TActorSystem* /*actorSystem*/, bool /*isConcurrent*/) const override {
return {};
};
diff --git a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp
index d793bdfb40..8dd1b059b1 100644
--- a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp
+++ b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp
@@ -13,6 +13,8 @@
#include <ydb/library/actors/core/hfunc.h>
+#include <util/generic/size_literals.h>
+
using namespace NYql::NDqs;
using namespace NActors;
@@ -33,6 +35,23 @@ TTaskRunnerActorSensors GetSensors(const T& t) {
return result;
}
+class TSpillingStorageInfo : public TSimpleRefCount<TSpillingStorageInfo> {
+public:
+ using TPtr = std::shared_ptr<TSpillingStorageInfo>;
+
+ TSpillingStorageInfo(const IDqChannelStorage::TPtr spillingStorage, ui64 channelId)
+ : SpillingStorage(spillingStorage)
+ , ChannelId(channelId)
+ , FirstStoredId(0)
+ , NextStoredId(0)
+ {}
+
+ const IDqChannelStorage::TPtr SpillingStorage = nullptr;
+ ui64 ChannelId = 0;
+ ui64 FirstStoredId = 0;
+ ui64 NextStoredId = 0;
+};
+
struct TOutputChannelReadResult {
bool IsChanged = false;
bool IsFinished = false;
@@ -43,13 +62,26 @@ struct TOutputChannelReadResult {
class TOutputChannelReader {
public:
- TOutputChannelReader(NTaskRunnerProxy::IOutputChannel::TPtr channel, i64 toPopSize, bool wasFinished)
+ TOutputChannelReader(NTaskRunnerProxy::IOutputChannel::TPtr channel, i64 toPopSize,
+ bool wasFinished, TSpillingStorageInfo::TPtr spillingStorageInfo, ui64 cookie
+ )
: Channel(channel)
+ , SpillingStorageInfo(spillingStorageInfo)
, ToPopSize(toPopSize)
, WasFinished(wasFinished)
+ , Cookie(cookie)
{}
TOutputChannelReadResult Read() {
+ if (SpillingStorageInfo) {
+ return ReadWithSpilling();
+ }
+ return ReadDirectly();
+ }
+
+private:
+
+ TOutputChannelReadResult ReadDirectly() {
int maxChunks = std::numeric_limits<int>::max();
bool changed = false;
bool isFinished = false;
@@ -60,7 +92,7 @@ public:
if (remain == 0) {
// special case to WorkerActor
- remain = 5<<20;
+ remain = 5_MB;
maxChunks = 1;
}
@@ -70,7 +102,6 @@ public:
const auto lastPop = std::move(Channel->Pop(data));
for (auto& metric : lastPop.GetMetric()) {
- // *response.AddMetric() = metric;
result.Metrics.push_back(metric);
}
@@ -89,10 +120,67 @@ public:
return result;
}
-private:
+ TOutputChannelReadResult ReadWithSpilling() {
+ int maxChunks = std::numeric_limits<int>::max();
+ bool changed = false;
+ bool isChanFinished = false;
+ i64 remain = ToPopSize;
+ bool hasData = true;
+ TOutputChannelReadResult result;
+
+ if (remain == 0) {
+ // special case to WorkerActor
+ remain = 5_MB;
+ maxChunks = 1;
+ }
+
+ auto spillingStorage = SpillingStorageInfo->SpillingStorage;
+ // Read all available data from the pipe and spill it
+ while (spillingStorage && !isChanFinished && hasData) {
+ TDqSerializedBatch data;
+ const auto lastPop = std::move(Channel->Pop(data));
+
+ for (auto& metric : lastPop.GetMetric()) {
+ result.Metrics.push_back(metric);
+ }
+
+ hasData = lastPop.GetResult();
+ isChanFinished = !hasData && Channel->IsFinished();
+ changed = changed || hasData || (isChanFinished != WasFinished);
+ if (hasData) {
+ spillingStorage->Put(SpillingStorageInfo->NextStoredId++, SaveForSpilling(std::move(data)), Cookie);
+ }
+ }
+
+ changed = false;
+ result.DataChunks.reserve(SpillingStorageInfo->NextStoredId - SpillingStorageInfo->FirstStoredId);
+ while (SpillingStorageInfo->FirstStoredId < SpillingStorageInfo->NextStoredId && remain > 0) {
+ TDqSerializedBatch data;
+ YQL_ENSURE(spillingStorage);
+ TBuffer blob;
+ if (!spillingStorage->Get(SpillingStorageInfo->FirstStoredId, blob, Cookie)) {
+ break;
+ }
+ ++SpillingStorageInfo->FirstStoredId;
+ data = LoadSpilled(std::move(blob));
+ remain -= data.Size();
+ result.DataChunks.emplace_back(std::move(data));
+ --maxChunks;
+ changed = true;
+ hasData = true;
+ }
+
+ result.IsFinished = isChanFinished && SpillingStorageInfo->FirstStoredId == SpillingStorageInfo->NextStoredId;
+ result.IsChanged = changed;
+ result.HasData = hasData;
+ return result;
+ }
+
NTaskRunnerProxy::IOutputChannel::TPtr Channel;
+ TSpillingStorageInfo::TPtr SpillingStorageInfo;
i64 ToPopSize;
bool WasFinished;
+ ui64 Cookie;
};
} // namespace
@@ -369,11 +457,15 @@ private:
auto cookie = ev->Cookie;
auto wasFinished = ev->Get()->WasFinished;
auto toPop = ev->Get()->Size;
- Invoker->Invoke([cookie,selfId,channelId=ev->Get()->ChannelId, actorSystem, replyTo, wasFinished, toPop, taskRunner=TaskRunner, settings=Settings, stageId=StageId]() {
+ ui64 channelId = ev->Get()->ChannelId;
+
+ TSpillingStorageInfo::TPtr spillingStorageInfo = GetSpillingStorage(channelId, actorSystem);
+
+ Invoker->Invoke([spillingStorageInfo, cookie, selfId, channelId=ev->Get()->ChannelId, actorSystem, replyTo, wasFinished, toPop, taskRunner=TaskRunner, settings=Settings, stageId=StageId]() {
try {
// auto guard = taskRunner->BindAllocator(); // only for local mode
auto channel = taskRunner->GetOutputChannel(channelId);
- TOutputChannelReader reader(channel, toPop, wasFinished);
+ TOutputChannelReader reader(channel, toPop, wasFinished, spillingStorageInfo, cookie);
TOutputChannelReadResult result = reader.Read();
NDqProto::TPopResponse response;
@@ -484,6 +576,7 @@ private:
auto taskId = ev->Get()->Task.GetId();
auto& inputs = ev->Get()->Task.GetInputs();
auto startTime = TInstant::Now();
+ ExecCtx = ev->Get()->ExecCtx;
for (auto inputId = 0; inputId < inputs.size(); inputId++) {
auto& input = inputs[inputId];
@@ -615,6 +708,22 @@ private:
});
}
+ TSpillingStorageInfo::TPtr GetSpillingStorage(ui64 channelId, TActorSystem* actorSystem) {
+ TSpillingStorageInfo::TPtr spillingStorageInfo = nullptr;
+ auto channelStorage = ExecCtx->CreateChannelStorage(channelId, actorSystem, true /*isConcurrent*/);
+
+ if (channelStorage) {
+ auto spillingIt = SpillingStoragesInfos.find(channelId);
+ if (spillingIt == SpillingStoragesInfos.end()) {
+ TSpillingStorageInfo* info = new TSpillingStorageInfo(channelStorage, channelId);
+ spillingIt = SpillingStoragesInfos.emplace(channelId, info).first;
+ }
+ spillingStorageInfo = spillingIt->second;
+ }
+
+ return spillingStorageInfo;
+ }
+
NActors::TActorId ParentId;
ITaskRunnerActor::ICallbacks* Parent;
const TString TraceId;
@@ -630,6 +739,9 @@ private:
ui64 StageId;
TWorkerRuntimeData* RuntimeData;
TString ClusterName;
+
+ std::shared_ptr<IDqTaskRunnerExecutionContext> ExecCtx;
+ std::unordered_map<ui64, TSpillingStorageInfo::TPtr> SpillingStoragesInfos;
};
class TTaskRunnerActorFactory: public ITaskRunnerActorFactory {