aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorudovichenko-r <rvu@ydb.tech>2023-10-30 12:00:48 +0300
committerudovichenko-r <rvu@ydb.tech>2023-10-30 12:35:14 +0300
commit733053293601705b5762becc1393afbfb75064e8 (patch)
treef16847f54b2108d36db9bba410e5176e2026b395
parentcb5ce0bf4ff3f14eea1992635480f5f04feee6dd (diff)
downloadydb-733053293601705b5762becc1393afbfb75064e8.tar.gz
[dq] Move spilling to yql/dq/actors
YQL-16013
-rw-r--r--.mapping.json10
-rw-r--r--ydb/core/kqp/common/simple/services.h5
-rw-r--r--ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h28
-rw-r--r--ydb/core/kqp/counters/CMakeLists.darwin-x86_64.txt2
-rw-r--r--ydb/core/kqp/counters/CMakeLists.linux-aarch64.txt2
-rw-r--r--ydb/core/kqp/counters/CMakeLists.linux-x86_64.txt2
-rw-r--r--ydb/core/kqp/counters/CMakeLists.windows-x86_64.txt2
-rw-r--r--ydb/core/kqp/counters/kqp_counters.cpp12
-rw-r--r--ydb/core/kqp/counters/kqp_counters.h12
-rw-r--r--ydb/core/kqp/counters/ya.make2
-rw-r--r--ydb/core/kqp/executer_actor/kqp_literal_executer.cpp2
-rw-r--r--ydb/core/kqp/proxy_service/CMakeLists.darwin-x86_64.txt2
-rw-r--r--ydb/core/kqp/proxy_service/CMakeLists.linux-aarch64.txt2
-rw-r--r--ydb/core/kqp/proxy_service/CMakeLists.linux-x86_64.txt2
-rw-r--r--ydb/core/kqp/proxy_service/CMakeLists.windows-x86_64.txt2
-rw-r--r--ydb/core/kqp/proxy_service/kqp_proxy_service.cpp27
-rw-r--r--ydb/core/kqp/proxy_service/ya.make2
-rw-r--r--ydb/core/kqp/runtime/CMakeLists.darwin-x86_64.txt15
-rw-r--r--ydb/core/kqp/runtime/CMakeLists.linux-aarch64.txt15
-rw-r--r--ydb/core/kqp/runtime/CMakeLists.linux-x86_64.txt15
-rw-r--r--ydb/core/kqp/runtime/CMakeLists.windows-x86_64.txt15
-rw-r--r--ydb/core/kqp/runtime/kqp_channel_storage.cpp246
-rw-r--r--ydb/core/kqp/runtime/kqp_spilling_file.h18
-rw-r--r--ydb/core/kqp/runtime/kqp_spilling_file_ut.cpp441
-rw-r--r--ydb/core/kqp/runtime/ut/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/kqp/runtime/ut/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/kqp/runtime/ut/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/kqp/runtime/ut/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/kqp/runtime/ut/ya.make1
-rw-r--r--ydb/core/kqp/runtime/ya.make5
-rw-r--r--ydb/library/yql/dq/actors/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/library/yql/dq/actors/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/library/yql/dq/actors/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/library/yql/dq/actors/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/library/yql/dq/actors/compute/CMakeLists.darwin-x86_64.txt2
-rw-r--r--ydb/library/yql/dq/actors/compute/CMakeLists.linux-aarch64.txt2
-rw-r--r--ydb/library/yql/dq/actors/compute/CMakeLists.linux-x86_64.txt2
-rw-r--r--ydb/library/yql/dq/actors/compute/CMakeLists.windows-x86_64.txt2
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp6
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp5
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h2
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.cpp26
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.h25
-rw-r--r--ydb/library/yql/dq/actors/compute/ya.make2
-rw-r--r--ydb/library/yql/dq/actors/dq_events_ids.h10
-rw-r--r--ydb/library/yql/dq/actors/spilling/CMakeLists.darwin-x86_64.txt33
-rw-r--r--ydb/library/yql/dq/actors/spilling/CMakeLists.linux-aarch64.txt34
-rw-r--r--ydb/library/yql/dq/actors/spilling/CMakeLists.linux-x86_64.txt34
-rw-r--r--ydb/library/yql/dq/actors/spilling/CMakeLists.txt17
-rw-r--r--ydb/library/yql/dq/actors/spilling/CMakeLists.windows-x86_64.txt33
-rw-r--r--ydb/library/yql/dq/actors/spilling/channel_storage.cpp242
-rw-r--r--ydb/library/yql/dq/actors/spilling/channel_storage.h (renamed from ydb/core/kqp/runtime/kqp_channel_storage.h)6
-rw-r--r--ydb/library/yql/dq/actors/spilling/spilling.cpp1
-rw-r--r--ydb/library/yql/dq/actors/spilling/spilling.h (renamed from ydb/core/kqp/runtime/kqp_spilling.h)21
-rw-r--r--ydb/library/yql/dq/actors/spilling/spilling_counters.cpp15
-rw-r--r--ydb/library/yql/dq/actors/spilling/spilling_counters.h22
-rw-r--r--ydb/library/yql/dq/actors/spilling/spilling_file.cpp (renamed from ydb/core/kqp/runtime/kqp_spilling_file.cpp)342
-rw-r--r--ydb/library/yql/dq/actors/spilling/spilling_file.h33
-rw-r--r--ydb/library/yql/dq/actors/spilling/spilling_file_ut.cpp456
-rw-r--r--ydb/library/yql/dq/actors/spilling/ut/CMakeLists.darwin-x86_64.txt80
-rw-r--r--ydb/library/yql/dq/actors/spilling/ut/CMakeLists.linux-aarch64.txt83
-rw-r--r--ydb/library/yql/dq/actors/spilling/ut/CMakeLists.linux-x86_64.txt85
-rw-r--r--ydb/library/yql/dq/actors/spilling/ut/CMakeLists.txt17
-rw-r--r--ydb/library/yql/dq/actors/spilling/ut/CMakeLists.windows-x86_64.txt73
-rw-r--r--ydb/library/yql/dq/actors/spilling/ut/ya.make20
-rw-r--r--ydb/library/yql/dq/actors/spilling/ya.make29
-rw-r--r--ydb/library/yql/dq/actors/task_runner/events.h2
-rw-r--r--ydb/library/yql/dq/actors/ya.make1
-rw-r--r--ydb/library/yql/dq/runtime/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/library/yql/dq/runtime/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/library/yql/dq/runtime/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/library/yql/dq/runtime/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/library/yql/dq/runtime/dq_tasks_runner.cpp10
-rw-r--r--ydb/library/yql/dq/runtime/dq_tasks_runner.h11
-rw-r--r--ydb/library/yql/dq/runtime/ya.make1
-rw-r--r--ydb/library/yql/providers/dq/actors/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/library/yql/providers/dq/actors/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/library/yql/providers/dq/actors/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/library/yql/providers/dq/actors/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/library/yql/providers/dq/actors/compute_actor.cpp1
-rw-r--r--ydb/library/yql/providers/dq/actors/worker_actor.cpp20
-rw-r--r--ydb/library/yql/providers/dq/actors/worker_actor.h3
-rw-r--r--ydb/library/yql/providers/dq/actors/ya.make1
-rw-r--r--ydb/library/yql/providers/dq/common/yql_dq_settings.cpp10
-rw-r--r--ydb/library/yql/providers/dq/common/yql_dq_settings.h7
-rw-r--r--ydb/library/yql/providers/dq/config/config.proto13
-rw-r--r--ydb/library/yql/providers/dq/local_gateway/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/library/yql/providers/dq/local_gateway/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/library/yql/providers/dq/local_gateway/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/library/yql/providers/dq/local_gateway/ya.make1
-rw-r--r--ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp32
-rw-r--r--ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.h1
-rw-r--r--ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp10
-rw-r--r--ydb/library/yql/providers/dq/runtime/task_command_executor.cpp6
-rw-r--r--ydb/library/yql/providers/dq/task_runner/tasks_runner_local.cpp3
-rw-r--r--ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp3
-rw-r--r--ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h1
-rw-r--r--ydb/library/yql/tools/dq/worker_node/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/library/yql/tools/dq/worker_node/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/library/yql/tools/dq/worker_node/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/library/yql/tools/dq/worker_node/main.cpp15
-rw-r--r--ydb/library/yql/tools/dq/worker_node/ya.make1
-rw-r--r--ydb/library/yql/tools/dqrun/dqrun.cpp2
103 files changed, 1779 insertions, 1045 deletions
diff --git a/.mapping.json b/.mapping.json
index 44b8d55c91..7de9cea3ec 100644
--- a/.mapping.json
+++ b/.mapping.json
@@ -7010,6 +7010,16 @@
"ydb/library/yql/dq/actors/protos/CMakeLists.linux-x86_64.txt":"",
"ydb/library/yql/dq/actors/protos/CMakeLists.txt":"",
"ydb/library/yql/dq/actors/protos/CMakeLists.windows-x86_64.txt":"",
+ "ydb/library/yql/dq/actors/spilling/CMakeLists.darwin-x86_64.txt":"",
+ "ydb/library/yql/dq/actors/spilling/CMakeLists.linux-aarch64.txt":"",
+ "ydb/library/yql/dq/actors/spilling/CMakeLists.linux-x86_64.txt":"",
+ "ydb/library/yql/dq/actors/spilling/CMakeLists.txt":"",
+ "ydb/library/yql/dq/actors/spilling/CMakeLists.windows-x86_64.txt":"",
+ "ydb/library/yql/dq/actors/spilling/ut/CMakeLists.darwin-x86_64.txt":"",
+ "ydb/library/yql/dq/actors/spilling/ut/CMakeLists.linux-aarch64.txt":"",
+ "ydb/library/yql/dq/actors/spilling/ut/CMakeLists.linux-x86_64.txt":"",
+ "ydb/library/yql/dq/actors/spilling/ut/CMakeLists.txt":"",
+ "ydb/library/yql/dq/actors/spilling/ut/CMakeLists.windows-x86_64.txt":"",
"ydb/library/yql/dq/actors/task_runner/CMakeLists.darwin-x86_64.txt":"",
"ydb/library/yql/dq/actors/task_runner/CMakeLists.linux-aarch64.txt":"",
"ydb/library/yql/dq/actors/task_runner/CMakeLists.linux-x86_64.txt":"",
diff --git a/ydb/core/kqp/common/simple/services.h b/ydb/core/kqp/common/simple/services.h
index 80002d604f..ebee571eb0 100644
--- a/ydb/core/kqp/common/simple/services.h
+++ b/ydb/core/kqp/common/simple/services.h
@@ -31,11 +31,6 @@ inline NActors::TActorId MakeKqpNodeServiceID(ui32 nodeId) {
return NActors::TActorId(nodeId, TStringBuf(name, 12));
}
-inline NActors::TActorId MakeKqpLocalFileSpillingServiceID(ui32 nodeId) {
- const char name[12] = "kqp_lfspill";
- return NActors::TActorId(nodeId, TStringBuf(name, 12));
-}
-
inline NActors::TActorId MakeKqpCompileComputationPatternServiceID(ui32 nodeId) {
const char name[12] = "kqp_comp_cp";
return NActors::TActorId(nodeId, TStringBuf(name, 12));
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h b/ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h
index a555f7c286..db7f86c85d 100644
--- a/ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h
+++ b/ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h
@@ -2,7 +2,7 @@
#include "kqp_compute_actor.h"
-#include <ydb/core/kqp/runtime/kqp_channel_storage.h>
+#include <ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.h>
#include <ydb/core/kqp/runtime/kqp_tasks_runner.h>
@@ -12,14 +12,12 @@ namespace NKqp {
using namespace NYql;
using namespace NYql::NDq;
-class TKqpTaskRunnerExecutionContext : public IDqTaskRunnerExecutionContext {
+class TKqpTaskRunnerExecutionContext : public TDqTaskRunnerExecutionContext {
public:
- TKqpTaskRunnerExecutionContext(ui64 txId, bool withSpilling, IDqChannelStorage::TWakeUpCallback&& wakeUp,
- const TActorContext& ctx)
- : TxId(txId)
- , WakeUp(std::move(wakeUp))
- , Ctx(ctx)
- , WithSpilling(withSpilling) {}
+ TKqpTaskRunnerExecutionContext(ui64 txId, bool withSpilling, IDqChannelStorage::TWakeUpCallback&& wakeUp, const TActorContext& ctx)
+ : TDqTaskRunnerExecutionContext(txId, withSpilling, std::move(wakeUp), ctx)
+ {
+ }
IDqOutputConsumer::TPtr CreateOutputConsumer(const NDqProto::TTaskOutput& outputDesc,
const NMiniKQL::TType* type, NUdf::IApplyContext* applyCtx, const NMiniKQL::TTypeEnvironment& typeEnv,
@@ -28,20 +26,6 @@ public:
{
return KqpBuildOutputConsumer(outputDesc, type, applyCtx, typeEnv, holderFactory, std::move(outputs));
}
-
- IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId) const override {
- if (WithSpilling) {
- return CreateKqpChannelStorage(TxId, channelId, WakeUp, Ctx);
- } else {
- return nullptr;
- }
- }
-
-private:
- const ui64 TxId;
- const IDqChannelStorage::TWakeUpCallback WakeUp;
- const TActorContext& Ctx;
- const bool WithSpilling;
};
} // namespace NKqp
diff --git a/ydb/core/kqp/counters/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/counters/CMakeLists.darwin-x86_64.txt
index 8f457ba4da..b5a08b2bb1 100644
--- a/ydb/core/kqp/counters/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/kqp/counters/CMakeLists.darwin-x86_64.txt
@@ -14,6 +14,8 @@ target_link_libraries(core-kqp-counters PUBLIC
ydb-core-base
ydb-core-protos
core-sys_view-service
+ dq-actors-spilling
+ library-yql-minikql
)
target_sources(core-kqp-counters PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/counters/kqp_counters.cpp
diff --git a/ydb/core/kqp/counters/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/counters/CMakeLists.linux-aarch64.txt
index f0dbbe779d..3a231a5261 100644
--- a/ydb/core/kqp/counters/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/kqp/counters/CMakeLists.linux-aarch64.txt
@@ -15,6 +15,8 @@ target_link_libraries(core-kqp-counters PUBLIC
ydb-core-base
ydb-core-protos
core-sys_view-service
+ dq-actors-spilling
+ library-yql-minikql
)
target_sources(core-kqp-counters PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/counters/kqp_counters.cpp
diff --git a/ydb/core/kqp/counters/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/counters/CMakeLists.linux-x86_64.txt
index f0dbbe779d..3a231a5261 100644
--- a/ydb/core/kqp/counters/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/kqp/counters/CMakeLists.linux-x86_64.txt
@@ -15,6 +15,8 @@ target_link_libraries(core-kqp-counters PUBLIC
ydb-core-base
ydb-core-protos
core-sys_view-service
+ dq-actors-spilling
+ library-yql-minikql
)
target_sources(core-kqp-counters PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/counters/kqp_counters.cpp
diff --git a/ydb/core/kqp/counters/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/counters/CMakeLists.windows-x86_64.txt
index 8f457ba4da..b5a08b2bb1 100644
--- a/ydb/core/kqp/counters/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/kqp/counters/CMakeLists.windows-x86_64.txt
@@ -14,6 +14,8 @@ target_link_libraries(core-kqp-counters PUBLIC
ydb-core-base
ydb-core-protos
core-sys_view-service
+ dq-actors-spilling
+ library-yql-minikql
)
target_sources(core-kqp-counters PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/counters/kqp_counters.cpp
diff --git a/ydb/core/kqp/counters/kqp_counters.cpp b/ydb/core/kqp/counters/kqp_counters.cpp
index 50fedabfa4..d45e4facc1 100644
--- a/ydb/core/kqp/counters/kqp_counters.cpp
+++ b/ydb/core/kqp/counters/kqp_counters.cpp
@@ -724,7 +724,8 @@ void TKqpCounters::UpdateTxCounters(const TKqpTransactionInfo& txInfo,
}
TKqpCounters::TKqpCounters(const ::NMonitoring::TDynamicCounterPtr& counters, const TActorContext* ctx)
- : AllocCounters(counters, "kqp")
+ : NYql::NDq::TSpillingCounters(counters)
+ , AllocCounters(counters, "kqp")
{
Counters = counters;
KqpGroup = GetServiceCounters(counters, "kqp");
@@ -786,15 +787,6 @@ TKqpCounters::TKqpCounters(const ::NMonitoring::TDynamicCounterPtr& counters, co
RmMaxSnapshotLatency = KqpGroup->GetCounter("RM/MaxSnapshotLatency", false);
RmNodeNumberInSnapshot = KqpGroup->GetCounter("RM/NodeNumberInSnapshot", false);
- /* Spilling */
- SpillingWriteBlobs = KqpGroup->GetCounter("Spilling/WriteBlobs", true);
- SpillingReadBlobs = KqpGroup->GetCounter("Spilling/ReadBlobs", true);
- SpillingStoredBlobs = KqpGroup->GetCounter("Spilling/StoredBlobs", false);
- SpillingTotalSpaceUsed = KqpGroup->GetCounter("Spilling/TotalSpaceUsed", false);
- SpillingTooBigFileErrors = KqpGroup->GetCounter("Spilling/TooBigFileErrors", true);
- SpillingNoSpaceErrors = KqpGroup->GetCounter("Spilling/NoSpaceErrors", true);
- SpillingIoErrors = KqpGroup->GetCounter("Spilling/IoErrors", true);
-
/* Scan queries */
ScanQueryShardDisconnect = KqpGroup->GetCounter("ScanQuery/ShardDisconnect", true);
ScanQueryShardResolve = KqpGroup->GetCounter("ScanQuery/ShardResolve", true);
diff --git a/ydb/core/kqp/counters/kqp_counters.h b/ydb/core/kqp/counters/kqp_counters.h
index 8a8e9da788..f629743445 100644
--- a/ydb/core/kqp/counters/kqp_counters.h
+++ b/ydb/core/kqp/counters/kqp_counters.h
@@ -12,6 +12,7 @@
#include <ydb/core/tx/tx_proxy/mon.h>
#include <ydb/library/yql/minikql/aligned_page_pool.h>
+#include <ydb/library/yql/dq/actors/spilling/spilling_counters.h>
#include <util/system/spinlock.h>
@@ -246,7 +247,7 @@ private:
using TKqpDbCountersPtr = TIntrusivePtr<TKqpDbCounters>;
-class TKqpCounters : public TThrRefBase, public TKqpCountersBase {
+class TKqpCounters : public TKqpCountersBase, public NYql::NDq::TSpillingCounters {
private:
struct TTxByKindCounters {
NMonitoring::THistogramPtr TotalDuration;
@@ -378,15 +379,6 @@ public:
::NMonitoring::TDynamicCounters::TCounterPtr RmMaxSnapshotLatency;
::NMonitoring::TDynamicCounters::TCounterPtr RmNodeNumberInSnapshot;
- // Spilling counters
- ::NMonitoring::TDynamicCounters::TCounterPtr SpillingWriteBlobs;
- ::NMonitoring::TDynamicCounters::TCounterPtr SpillingReadBlobs;
- ::NMonitoring::TDynamicCounters::TCounterPtr SpillingStoredBlobs;
- ::NMonitoring::TDynamicCounters::TCounterPtr SpillingTotalSpaceUsed;
- ::NMonitoring::TDynamicCounters::TCounterPtr SpillingTooBigFileErrors;
- ::NMonitoring::TDynamicCounters::TCounterPtr SpillingNoSpaceErrors;
- ::NMonitoring::TDynamicCounters::TCounterPtr SpillingIoErrors;
-
// Scan queries counters
::NMonitoring::TDynamicCounters::TCounterPtr ScanQueryShardDisconnect;
::NMonitoring::TDynamicCounters::TCounterPtr ScanQueryShardResolve;
diff --git a/ydb/core/kqp/counters/ya.make b/ydb/core/kqp/counters/ya.make
index dd0786c24f..4ce62cb5e1 100644
--- a/ydb/core/kqp/counters/ya.make
+++ b/ydb/core/kqp/counters/ya.make
@@ -10,6 +10,8 @@ PEERDIR(
ydb/core/base
ydb/core/protos
ydb/core/sys_view/service
+ ydb/library/yql/dq/actors/spilling
+ ydb/library/yql/minikql
)
END()
diff --git a/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp b/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp
index 04aabbd48c..39f224f736 100644
--- a/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp
@@ -66,7 +66,7 @@ TDqTaskRunnerMemoryLimits CreateTaskRunnerMemoryLimits() {
return memoryLimits;
}
-TDqTaskRunnerExecutionContext CreateTaskRunnerExecutionContext() {
+TDqTaskRunnerExecutionContextDefault CreateTaskRunnerExecutionContext() {
return {};
}
diff --git a/ydb/core/kqp/proxy_service/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/proxy_service/CMakeLists.darwin-x86_64.txt
index f8582de4c3..449150439b 100644
--- a/ydb/core/kqp/proxy_service/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/kqp/proxy_service/CMakeLists.darwin-x86_64.txt
@@ -32,10 +32,12 @@ target_link_libraries(core-kqp-proxy_service PUBLIC
core-tx-tx_proxy
core-tx-scheme_cache
core-tx-schemeshard
+ ydb-core-mon
ydb-library-query_actor
providers-common-http_gateway
providers-common-proto
yql-public-issue
+ dq-actors-spilling
api-protos
public-lib-operation_id
public-lib-scheme_types
diff --git a/ydb/core/kqp/proxy_service/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/proxy_service/CMakeLists.linux-aarch64.txt
index 8469df0747..1b2aa5977e 100644
--- a/ydb/core/kqp/proxy_service/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/kqp/proxy_service/CMakeLists.linux-aarch64.txt
@@ -33,10 +33,12 @@ target_link_libraries(core-kqp-proxy_service PUBLIC
core-tx-tx_proxy
core-tx-scheme_cache
core-tx-schemeshard
+ ydb-core-mon
ydb-library-query_actor
providers-common-http_gateway
providers-common-proto
yql-public-issue
+ dq-actors-spilling
api-protos
public-lib-operation_id
public-lib-scheme_types
diff --git a/ydb/core/kqp/proxy_service/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/proxy_service/CMakeLists.linux-x86_64.txt
index 8469df0747..1b2aa5977e 100644
--- a/ydb/core/kqp/proxy_service/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/kqp/proxy_service/CMakeLists.linux-x86_64.txt
@@ -33,10 +33,12 @@ target_link_libraries(core-kqp-proxy_service PUBLIC
core-tx-tx_proxy
core-tx-scheme_cache
core-tx-schemeshard
+ ydb-core-mon
ydb-library-query_actor
providers-common-http_gateway
providers-common-proto
yql-public-issue
+ dq-actors-spilling
api-protos
public-lib-operation_id
public-lib-scheme_types
diff --git a/ydb/core/kqp/proxy_service/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/proxy_service/CMakeLists.windows-x86_64.txt
index f8582de4c3..449150439b 100644
--- a/ydb/core/kqp/proxy_service/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/kqp/proxy_service/CMakeLists.windows-x86_64.txt
@@ -32,10 +32,12 @@ target_link_libraries(core-kqp-proxy_service PUBLIC
core-tx-tx_proxy
core-tx-scheme_cache
core-tx-schemeshard
+ ydb-core-mon
ydb-library-query_actor
providers-common-http_gateway
providers-common-proto
yql-public-issue
+ dq-actors-spilling
api-protos
public-lib-operation_id
public-lib-scheme_types
diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
index dfac53d9d2..1d55ec7194 100644
--- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
+++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
@@ -15,13 +15,14 @@
#include <ydb/core/kqp/compile_service/kqp_compile_service.h>
#include <ydb/core/kqp/session_actor/kqp_worker_common.h>
#include <ydb/core/kqp/node_service/kqp_node_service.h>
-#include <ydb/core/kqp/runtime/kqp_spilling_file.h>
-#include <ydb/core/kqp/runtime/kqp_spilling.h>
+#include <ydb/library/yql/dq/actors/spilling/spilling_file.h>
+#include <ydb/library/yql/dq/actors/spilling/spilling.h>
#include <ydb/core/actorlib_impl/long_timer.h>
#include <ydb/public/lib/operation_id/operation_id.h>
#include <ydb/core/node_whiteboard/node_whiteboard.h>
#include <ydb/core/ydb_convert/ydb_convert.h>
#include <ydb/core/kqp/compute_actor/kqp_compute_actor.h>
+#include <ydb/core/mon/mon.h>
#include <ydb/library/yql/utils/actor_log/log.h>
#include <ydb/library/yql/core/services/mounts/yql_mounts.h>
@@ -211,9 +212,25 @@ public:
WhiteBoardService = NNodeWhiteboard::MakeNodeWhiteboardServiceId(SelfId().NodeId());
if (auto& cfg = TableServiceConfig.GetSpillingServiceConfig().GetLocalFileConfig(); cfg.GetEnable()) {
- SpillingService = TlsActivationContext->ExecutorThread.RegisterActor(CreateKqpLocalFileSpillingService(cfg, Counters));
+ SpillingService = TlsActivationContext->ExecutorThread.RegisterActor(NYql::NDq::CreateDqLocalFileSpillingService(
+ NYql::NDq::TFileSpillingServiceConfig{
+ .Root = cfg.GetRoot(),
+ .MaxTotalSize = cfg.GetMaxTotalSize(),
+ .MaxFileSize = cfg.GetMaxFileSize(),
+ .MaxFilePartSize = cfg.GetMaxFilePartSize(),
+ .IoThreadPoolWorkersCount = cfg.GetIoThreadPool().GetWorkersCount(),
+ .IoThreadPoolQueueSize = cfg.GetIoThreadPool().GetQueueSize(),
+ .CleanupOnShutdown = false
+ },
+ Counters));
TlsActivationContext->ExecutorThread.ActorSystem->RegisterLocalService(
- MakeKqpLocalFileSpillingServiceID(SelfId().NodeId()), SpillingService);
+ NYql::NDq::MakeDqLocalFileSpillingServiceID(SelfId().NodeId()), SpillingService);
+
+ if (NActors::TMon* mon = AppData()->Mon) {
+ NMonitoring::TIndexMonPage* actorsMonPage = mon->RegisterIndexPage("actors", "Actors");
+ mon->RegisterActorPage(actorsMonPage, "kqp_spilling_file", "KQP Local File Spilling Service", false,
+ TlsActivationContext->ExecutorThread.ActorSystem, SpillingService);
+ }
}
// Create compile service
@@ -1580,7 +1597,7 @@ private:
IActor* CreateKqpProxyService(const NKikimrConfig::TLogConfig& logConfig,
const NKikimrConfig::TTableServiceConfig& tableServiceConfig,
- const NKikimrConfig::TQueryServiceConfig& queryServiceConfig,
+ const NKikimrConfig::TQueryServiceConfig& queryServiceConfig,
const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig,
TVector<NKikimrKqp::TKqpSetting>&& settings,
std::shared_ptr<IQueryReplayBackendFactory> queryReplayFactory,
diff --git a/ydb/core/kqp/proxy_service/ya.make b/ydb/core/kqp/proxy_service/ya.make
index cef6cbd664..e93003bae8 100644
--- a/ydb/core/kqp/proxy_service/ya.make
+++ b/ydb/core/kqp/proxy_service/ya.make
@@ -24,10 +24,12 @@ PEERDIR(
ydb/core/tx/tx_proxy
ydb/core/tx/scheme_cache
ydb/core/tx/schemeshard
+ ydb/core/mon
ydb/library/query_actor
ydb/library/yql/providers/common/http_gateway
ydb/library/yql/providers/common/proto
ydb/library/yql/public/issue
+ ydb/library/yql/dq/actors/spilling
ydb/public/api/protos
ydb/public/lib/operation_id
ydb/public/lib/scheme_types
diff --git a/ydb/core/kqp/runtime/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/runtime/CMakeLists.darwin-x86_64.txt
index e8854f07a9..c297a48ab8 100644
--- a/ydb/core/kqp/runtime/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/kqp/runtime/CMakeLists.darwin-x86_64.txt
@@ -7,12 +7,6 @@
add_subdirectory(ut)
-get_built_tool_path(
- TOOL_enum_parser_bin
- TOOL_enum_parser_dependency
- tools/enum_parser/enum_parser
- enum_parser
-)
add_library(core-kqp-runtime)
target_compile_options(core-kqp-runtime PRIVATE
@@ -34,13 +28,12 @@ target_link_libraries(core-kqp-runtime PUBLIC
minikql-comp_nodes-llvm
library-yql-utils
dq-actors-protos
+ dq-actors-spilling
yql-dq-common
yql-dq-runtime
cpp-threading-hot_swap
- tools-enum_parser-enum_serialization_runtime
)
target_sources(core-kqp-runtime PRIVATE
- ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_channel_storage.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_compute.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_effects.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_output_stream.cpp
@@ -51,15 +44,9 @@ target_sources(core-kqp-runtime PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_sequencer_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_sequencer_factory.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_scan_data_meta.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_spilling_file.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_stream_lookup_factory.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_tasks_runner.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_transport.cpp
)
-generate_enum_serilization(core-kqp-runtime
- ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_spilling.h
- INCLUDE_HEADERS
- ydb/core/kqp/runtime/kqp_spilling.h
-)
diff --git a/ydb/core/kqp/runtime/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/runtime/CMakeLists.linux-aarch64.txt
index 8a8cb70240..ba1adf7be3 100644
--- a/ydb/core/kqp/runtime/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/kqp/runtime/CMakeLists.linux-aarch64.txt
@@ -7,12 +7,6 @@
add_subdirectory(ut)
-get_built_tool_path(
- TOOL_enum_parser_bin
- TOOL_enum_parser_dependency
- tools/enum_parser/enum_parser
- enum_parser
-)
add_library(core-kqp-runtime)
target_compile_options(core-kqp-runtime PRIVATE
@@ -35,13 +29,12 @@ target_link_libraries(core-kqp-runtime PUBLIC
minikql-comp_nodes-llvm
library-yql-utils
dq-actors-protos
+ dq-actors-spilling
yql-dq-common
yql-dq-runtime
cpp-threading-hot_swap
- tools-enum_parser-enum_serialization_runtime
)
target_sources(core-kqp-runtime PRIVATE
- ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_channel_storage.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_compute.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_effects.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_output_stream.cpp
@@ -52,15 +45,9 @@ target_sources(core-kqp-runtime PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_sequencer_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_sequencer_factory.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_scan_data_meta.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_spilling_file.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_stream_lookup_factory.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_tasks_runner.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_transport.cpp
)
-generate_enum_serilization(core-kqp-runtime
- ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_spilling.h
- INCLUDE_HEADERS
- ydb/core/kqp/runtime/kqp_spilling.h
-)
diff --git a/ydb/core/kqp/runtime/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/runtime/CMakeLists.linux-x86_64.txt
index 8a8cb70240..ba1adf7be3 100644
--- a/ydb/core/kqp/runtime/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/kqp/runtime/CMakeLists.linux-x86_64.txt
@@ -7,12 +7,6 @@
add_subdirectory(ut)
-get_built_tool_path(
- TOOL_enum_parser_bin
- TOOL_enum_parser_dependency
- tools/enum_parser/enum_parser
- enum_parser
-)
add_library(core-kqp-runtime)
target_compile_options(core-kqp-runtime PRIVATE
@@ -35,13 +29,12 @@ target_link_libraries(core-kqp-runtime PUBLIC
minikql-comp_nodes-llvm
library-yql-utils
dq-actors-protos
+ dq-actors-spilling
yql-dq-common
yql-dq-runtime
cpp-threading-hot_swap
- tools-enum_parser-enum_serialization_runtime
)
target_sources(core-kqp-runtime PRIVATE
- ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_channel_storage.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_compute.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_effects.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_output_stream.cpp
@@ -52,15 +45,9 @@ target_sources(core-kqp-runtime PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_sequencer_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_sequencer_factory.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_scan_data_meta.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_spilling_file.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_stream_lookup_factory.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_tasks_runner.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_transport.cpp
)
-generate_enum_serilization(core-kqp-runtime
- ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_spilling.h
- INCLUDE_HEADERS
- ydb/core/kqp/runtime/kqp_spilling.h
-)
diff --git a/ydb/core/kqp/runtime/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/runtime/CMakeLists.windows-x86_64.txt
index e8854f07a9..c297a48ab8 100644
--- a/ydb/core/kqp/runtime/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/kqp/runtime/CMakeLists.windows-x86_64.txt
@@ -7,12 +7,6 @@
add_subdirectory(ut)
-get_built_tool_path(
- TOOL_enum_parser_bin
- TOOL_enum_parser_dependency
- tools/enum_parser/enum_parser
- enum_parser
-)
add_library(core-kqp-runtime)
target_compile_options(core-kqp-runtime PRIVATE
@@ -34,13 +28,12 @@ target_link_libraries(core-kqp-runtime PUBLIC
minikql-comp_nodes-llvm
library-yql-utils
dq-actors-protos
+ dq-actors-spilling
yql-dq-common
yql-dq-runtime
cpp-threading-hot_swap
- tools-enum_parser-enum_serialization_runtime
)
target_sources(core-kqp-runtime PRIVATE
- ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_channel_storage.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_compute.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_effects.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_output_stream.cpp
@@ -51,15 +44,9 @@ target_sources(core-kqp-runtime PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_sequencer_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_sequencer_factory.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_scan_data_meta.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_spilling_file.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_stream_lookup_factory.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_tasks_runner.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_transport.cpp
)
-generate_enum_serilization(core-kqp-runtime
- ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_spilling.h
- INCLUDE_HEADERS
- ydb/core/kqp/runtime/kqp_spilling.h
-)
diff --git a/ydb/core/kqp/runtime/kqp_channel_storage.cpp b/ydb/core/kqp/runtime/kqp_channel_storage.cpp
deleted file mode 100644
index d0e40d8d95..0000000000
--- a/ydb/core/kqp/runtime/kqp_channel_storage.cpp
+++ /dev/null
@@ -1,246 +0,0 @@
-#include "kqp_channel_storage.h"
-#include "kqp_spilling.h"
-#include "kqp_spilling_file.h"
-
-#include <ydb/library/yql/utils/yql_panic.h>
-
-#include <library/cpp/actors/core/actor_bootstrapped.h>
-#include <library/cpp/actors/core/hfunc.h>
-#include <library/cpp/actors/core/log.h>
-
-#include <util/generic/buffer.h>
-#include <util/generic/map.h>
-#include <util/generic/set.h>
-#include <util/generic/size_literals.h>
-
-
-namespace NKikimr::NKqp {
-
-using namespace NActors;
-using namespace NYql;
-using namespace NYql::NDq;
-
-#define LOG(...) do { if (Y_UNLIKELY(LogFunc)) { LogFunc(__VA_ARGS__); } } while (0)
-
-#define LOG_D(s) \
- LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId << ", channelId: " << ChannelId << ". " << s)
-#define LOG_I(s) \
- LOG_INFO_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId << ", channelId: " << ChannelId << ". " << s)
-#define LOG_E(s) \
- LOG_ERROR_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId << ", channelId: " << ChannelId << ". " << s)
-#define LOG_C(s) \
- LOG_CRIT_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId << ", channelId: " << ChannelId << ". " << s)
-#define LOG_W(s) \
- LOG_WARN_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId << ", channelId: " << ChannelId << ". " << s)
-#define LOG_T(s) \
- LOG_TRACE_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId << ", channelId: " << ChannelId << ". " << s)
-
-namespace {
-
-constexpr ui32 MAX_INFLIGHT_BLOBS_COUNT = 10;
-constexpr ui64 MAX_INFLIGHT_BLOBS_SIZE = 50_MB;
-
-class TKqpChannelStorageActor : public TActorBootstrapped<TKqpChannelStorageActor> {
- using TBase = TActorBootstrapped<TKqpChannelStorageActor>;
-
-public:
- TKqpChannelStorageActor(ui64 txId, ui64 channelId, IDqChannelStorage::TWakeUpCallback&& wakeUp)
- : TxId(txId)
- , ChannelId(channelId)
- , WakeUp(std::move(wakeUp)) {}
-
- void Bootstrap() {
- auto spillingActor = CreateKqpLocalFileSpillingActor(TxId, TStringBuilder() << "ChannelId: " << ChannelId,
- SelfId(), true);
- SpillingActorId = Register(spillingActor);
-
- Become(&TKqpChannelStorageActor::WorkState);
- }
-
- static constexpr char ActorName[] = "KQP_CHANNEL_STORAGE";
-
-protected:
- void PassAway() override {
- Send(SpillingActorId, new TEvents::TEvPoison);
- TBase::PassAway();
- }
-
-private:
- STATEFN(WorkState) {
- switch (ev->GetTypeRewrite()) {
- hFunc(TEvKqpSpilling::TEvWriteResult, HandleWork);
- hFunc(TEvKqpSpilling::TEvReadResult, HandleWork);
- hFunc(TEvKqpSpilling::TEvError, HandleWork);
- default:
- Y_ABORT("TKqpChannelStorageActor::WorkState unexpected event type: %" PRIx32 " event: %s",
- ev->GetTypeRewrite(),
- ev->ToString().data());
- }
- }
-
- void HandleWork(TEvKqpSpilling::TEvWriteResult::TPtr& ev) {
- auto& msg = *ev->Get();
- LOG_T("[TEvWriteResult] blobId: " << msg.BlobId);
-
- auto it = WritingBlobs.find(msg.BlobId);
- if (it == WritingBlobs.end()) {
- LOG_E("Got unexpected TEvWriteResult, blobId: " << msg.BlobId);
-
- Error = "Internal error";
-
- Send(SpillingActorId, new TEvents::TEvPoison);
- return;
- }
-
- ui64 size = it->second;
- WritingBlobsSize -= size;
- WritingBlobs.erase(it);
-
- StoredBlobsCount++;
- StoredBlobsSize += size;
- }
-
- void HandleWork(TEvKqpSpilling::TEvReadResult::TPtr& ev) {
- auto& msg = *ev->Get();
- LOG_T("[TEvReadResult] blobId: " << msg.BlobId << ", size: " << msg.Blob.size());
-
- if (LoadingBlobs.erase(msg.BlobId) != 1) {
- LOG_E("[TEvReadResult] unexpected, blobId: " << msg.BlobId << ", size: " << msg.Blob.size());
- return;
- }
-
- LoadedBlobs[msg.BlobId].Swap(msg.Blob);
- YQL_ENSURE(LoadedBlobs[msg.BlobId].size() != 0);
-
- if (LoadedBlobs.size() == 1) {
- WakeUp();
- }
- }
-
- void HandleWork(TEvKqpSpilling::TEvError::TPtr& ev) {
- auto& msg = *ev->Get();
- LOG_D("[TEvError] " << msg.Message);
-
- Error.ConstructInPlace(msg.Message);
- }
-
-public:
- [[nodiscard]]
- const TMaybe<TString>& GetError() const {
- return Error;
- }
-
- bool IsEmpty() const {
- return WritingBlobs.empty() && StoredBlobsCount == 0 && LoadedBlobs.empty();
- }
-
- bool IsFull() const {
- return WritingBlobs.size() > MAX_INFLIGHT_BLOBS_COUNT || WritingBlobsSize > MAX_INFLIGHT_BLOBS_SIZE;
- }
-
- void Put(ui64 blobId, TRope&& blob) {
- FailOnError();
-
- // TODO: timeout
- // TODO: limit inflight events
-
- ui64 size = blob.size();
-
- Send(SpillingActorId, new TEvKqpSpilling::TEvWrite(blobId, std::move(blob)));
-
- WritingBlobs.emplace(blobId, size);
- WritingBlobsSize += size;
- }
-
- bool Get(ui64 blobId, TBuffer& blob) {
- FailOnError();
-
- auto loadedIt = LoadedBlobs.find(blobId);
- if (loadedIt != LoadedBlobs.end()) {
- YQL_ENSURE(loadedIt->second.size() != 0);
- blob.Swap(loadedIt->second);
- LoadedBlobs.erase(loadedIt);
- return true;
- }
-
- auto result = LoadingBlobs.emplace(blobId);
- if (result.second) {
- Send(SpillingActorId, new TEvKqpSpilling::TEvRead(blobId, true));
- }
-
- return false;
- }
-
- void Terminate() {
- PassAway();
- }
-
-private:
- void FailOnError() {
- if (Error) {
- LOG_E("TxId: " << TxId << ", channelId: " << ChannelId << ", error: " << *Error);
- ythrow TDqChannelStorageException() << "TxId: " << TxId << ", channelId: " << ChannelId
- << ", error: " << *Error;
- }
- }
-
-private:
- const ui64 TxId;
- const ui64 ChannelId;
- IDqChannelStorage::TWakeUpCallback WakeUp;
- TActorId SpillingActorId;
-
- TMap<ui64, ui64> WritingBlobs; // blobId -> blobSize
- ui64 WritingBlobsSize = 0;
-
- ui32 StoredBlobsCount = 0;
- ui64 StoredBlobsSize = 0;
-
- TSet<ui64> LoadingBlobs;
- TMap<ui64, TBuffer> LoadedBlobs;
-
- TMaybe<TString> Error;
-};
-
-
-class TKqpChannelStorage : public IDqChannelStorage {
-public:
- TKqpChannelStorage(ui64 txId, ui64 channelId, TWakeUpCallback&& wakeUp, const TActorContext& ctx)
- {
- SelfActor = new TKqpChannelStorageActor(txId, channelId, std::move(wakeUp));
- ctx.RegisterWithSameMailbox(SelfActor);
- }
-
- ~TKqpChannelStorage() {
- SelfActor->Terminate();
- }
-
- bool IsEmpty() const override {
- return SelfActor->IsEmpty();
- }
-
- bool IsFull() const override {
- return SelfActor->IsFull();
- }
-
- void Put(ui64 blobId, TRope&& blob) override {
- SelfActor->Put(blobId, std::move(blob));
- }
-
- bool Get(ui64 blobId, TBuffer& blob) override {
- return SelfActor->Get(blobId, blob);
- }
-
-private:
- TKqpChannelStorageActor* SelfActor;
-};
-
-} // anonymous namespace
-
-IDqChannelStorage::TPtr CreateKqpChannelStorage(ui64 txId, ui64 channelId, IDqChannelStorage::TWakeUpCallback wakeUp,
- const TActorContext& ctx)
-{
- return new TKqpChannelStorage(txId, channelId, std::move(wakeUp), ctx);
-}
-
-} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/runtime/kqp_spilling_file.h b/ydb/core/kqp/runtime/kqp_spilling_file.h
deleted file mode 100644
index 14196a8498..0000000000
--- a/ydb/core/kqp/runtime/kqp_spilling_file.h
+++ /dev/null
@@ -1,18 +0,0 @@
-#pragma once
-
-#include <ydb/core/kqp/counters/kqp_counters.h>
-#include <ydb/core/protos/config.pb.h>
-
-#include <library/cpp/actors/core/actor.h>
-
-
-namespace NKikimr::NKqp {
-
-NActors::IActor* CreateKqpLocalFileSpillingActor(ui64 txId, const TString& details, const NActors::TActorId& client,
- bool removeBlobsAfterRead);
-
-NActors::IActor* CreateKqpLocalFileSpillingService(
- const NKikimrConfig::TTableServiceConfig::TSpillingServiceConfig::TLocalFileConfig& config,
- TIntrusivePtr<TKqpCounters> counters);
-
-} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/runtime/kqp_spilling_file_ut.cpp b/ydb/core/kqp/runtime/kqp_spilling_file_ut.cpp
deleted file mode 100644
index 5211e30104..0000000000
--- a/ydb/core/kqp/runtime/kqp_spilling_file_ut.cpp
+++ /dev/null
@@ -1,441 +0,0 @@
-#include "kqp_spilling_file.h"
-#include "kqp_spilling.h"
-
-#include <ydb/core/kqp/common/kqp.h>
-#include <ydb/core/testlib/basics/appdata.h>
-#include <ydb/core/testlib/basics/runtime.h>
-
-#include <library/cpp/testing/unittest/registar.h>
-
-#include <util/system/fs.h>
-
-namespace NKikimr::NKqp {
-
-using namespace NActors;
-using namespace NKikimr;
-using namespace NKikimr::NMiniKQL;
-using namespace NKikimr::NKqp;
-using namespace NYql;
-
-namespace {
-
-TString GetSpillingPrefix() {
- static TString str = Sprintf("%s_%d/", "kqp_spilling", (int)getpid());
- return str;
-}
-
-TBuffer CreateBlob(ui32 size, char symbol) {
- TBuffer blob(size);
- blob.Fill(symbol, size);
- return blob;
-}
-
-TRope CreateRope(ui32 size, char symbol, ui32 chunkSize = 7) {
- TRope result;
- while (size) {
- size_t count = std::min(size, chunkSize);
- TString str(count, symbol);
- result.Insert(result.End(), TRope{str});
- size -= count;
- }
- return result;
-}
-
-void AssertEquals(const TBuffer& lhs, const TBuffer& rhs) {
- TStringBuf l{lhs.data(), lhs.size()};
- TStringBuf r{rhs.data(), rhs.size()};
- UNIT_ASSERT_STRINGS_EQUAL(l, r);
-}
-
-TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters() {
- static auto counters = MakeIntrusive<::NMonitoring::TDynamicCounters>();
- return counters;
-}
-
-TActorId StartSpillingService(TTestBasicRuntime& runtime, ui64 maxTotalSize = 1000, ui64 maxFileSize = 500,
- ui64 maxFilePartSize = 100, const TString& root = "./" + GetSpillingPrefix())
-{
- Cerr << "cwd: " << NFs::CurrentWorkingDirectory() << Endl;
-
- NKikimrConfig::TTableServiceConfig::TSpillingServiceConfig::TLocalFileConfig config;
- config.SetEnable(true);
- config.SetRoot(root);
- config.SetMaxTotalSize(maxTotalSize);
- config.SetMaxFileSize(maxFileSize);
- config.SetMaxFilePartSize(maxFilePartSize);
-
- auto counters = Counters();
- counters->ResetCounters();
- auto kqpCounters = MakeIntrusive<TKqpCounters>(counters);
-
- auto* spillingService = CreateKqpLocalFileSpillingService(config, kqpCounters);
- auto spillingServiceActorId = runtime.Register(spillingService);
- runtime.EnableScheduleForActor(spillingServiceActorId);
- runtime.RegisterService(MakeKqpLocalFileSpillingServiceID(runtime.GetNodeId()), spillingServiceActorId);
-
- return spillingServiceActorId;
-}
-
-TActorId StartSpillingActor(TTestBasicRuntime& runtime, const TActorId& client, bool removeBlobsAfterRead = true) {
- auto *spillingActor = CreateKqpLocalFileSpillingActor(1, "test", client, removeBlobsAfterRead);
- auto spillingActorId = runtime.Register(spillingActor);
- runtime.EnableScheduleForActor(spillingActorId);
-
- return spillingActorId;
-}
-
-void WaitBootstrap(TTestBasicRuntime& runtime) {
- TDispatchOptions options;
- options.FinalEvents.emplace_back(TEvents::TSystem::Bootstrap, 1);
- UNIT_ASSERT(runtime.DispatchEvents(options));
-}
-
-void SetupLogs(TTestBasicRuntime& runtime) {
- runtime.SetLogPriority(NKikimrServices::KQP_BLOBS_STORAGE, NActors::NLog::PRI_ERROR);
-}
-
-} // anonymous namespace
-
-Y_UNIT_TEST_SUITE(KqpSpillingFileTests) {
-
-Y_UNIT_TEST(Simple) {
- TTestBasicRuntime runtime{1, false};
- runtime.Initialize(TAppPrepare().Unwrap());
- SetupLogs(runtime);
-
- auto spillingService = StartSpillingService(runtime);
- auto tester = runtime.AllocateEdgeActor();
- auto spillingActor = StartSpillingActor(runtime, tester);
-
- WaitBootstrap(runtime);
-
- // put blob 1
- {
- auto ev = new TEvKqpSpilling::TEvWrite(1, CreateRope(10, 'a'));
- runtime.Send(new IEventHandle(spillingActor, tester, ev));
-
- auto resp = runtime.GrabEdgeEvent<TEvKqpSpilling::TEvWriteResult>(tester, TDuration::Seconds(1));
- UNIT_ASSERT_VALUES_EQUAL(1, resp->Get()->BlobId);
- }
-
- // put blob 2
- {
- auto ev = new TEvKqpSpilling::TEvWrite(2, CreateRope(11, 'z'));
- runtime.Send(new IEventHandle(spillingActor, tester, ev));
-
- auto resp = runtime.GrabEdgeEvent<TEvKqpSpilling::TEvWriteResult>(tester, TDuration::Seconds(1));
- UNIT_ASSERT_VALUES_EQUAL(2, resp->Get()->BlobId);
- }
-
- // get blob 1
- {
- auto ev = new TEvKqpSpilling::TEvRead(1);
- runtime.Send(new IEventHandle(spillingActor, tester, ev));
-
- auto resp = runtime.GrabEdgeEvent<TEvKqpSpilling::TEvReadResult>(tester, TDuration::Seconds(1));
- UNIT_ASSERT_VALUES_EQUAL(1, resp->Get()->BlobId);
-
- TBuffer expected = CreateBlob(10, 'a');
- AssertEquals(expected, resp->Get()->Blob);
- }
-
- // get blob 2
- {
- auto ev = new TEvKqpSpilling::TEvRead(2);
- runtime.Send(new IEventHandle(spillingActor, tester, ev));
-
- auto resp = runtime.GrabEdgeEvent<TEvKqpSpilling::TEvReadResult>(tester, TDuration::Seconds(1));
- UNIT_ASSERT_VALUES_EQUAL(2, resp->Get()->BlobId);
-
- TBuffer expected = CreateBlob(11, 'z');
- AssertEquals(expected, resp->Get()->Blob);
- }
-
- // terminate
- {
- runtime.Send(new IEventHandle(spillingActor, tester, new TEvents::TEvPoison));
-
- std::atomic<bool> done = false;
- runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& event) {
- if (event->GetRecipientRewrite() == spillingService) {
- if (event->GetTypeRewrite() == 2146435074 /* EvCloseFileResponse */ ) {
- done = true;
- }
- }
- return TTestActorRuntimeBase::EEventAction::PROCESS;
- });
-
- TDispatchOptions options;
- options.CustomFinalCondition = [&]() {
- return (bool) done;
- };
-
- runtime.DispatchEvents(options, TDuration::Seconds(1));
- }
-}
-
-Y_UNIT_TEST(Write_TotalSizeLimitExceeded) {
- TTestBasicRuntime runtime{1, false};
- runtime.Initialize(TAppPrepare().Unwrap());
- SetupLogs(runtime);
-
- StartSpillingService(runtime, 100, 1000, 1000);
- auto tester = runtime.AllocateEdgeActor();
- auto spillingActor = StartSpillingActor(runtime, tester);
-
- WaitBootstrap(runtime);
-
- {
- auto ev = new TEvKqpSpilling::TEvWrite(1, CreateRope(51, 'a'));
- runtime.Send(new IEventHandle(spillingActor, tester, ev));
-
- auto resp = runtime.GrabEdgeEvent<TEvKqpSpilling::TEvWriteResult>(tester);
- UNIT_ASSERT_VALUES_EQUAL(1, resp->Get()->BlobId);
- }
-
- {
- auto ev = new TEvKqpSpilling::TEvWrite(2, CreateRope(50, 'b'));
- runtime.Send(new IEventHandle(spillingActor, tester, ev));
-
- auto resp = runtime.GrabEdgeEvent<TEvKqpSpilling::TEvError>(tester);
- UNIT_ASSERT_STRINGS_EQUAL("Total size limit exceeded", resp->Get()->Message);
- }
-}
-
-Y_UNIT_TEST(Write_FileSizeLimitExceeded) {
- TTestBasicRuntime runtime{1, false};
- runtime.Initialize(TAppPrepare().Unwrap());
- SetupLogs(runtime);
-
- StartSpillingService(runtime, 1000, 100, 1000);
- auto tester = runtime.AllocateEdgeActor();
- auto spillingActor = StartSpillingActor(runtime, tester);
-
- WaitBootstrap(runtime);
-
- {
- auto ev = new TEvKqpSpilling::TEvWrite(1, CreateRope(51, 'a'));
- runtime.Send(new IEventHandle(spillingActor, tester, ev));
-
- auto resp = runtime.GrabEdgeEvent<TEvKqpSpilling::TEvWriteResult>(tester);
- UNIT_ASSERT_VALUES_EQUAL(1, resp->Get()->BlobId);
- }
-
- {
- auto ev = new TEvKqpSpilling::TEvWrite(2, CreateRope(50, 'b'));
- runtime.Send(new IEventHandle(spillingActor, tester, ev));
-
- auto resp = runtime.GrabEdgeEvent<TEvKqpSpilling::TEvError>(tester);
- UNIT_ASSERT_STRINGS_EQUAL("File size limit exceeded", resp->Get()->Message);
- }
-}
-
-Y_UNIT_TEST(MultipleFileParts) {
- TTestBasicRuntime runtime{1, false};
- runtime.Initialize(TAppPrepare().Unwrap());
- SetupLogs(runtime);
-
- StartSpillingService(runtime, 1000, 100, 25);
- auto tester = runtime.AllocateEdgeActor();
- auto spillingActor = StartSpillingActor(runtime, tester);
-
- WaitBootstrap(runtime);
-
- const TString filePrefix = TStringBuilder() << NFs::CurrentWorkingDirectory() << "/" << GetSpillingPrefix() << "node_" << runtime.GetNodeId() << "/1_test_";
-
- for (ui32 i = 0; i < 5; ++i) {
- // Cerr << "---- store blob #" << i << Endl;
- auto ev = new TEvKqpSpilling::TEvWrite(i, CreateRope(20, 'a' + i));
- runtime.Send(new IEventHandle(spillingActor, tester, ev));
-
- auto resp = runtime.GrabEdgeEvent<TEvKqpSpilling::TEvWriteResult>(tester);
- UNIT_ASSERT_VALUES_EQUAL(i, resp->Get()->BlobId);
-
- UNIT_ASSERT(NFs::Exists(TStringBuilder() << filePrefix << i));
- }
-
- for (i32 i = 4; i >= 0; --i) {
- // Cerr << "---- load blob #" << i << Endl;
- auto ev = new TEvKqpSpilling::TEvRead(i, true);
- runtime.Send(new IEventHandle(spillingActor, tester, ev));
-
- auto resp = runtime.GrabEdgeEvent<TEvKqpSpilling::TEvReadResult>(tester);
- UNIT_ASSERT_VALUES_EQUAL(i, resp->Get()->BlobId);
- TBuffer expected = CreateBlob(20, 'a' + i);
- AssertEquals(expected, resp->Get()->Blob);
-
- if (i == 4) {
- // do not remove last file
- UNIT_ASSERT(NFs::Exists(TStringBuilder() << filePrefix << i));
- } else {
- UNIT_ASSERT(!NFs::Exists(TStringBuilder() << filePrefix << i));
- }
- }
-}
-
-Y_UNIT_TEST(SingleFilePart) {
- TTestBasicRuntime runtime{1, false};
- runtime.Initialize(TAppPrepare().Unwrap());
- SetupLogs(runtime);
-
- StartSpillingService(runtime, 1000, 100, 25);
- auto tester = runtime.AllocateEdgeActor();
- auto spillingActor = StartSpillingActor(runtime, tester, false);
-
- WaitBootstrap(runtime);
-
- const TString filePrefix = TStringBuilder() << NFs::CurrentWorkingDirectory() << "/" << GetSpillingPrefix() << "node_" << runtime.GetNodeId() << "/1_test_";
-
- for (ui32 i = 0; i < 5; ++i) {
- // Cerr << "---- store blob #" << i << Endl;
- auto ev = new TEvKqpSpilling::TEvWrite(i, CreateRope(20, 'a' + i));
- runtime.Send(new IEventHandle(spillingActor, tester, ev));
-
- auto resp = runtime.GrabEdgeEvent<TEvKqpSpilling::TEvWriteResult>(tester);
- UNIT_ASSERT_VALUES_EQUAL(i, resp->Get()->BlobId);
-
- UNIT_ASSERT(NFs::Exists(TStringBuilder() << filePrefix << 0));
- if (i > 0) {
- UNIT_ASSERT(!NFs::Exists(TStringBuilder() << filePrefix << i));
- }
- }
-
- for (i32 i = 4; i >= 0; --i) {
- // Cerr << "---- load blob #" << i << Endl;
- auto ev = new TEvKqpSpilling::TEvRead(i, true);
- runtime.Send(new IEventHandle(spillingActor, tester, ev));
-
- auto resp = runtime.GrabEdgeEvent<TEvKqpSpilling::TEvReadResult>(tester);
- UNIT_ASSERT_VALUES_EQUAL(i, resp->Get()->BlobId);
- TBuffer expected = CreateBlob(20, 'a' + i);
- AssertEquals(expected, resp->Get()->Blob);
-
- UNIT_ASSERT(NFs::Exists(TStringBuilder() << filePrefix << 0));
- }
-}
-
-Y_UNIT_TEST(ReadError) {
- return;
-
- TTestBasicRuntime runtime{1, false};
- runtime.Initialize(TAppPrepare().Unwrap());
- SetupLogs(runtime);
-
- StartSpillingService(runtime);
- auto tester = runtime.AllocateEdgeActor();
- auto spillingActor = StartSpillingActor(runtime, tester);
-
- WaitBootstrap(runtime);
-
- {
- auto ev = new TEvKqpSpilling::TEvWrite(0, CreateRope(20, 'a'));
- runtime.Send(new IEventHandle(spillingActor, tester, ev));
-
- auto resp = runtime.GrabEdgeEvent<TEvKqpSpilling::TEvWriteResult>(tester);
- UNIT_ASSERT_VALUES_EQUAL(0, resp->Get()->BlobId);
- }
-
- ::unlink((NFs::CurrentWorkingDirectory() + GetSpillingPrefix() + "node_1/1_test_0").c_str());
-
- {
- auto ev = new TEvKqpSpilling::TEvRead(0, true);
- runtime.Send(new IEventHandle(spillingActor, tester, ev));
-
- auto resp = runtime.GrabEdgeEvent<TEvKqpSpilling::TEvError>(tester);
- auto& err = resp->Get()->Message;
- auto expected = "can't open \"" + GetSpillingPrefix() + "node_1/1_test_0\" with mode RdOnly";
- UNIT_ASSERT_C(err.Contains("No such file or directory"), err);
- UNIT_ASSERT_C(err.Contains(expected), err);
- }
-}
-
-
-struct THttpRequest : NMonitoring::IHttpRequest {
- HTTP_METHOD Method;
- TCgiParameters CgiParameters;
- THttpHeaders HttpHeaders;
-
- THttpRequest(HTTP_METHOD method)
- : Method(method)
- {}
-
- ~THttpRequest() {}
-
- const char* GetURI() const override {
- return "";
- }
-
- const char* GetPath() const override {
- return "";
- }
-
- const TCgiParameters& GetParams() const override {
- return CgiParameters;
- }
-
- const TCgiParameters& GetPostParams() const override {
- return CgiParameters;
- }
-
- TStringBuf GetPostContent() const override {
- return TString();
- }
-
- HTTP_METHOD GetMethod() const override {
- return Method;
- }
-
- const THttpHeaders& GetHeaders() const override {
- return HttpHeaders;
- }
-
- TString GetRemoteAddr() const override {
- return TString();
- }
-};
-
-Y_UNIT_TEST(StartError) {
- TTestBasicRuntime runtime{1, false};
- runtime.Initialize(TAppPrepare().Unwrap());
- SetupLogs(runtime);
-
- auto spillingService = StartSpillingService(runtime, 100, 500, 100, "/nonexistent/" + GetSpillingPrefix());
- auto tester = runtime.AllocateEdgeActor();
- auto spillingActor = StartSpillingActor(runtime, tester);
-
- WaitBootstrap(runtime);
-
- // put blob 1
- {
- auto ev = new TEvKqpSpilling::TEvWrite(1, CreateRope(10, 'a'));
- runtime.Send(new IEventHandle(spillingActor, tester, ev));
-
- auto resp = runtime.GrabEdgeEvent<TEvKqpSpilling::TEvError>(tester, TDuration::Seconds(1));
- UNIT_ASSERT_EQUAL("Service not started", resp->Get()->Message);
- }
-
- // get blob 1
- {
- auto ev = new TEvKqpSpilling::TEvRead(1);
- runtime.Send(new IEventHandle(spillingActor, tester, ev));
-
- auto resp = runtime.GrabEdgeEvent<TEvKqpSpilling::TEvError>(tester, TDuration::Seconds(1));
- UNIT_ASSERT_EQUAL("Service not started", resp->Get()->Message);
- }
-
- // mon
- {
- THttpRequest httpReq(HTTP_METHOD_GET);
- NMonitoring::TMonService2HttpRequest monReq(nullptr, &httpReq, nullptr, nullptr, "", nullptr);
-
- runtime.Send(new IEventHandle(spillingService, tester, new NMon::TEvHttpInfo(monReq)));
-
- auto resp = runtime.GrabEdgeEvent<NMon::TEvHttpInfoRes>(tester, TDuration::Seconds(1));
- UNIT_ASSERT_EQUAL("<html><h2>Service is not started due to IO error</h2></html>",
- ((NMon::TEvHttpInfoRes*) resp->Get())->Answer);
- }
-}
-
-} // suite
-
-} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/runtime/ut/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/runtime/ut/CMakeLists.darwin-x86_64.txt
index 896a8cd357..c4329c5beb 100644
--- a/ydb/core/kqp/runtime/ut/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/kqp/runtime/ut/CMakeLists.darwin-x86_64.txt
@@ -32,7 +32,6 @@ target_link_options(ydb-core-kqp-runtime-ut PRIVATE
CoreFoundation
)
target_sources(ydb-core-kqp-runtime-ut PRIVATE
- ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_spilling_file_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_scan_data_ut.cpp
)
set_property(
diff --git a/ydb/core/kqp/runtime/ut/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/runtime/ut/CMakeLists.linux-aarch64.txt
index e197bb662a..099a1c281f 100644
--- a/ydb/core/kqp/runtime/ut/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/kqp/runtime/ut/CMakeLists.linux-aarch64.txt
@@ -35,7 +35,6 @@ target_link_options(ydb-core-kqp-runtime-ut PRIVATE
-ldl
)
target_sources(ydb-core-kqp-runtime-ut PRIVATE
- ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_spilling_file_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_scan_data_ut.cpp
)
set_property(
diff --git a/ydb/core/kqp/runtime/ut/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/runtime/ut/CMakeLists.linux-x86_64.txt
index 3c64538479..ebf4ee2a3b 100644
--- a/ydb/core/kqp/runtime/ut/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/kqp/runtime/ut/CMakeLists.linux-x86_64.txt
@@ -36,7 +36,6 @@ target_link_options(ydb-core-kqp-runtime-ut PRIVATE
-ldl
)
target_sources(ydb-core-kqp-runtime-ut PRIVATE
- ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_spilling_file_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_scan_data_ut.cpp
)
set_property(
diff --git a/ydb/core/kqp/runtime/ut/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/runtime/ut/CMakeLists.windows-x86_64.txt
index 81ecb94af8..5ac0f20fa5 100644
--- a/ydb/core/kqp/runtime/ut/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/kqp/runtime/ut/CMakeLists.windows-x86_64.txt
@@ -25,7 +25,6 @@ target_link_libraries(ydb-core-kqp-runtime-ut PUBLIC
udf-service-exception_policy
)
target_sources(ydb-core-kqp-runtime-ut PRIVATE
- ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_spilling_file_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_scan_data_ut.cpp
)
set_property(
diff --git a/ydb/core/kqp/runtime/ut/ya.make b/ydb/core/kqp/runtime/ut/ya.make
index 9279dbdc98..c2bc69362d 100644
--- a/ydb/core/kqp/runtime/ut/ya.make
+++ b/ydb/core/kqp/runtime/ut/ya.make
@@ -6,7 +6,6 @@ SIZE(MEDIUM)
TIMEOUT(180)
SRCS(
- kqp_spilling_file_ut.cpp
kqp_scan_data_ut.cpp
)
diff --git a/ydb/core/kqp/runtime/ya.make b/ydb/core/kqp/runtime/ya.make
index 3f41b86b27..9bee3453d3 100644
--- a/ydb/core/kqp/runtime/ya.make
+++ b/ydb/core/kqp/runtime/ya.make
@@ -1,7 +1,6 @@
LIBRARY()
SRCS(
- kqp_channel_storage.cpp
kqp_compute.cpp
kqp_effects.cpp
kqp_output_stream.cpp
@@ -13,7 +12,6 @@ SRCS(
kqp_sequencer_actor.cpp
kqp_sequencer_factory.cpp
kqp_scan_data_meta.cpp
- kqp_spilling_file.cpp
kqp_stream_lookup_actor.cpp
kqp_stream_lookup_actor.h
kqp_stream_lookup_factory.cpp
@@ -38,6 +36,7 @@ PEERDIR(
ydb/library/yql/minikql/comp_nodes/llvm
ydb/library/yql/utils
ydb/library/yql/dq/actors/protos
+ ydb/library/yql/dq/actors/spilling
ydb/library/yql/dq/common
ydb/library/yql/dq/runtime
library/cpp/threading/hot_swap
@@ -45,8 +44,6 @@ PEERDIR(
YQL_LAST_ABI_VERSION()
-GENERATE_ENUM_SERIALIZATION(kqp_spilling.h)
-
END()
RECURSE_FOR_TESTS(
diff --git a/ydb/library/yql/dq/actors/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/dq/actors/CMakeLists.darwin-x86_64.txt
index 1747265bae..9017d23dd4 100644
--- a/ydb/library/yql/dq/actors/CMakeLists.darwin-x86_64.txt
+++ b/ydb/library/yql/dq/actors/CMakeLists.darwin-x86_64.txt
@@ -8,6 +8,7 @@
add_subdirectory(compute)
add_subdirectory(protos)
+add_subdirectory(spilling)
add_subdirectory(task_runner)
get_built_tool_path(
TOOL_grpc_cpp_bin
diff --git a/ydb/library/yql/dq/actors/CMakeLists.linux-aarch64.txt b/ydb/library/yql/dq/actors/CMakeLists.linux-aarch64.txt
index af14a84926..efbd60eba8 100644
--- a/ydb/library/yql/dq/actors/CMakeLists.linux-aarch64.txt
+++ b/ydb/library/yql/dq/actors/CMakeLists.linux-aarch64.txt
@@ -8,6 +8,7 @@
add_subdirectory(compute)
add_subdirectory(protos)
+add_subdirectory(spilling)
add_subdirectory(task_runner)
get_built_tool_path(
TOOL_grpc_cpp_bin
diff --git a/ydb/library/yql/dq/actors/CMakeLists.linux-x86_64.txt b/ydb/library/yql/dq/actors/CMakeLists.linux-x86_64.txt
index af14a84926..efbd60eba8 100644
--- a/ydb/library/yql/dq/actors/CMakeLists.linux-x86_64.txt
+++ b/ydb/library/yql/dq/actors/CMakeLists.linux-x86_64.txt
@@ -8,6 +8,7 @@
add_subdirectory(compute)
add_subdirectory(protos)
+add_subdirectory(spilling)
add_subdirectory(task_runner)
get_built_tool_path(
TOOL_grpc_cpp_bin
diff --git a/ydb/library/yql/dq/actors/CMakeLists.windows-x86_64.txt b/ydb/library/yql/dq/actors/CMakeLists.windows-x86_64.txt
index 1747265bae..9017d23dd4 100644
--- a/ydb/library/yql/dq/actors/CMakeLists.windows-x86_64.txt
+++ b/ydb/library/yql/dq/actors/CMakeLists.windows-x86_64.txt
@@ -8,6 +8,7 @@
add_subdirectory(compute)
add_subdirectory(protos)
+add_subdirectory(spilling)
add_subdirectory(task_runner)
get_built_tool_path(
TOOL_grpc_cpp_bin
diff --git a/ydb/library/yql/dq/actors/compute/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/dq/actors/compute/CMakeLists.darwin-x86_64.txt
index 7b0917ad05..e9b39cdfbe 100644
--- a/ydb/library/yql/dq/actors/compute/CMakeLists.darwin-x86_64.txt
+++ b/ydb/library/yql/dq/actors/compute/CMakeLists.darwin-x86_64.txt
@@ -23,11 +23,13 @@ target_link_libraries(dq-actors-compute PUBLIC
yql-dq-proto
yql-dq-runtime
yql-dq-tasks
+ dq-actors-spilling
minikql-comp_nodes-llvm
yql-public-issue
core-quoter-public
)
target_sources(dq-actors-compute PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp
diff --git a/ydb/library/yql/dq/actors/compute/CMakeLists.linux-aarch64.txt b/ydb/library/yql/dq/actors/compute/CMakeLists.linux-aarch64.txt
index 0fed5ab4c0..b6596cf87f 100644
--- a/ydb/library/yql/dq/actors/compute/CMakeLists.linux-aarch64.txt
+++ b/ydb/library/yql/dq/actors/compute/CMakeLists.linux-aarch64.txt
@@ -24,11 +24,13 @@ target_link_libraries(dq-actors-compute PUBLIC
yql-dq-proto
yql-dq-runtime
yql-dq-tasks
+ dq-actors-spilling
minikql-comp_nodes-llvm
yql-public-issue
core-quoter-public
)
target_sources(dq-actors-compute PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp
diff --git a/ydb/library/yql/dq/actors/compute/CMakeLists.linux-x86_64.txt b/ydb/library/yql/dq/actors/compute/CMakeLists.linux-x86_64.txt
index 0fed5ab4c0..b6596cf87f 100644
--- a/ydb/library/yql/dq/actors/compute/CMakeLists.linux-x86_64.txt
+++ b/ydb/library/yql/dq/actors/compute/CMakeLists.linux-x86_64.txt
@@ -24,11 +24,13 @@ target_link_libraries(dq-actors-compute PUBLIC
yql-dq-proto
yql-dq-runtime
yql-dq-tasks
+ dq-actors-spilling
minikql-comp_nodes-llvm
yql-public-issue
core-quoter-public
)
target_sources(dq-actors-compute PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp
diff --git a/ydb/library/yql/dq/actors/compute/CMakeLists.windows-x86_64.txt b/ydb/library/yql/dq/actors/compute/CMakeLists.windows-x86_64.txt
index 7b0917ad05..e9b39cdfbe 100644
--- a/ydb/library/yql/dq/actors/compute/CMakeLists.windows-x86_64.txt
+++ b/ydb/library/yql/dq/actors/compute/CMakeLists.windows-x86_64.txt
@@ -23,11 +23,13 @@ target_link_libraries(dq-actors-compute PUBLIC
yql-dq-proto
yql-dq-runtime
yql-dq-tasks
+ dq-actors-spilling
minikql-comp_nodes-llvm
yql-public-issue
core-quoter-public
)
target_sources(dq-actors-compute PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp
diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
index 5a24705ba3..c382470556 100644
--- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
@@ -1,5 +1,6 @@
#include "dq_compute_actor.h"
#include "dq_async_compute_actor.h"
+#include "dq_task_runner_exec_ctx.h"
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h>
@@ -82,8 +83,9 @@ public:
Become(&TDqAsyncComputeActor::StateFuncWrapper<&TDqAsyncComputeActor::StateFuncBody>);
- // TODO:
- std::shared_ptr<IDqTaskRunnerExecutionContext> execCtx = std::shared_ptr<IDqTaskRunnerExecutionContext>(new TDqTaskRunnerExecutionContext());
+ auto wakeup = [this]{ ContinueExecute(); };
+ std::shared_ptr<IDqTaskRunnerExecutionContext> execCtx = std::make_shared<TDqTaskRunnerExecutionContext>(
+ TxId, RuntimeSettings.UseSpilling, std::move(wakeup), TlsActivationContext->AsActorContext());
Send(TaskRunnerActorId,
new NTaskRunnerActor::TEvTaskRunnerCreate(
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp
index f8d2105aab..c1a8db880c 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp
@@ -1,5 +1,6 @@
#include "dq_compute_actor_impl.h"
#include "dq_compute_actor.h"
+#include "dq_task_runner_exec_ctx.h"
#include <ydb/library/yql/dq/common/dq_common.h>
@@ -55,7 +56,9 @@ public:
auto taskRunner = TaskRunnerFactory(Task, logger);
SetTaskRunner(taskRunner);
- PrepareTaskRunner();
+ auto wakeup = [this]{ ContinueExecute(); };
+ TDqTaskRunnerExecutionContext execCtx(TxId, RuntimeSettings.UseSpilling, std::move(wakeup), TlsActivationContext->AsActorContext());
+ PrepareTaskRunner(execCtx);
ContinueExecute();
}
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
index 8c433ee435..cac8935d01 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
@@ -1529,7 +1529,7 @@ protected:
TaskRunner = taskRunner;
}
- void PrepareTaskRunner(const IDqTaskRunnerExecutionContext& execCtx = TDqTaskRunnerExecutionContext()) {
+ void PrepareTaskRunner(const IDqTaskRunnerExecutionContext& execCtx) {
YQL_ENSURE(TaskRunner);
auto guard = TaskRunner->BindAllocator(MemoryQuota->GetMkqlMemoryLimit());
diff --git a/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.cpp b/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.cpp
new file mode 100644
index 0000000000..08265d3f3e
--- /dev/null
+++ b/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.cpp
@@ -0,0 +1,26 @@
+#include "dq_task_runner_exec_ctx.h"
+
+#include <ydb/library/yql/dq/actors/spilling/channel_storage.h>
+
+
+namespace NYql {
+namespace NDq {
+
+TDqTaskRunnerExecutionContext::TDqTaskRunnerExecutionContext(TTxId txId, bool withSpilling, IDqChannelStorage::TWakeUpCallback&& wakeUp, const NActors::TActorContext& ctx)
+ : TxId_(txId)
+ , WakeUp_(std::move(wakeUp))
+ , Ctx_(ctx)
+ , WithSpilling_(withSpilling)
+{
+}
+
+IDqChannelStorage::TPtr TDqTaskRunnerExecutionContext::CreateChannelStorage(ui64 channelId) const {
+ if (WithSpilling_) {
+ return CreateDqChannelStorage(TxId_, channelId, WakeUp_, Ctx_);
+ } else {
+ return nullptr;
+ }
+}
+
+} // namespace NDq
+} // namespace NYql
diff --git a/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.h b/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.h
new file mode 100644
index 0000000000..e364dda0d2
--- /dev/null
+++ b/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.h
@@ -0,0 +1,25 @@
+#pragma once
+
+#include <ydb/library/yql/dq/runtime/dq_tasks_runner.h>
+#include <ydb/library/yql/dq/common/dq_common.h>
+#include <library/cpp/actors/core/actor.h>
+
+
+namespace NYql {
+namespace NDq {
+
+class TDqTaskRunnerExecutionContext : public TDqTaskRunnerExecutionContextBase {
+public:
+ TDqTaskRunnerExecutionContext(TTxId txId, bool withSpilling, IDqChannelStorage::TWakeUpCallback&& wakeUp, const NActors::TActorContext& ctx);
+
+ IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId) const override;
+
+private:
+ const TTxId TxId_;
+ const IDqChannelStorage::TWakeUpCallback WakeUp_;
+ const NActors::TActorContext& Ctx_;
+ const bool WithSpilling_;
+};
+
+} // namespace NDq
+} // namespace NYql
diff --git a/ydb/library/yql/dq/actors/compute/ya.make b/ydb/library/yql/dq/actors/compute/ya.make
index 486920dadf..183fb0eb85 100644
--- a/ydb/library/yql/dq/actors/compute/ya.make
+++ b/ydb/library/yql/dq/actors/compute/ya.make
@@ -1,6 +1,7 @@
LIBRARY()
SRCS(
+ dq_task_runner_exec_ctx.cpp
dq_async_compute_actor.cpp
dq_compute_actor_async_io_factory.cpp
dq_compute_actor_channels.cpp
@@ -24,6 +25,7 @@ PEERDIR(
ydb/library/yql/dq/proto
ydb/library/yql/dq/runtime
ydb/library/yql/dq/tasks
+ ydb/library/yql/dq/actors/spilling
ydb/library/yql/minikql/comp_nodes/llvm
ydb/library/yql/public/issue
ydb/core/quoter/public
diff --git a/ydb/library/yql/dq/actors/dq_events_ids.h b/ydb/library/yql/dq/actors/dq_events_ids.h
index 11b0225233..d097e6bd34 100644
--- a/ydb/library/yql/dq/actors/dq_events_ids.h
+++ b/ydb/library/yql/dq/actors/dq_events_ids.h
@@ -58,5 +58,15 @@ struct TDqComputeEvents {
static_assert(EvEnd < EventSpaceBegin((TDqEvents::ES_DQ_COMPUTE + 1)));
};
+struct TDqSpillingEvents {
+ enum EDqSpillingEvents {
+ EvWrite = EventSpaceBegin(TDqEvents::ES_DQ_COMPUTE) + 100,
+ EvWriteResult,
+ EvRead,
+ EvReadResult,
+ EvError,
+ };
+};
+
} // namespace NDq
} // namespace NYql
diff --git a/ydb/library/yql/dq/actors/spilling/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/dq/actors/spilling/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 0000000000..5c8ffc401e
--- /dev/null
+++ b/ydb/library/yql/dq/actors/spilling/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,33 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+add_subdirectory(ut)
+
+add_library(dq-actors-spilling)
+target_compile_options(dq-actors-spilling PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(dq-actors-spilling PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ ydb-library-services
+ yql-dq-common
+ yql-dq-actors
+ yql-dq-runtime
+ library-yql-utils
+ cpp-actors-core
+ cpp-actors-util
+ cpp-monlib-dynamic_counters
+ monlib-service-pages
+)
+target_sources(dq-actors-spilling PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/spilling/channel_storage.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/spilling/spilling_counters.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/spilling/spilling_file.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/spilling/spilling.cpp
+)
diff --git a/ydb/library/yql/dq/actors/spilling/CMakeLists.linux-aarch64.txt b/ydb/library/yql/dq/actors/spilling/CMakeLists.linux-aarch64.txt
new file mode 100644
index 0000000000..94acf3d36a
--- /dev/null
+++ b/ydb/library/yql/dq/actors/spilling/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,34 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+add_subdirectory(ut)
+
+add_library(dq-actors-spilling)
+target_compile_options(dq-actors-spilling PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(dq-actors-spilling PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ ydb-library-services
+ yql-dq-common
+ yql-dq-actors
+ yql-dq-runtime
+ library-yql-utils
+ cpp-actors-core
+ cpp-actors-util
+ cpp-monlib-dynamic_counters
+ monlib-service-pages
+)
+target_sources(dq-actors-spilling PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/spilling/channel_storage.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/spilling/spilling_counters.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/spilling/spilling_file.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/spilling/spilling.cpp
+)
diff --git a/ydb/library/yql/dq/actors/spilling/CMakeLists.linux-x86_64.txt b/ydb/library/yql/dq/actors/spilling/CMakeLists.linux-x86_64.txt
new file mode 100644
index 0000000000..94acf3d36a
--- /dev/null
+++ b/ydb/library/yql/dq/actors/spilling/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,34 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+add_subdirectory(ut)
+
+add_library(dq-actors-spilling)
+target_compile_options(dq-actors-spilling PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(dq-actors-spilling PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ ydb-library-services
+ yql-dq-common
+ yql-dq-actors
+ yql-dq-runtime
+ library-yql-utils
+ cpp-actors-core
+ cpp-actors-util
+ cpp-monlib-dynamic_counters
+ monlib-service-pages
+)
+target_sources(dq-actors-spilling PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/spilling/channel_storage.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/spilling/spilling_counters.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/spilling/spilling_file.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/spilling/spilling.cpp
+)
diff --git a/ydb/library/yql/dq/actors/spilling/CMakeLists.txt b/ydb/library/yql/dq/actors/spilling/CMakeLists.txt
new file mode 100644
index 0000000000..f8b31df0c1
--- /dev/null
+++ b/ydb/library/yql/dq/actors/spilling/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/library/yql/dq/actors/spilling/CMakeLists.windows-x86_64.txt b/ydb/library/yql/dq/actors/spilling/CMakeLists.windows-x86_64.txt
new file mode 100644
index 0000000000..5c8ffc401e
--- /dev/null
+++ b/ydb/library/yql/dq/actors/spilling/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,33 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+add_subdirectory(ut)
+
+add_library(dq-actors-spilling)
+target_compile_options(dq-actors-spilling PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(dq-actors-spilling PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ ydb-library-services
+ yql-dq-common
+ yql-dq-actors
+ yql-dq-runtime
+ library-yql-utils
+ cpp-actors-core
+ cpp-actors-util
+ cpp-monlib-dynamic_counters
+ monlib-service-pages
+)
+target_sources(dq-actors-spilling PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/spilling/channel_storage.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/spilling/spilling_counters.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/spilling/spilling_file.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/spilling/spilling.cpp
+)
diff --git a/ydb/library/yql/dq/actors/spilling/channel_storage.cpp b/ydb/library/yql/dq/actors/spilling/channel_storage.cpp
new file mode 100644
index 0000000000..4f5a55a23e
--- /dev/null
+++ b/ydb/library/yql/dq/actors/spilling/channel_storage.cpp
@@ -0,0 +1,242 @@
+#include "channel_storage.h"
+#include "spilling.h"
+#include "spilling_file.h"
+
+#include <ydb/library/yql/utils/yql_panic.h>
+#include <ydb/library/services/services.pb.h>
+
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <library/cpp/actors/core/hfunc.h>
+#include <library/cpp/actors/core/log.h>
+
+#include <util/generic/buffer.h>
+#include <util/generic/map.h>
+#include <util/generic/set.h>
+#include <util/generic/size_literals.h>
+
+
+namespace NYql::NDq {
+
+using namespace NActors;
+
+#define LOG_D(s) \
+ LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId_ << ", channelId: " << ChannelId_ << ". " << s)
+#define LOG_I(s) \
+ LOG_INFO_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId << ", channelId: " << ChannelId << ". " << s)
+#define LOG_E(s) \
+ LOG_ERROR_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId_ << ", channelId: " << ChannelId_ << ". " << s)
+#define LOG_C(s) \
+ LOG_CRIT_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId << ", channelId: " << ChannelId << ". " << s)
+#define LOG_W(s) \
+ LOG_WARN_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId << ", channelId: " << ChannelId << ". " << s)
+#define LOG_T(s) \
+ LOG_TRACE_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId_ << ", channelId: " << ChannelId_ << ". " << s)
+
+namespace {
+
+constexpr ui32 MAX_INFLIGHT_BLOBS_COUNT = 10;
+constexpr ui64 MAX_INFLIGHT_BLOBS_SIZE = 50_MB;
+
+class TDqChannelStorageActor : public TActorBootstrapped<TDqChannelStorageActor> {
+ using TBase = TActorBootstrapped<TDqChannelStorageActor>;
+
+public:
+ TDqChannelStorageActor(TTxId txId, ui64 channelId, IDqChannelStorage::TWakeUpCallback&& wakeUp)
+ : TxId_(txId)
+ , ChannelId_(channelId)
+ , WakeUp_(std::move(wakeUp)) {}
+
+ void Bootstrap() {
+ auto spillingActor = CreateDqLocalFileSpillingActor(TxId_, TStringBuilder() << "ChannelId: " << ChannelId_,
+ SelfId(), true);
+ SpillingActorId_ = Register(spillingActor);
+
+ Become(&TDqChannelStorageActor::WorkState);
+ }
+
+ static constexpr char ActorName[] = "DQ_CHANNEL_STORAGE";
+
+protected:
+ void PassAway() override {
+ Send(SpillingActorId_, new TEvents::TEvPoison);
+ TBase::PassAway();
+ }
+
+private:
+ STATEFN(WorkState) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvDqSpilling::TEvWriteResult, HandleWork);
+ hFunc(TEvDqSpilling::TEvReadResult, HandleWork);
+ hFunc(TEvDqSpilling::TEvError, HandleWork);
+ default:
+ Y_ABORT("TDqChannelStorageActor::WorkState unexpected event type: %" PRIx32 " event: %s",
+ ev->GetTypeRewrite(),
+ ev->ToString().data());
+ }
+ }
+
+ void HandleWork(TEvDqSpilling::TEvWriteResult::TPtr& ev) {
+ auto& msg = *ev->Get();
+ LOG_T("[TEvWriteResult] blobId: " << msg.BlobId);
+
+ auto it = WritingBlobs_.find(msg.BlobId);
+ if (it == WritingBlobs_.end()) {
+ LOG_E("Got unexpected TEvWriteResult, blobId: " << msg.BlobId);
+
+ Error_ = "Internal error";
+
+ Send(SpillingActorId_, new TEvents::TEvPoison);
+ return;
+ }
+
+ ui64 size = it->second;
+ WritingBlobsSize_ -= size;
+ WritingBlobs_.erase(it);
+
+ StoredBlobsCount_++;
+ StoredBlobsSize_ += size;
+ }
+
+ void HandleWork(TEvDqSpilling::TEvReadResult::TPtr& ev) {
+ auto& msg = *ev->Get();
+ LOG_T("[TEvReadResult] blobId: " << msg.BlobId << ", size: " << msg.Blob.size());
+
+ if (LoadingBlobs_.erase(msg.BlobId) != 1) {
+ LOG_E("[TEvReadResult] unexpected, blobId: " << msg.BlobId << ", size: " << msg.Blob.size());
+ return;
+ }
+
+ LoadedBlobs_[msg.BlobId].Swap(msg.Blob);
+ YQL_ENSURE(LoadedBlobs_[msg.BlobId].size() != 0);
+
+ if (LoadedBlobs_.size() == 1) {
+ WakeUp_();
+ }
+ }
+
+ void HandleWork(TEvDqSpilling::TEvError::TPtr& ev) {
+ auto& msg = *ev->Get();
+ LOG_D("[TEvError] " << msg.Message);
+
+ Error_.ConstructInPlace(msg.Message);
+ }
+
+public:
+ [[nodiscard]]
+ const TMaybe<TString>& GetError() const {
+ return Error_;
+ }
+
+ bool IsEmpty() const {
+ return WritingBlobs_.empty() && StoredBlobsCount_ == 0 && LoadedBlobs_.empty();
+ }
+
+ bool IsFull() const {
+ return WritingBlobs_.size() > MAX_INFLIGHT_BLOBS_COUNT || WritingBlobsSize_ > MAX_INFLIGHT_BLOBS_SIZE;
+ }
+
+ void Put(ui64 blobId, TRope&& blob) {
+ FailOnError();
+
+ // TODO: timeout
+ // TODO: limit inflight events
+
+ ui64 size = blob.size();
+
+ Send(SpillingActorId_, new TEvDqSpilling::TEvWrite(blobId, std::move(blob)));
+
+ WritingBlobs_.emplace(blobId, size);
+ WritingBlobsSize_ += size;
+ }
+
+ bool Get(ui64 blobId, TBuffer& blob) {
+ FailOnError();
+
+ auto loadedIt = LoadedBlobs_.find(blobId);
+ if (loadedIt != LoadedBlobs_.end()) {
+ YQL_ENSURE(loadedIt->second.size() != 0);
+ blob.Swap(loadedIt->second);
+ LoadedBlobs_.erase(loadedIt);
+ return true;
+ }
+
+ auto result = LoadingBlobs_.emplace(blobId);
+ if (result.second) {
+ Send(SpillingActorId_, new TEvDqSpilling::TEvRead(blobId, true));
+ }
+
+ return false;
+ }
+
+ void Terminate() {
+ PassAway();
+ }
+
+private:
+ void FailOnError() {
+ if (Error_) {
+ LOG_E("Error: " << *Error_);
+ ythrow TDqChannelStorageException() << "TxId: " << TxId_ << ", channelId: " << ChannelId_
+ << ", error: " << *Error_;
+ }
+ }
+
+private:
+ const TTxId TxId_;
+ const ui64 ChannelId_;
+ IDqChannelStorage::TWakeUpCallback WakeUp_;
+ TActorId SpillingActorId_;
+
+ TMap<ui64, ui64> WritingBlobs_; // blobId -> blobSize
+ ui64 WritingBlobsSize_ = 0;
+
+ ui32 StoredBlobsCount_ = 0;
+ ui64 StoredBlobsSize_ = 0;
+
+ TSet<ui64> LoadingBlobs_;
+ TMap<ui64, TBuffer> LoadedBlobs_;
+
+ TMaybe<TString> Error_;
+};
+
+
+class TDqChannelStorage : public IDqChannelStorage {
+public:
+ TDqChannelStorage(TTxId txId, ui64 channelId, TWakeUpCallback&& wakeUp, const TActorContext& ctx) {
+ SelfActor_ = new TDqChannelStorageActor(txId, channelId, std::move(wakeUp));
+ ctx.RegisterWithSameMailbox(SelfActor_);
+ }
+
+ ~TDqChannelStorage() {
+ SelfActor_->Terminate();
+ }
+
+ bool IsEmpty() const override {
+ return SelfActor_->IsEmpty();
+ }
+
+ bool IsFull() const override {
+ return SelfActor_->IsFull();
+ }
+
+ void Put(ui64 blobId, TRope&& blob) override {
+ SelfActor_->Put(blobId, std::move(blob));
+ }
+
+ bool Get(ui64 blobId, TBuffer& blob) override {
+ return SelfActor_->Get(blobId, blob);
+ }
+
+private:
+ TDqChannelStorageActor* SelfActor_;
+};
+
+} // anonymous namespace
+
+IDqChannelStorage::TPtr CreateDqChannelStorage(TTxId txId, ui64 channelId, IDqChannelStorage::TWakeUpCallback wakeUp,
+ const TActorContext& ctx)
+{
+ return new TDqChannelStorage(txId, channelId, std::move(wakeUp), ctx);
+}
+
+} // namespace NYql::NDq
diff --git a/ydb/core/kqp/runtime/kqp_channel_storage.h b/ydb/library/yql/dq/actors/spilling/channel_storage.h
index ec149722b8..f0d6905f02 100644
--- a/ydb/core/kqp/runtime/kqp_channel_storage.h
+++ b/ydb/library/yql/dq/actors/spilling/channel_storage.h
@@ -4,9 +4,9 @@
#include <ydb/library/yql/dq/runtime/dq_channel_storage.h>
#include <library/cpp/actors/core/actor.h>
-namespace NKikimr::NKqp {
+namespace NYql::NDq {
-NYql::NDq::IDqChannelStorage::TPtr CreateKqpChannelStorage(ui64 txId, ui64 channelId,
+NYql::NDq::IDqChannelStorage::TPtr CreateDqChannelStorage(TTxId txId, ui64 channelId,
NYql::NDq::IDqChannelStorage::TWakeUpCallback wakeUpCb, const NActors::TActorContext& ctx);
-} // namespace NKikimr::NKqp
+} // namespace NYql::NDq
diff --git a/ydb/library/yql/dq/actors/spilling/spilling.cpp b/ydb/library/yql/dq/actors/spilling/spilling.cpp
new file mode 100644
index 0000000000..9c8ef0f0df
--- /dev/null
+++ b/ydb/library/yql/dq/actors/spilling/spilling.cpp
@@ -0,0 +1 @@
+#include "spilling.h"
diff --git a/ydb/core/kqp/runtime/kqp_spilling.h b/ydb/library/yql/dq/actors/spilling/spilling.h
index 8cc9636946..b9d43de0a7 100644
--- a/ydb/core/kqp/runtime/kqp_spilling.h
+++ b/ydb/library/yql/dq/actors/spilling/spilling.h
@@ -1,16 +1,17 @@
#pragma once
-#include <ydb/core/kqp/common/kqp_event_ids.h>
-#include <ydb/core/protos/config.pb.h>
+#include <ydb/library/yql/dq/actors/dq_events_ids.h>
#include <library/cpp/actors/util/rope.h>
+#include <library/cpp/actors/core/event_local.h>
+#include <util/datetime/base.h>
#include <util/generic/buffer.h>
-namespace NKikimr::NKqp {
+namespace NYql::NDq {
-struct TEvKqpSpilling {
- struct TEvWrite : public TEventLocal<TEvWrite, TKqpSpillingEvents::EvWrite> {
+struct TEvDqSpilling {
+ struct TEvWrite : public NActors::TEventLocal<TEvWrite, TDqSpillingEvents::EvWrite> {
ui64 BlobId;
TRope Blob;
TMaybe<TDuration> Timeout;
@@ -19,14 +20,14 @@ struct TEvKqpSpilling {
: BlobId(blobId), Blob(std::move(blob)), Timeout(timeout) {}
};
- struct TEvWriteResult : public TEventLocal<TEvWriteResult, TKqpSpillingEvents::EvWriteResult> {
+ struct TEvWriteResult : public NActors::TEventLocal<TEvWriteResult, TDqSpillingEvents::EvWriteResult> {
ui64 BlobId;
TEvWriteResult(ui64 blobId)
: BlobId(blobId) {}
};
- struct TEvRead : public TEventLocal<TEvRead, TKqpSpillingEvents::EvRead> {
+ struct TEvRead : public NActors::TEventLocal<TEvRead, TDqSpillingEvents::EvRead> {
ui64 BlobId;
bool RemoveBlob;
TMaybe<TDuration> Timeout;
@@ -35,7 +36,7 @@ struct TEvKqpSpilling {
: BlobId(blobId), RemoveBlob(removeBlob), Timeout(timeout) {}
};
- struct TEvReadResult : public TEventLocal<TEvReadResult, TKqpSpillingEvents::EvReadResult> {
+ struct TEvReadResult : public NActors::TEventLocal<TEvReadResult, TDqSpillingEvents::EvReadResult> {
ui64 BlobId;
TBuffer Blob;
@@ -43,7 +44,7 @@ struct TEvKqpSpilling {
: BlobId(blobId), Blob(std::move(blob)) {}
};
- struct TEvError : public TEventLocal<TEvError, TKqpSpillingEvents::EvError> {
+ struct TEvError : public NActors::TEventLocal<TEvError, TDqSpillingEvents::EvError> {
TString Message;
TEvError(const TString& message)
@@ -51,4 +52,4 @@ struct TEvKqpSpilling {
};
};
-} // namespace NKikimr::NKqp
+} // namespace NYql::NDq
diff --git a/ydb/library/yql/dq/actors/spilling/spilling_counters.cpp b/ydb/library/yql/dq/actors/spilling/spilling_counters.cpp
new file mode 100644
index 0000000000..d3023385c1
--- /dev/null
+++ b/ydb/library/yql/dq/actors/spilling/spilling_counters.cpp
@@ -0,0 +1,15 @@
+#include "spilling_counters.h"
+
+namespace NYql::NDq {
+
+TSpillingCounters::TSpillingCounters(const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters) {
+ SpillingWriteBlobs = counters->GetCounter("Spilling/WriteBlobs", true);
+ SpillingReadBlobs = counters->GetCounter("Spilling/ReadBlobs", true);
+ SpillingStoredBlobs = counters->GetCounter("Spilling/StoredBlobs", false);
+ SpillingTotalSpaceUsed = counters->GetCounter("Spilling/TotalSpaceUsed", false);
+ SpillingTooBigFileErrors = counters->GetCounter("Spilling/TooBigFileErrors", true);
+ SpillingNoSpaceErrors = counters->GetCounter("Spilling/NoSpaceErrors", true);
+ SpillingIoErrors = counters->GetCounter("Spilling/IoErrors", true);
+}
+
+} // namespace NYql::NDq
diff --git a/ydb/library/yql/dq/actors/spilling/spilling_counters.h b/ydb/library/yql/dq/actors/spilling/spilling_counters.h
new file mode 100644
index 0000000000..4122c13e7e
--- /dev/null
+++ b/ydb/library/yql/dq/actors/spilling/spilling_counters.h
@@ -0,0 +1,22 @@
+#pragma once
+
+#include <library/cpp/monlib/dynamic_counters/counters.h>
+
+#include <util/generic/ptr.h>
+
+namespace NYql::NDq {
+
+struct TSpillingCounters : public TThrRefBase {
+
+ TSpillingCounters(const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters);
+
+ ::NMonitoring::TDynamicCounters::TCounterPtr SpillingWriteBlobs;
+ ::NMonitoring::TDynamicCounters::TCounterPtr SpillingReadBlobs;
+ ::NMonitoring::TDynamicCounters::TCounterPtr SpillingStoredBlobs;
+ ::NMonitoring::TDynamicCounters::TCounterPtr SpillingTotalSpaceUsed;
+ ::NMonitoring::TDynamicCounters::TCounterPtr SpillingTooBigFileErrors;
+ ::NMonitoring::TDynamicCounters::TCounterPtr SpillingNoSpaceErrors;
+ ::NMonitoring::TDynamicCounters::TCounterPtr SpillingIoErrors;
+};
+
+} // namespace NYql::NDq
diff --git a/ydb/core/kqp/runtime/kqp_spilling_file.cpp b/ydb/library/yql/dq/actors/spilling/spilling_file.cpp
index 190ac2ac4f..b2f7c80603 100644
--- a/ydb/core/kqp/runtime/kqp_spilling_file.cpp
+++ b/ydb/library/yql/dq/actors/spilling/spilling_file.cpp
@@ -1,23 +1,20 @@
-#include "kqp_spilling.h"
-#include "kqp_spilling_file.h"
-
-#include <ydb/core/actorlib_impl/long_timer.h>
-#include <ydb/core/base/appdata.h>
-#include <ydb/core/kqp/common/kqp.h>
-#include <ydb/core/mon/mon.h>
+#include "spilling.h"
+#include "spilling_file.h"
+#include <ydb/library/services/services.pb.h>
#include <ydb/library/yql/utils/yql_panic.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
#include <library/cpp/actors/core/hfunc.h>
#include <library/cpp/actors/core/log.h>
+#include <library/cpp/actors/core/events.h>
#include <library/cpp/monlib/service/pages/templates.h>
#include <util/folder/path.h>
#include <util/stream/file.h>
#include <util/thread/pool.h>
-namespace NKikimr::NKqp {
+namespace NYql::NDq {
using namespace NActors;
@@ -26,31 +23,31 @@ namespace {
// Read, write, and execute by owner only
constexpr int DIR_MODE = S_IRWXU;
-#define LOG_D(s) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_BLOBS_STORAGE, s)
-#define LOG_I(s) LOG_INFO_S(*TlsActivationContext, NKikimrServices::KQP_BLOBS_STORAGE, s)
-#define LOG_E(s) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::KQP_BLOBS_STORAGE, s)
-#define LOG_C(s) LOG_CRIT_S(*TlsActivationContext, NKikimrServices::KQP_BLOBS_STORAGE, s)
+#define LOG_D(s) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, s)
+#define LOG_I(s) LOG_INFO_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, s)
+#define LOG_E(s) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, s)
+#define LOG_C(s) LOG_CRIT_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, s)
-#define A_LOG_D(s) LOG_DEBUG_S(*ActorSystem, NKikimrServices::KQP_BLOBS_STORAGE, s)
-#define A_LOG_E(s) LOG_ERROR_S(*ActorSystem, NKikimrServices::KQP_BLOBS_STORAGE, s)
+#define A_LOG_D(s) LOG_DEBUG_S(*ActorSystem, NKikimrServices::KQP_COMPUTE, s)
+#define A_LOG_E(s) LOG_ERROR_S(*ActorSystem, NKikimrServices::KQP_COMPUTE, s)
#define REMOVE_FILES 1
// Local File Storage Events
-struct TEvKqpSpillingLocalFile {
+struct TEvDqSpillingLocalFile {
enum EEv {
- EvOpenFile = EventSpaceBegin(TKikimrEvents::ES_PRIVATE),
+ EvOpenFile = EventSpaceBegin(TEvents::ES_PRIVATE),
EvCloseFile,
LastEvent = EvCloseFile
};
struct TEvOpenFile : public TEventLocal<TEvOpenFile, EvOpenFile> {
- ui64 TxId;
+ TTxId TxId;
TString Description; // for viewer & logs only
bool RemoveBlobsAfterRead;
- TEvOpenFile(ui64 txId, const TString& description, bool removeBlobsAfterRead)
+ TEvOpenFile(TTxId txId, const TString& description, bool removeBlobsAfterRead)
: TxId(txId), Description(description), RemoveBlobsAfterRead(removeBlobsAfterRead) {}
};
@@ -66,101 +63,102 @@ struct TEvKqpSpillingLocalFile {
// It is a simple proxy between client and spilling service with one feature --
// it provides human-readable description for logs and viewer.
-class TKqpLocalFileSpillingActor : public TActorBootstrapped<TKqpLocalFileSpillingActor> {
+class TDqLocalFileSpillingActor : public TActorBootstrapped<TDqLocalFileSpillingActor> {
public:
- TKqpLocalFileSpillingActor(ui64 txId, const TString& details, const TActorId& client, bool removeBlobsAfterRead)
- : TxId(txId)
- , Details(details)
- , ClientActorId(client)
- , RemoveBlobsAfterRead(removeBlobsAfterRead) {}
+ TDqLocalFileSpillingActor(TTxId txId, const TString& details, const TActorId& client, bool removeBlobsAfterRead)
+ : TxId_(txId)
+ , Details_(details)
+ , ClientActorId_(client)
+ , RemoveBlobsAfterRead_(removeBlobsAfterRead)
+ {}
void Bootstrap() {
- ServiceActorId = MakeKqpLocalFileSpillingServiceID(SelfId().NodeId());
- YQL_ENSURE(ServiceActorId);
+ ServiceActorId_ = MakeDqLocalFileSpillingServiceID(SelfId().NodeId());
+ YQL_ENSURE(ServiceActorId_);
- LOG_D("Register LocalFileSpillingActor " << SelfId() << " at service " << ServiceActorId);
- Send(ServiceActorId, new TEvKqpSpillingLocalFile::TEvOpenFile(TxId, Details, RemoveBlobsAfterRead));
+ LOG_D("Register LocalFileSpillingActor " << SelfId() << " at service " << ServiceActorId_);
+ Send(ServiceActorId_, new TEvDqSpillingLocalFile::TEvOpenFile(TxId_, Details_, RemoveBlobsAfterRead_));
- Become(&TKqpLocalFileSpillingActor::WorkState);
+ Become(&TDqLocalFileSpillingActor::WorkState);
}
- static constexpr char ActorName[] = "KQP_LOCAL_FILE_SPILLING";
+ static constexpr char ActorName[] = "DQ_LOCAL_FILE_SPILLING";
private:
STRICT_STFUNC(WorkState,
- hFunc(TEvKqpSpilling::TEvWrite, HandleWork)
- hFunc(TEvKqpSpilling::TEvWriteResult, HandleWork)
- hFunc(TEvKqpSpilling::TEvRead, HandleWork)
- hFunc(TEvKqpSpilling::TEvReadResult, HandleWork)
- hFunc(TEvKqpSpilling::TEvError, HandleWork)
+ hFunc(TEvDqSpilling::TEvWrite, HandleWork)
+ hFunc(TEvDqSpilling::TEvWriteResult, HandleWork)
+ hFunc(TEvDqSpilling::TEvRead, HandleWork)
+ hFunc(TEvDqSpilling::TEvReadResult, HandleWork)
+ hFunc(TEvDqSpilling::TEvError, HandleWork)
hFunc(TEvents::TEvPoison, HandleWork)
);
- void HandleWork(TEvKqpSpilling::TEvWrite::TPtr& ev) {
+ void HandleWork(TEvDqSpilling::TEvWrite::TPtr& ev) {
ValidateSender(ev->Sender);
- Send(ServiceActorId, ev->Release().Release());
+ Send(ServiceActorId_, ev->Release().Release());
}
- void HandleWork(TEvKqpSpilling::TEvWriteResult::TPtr& ev) {
- if (!Send(ClientActorId, ev->Release().Release())) {
+ void HandleWork(TEvDqSpilling::TEvWriteResult::TPtr& ev) {
+ if (!Send(ClientActorId_, ev->Release().Release())) {
ClientLost();
}
}
- void HandleWork(TEvKqpSpilling::TEvRead::TPtr& ev) {
+ void HandleWork(TEvDqSpilling::TEvRead::TPtr& ev) {
ValidateSender(ev->Sender);
- Send(ServiceActorId, ev->Release().Release());
+ Send(ServiceActorId_, ev->Release().Release());
}
- void HandleWork(TEvKqpSpilling::TEvReadResult::TPtr& ev) {
- if (!Send(ClientActorId, ev->Release().Release())) {
+ void HandleWork(TEvDqSpilling::TEvReadResult::TPtr& ev) {
+ if (!Send(ClientActorId_, ev->Release().Release())) {
ClientLost();
}
}
- void HandleWork(TEvKqpSpilling::TEvError::TPtr& ev) {
- Send(ClientActorId, ev->Release().Release());
+ void HandleWork(TEvDqSpilling::TEvError::TPtr& ev) {
+ Send(ClientActorId_, ev->Release().Release());
}
void HandleWork(TEvents::TEvPoison::TPtr& ev) {
ValidateSender(ev->Sender);
- Send(ServiceActorId, new TEvKqpSpillingLocalFile::TEvCloseFile);
+ Send(ServiceActorId_, new TEvDqSpillingLocalFile::TEvCloseFile);
PassAway();
}
private:
void ValidateSender(const TActorId& sender) {
- YQL_ENSURE(ClientActorId == sender, "" << ClientActorId << " != " << sender);
+ YQL_ENSURE(ClientActorId_ == sender, "" << ClientActorId_ << " != " << sender);
}
void ClientLost() {
- Send(ServiceActorId, new TEvKqpSpillingLocalFile::TEvCloseFile("Client lost"));
+ Send(ServiceActorId_, new TEvDqSpillingLocalFile::TEvCloseFile("Client lost"));
PassAway();
}
private:
- const ui64 TxId;
- const TString Details;
- const TActorId ClientActorId;
- const bool RemoveBlobsAfterRead;
- TActorId ServiceActorId;
+ const TTxId TxId_;
+ const TString Details_;
+ const TActorId ClientActorId_;
+ const bool RemoveBlobsAfterRead_;
+ TActorId ServiceActorId_;
};
-class TKqpLocalFileSpillingService : public TActorBootstrapped<TKqpLocalFileSpillingService> {
+class TDqLocalFileSpillingService : public TActorBootstrapped<TDqLocalFileSpillingService> {
private:
struct TEvPrivate {
enum EEv {
- EvCloseFileResponse = TEvKqpSpillingLocalFile::EEv::LastEvent + 1,
+ EvCloseFileResponse = TEvDqSpillingLocalFile::EEv::LastEvent + 1,
EvWriteFileResponse,
EvReadFileResponse,
LastEvent
};
- static_assert(EEv::LastEvent - EventSpaceBegin(TKikimrEvents::ES_PRIVATE) < 16);
+ static_assert(EEv::LastEvent - EventSpaceBegin(TEvents::ES_PRIVATE) < 16);
struct TEvCloseFileResponse : public TEventLocal<TEvCloseFileResponse, EvCloseFileResponse> {
TActorId Client;
@@ -192,58 +190,54 @@ private:
using TFilesIt = __yhashtable_iterator<std::pair<const TActorId, TFileDesc>>;
public:
- TKqpLocalFileSpillingService(const NKikimrConfig::TTableServiceConfig::TSpillingServiceConfig::TLocalFileConfig& config,
- TIntrusivePtr<TKqpCounters> counters)
- : Config(config)
- , Counters(counters)
+ TDqLocalFileSpillingService(const TFileSpillingServiceConfig& config,
+ TIntrusivePtr<TSpillingCounters> counters)
+ : Config_(config)
+ , Counters_(counters)
{
- IoThreadPool = CreateThreadPool(Config.GetIoThreadPool().GetWorkersCount(),
- Config.GetIoThreadPool().GetQueueSize(), IThreadPool::TParams().SetThreadNamePrefix("KqpSpilling"));
+ IoThreadPool_ = CreateThreadPool(Config_.IoThreadPoolWorkersCount,
+ Config_.IoThreadPoolQueueSize, IThreadPool::TParams().SetThreadNamePrefix("DqSpilling"));
}
void Bootstrap() {
- Root = Config.GetRoot();
- Root /= (TStringBuilder() << "node_" << SelfId().NodeId());
+ Root_ = Config_.Root;
+ Root_ /= (TStringBuilder() << "node_" << SelfId().NodeId());
- LOG_I("Init KQP local file spilling service at " << Root << ", actor: " << SelfId());
+ LOG_I("Init DQ local file spilling service at " << Root_ << ", actor: " << SelfId());
try {
- if (Root.IsSymlink()) {
- throw TIoException() << Root << " is a symlink, can not start Spilling Service";
+ if (Root_.IsSymlink()) {
+ throw TIoException() << Root_ << " is a symlink, can not start Spilling Service";
}
- Root.ForceDelete();
- Root.MkDirs(DIR_MODE);
+ Root_.ForceDelete();
+ Root_.MkDirs(DIR_MODE);
} catch (...) {
LOG_E(CurrentExceptionMessage());
- Become(&TKqpLocalFileSpillingService::BrokenState);
+ Become(&TDqLocalFileSpillingService::BrokenState);
return;
}
- NActors::TMon* mon = AppData()->Mon;
- if (mon) {
- NMonitoring::TIndexMonPage* actorsMonPage = mon->RegisterIndexPage("actors", "Actors");
- mon->RegisterActorPage(actorsMonPage, "kqp_spilling_file", "KQP Local File Spilling Service", false,
- TlsActivationContext->ExecutorThread.ActorSystem, SelfId());
- }
-
- Become(&TKqpLocalFileSpillingService::WorkState);
+ Become(&TDqLocalFileSpillingService::WorkState);
}
- static constexpr char ActorName[] = "KQP_LOCAL_FILE_SPILLING_SERVICE";
+ static constexpr char ActorName[] = "DQ_LOCAL_FILE_SPILLING_SERVICE";
protected:
void PassAway() override {
- IoThreadPool->Stop();
+ IoThreadPool_->Stop();
IActor::PassAway();
+ if (Config_.CleanupOnShutdown) {
+ Root_.ForceDelete();
+ }
}
private:
STATEFN(BrokenState) {
switch (ev->GetTypeRewrite()) {
- case TEvKqpSpillingLocalFile::TEvOpenFile::EventType:
- case TEvKqpSpillingLocalFile::TEvCloseFile::EventType:
- case TEvKqpSpilling::TEvWrite::EventType:
- case TEvKqpSpilling::TEvRead::EventType: {
+ case TEvDqSpillingLocalFile::TEvOpenFile::EventType:
+ case TEvDqSpillingLocalFile::TEvCloseFile::EventType:
+ case TEvDqSpilling::TEvWrite::EventType:
+ case TEvDqSpilling::TEvRead::EventType: {
HandleBroken(ev->Sender);
break;
}
@@ -256,7 +250,7 @@ private:
void HandleBroken(const TActorId& from) {
LOG_E("Service is broken, send error to client " << from);
- Send(from, new TEvKqpSpilling::TEvError("Service not started"));
+ Send(from, new TEvDqSpilling::TEvError("Service not started"));
}
void HandleBroken(NMon::TEvHttpInfo::TPtr& ev) {
@@ -265,43 +259,43 @@ private:
private:
STRICT_STFUNC(WorkState,
- hFunc(TEvKqpSpillingLocalFile::TEvOpenFile, HandleWork)
- hFunc(TEvKqpSpillingLocalFile::TEvCloseFile, HandleWork)
+ hFunc(TEvDqSpillingLocalFile::TEvOpenFile, HandleWork)
+ hFunc(TEvDqSpillingLocalFile::TEvCloseFile, HandleWork)
hFunc(TEvPrivate::TEvCloseFileResponse, HandleWork)
- hFunc(TEvKqpSpilling::TEvWrite, HandleWork)
+ hFunc(TEvDqSpilling::TEvWrite, HandleWork)
hFunc(TEvPrivate::TEvWriteFileResponse, HandleWork)
- hFunc(TEvKqpSpilling::TEvRead, HandleWork)
+ hFunc(TEvDqSpilling::TEvRead, HandleWork)
hFunc(TEvPrivate::TEvReadFileResponse, HandleWork)
hFunc(NMon::TEvHttpInfo, HandleWork)
cFunc(TEvents::TEvPoison::EventType, PassAway)
);
- void HandleWork(TEvKqpSpillingLocalFile::TEvOpenFile::TPtr& ev) {
+ void HandleWork(TEvDqSpillingLocalFile::TEvOpenFile::TPtr& ev) {
auto& msg = *ev->Get();
LOG_D("[OpenFile] TxId: " << msg.TxId << ", desc: " << msg.Description << ", from: " << ev->Sender
<< ", removeBlobsAfterRead: " << msg.RemoveBlobsAfterRead);
- auto it = Files.find(ev->Sender);
- if (it != Files.end()) {
+ auto it = Files_.find(ev->Sender);
+ if (it != Files_.end()) {
LOG_E("[OpenFile] Can not open file: already exists. TxId: " << msg.TxId << ", desc: " << msg.Description);
- Send(ev->Sender, new TEvKqpSpilling::TEvError("File already exists"));
+ Send(ev->Sender, new TEvDqSpilling::TEvError("File already exists"));
return;
}
- auto& fd = Files[ev->Sender];
+ auto& fd = Files_[ev->Sender];
fd.TxId = msg.TxId;
fd.Description = std::move(msg.Description);
fd.RemoveBlobsAfterRead = msg.RemoveBlobsAfterRead;
fd.OpenAt = TInstant::Now();
}
- void HandleWork(TEvKqpSpillingLocalFile::TEvCloseFile::TPtr& ev) {
+ void HandleWork(TEvDqSpillingLocalFile::TEvCloseFile::TPtr& ev) {
auto& msg = *ev->Get();
LOG_D("[CloseFile] from: " << ev->Sender << ", error: " << msg.Error);
- auto it = Files.find(ev->Sender);
- if (it == Files.end()) {
+ auto it = Files_.find(ev->Sender);
+ if (it == Files_.end()) {
LOG_E("[CloseFile] Can not close file: not found. From: " << ev->Sender << ", error: " << msg.Error);
return;
}
@@ -342,8 +336,8 @@ private:
auto& msg = *ev->Get();
LOG_D("[CloseFileResponse] from: " << msg.Client);
- auto it = Files.find(msg.Client);
- if (it == Files.end()) {
+ auto it = Files_.find(msg.Client);
+ if (it == Files_.end()) {
LOG_E("[CloseFileResponse] Can not find file from: " << msg.Client);
return;
}
@@ -353,22 +347,22 @@ private:
blobs += fp.Blobs.size();
}
- Counters->SpillingStoredBlobs->Sub(blobs);
- Counters->SpillingTotalSpaceUsed->Sub(it->second.TotalSize);
+ Counters_->SpillingStoredBlobs->Sub(blobs);
+ Counters_->SpillingTotalSpaceUsed->Sub(it->second.TotalSize);
MoveFileToClosed(it);
}
- void HandleWork(TEvKqpSpilling::TEvWrite::TPtr& ev) {
+ void HandleWork(TEvDqSpilling::TEvWrite::TPtr& ev) {
auto& msg = *ev->Get();
LOG_D("[Write] from: " << ev->Sender << ", blobId: " << msg.BlobId << ", bytes: " << msg.Blob.size());
- auto it = Files.find(ev->Sender);
- if (it == Files.end()) {
+ auto it = Files_.find(ev->Sender);
+ if (it == Files_.end()) {
LOG_E("[Write] File not found. "
<< "From: " << ev->Sender << ", blobId: " << msg.BlobId << ", bytes: " << msg.Blob.size());
- Send(ev->Sender, new TEvKqpSpilling::TEvError("File not found"));
+ Send(ev->Sender, new TEvDqSpilling::TEvError("File not found"));
return;
}
@@ -378,37 +372,37 @@ private:
LOG_E("[Write] File already closed. "
<< "From: " << ev->Sender << ", blobId: " << msg.BlobId << ", bytes: " << msg.Blob.size());
- Send(ev->Sender, new TEvKqpSpilling::TEvError("File already closed"));
+ Send(ev->Sender, new TEvDqSpilling::TEvError("File already closed"));
return;
}
- if (fd.TotalSize + msg.Blob.size() > Config.GetMaxFileSize()) {
+ if (Config_.MaxFileSize && fd.TotalSize + msg.Blob.size() > Config_.MaxFileSize) {
LOG_E("[Write] File size limit exceeded. "
<< "From: " << ev->Sender << ", blobId: " << msg.BlobId << ", bytes: " << msg.Blob.size());
- Send(ev->Sender, new TEvKqpSpilling::TEvError("File size limit exceeded"));
+ Send(ev->Sender, new TEvDqSpilling::TEvError("File size limit exceeded"));
- Counters->SpillingTooBigFileErrors->Inc();
+ Counters_->SpillingTooBigFileErrors->Inc();
return;
}
- if (TotalSize + msg.Blob.size() > Config.GetMaxTotalSize()) {
+ if (Config_.MaxTotalSize && TotalSize_ + msg.Blob.size() > Config_.MaxTotalSize) {
LOG_E("[Write] Total size limit exceeded. "
<< "From: " << ev->Sender << ", blobId: " << msg.BlobId << ", bytes: " << msg.Blob.size());
- Send(ev->Sender, new TEvKqpSpilling::TEvError("Total size limit exceeded"));
+ Send(ev->Sender, new TEvDqSpilling::TEvError("Total size limit exceeded"));
- Counters->SpillingNoSpaceErrors->Inc();
+ Counters_->SpillingNoSpaceErrors->Inc();
return;
}
fd.TotalSize += msg.Blob.size();
- TotalSize += msg.Blob.size();
+ TotalSize_ += msg.Blob.size();
TFileDesc::TFilePart* fp = fd.PartsList.empty() ? nullptr : &fd.PartsList.back();
bool newFile = false;
- if (!fp || (fd.RemoveBlobsAfterRead && (fp->Size + msg.Blob.size() > Config.GetMaxFilePartSize()))) {
+ if (!fp || (fd.RemoveBlobsAfterRead && (Config_.MaxFilePartSize && fp->Size + msg.Blob.size() > Config_.MaxFilePartSize))) {
if (!fd.PartsList.empty()) {
fd.PartsList.back().Last = false;
}
@@ -418,7 +412,7 @@ private:
fd.Parts.emplace(msg.BlobId, fp);
auto fname = TStringBuilder() << fd.TxId << "_" << fd.Description << "_" << fd.NextPartListIndex++;
- fp->FileName = (Root / fname).GetPath();
+ fp->FileName = (Root_ / fname).GetPath();
LOG_D("[Write] create new FilePart " << fp->FileName);
newFile = true;
@@ -448,12 +442,12 @@ private:
auto& msg = *ev->Get();
LOG_D("[WriteFileResponse] from: " << msg.Client << ", blobId: " << msg.BlobId << ", error: " << msg.Error);
- auto it = Files.find(msg.Client);
- if (it == Files.end()) {
+ auto it = Files_.find(msg.Client);
+ if (it == Files_.end()) {
LOG_E("[WriteFileResponse] Can not write file: not found. "
<< "From: " << msg.Client << ", blobId: " << msg.BlobId << ", error: " << msg.Error);
- Send(ev->Sender, new TEvKqpSpilling::TEvError("Internal error"));
+ Send(ev->Sender, new TEvDqSpilling::TEvError("Internal error"));
return;
}
@@ -472,8 +466,8 @@ private:
fd.TotalWriteBytes += blobDesc.Size;
- Counters->SpillingStoredBlobs->Inc();
- Counters->SpillingTotalSpaceUsed->Add(blobDesc.Size);
+ Counters_->SpillingStoredBlobs->Inc();
+ Counters_->SpillingTotalSpaceUsed->Add(blobDesc.Size);
if (msg.NewFileHandle) {
fp->FileHandle.Swap(msg.NewFileHandle);
@@ -484,32 +478,32 @@ private:
fd.Error = "File part not found";
}
- Counters->SpillingIoErrors->Inc();
+ Counters_->SpillingIoErrors->Inc();
}
if (fd.Error) {
- Send(msg.Client, new TEvKqpSpilling::TEvError(*fd.Error));
+ Send(msg.Client, new TEvDqSpilling::TEvError(*fd.Error));
fd.Ops.clear();
CloseFile(it, fd.Error);
return;
}
- Counters->SpillingWriteBlobs->Inc();
+ Counters_->SpillingWriteBlobs->Inc();
- Send(msg.Client, new TEvKqpSpilling::TEvWriteResult(msg.BlobId));
+ Send(msg.Client, new TEvDqSpilling::TEvWriteResult(msg.BlobId));
RunNextOp(fd);
}
- void HandleWork(TEvKqpSpilling::TEvRead::TPtr& ev) {
+ void HandleWork(TEvDqSpilling::TEvRead::TPtr& ev) {
auto& msg = *ev->Get();
LOG_D("[Read] from: " << ev->Sender << ", blobId: " << msg.BlobId);
- auto it = Files.find(ev->Sender);
- if (it == Files.end()) {
+ auto it = Files_.find(ev->Sender);
+ if (it == Files_.end()) {
LOG_E("[Read] Can not read file: not found. From: " << ev->Sender << ", blobId: " << msg.BlobId);
- Send(ev->Sender, new TEvKqpSpilling::TEvError("File not found"));
+ Send(ev->Sender, new TEvDqSpilling::TEvError("File not found"));
return;
}
@@ -518,7 +512,7 @@ private:
if (fd.CloseAt) {
LOG_E("[Read] Can not read file: closed. From: " << ev->Sender << ", blobId: " << msg.BlobId);
- Send(ev->Sender, new TEvKqpSpilling::TEvError("Closed"));
+ Send(ev->Sender, new TEvDqSpilling::TEvError("Closed"));
return;
}
@@ -526,7 +520,7 @@ private:
if (partIt == fd.Parts.end()) {
LOG_E("[Read] Can not read file: part not found. From: " << ev->Sender << ", blobId: " << msg.BlobId);
- Send(ev->Sender, new TEvKqpSpilling::TEvError("File part not found"));
+ Send(ev->Sender, new TEvDqSpilling::TEvError("File part not found"));
fd.Ops.clear();
TMaybe<TString> err = "Part not found";
@@ -540,7 +534,7 @@ private:
if (blobIt == fp->Blobs.end()) {
LOG_E("[Read] Can not read file: blob not found in the part. From: " << ev->Sender << ", blobId: " << msg.BlobId);
- Send(ev->Sender, new TEvKqpSpilling::TEvError("Blob not found in the file part"));
+ Send(ev->Sender, new TEvDqSpilling::TEvError("Blob not found in the file part"));
fd.Ops.clear();
TMaybe<TString> err = "Blob not found in the file part";
@@ -581,12 +575,12 @@ private:
LOG_D("[ReadFileResponse] from: " << msg.Client << ", blobId: " << msg.BlobId << ", removed: " << msg.Removed
<< ", error: " << msg.Error);
- auto it = Files.find(msg.Client);
- if (it == Files.end()) {
+ auto it = Files_.find(msg.Client);
+ if (it == Files_.end()) {
LOG_E("[ReadFileResponse] Can not read file: not found. "
<< "From: " << msg.Client << ", blobId: " << msg.BlobId << ", error: " << msg.Error);
- Send(ev->Sender, new TEvKqpSpilling::TEvError("Internal error"));
+ Send(ev->Sender, new TEvDqSpilling::TEvError("Internal error"));
return;
}
@@ -606,10 +600,10 @@ private:
if (msg.Removed) {
fd.TotalSize -= fp->Size;
- TotalSize -= fp->Size;
+ TotalSize_ -= fp->Size;
- Counters->SpillingTotalSpaceUsed->Sub(fp->Size);
- Counters->SpillingStoredBlobs->Sub(fp->Blobs.size());
+ Counters_->SpillingTotalSpaceUsed->Sub(fp->Size);
+ Counters_->SpillingStoredBlobs->Sub(fp->Blobs.size());
fd.Parts.erase(msg.BlobId);
fd.PartsList.remove_if([fp](const auto& x) { return &x == fp; });
@@ -621,40 +615,40 @@ private:
}
if (fd.Error) {
- Send(msg.Client, new TEvKqpSpilling::TEvError(*fd.Error));
+ Send(msg.Client, new TEvDqSpilling::TEvError(*fd.Error));
fd.Ops.clear();
CloseFile(it, fd.Error);
return;
}
- Counters->SpillingReadBlobs->Inc();
+ Counters_->SpillingReadBlobs->Inc();
- Send(msg.Client, new TEvKqpSpilling::TEvReadResult(msg.BlobId, std::move(msg.Blob)));
+ Send(msg.Client, new TEvDqSpilling::TEvReadResult(msg.BlobId, std::move(msg.Blob)));
RunNextOp(fd);
}
void HandleWork(NMon::TEvHttpInfo::TPtr& ev) {
TStringStream s;
- TMap<ui64, TVector<const TFileDesc*>> byTx;
- for (const auto& fd : Files) {
+ TMap<TTxId, TVector<const TFileDesc*>> byTx;
+ for (const auto& fd : Files_) {
byTx[fd.second.TxId].push_back(&fd.second);
}
HTML(s) {
TAG(TH2) { s << "Configuration"; }
PRE() {
- s << " - Root: " << Config.GetRoot() << Endl;
- s << " - MaxTotalSize: " << Config.GetMaxTotalSize() << Endl;
- s << " - MaxFileSize: " << Config.GetMaxFileSize() << Endl;
- s << " - MaxFilePartSize: " << Config.GetMaxFilePartSize() << Endl;
- s << " - IO thread pool, workers: " << Config.GetIoThreadPool().GetWorkersCount()
- << ", queue: " << Config.GetIoThreadPool().GetQueueSize() << Endl;
+ s << " - Root: " << Config_.Root << Endl;
+ s << " - MaxTotalSize: " << Config_.MaxTotalSize << Endl;
+ s << " - MaxFileSize: " << Config_.MaxFileSize << Endl;
+ s << " - MaxFilePartSize: " << Config_.MaxFilePartSize << Endl;
+ s << " - IO thread pool, workers: " << Config_.IoThreadPoolWorkersCount
+ << ", queue: " << Config_.IoThreadPoolQueueSize << Endl;
}
TAG(TH2) { s << "Active files"; }
- PRE() { s << "Used space: " << TotalSize << Endl; }
+ PRE() { s << "Used space: " << TotalSize_ << Endl; }
for (const auto& tx : byTx) {
TAG(TH2) { s << "Transaction " << tx.first; }
@@ -692,7 +686,7 @@ private:
TAG(TH2) { s << "Last closed files"; }
UL() {
- for (auto it = ClosedFiles.rbegin(); it != ClosedFiles.rend(); ++it) {
+ for (auto it = ClosedFiles_.rbegin(); it != ClosedFiles_.rend(); ++it) {
auto& fd = *it;
LI() { s << "Transaction: " << fd.TxId << ", " << fd.Description; }
PRE() {
@@ -720,7 +714,7 @@ private:
} else {
fd.HasActiveOp = true;
// TODO: retry if fails
- IoThreadPool->SafeAddAndOwn(std::move(op));
+ IoThreadPool_->SafeAddAndOwn(std::move(op));
}
}
@@ -736,12 +730,12 @@ private:
}
void MoveFileToClosed(TFilesIt it) {
- TotalSize -= it->second.TotalSize;
- ClosedFiles.emplace_back(TClosedFileDesc(std::move(it->second)));
- while (ClosedFiles.size() > 100) {
- ClosedFiles.pop_front();
+ TotalSize_ -= it->second.TotalSize;
+ ClosedFiles_.emplace_back(TClosedFileDesc(std::move(it->second)));
+ while (ClosedFiles_.size() > 100) {
+ ClosedFiles_.pop_front();
}
- Files.erase(it);
+ Files_.erase(it);
}
private:
@@ -870,12 +864,8 @@ private:
};
private:
- const NKikimrConfig::TTableServiceConfig::TSpillingServiceConfig::TLocalFileConfig Config;
- TFsPath Root;
- TIntrusivePtr<TKqpCounters> Counters;
-
struct TFileDesc {
- ui64 TxId = 0;
+ TTxId TxId;
TString Description;
bool RemoveBlobsAfterRead = false;
TInstant OpenAt;
@@ -919,7 +909,7 @@ private:
};
struct TClosedFileDesc {
- ui64 TxId;
+ TTxId TxId;
TString Description;
bool RemoveBlobsAfterRead;
TInstant OpenAt;
@@ -940,28 +930,32 @@ private:
, WaitTime(fd.TotalWaitTime)
, WriteBytes(fd.TotalWriteBytes)
, ReadBytes(fd.TotalReadBytes)
- , Error(std::move(fd.Error)) {}
+ , Error(std::move(fd.Error))
+ {}
};
- THolder<IThreadPool> IoThreadPool;
- THashMap<TActorId, TFileDesc> Files;
- TList<const TClosedFileDesc> ClosedFiles;
- ui64 TotalSize = 0;
+private:
+ const TFileSpillingServiceConfig Config_;
+ TFsPath Root_;
+ TIntrusivePtr<TSpillingCounters> Counters_;
+
+ THolder<IThreadPool> IoThreadPool_;
+ THashMap<TActorId, TFileDesc> Files_;
+ TList<const TClosedFileDesc> ClosedFiles_;
+ ui64 TotalSize_ = 0;
};
} // anonymous namespace
-IActor* CreateKqpLocalFileSpillingActor(ui64 txId, const TString& details, const TActorId& client,
+IActor* CreateDqLocalFileSpillingActor(TTxId txId, const TString& details, const TActorId& client,
bool removeBlobsAfterRead)
{
- return new TKqpLocalFileSpillingActor(txId, details, client, removeBlobsAfterRead);
+ return new TDqLocalFileSpillingActor(txId, details, client, removeBlobsAfterRead);
}
-IActor* CreateKqpLocalFileSpillingService(
- const NKikimrConfig::TTableServiceConfig::TSpillingServiceConfig::TLocalFileConfig& config,
- TIntrusivePtr<TKqpCounters> counters)
+IActor* CreateDqLocalFileSpillingService(const TFileSpillingServiceConfig& config, TIntrusivePtr<TSpillingCounters> counters)
{
- return new TKqpLocalFileSpillingService(config, counters);
+ return new TDqLocalFileSpillingService(config, counters);
}
-} // namespace NKikimr::NKqp
+} // namespace NYql::NDq
diff --git a/ydb/library/yql/dq/actors/spilling/spilling_file.h b/ydb/library/yql/dq/actors/spilling/spilling_file.h
new file mode 100644
index 0000000000..c2173221eb
--- /dev/null
+++ b/ydb/library/yql/dq/actors/spilling/spilling_file.h
@@ -0,0 +1,33 @@
+#pragma once
+
+#include "spilling_counters.h"
+
+#include <ydb/library/yql/dq/common/dq_common.h>
+#include <library/cpp/actors/core/actor.h>
+
+#include <util/system/types.h>
+#include <util/generic/strbuf.h>
+
+namespace NYql::NDq {
+
+struct TFileSpillingServiceConfig {
+ TString Root;
+ ui64 MaxTotalSize = 0;
+ ui64 MaxFileSize = 0;
+ ui64 MaxFilePartSize = 0;
+
+ ui32 IoThreadPoolWorkersCount = 2;
+ ui32 IoThreadPoolQueueSize = 1000;
+ bool CleanupOnShutdown = false;
+};
+
+inline NActors::TActorId MakeDqLocalFileSpillingServiceID(ui32 nodeId) {
+ const char name[12] = "dq_lfspill";
+ return NActors::TActorId(nodeId, TStringBuf(name, 12));
+}
+
+NActors::IActor* CreateDqLocalFileSpillingActor(TTxId txId, const TString& details, const NActors::TActorId& client, bool removeBlobsAfterRead);
+
+NActors::IActor* CreateDqLocalFileSpillingService(const TFileSpillingServiceConfig& config, TIntrusivePtr<TSpillingCounters> counters);
+
+} // namespace NYql::NDq
diff --git a/ydb/library/yql/dq/actors/spilling/spilling_file_ut.cpp b/ydb/library/yql/dq/actors/spilling/spilling_file_ut.cpp
new file mode 100644
index 0000000000..ff496b5f48
--- /dev/null
+++ b/ydb/library/yql/dq/actors/spilling/spilling_file_ut.cpp
@@ -0,0 +1,456 @@
+#include "spilling_file.h"
+#include "spilling.h"
+
+#include <ydb/library/services/services.pb.h>
+
+#include <library/cpp/testing/unittest/registar.h>
+#include <library/cpp/actors/testlib/test_runtime.h>
+
+#include <util/system/fs.h>
+#include <util/generic/string.h>
+#include <util/folder/path.h>
+
+namespace NYql::NDq {
+
+using namespace NActors;
+
+namespace {
+
+class TTestActorRuntime: public TTestActorRuntimeBase {
+public:
+ void InitNodeImpl(TNodeDataBase* node, size_t nodeIndex) override {
+ node->LogSettings->Append(
+ NKikimrServices::EServiceKikimr_MIN,
+ NKikimrServices::EServiceKikimr_MAX,
+ NKikimrServices::EServiceKikimr_Name
+ );
+ TTestActorRuntimeBase::InitNodeImpl(node, nodeIndex);
+ }
+
+ ~TTestActorRuntime() {
+ if (SpillingRoot_ && SpillingRoot_.Exists()) {
+ SpillingRoot_.ForceDelete();
+ }
+ }
+
+ void Initialize() override {
+ TTestActorRuntimeBase::Initialize();
+ SetLogPriority(NKikimrServices::KQP_COMPUTE, NActors::NLog::PRI_ERROR);
+ }
+
+ TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters() {
+ static auto counters = MakeIntrusive<::NMonitoring::TDynamicCounters>();
+ return counters;
+ }
+
+ static TString GetSpillingPrefix() {
+ static TString str = Sprintf("%s_%d", "dq_spilling", (int)getpid());
+ return str;
+ }
+
+ TActorId StartSpillingService(ui64 maxTotalSize = 1000, ui64 maxFileSize = 500,
+ ui64 maxFilePartSize = 100, const TFsPath& root = TFsPath::Cwd() / GetSpillingPrefix())
+ {
+ SpillingRoot_ = root;
+
+ auto config = TFileSpillingServiceConfig{
+ .Root = root.GetPath(),
+ .MaxTotalSize = maxTotalSize,
+ .MaxFileSize = maxFileSize,
+ .MaxFilePartSize = maxFilePartSize
+ };
+
+ auto counters = Counters();
+ counters->ResetCounters();
+
+ auto spillingService = CreateDqLocalFileSpillingService(config, MakeIntrusive<TSpillingCounters>(counters));
+ auto spillingServiceActorId = Register(spillingService);
+ EnableScheduleForActor(spillingServiceActorId);
+ RegisterService(MakeDqLocalFileSpillingServiceID(GetNodeId()), spillingServiceActorId);
+
+ return spillingServiceActorId;
+ }
+
+ TActorId StartSpillingActor(const TActorId& client, bool removeBlobsAfterRead = true) {
+ auto spillingActor = CreateDqLocalFileSpillingActor(1ul, "test", client, removeBlobsAfterRead);
+ auto spillingActorId = Register(spillingActor);
+ EnableScheduleForActor(spillingActorId);
+
+ return spillingActorId;
+ }
+
+ void WaitBootstrap() {
+ TDispatchOptions options;
+ options.FinalEvents.emplace_back(TEvents::TSystem::Bootstrap, 1);
+ UNIT_ASSERT(DispatchEvents(options));
+ }
+
+ const TFsPath& GetSpillingRoot() const {
+ return SpillingRoot_;
+ }
+
+private:
+ TFsPath SpillingRoot_;
+};
+
+TBuffer CreateBlob(ui32 size, char symbol) {
+ TBuffer blob(size);
+ blob.Fill(symbol, size);
+ return blob;
+}
+
+TRope CreateRope(ui32 size, char symbol, ui32 chunkSize = 7) {
+ TRope result;
+ while (size) {
+ size_t count = std::min(size, chunkSize);
+ TString str(count, symbol);
+ result.Insert(result.End(), TRope{str});
+ size -= count;
+ }
+ return result;
+}
+
+void AssertEquals(const TBuffer& lhs, const TBuffer& rhs) {
+ TStringBuf l{lhs.data(), lhs.size()};
+ TStringBuf r{rhs.data(), rhs.size()};
+ UNIT_ASSERT_STRINGS_EQUAL(l, r);
+}
+
+
+struct THttpRequest : NMonitoring::IHttpRequest {
+ HTTP_METHOD Method;
+ TCgiParameters CgiParameters;
+ THttpHeaders HttpHeaders;
+
+ THttpRequest(HTTP_METHOD method)
+ : Method(method)
+ {}
+
+ ~THttpRequest() {}
+
+ const char* GetURI() const override {
+ return "";
+ }
+
+ const char* GetPath() const override {
+ return "";
+ }
+
+ const TCgiParameters& GetParams() const override {
+ return CgiParameters;
+ }
+
+ const TCgiParameters& GetPostParams() const override {
+ return CgiParameters;
+ }
+
+ TStringBuf GetPostContent() const override {
+ return TString();
+ }
+
+ HTTP_METHOD GetMethod() const override {
+ return Method;
+ }
+
+ const THttpHeaders& GetHeaders() const override {
+ return HttpHeaders;
+ }
+
+ TString GetRemoteAddr() const override {
+ return TString();
+ }
+};
+
+} // anonymous namespace
+
+Y_UNIT_TEST_SUITE(DqSpillingFileTests) {
+
+ Y_UNIT_TEST(Simple) {
+ TTestActorRuntime runtime;
+ runtime.Initialize();
+
+ auto spillingService = runtime.StartSpillingService();
+ auto tester = runtime.AllocateEdgeActor();
+ auto spillingActor = runtime.StartSpillingActor(tester);
+
+ runtime.WaitBootstrap();
+
+ // put blob 1
+ {
+ auto ev = new TEvDqSpilling::TEvWrite(1, CreateRope(10, 'a'));
+ runtime.Send(new IEventHandle(spillingActor, tester, ev));
+
+ auto resp = runtime.GrabEdgeEvent<TEvDqSpilling::TEvWriteResult>(tester, TDuration::Seconds(1));
+ UNIT_ASSERT_VALUES_EQUAL(1, resp->Get()->BlobId);
+ }
+
+ // put blob 2
+ {
+ auto ev = new TEvDqSpilling::TEvWrite(2, CreateRope(11, 'z'));
+ runtime.Send(new IEventHandle(spillingActor, tester, ev));
+
+ auto resp = runtime.GrabEdgeEvent<TEvDqSpilling::TEvWriteResult>(tester, TDuration::Seconds(1));
+ UNIT_ASSERT_VALUES_EQUAL(2, resp->Get()->BlobId);
+ }
+
+ // get blob 1
+ {
+ auto ev = new TEvDqSpilling::TEvRead(1);
+ runtime.Send(new IEventHandle(spillingActor, tester, ev));
+
+ auto resp = runtime.GrabEdgeEvent<TEvDqSpilling::TEvReadResult>(tester, TDuration::Seconds(1));
+ UNIT_ASSERT_VALUES_EQUAL(1, resp->Get()->BlobId);
+
+ TBuffer expected = CreateBlob(10, 'a');
+ AssertEquals(expected, resp->Get()->Blob);
+ }
+
+ // get blob 2
+ {
+ auto ev = new TEvDqSpilling::TEvRead(2);
+ runtime.Send(new IEventHandle(spillingActor, tester, ev));
+
+ auto resp = runtime.GrabEdgeEvent<TEvDqSpilling::TEvReadResult>(tester, TDuration::Seconds(1));
+ UNIT_ASSERT_VALUES_EQUAL(2, resp->Get()->BlobId);
+
+ TBuffer expected = CreateBlob(11, 'z');
+ AssertEquals(expected, resp->Get()->Blob);
+ }
+
+ // terminate
+ {
+ runtime.Send(new IEventHandle(spillingActor, tester, new TEvents::TEvPoison));
+
+ std::atomic<bool> done = false;
+ runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& event) {
+ if (event->GetRecipientRewrite() == spillingService) {
+ if (event->GetTypeRewrite() == 2146435074 /* EvCloseFileResponse */ ) {
+ done = true;
+ }
+ }
+ return TTestActorRuntimeBase::EEventAction::PROCESS;
+ });
+
+ TDispatchOptions options;
+ options.CustomFinalCondition = [&]() {
+ return (bool) done;
+ };
+
+ runtime.DispatchEvents(options, TDuration::Seconds(1));
+ }
+ }
+
+ Y_UNIT_TEST(Write_TotalSizeLimitExceeded) {
+ TTestActorRuntime runtime;
+ runtime.Initialize();
+
+ runtime.StartSpillingService(100, 1000, 1000);
+ auto tester = runtime.AllocateEdgeActor();
+ auto spillingActor = runtime.StartSpillingActor(tester);
+
+ runtime.WaitBootstrap();
+
+ {
+ auto ev = new TEvDqSpilling::TEvWrite(1, CreateRope(51, 'a'));
+ runtime.Send(new IEventHandle(spillingActor, tester, ev));
+
+ auto resp = runtime.GrabEdgeEvent<TEvDqSpilling::TEvWriteResult>(tester);
+ UNIT_ASSERT_VALUES_EQUAL(1, resp->Get()->BlobId);
+ }
+
+ {
+ auto ev = new TEvDqSpilling::TEvWrite(2, CreateRope(50, 'b'));
+ runtime.Send(new IEventHandle(spillingActor, tester, ev));
+
+ auto resp = runtime.GrabEdgeEvent<TEvDqSpilling::TEvError>(tester);
+ UNIT_ASSERT_STRINGS_EQUAL("Total size limit exceeded", resp->Get()->Message);
+ }
+ }
+
+ Y_UNIT_TEST(Write_FileSizeLimitExceeded) {
+ TTestActorRuntime runtime;
+ runtime.Initialize();
+
+ runtime.StartSpillingService(1000, 100, 1000);
+ auto tester = runtime.AllocateEdgeActor();
+ auto spillingActor = runtime.StartSpillingActor(tester);
+
+ runtime.WaitBootstrap();
+
+ {
+ auto ev = new TEvDqSpilling::TEvWrite(1, CreateRope(51, 'a'));
+ runtime.Send(new IEventHandle(spillingActor, tester, ev));
+
+ auto resp = runtime.GrabEdgeEvent<TEvDqSpilling::TEvWriteResult>(tester);
+ UNIT_ASSERT_VALUES_EQUAL(1, resp->Get()->BlobId);
+ }
+
+ {
+ auto ev = new TEvDqSpilling::TEvWrite(2, CreateRope(50, 'b'));
+ runtime.Send(new IEventHandle(spillingActor, tester, ev));
+
+ auto resp = runtime.GrabEdgeEvent<TEvDqSpilling::TEvError>(tester);
+ UNIT_ASSERT_STRINGS_EQUAL("File size limit exceeded", resp->Get()->Message);
+ }
+ }
+
+ Y_UNIT_TEST(MultipleFileParts) {
+ TTestActorRuntime runtime;
+ runtime.Initialize();
+
+ runtime.StartSpillingService(1000, 100, 25);
+ auto tester = runtime.AllocateEdgeActor();
+ auto spillingActor = runtime.StartSpillingActor(tester);
+
+ runtime.WaitBootstrap();
+
+ const TString filePrefix = TStringBuilder() << runtime.GetSpillingRoot().GetPath() << "/node_" << runtime.GetNodeId() << "/1_test_";
+
+ for (ui32 i = 0; i < 5; ++i) {
+ // Cerr << "---- store blob #" << i << Endl;
+ auto ev = new TEvDqSpilling::TEvWrite(i, CreateRope(20, 'a' + i));
+ runtime.Send(new IEventHandle(spillingActor, tester, ev));
+
+ auto resp = runtime.GrabEdgeEvent<TEvDqSpilling::TEvWriteResult>(tester);
+ UNIT_ASSERT_VALUES_EQUAL(i, resp->Get()->BlobId);
+
+ UNIT_ASSERT(NFs::Exists(TStringBuilder() << filePrefix << i));
+ }
+
+ for (i32 i = 4; i >= 0; --i) {
+ // Cerr << "---- load blob #" << i << Endl;
+ auto ev = new TEvDqSpilling::TEvRead(i, true);
+ runtime.Send(new IEventHandle(spillingActor, tester, ev));
+
+ auto resp = runtime.GrabEdgeEvent<TEvDqSpilling::TEvReadResult>(tester);
+ UNIT_ASSERT_VALUES_EQUAL(i, resp->Get()->BlobId);
+ TBuffer expected = CreateBlob(20, 'a' + i);
+ AssertEquals(expected, resp->Get()->Blob);
+
+ if (i == 4) {
+ // do not remove last file
+ UNIT_ASSERT(NFs::Exists(TStringBuilder() << filePrefix << i));
+ } else {
+ UNIT_ASSERT(!NFs::Exists(TStringBuilder() << filePrefix << i));
+ }
+ }
+ }
+
+ Y_UNIT_TEST(SingleFilePart) {
+ TTestActorRuntime runtime;
+ runtime.Initialize();
+
+ runtime.StartSpillingService(1000, 100, 25);
+ auto tester = runtime.AllocateEdgeActor();
+ auto spillingActor = runtime.StartSpillingActor(tester, false);
+
+ runtime.WaitBootstrap();
+
+ const TString filePrefix = TStringBuilder() << runtime.GetSpillingRoot().GetPath() << "/node_" << runtime.GetNodeId() << "/1_test_";
+
+ for (ui32 i = 0; i < 5; ++i) {
+ // Cerr << "---- store blob #" << i << Endl;
+ auto ev = new TEvDqSpilling::TEvWrite(i, CreateRope(20, 'a' + i));
+ runtime.Send(new IEventHandle(spillingActor, tester, ev));
+
+ auto resp = runtime.GrabEdgeEvent<TEvDqSpilling::TEvWriteResult>(tester);
+ UNIT_ASSERT_VALUES_EQUAL(i, resp->Get()->BlobId);
+
+ UNIT_ASSERT(NFs::Exists(TStringBuilder() << filePrefix << 0));
+ if (i > 0) {
+ UNIT_ASSERT(!NFs::Exists(TStringBuilder() << filePrefix << i));
+ }
+ }
+
+ for (i32 i = 4; i >= 0; --i) {
+ // Cerr << "---- load blob #" << i << Endl;
+ auto ev = new TEvDqSpilling::TEvRead(i, true);
+ runtime.Send(new IEventHandle(spillingActor, tester, ev));
+
+ auto resp = runtime.GrabEdgeEvent<TEvDqSpilling::TEvReadResult>(tester);
+ UNIT_ASSERT_VALUES_EQUAL(i, resp->Get()->BlobId);
+ TBuffer expected = CreateBlob(20, 'a' + i);
+ AssertEquals(expected, resp->Get()->Blob);
+
+ UNIT_ASSERT(NFs::Exists(TStringBuilder() << filePrefix << 0));
+ }
+ }
+
+ Y_UNIT_TEST(ReadError) {
+ //return;
+
+ TTestActorRuntime runtime;
+ runtime.Initialize();
+
+ runtime.StartSpillingService();
+ auto tester = runtime.AllocateEdgeActor();
+ auto spillingActor = runtime.StartSpillingActor(tester);
+
+ runtime.WaitBootstrap();
+
+ {
+ auto ev = new TEvDqSpilling::TEvWrite(0, CreateRope(20, 'a'));
+ runtime.Send(new IEventHandle(spillingActor, tester, ev));
+
+ auto resp = runtime.GrabEdgeEvent<TEvDqSpilling::TEvWriteResult>(tester);
+ UNIT_ASSERT_VALUES_EQUAL(0, resp->Get()->BlobId);
+ }
+
+ (runtime.GetSpillingRoot() / "node_1" / "1_test_0").ForceDelete();
+
+ {
+ auto ev = new TEvDqSpilling::TEvRead(0, true);
+ runtime.Send(new IEventHandle(spillingActor, tester, ev));
+
+ auto resp = runtime.GrabEdgeEvent<TEvDqSpilling::TEvError>(tester);
+ auto& err = resp->Get()->Message;
+ auto expected = "can't open \"" + runtime.GetSpillingRoot().GetPath() + "/node_1/1_test_0\" with mode RdOnly";
+ UNIT_ASSERT_C(err.Contains("No such file or directory"), err);
+ UNIT_ASSERT_C(err.Contains(expected), err);
+ }
+ }
+
+ Y_UNIT_TEST(StartError) {
+ TTestActorRuntime runtime;
+ runtime.Initialize();
+
+ auto spillingService = runtime.StartSpillingService(100, 500, 100, TFsPath("/nonexistent") / runtime.GetSpillingPrefix());
+ auto tester = runtime.AllocateEdgeActor();
+ auto spillingActor = runtime.StartSpillingActor(tester);
+
+ runtime.WaitBootstrap();
+
+ // put blob 1
+ {
+ auto ev = new TEvDqSpilling::TEvWrite(1, CreateRope(10, 'a'));
+ runtime.Send(new IEventHandle(spillingActor, tester, ev));
+
+ auto resp = runtime.GrabEdgeEvent<TEvDqSpilling::TEvError>(tester, TDuration::Seconds(1));
+ UNIT_ASSERT_EQUAL("Service not started", resp->Get()->Message);
+ }
+
+ // get blob 1
+ {
+ auto ev = new TEvDqSpilling::TEvRead(1);
+ runtime.Send(new IEventHandle(spillingActor, tester, ev));
+
+ auto resp = runtime.GrabEdgeEvent<TEvDqSpilling::TEvError>(tester, TDuration::Seconds(1));
+ UNIT_ASSERT_EQUAL("Service not started", resp->Get()->Message);
+ }
+
+ // mon
+ {
+ THttpRequest httpReq(HTTP_METHOD_GET);
+ NMonitoring::TMonService2HttpRequest monReq(nullptr, &httpReq, nullptr, nullptr, "", nullptr);
+
+ runtime.Send(new IEventHandle(spillingService, tester, new NMon::TEvHttpInfo(monReq)));
+
+ auto resp = runtime.GrabEdgeEvent<NMon::TEvHttpInfoRes>(tester, TDuration::Seconds(1));
+ UNIT_ASSERT_EQUAL("<html><h2>Service is not started due to IO error</h2></html>",
+ ((NMon::TEvHttpInfoRes*) resp->Get())->Answer);
+ }
+ }
+
+} // suite
+
+} // namespace NYql::NDq
diff --git a/ydb/library/yql/dq/actors/spilling/ut/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/dq/actors/spilling/ut/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 0000000000..14ac9583a2
--- /dev/null
+++ b/ydb/library/yql/dq/actors/spilling/ut/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,80 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(ydb-library-yql-dq-actors-spilling-ut)
+target_compile_options(ydb-library-yql-dq-actors-spilling-ut PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_include_directories(ydb-library-yql-dq-actors-spilling-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/spilling
+)
+target_link_libraries(ydb-library-yql-dq-actors-spilling-ut PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-cpuid_check
+ cpp-testing-unittest_main
+ dq-actors-spilling
+ cpp-testing-unittest
+ cpp-actors-testlib
+ ydb-library-services
+)
+target_link_options(ydb-library-yql-dq-actors-spilling-ut PRIVATE
+ -Wl,-platform_version,macos,11.0,11.0
+ -fPIC
+ -fPIC
+ -framework
+ CoreFoundation
+)
+target_sources(ydb-library-yql-dq-actors-spilling-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/spilling/spilling_file_ut.cpp
+)
+set_property(
+ TARGET
+ ydb-library-yql-dq-actors-spilling-ut
+ PROPERTY
+ SPLIT_FACTOR
+ 10
+)
+add_yunittest(
+ NAME
+ ydb-library-yql-dq-actors-spilling-ut
+ TEST_TARGET
+ ydb-library-yql-dq-actors-spilling-ut
+ TEST_ARG
+ --print-before-suite
+ --print-before-test
+ --fork-tests
+ --print-times
+ --show-fails
+)
+set_yunittest_property(
+ TEST
+ ydb-library-yql-dq-actors-spilling-ut
+ PROPERTY
+ LABELS
+ MEDIUM
+)
+set_yunittest_property(
+ TEST
+ ydb-library-yql-dq-actors-spilling-ut
+ PROPERTY
+ PROCESSORS
+ 1
+)
+set_yunittest_property(
+ TEST
+ ydb-library-yql-dq-actors-spilling-ut
+ PROPERTY
+ TIMEOUT
+ 180
+)
+target_allocator(ydb-library-yql-dq-actors-spilling-ut
+ system_allocator
+)
+vcs_info(ydb-library-yql-dq-actors-spilling-ut)
diff --git a/ydb/library/yql/dq/actors/spilling/ut/CMakeLists.linux-aarch64.txt b/ydb/library/yql/dq/actors/spilling/ut/CMakeLists.linux-aarch64.txt
new file mode 100644
index 0000000000..6ac287eaed
--- /dev/null
+++ b/ydb/library/yql/dq/actors/spilling/ut/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,83 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(ydb-library-yql-dq-actors-spilling-ut)
+target_compile_options(ydb-library-yql-dq-actors-spilling-ut PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_include_directories(ydb-library-yql-dq-actors-spilling-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/spilling
+)
+target_link_libraries(ydb-library-yql-dq-actors-spilling-ut PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ cpp-testing-unittest_main
+ dq-actors-spilling
+ cpp-testing-unittest
+ cpp-actors-testlib
+ ydb-library-services
+)
+target_link_options(ydb-library-yql-dq-actors-spilling-ut PRIVATE
+ -ldl
+ -lrt
+ -Wl,--no-as-needed
+ -fPIC
+ -fPIC
+ -lpthread
+ -lrt
+ -ldl
+)
+target_sources(ydb-library-yql-dq-actors-spilling-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/spilling/spilling_file_ut.cpp
+)
+set_property(
+ TARGET
+ ydb-library-yql-dq-actors-spilling-ut
+ PROPERTY
+ SPLIT_FACTOR
+ 10
+)
+add_yunittest(
+ NAME
+ ydb-library-yql-dq-actors-spilling-ut
+ TEST_TARGET
+ ydb-library-yql-dq-actors-spilling-ut
+ TEST_ARG
+ --print-before-suite
+ --print-before-test
+ --fork-tests
+ --print-times
+ --show-fails
+)
+set_yunittest_property(
+ TEST
+ ydb-library-yql-dq-actors-spilling-ut
+ PROPERTY
+ LABELS
+ MEDIUM
+)
+set_yunittest_property(
+ TEST
+ ydb-library-yql-dq-actors-spilling-ut
+ PROPERTY
+ PROCESSORS
+ 1
+)
+set_yunittest_property(
+ TEST
+ ydb-library-yql-dq-actors-spilling-ut
+ PROPERTY
+ TIMEOUT
+ 180
+)
+target_allocator(ydb-library-yql-dq-actors-spilling-ut
+ cpp-malloc-jemalloc
+)
+vcs_info(ydb-library-yql-dq-actors-spilling-ut)
diff --git a/ydb/library/yql/dq/actors/spilling/ut/CMakeLists.linux-x86_64.txt b/ydb/library/yql/dq/actors/spilling/ut/CMakeLists.linux-x86_64.txt
new file mode 100644
index 0000000000..0de300a43d
--- /dev/null
+++ b/ydb/library/yql/dq/actors/spilling/ut/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,85 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(ydb-library-yql-dq-actors-spilling-ut)
+target_compile_options(ydb-library-yql-dq-actors-spilling-ut PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_include_directories(ydb-library-yql-dq-actors-spilling-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/spilling
+)
+target_link_libraries(ydb-library-yql-dq-actors-spilling-ut PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-cpuid_check
+ cpp-testing-unittest_main
+ dq-actors-spilling
+ cpp-testing-unittest
+ cpp-actors-testlib
+ ydb-library-services
+)
+target_link_options(ydb-library-yql-dq-actors-spilling-ut PRIVATE
+ -ldl
+ -lrt
+ -Wl,--no-as-needed
+ -fPIC
+ -fPIC
+ -lpthread
+ -lrt
+ -ldl
+)
+target_sources(ydb-library-yql-dq-actors-spilling-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/spilling/spilling_file_ut.cpp
+)
+set_property(
+ TARGET
+ ydb-library-yql-dq-actors-spilling-ut
+ PROPERTY
+ SPLIT_FACTOR
+ 10
+)
+add_yunittest(
+ NAME
+ ydb-library-yql-dq-actors-spilling-ut
+ TEST_TARGET
+ ydb-library-yql-dq-actors-spilling-ut
+ TEST_ARG
+ --print-before-suite
+ --print-before-test
+ --fork-tests
+ --print-times
+ --show-fails
+)
+set_yunittest_property(
+ TEST
+ ydb-library-yql-dq-actors-spilling-ut
+ PROPERTY
+ LABELS
+ MEDIUM
+)
+set_yunittest_property(
+ TEST
+ ydb-library-yql-dq-actors-spilling-ut
+ PROPERTY
+ PROCESSORS
+ 1
+)
+set_yunittest_property(
+ TEST
+ ydb-library-yql-dq-actors-spilling-ut
+ PROPERTY
+ TIMEOUT
+ 180
+)
+target_allocator(ydb-library-yql-dq-actors-spilling-ut
+ cpp-malloc-tcmalloc
+ libs-tcmalloc-no_percpu_cache
+)
+vcs_info(ydb-library-yql-dq-actors-spilling-ut)
diff --git a/ydb/library/yql/dq/actors/spilling/ut/CMakeLists.txt b/ydb/library/yql/dq/actors/spilling/ut/CMakeLists.txt
new file mode 100644
index 0000000000..f8b31df0c1
--- /dev/null
+++ b/ydb/library/yql/dq/actors/spilling/ut/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/library/yql/dq/actors/spilling/ut/CMakeLists.windows-x86_64.txt b/ydb/library/yql/dq/actors/spilling/ut/CMakeLists.windows-x86_64.txt
new file mode 100644
index 0000000000..48680b0618
--- /dev/null
+++ b/ydb/library/yql/dq/actors/spilling/ut/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,73 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(ydb-library-yql-dq-actors-spilling-ut)
+target_compile_options(ydb-library-yql-dq-actors-spilling-ut PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_include_directories(ydb-library-yql-dq-actors-spilling-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/spilling
+)
+target_link_libraries(ydb-library-yql-dq-actors-spilling-ut PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-cpuid_check
+ cpp-testing-unittest_main
+ dq-actors-spilling
+ cpp-testing-unittest
+ cpp-actors-testlib
+ ydb-library-services
+)
+target_sources(ydb-library-yql-dq-actors-spilling-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/spilling/spilling_file_ut.cpp
+)
+set_property(
+ TARGET
+ ydb-library-yql-dq-actors-spilling-ut
+ PROPERTY
+ SPLIT_FACTOR
+ 10
+)
+add_yunittest(
+ NAME
+ ydb-library-yql-dq-actors-spilling-ut
+ TEST_TARGET
+ ydb-library-yql-dq-actors-spilling-ut
+ TEST_ARG
+ --print-before-suite
+ --print-before-test
+ --fork-tests
+ --print-times
+ --show-fails
+)
+set_yunittest_property(
+ TEST
+ ydb-library-yql-dq-actors-spilling-ut
+ PROPERTY
+ LABELS
+ MEDIUM
+)
+set_yunittest_property(
+ TEST
+ ydb-library-yql-dq-actors-spilling-ut
+ PROPERTY
+ PROCESSORS
+ 1
+)
+set_yunittest_property(
+ TEST
+ ydb-library-yql-dq-actors-spilling-ut
+ PROPERTY
+ TIMEOUT
+ 180
+)
+target_allocator(ydb-library-yql-dq-actors-spilling-ut
+ system_allocator
+)
+vcs_info(ydb-library-yql-dq-actors-spilling-ut)
diff --git a/ydb/library/yql/dq/actors/spilling/ut/ya.make b/ydb/library/yql/dq/actors/spilling/ut/ya.make
new file mode 100644
index 0000000000..f528b5d1fe
--- /dev/null
+++ b/ydb/library/yql/dq/actors/spilling/ut/ya.make
@@ -0,0 +1,20 @@
+UNITTEST_FOR(ydb/library/yql/dq/actors/spilling)
+
+FORK_SUBTESTS()
+
+SIZE(MEDIUM)
+TIMEOUT(180)
+
+SRCS(
+ spilling_file_ut.cpp
+)
+
+YQL_LAST_ABI_VERSION()
+
+PEERDIR(
+ library/cpp/testing/unittest
+ library/cpp/actors/testlib
+ ydb/library/services
+)
+
+END()
diff --git a/ydb/library/yql/dq/actors/spilling/ya.make b/ydb/library/yql/dq/actors/spilling/ya.make
new file mode 100644
index 0000000000..c23435ce09
--- /dev/null
+++ b/ydb/library/yql/dq/actors/spilling/ya.make
@@ -0,0 +1,29 @@
+LIBRARY()
+
+SRCS(
+ channel_storage.cpp
+ spilling_counters.cpp
+ spilling_file.cpp
+ spilling.cpp
+)
+
+PEERDIR(
+ ydb/library/services
+ ydb/library/yql/dq/common
+ ydb/library/yql/dq/actors
+ ydb/library/yql/dq/runtime
+ ydb/library/yql/utils
+
+ library/cpp/actors/core
+ library/cpp/actors/util
+ library/cpp/monlib/dynamic_counters
+ library/cpp/monlib/service/pages
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
+
+RECURSE_FOR_TESTS(
+ ut
+)
diff --git a/ydb/library/yql/dq/actors/task_runner/events.h b/ydb/library/yql/dq/actors/task_runner/events.h
index e63223bb08..08614b07bf 100644
--- a/ydb/library/yql/dq/actors/task_runner/events.h
+++ b/ydb/library/yql/dq/actors/task_runner/events.h
@@ -158,7 +158,7 @@ struct TEvTaskRunnerCreate
TEvTaskRunnerCreate(
const NDqProto::TDqTask& task,
const TDqTaskRunnerMemoryLimits& memoryLimits,
- const std::shared_ptr<IDqTaskRunnerExecutionContext>& execCtx = std::shared_ptr<IDqTaskRunnerExecutionContext>(new TDqTaskRunnerExecutionContext()))
+ const std::shared_ptr<IDqTaskRunnerExecutionContext>& execCtx)
: Task(task)
, MemoryLimits(memoryLimits)
, ExecCtx(execCtx)
diff --git a/ydb/library/yql/dq/actors/ya.make b/ydb/library/yql/dq/actors/ya.make
index 731ece61cd..c07b3cc7b9 100644
--- a/ydb/library/yql/dq/actors/ya.make
+++ b/ydb/library/yql/dq/actors/ya.make
@@ -16,5 +16,6 @@ END()
RECURSE(
compute
protos
+ spilling
task_runner
)
diff --git a/ydb/library/yql/dq/runtime/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/dq/runtime/CMakeLists.darwin-x86_64.txt
index ee09e6cf14..ddcaa3f12b 100644
--- a/ydb/library/yql/dq/runtime/CMakeLists.darwin-x86_64.txt
+++ b/ydb/library/yql/dq/runtime/CMakeLists.darwin-x86_64.txt
@@ -32,6 +32,7 @@ target_link_libraries(yql-dq-runtime PUBLIC
yql-dq-expr_nodes
yql-dq-type_ann
common-schema-mkql
+ cpp-actors-util
tools-enum_parser-enum_serialization_runtime
)
target_sources(yql-dq-runtime PRIVATE
diff --git a/ydb/library/yql/dq/runtime/CMakeLists.linux-aarch64.txt b/ydb/library/yql/dq/runtime/CMakeLists.linux-aarch64.txt
index 6f24fe6c02..dfeb295ee7 100644
--- a/ydb/library/yql/dq/runtime/CMakeLists.linux-aarch64.txt
+++ b/ydb/library/yql/dq/runtime/CMakeLists.linux-aarch64.txt
@@ -33,6 +33,7 @@ target_link_libraries(yql-dq-runtime PUBLIC
yql-dq-expr_nodes
yql-dq-type_ann
common-schema-mkql
+ cpp-actors-util
tools-enum_parser-enum_serialization_runtime
)
target_sources(yql-dq-runtime PRIVATE
diff --git a/ydb/library/yql/dq/runtime/CMakeLists.linux-x86_64.txt b/ydb/library/yql/dq/runtime/CMakeLists.linux-x86_64.txt
index 6f24fe6c02..dfeb295ee7 100644
--- a/ydb/library/yql/dq/runtime/CMakeLists.linux-x86_64.txt
+++ b/ydb/library/yql/dq/runtime/CMakeLists.linux-x86_64.txt
@@ -33,6 +33,7 @@ target_link_libraries(yql-dq-runtime PUBLIC
yql-dq-expr_nodes
yql-dq-type_ann
common-schema-mkql
+ cpp-actors-util
tools-enum_parser-enum_serialization_runtime
)
target_sources(yql-dq-runtime PRIVATE
diff --git a/ydb/library/yql/dq/runtime/CMakeLists.windows-x86_64.txt b/ydb/library/yql/dq/runtime/CMakeLists.windows-x86_64.txt
index ee09e6cf14..ddcaa3f12b 100644
--- a/ydb/library/yql/dq/runtime/CMakeLists.windows-x86_64.txt
+++ b/ydb/library/yql/dq/runtime/CMakeLists.windows-x86_64.txt
@@ -32,6 +32,7 @@ target_link_libraries(yql-dq-runtime PUBLIC
yql-dq-expr_nodes
yql-dq-type_ann
common-schema-mkql
+ cpp-actors-util
tools-enum_parser-enum_serialization_runtime
)
target_sources(yql-dq-runtime PRIVATE
diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
index 6dd562a088..4e4fec2bef 100644
--- a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
+++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
@@ -204,17 +204,13 @@ IDqOutputConsumer::TPtr DqBuildOutputConsumer(const NDqProto::TTaskOutput& outpu
}
}
-IDqOutputConsumer::TPtr TDqTaskRunnerExecutionContext::CreateOutputConsumer(const TTaskOutput& outputDesc,
+IDqOutputConsumer::TPtr TDqTaskRunnerExecutionContextBase::CreateOutputConsumer(const TTaskOutput& outputDesc,
const NKikimr::NMiniKQL::TType* type, NUdf::IApplyContext*, const TTypeEnvironment& typeEnv,
const NKikimr::NMiniKQL::THolderFactory& holderFactory, TVector<IDqOutput::TPtr>&& outputs) const
{
return DqBuildOutputConsumer(outputDesc, type, typeEnv, holderFactory, std::move(outputs));
}
-IDqChannelStorage::TPtr TDqTaskRunnerExecutionContext::CreateChannelStorage(ui64 /* channelId */) const {
- return {};
-}
-
inline TCollectStatsLevel StatsModeToCollectStatsLevel(NDqProto::EDqStatsMode statsMode) {
if (statsMode >= NDqProto::DQ_STATS_MODE_PROFILE) return TCollectStatsLevel::Profile;
else if (statsMode >= NDqProto::DQ_STATS_MODE_FULL) return TCollectStatsLevel::Full;
@@ -445,7 +441,7 @@ public:
} else {
entry = ticket.GetValueSync();
}
- }
+ }
if (!entry) {
entry = CreateComputationPattern(task, program.GetRaw(), false, canBeCached);
@@ -576,7 +572,7 @@ public:
inputs.clear();
inputs.emplace_back(transform->TransformOutput);
entryNode->SetValue(AllocatedHolder->ProgramParsed.CompGraph->GetContext(),
- CreateInputUnionValue(std::move(inputs), holderFactory,
+ CreateInputUnionValue(std::move(inputs), holderFactory,
{&inputStats, transform->TransformOutputType}));
} else {
entryNode->SetValue(AllocatedHolder->ProgramParsed.CompGraph->GetContext(),
diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.h b/ydb/library/yql/dq/runtime/dq_tasks_runner.h
index 22330a5d15..2c6fb027f7 100644
--- a/ydb/library/yql/dq/runtime/dq_tasks_runner.h
+++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.h
@@ -171,15 +171,20 @@ public:
virtual IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId) const = 0;
};
-class TDqTaskRunnerExecutionContext : public IDqTaskRunnerExecutionContext {
+class TDqTaskRunnerExecutionContextBase : public IDqTaskRunnerExecutionContext {
public:
IDqOutputConsumer::TPtr CreateOutputConsumer(const NDqProto::TTaskOutput& outputDesc,
const NKikimr::NMiniKQL::TType* type, NUdf::IApplyContext* applyCtx,
const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv,
const NKikimr::NMiniKQL::THolderFactory& holderFactory,
TVector<IDqOutput::TPtr>&& outputs) const override;
+};
- IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId) const override;
+class TDqTaskRunnerExecutionContextDefault : public TDqTaskRunnerExecutionContextBase {
+public:
+ IDqChannelStorage::TPtr CreateChannelStorage(ui64 /*channelId*/) const override {
+ return {};
+ };
};
struct TDqTaskRunnerSettings {
@@ -376,7 +381,7 @@ public:
virtual ui64 GetTaskId() const = 0;
virtual void Prepare(const TDqTaskSettings& task, const TDqTaskRunnerMemoryLimits& memoryLimits,
- const IDqTaskRunnerExecutionContext& execCtx = TDqTaskRunnerExecutionContext()) = 0;
+ const IDqTaskRunnerExecutionContext& execCtx) = 0;
virtual ERunStatus Run() = 0;
virtual bool HasEffects() const = 0;
diff --git a/ydb/library/yql/dq/runtime/ya.make b/ydb/library/yql/dq/runtime/ya.make
index c062d8743f..6202096617 100644
--- a/ydb/library/yql/dq/runtime/ya.make
+++ b/ydb/library/yql/dq/runtime/ya.make
@@ -13,6 +13,7 @@ PEERDIR(
ydb/library/yql/dq/type_ann
ydb/library/yql/parser/pg_wrapper/interface
ydb/library/yql/providers/common/schema/mkql
+ library/cpp/actors/util
)
SRCS(
diff --git a/ydb/library/yql/providers/dq/actors/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/providers/dq/actors/CMakeLists.darwin-x86_64.txt
index a7cb059668..10e86803f2 100644
--- a/ydb/library/yql/providers/dq/actors/CMakeLists.darwin-x86_64.txt
+++ b/ydb/library/yql/providers/dq/actors/CMakeLists.darwin-x86_64.txt
@@ -33,6 +33,7 @@ target_link_libraries(providers-dq-actors PUBLIC
yql-dq-proto
yql-dq-runtime
yql-dq-tasks
+ dq-actors-compute
yql-utils-failure_injector
providers-common-metrics
dq-actors-events
diff --git a/ydb/library/yql/providers/dq/actors/CMakeLists.linux-aarch64.txt b/ydb/library/yql/providers/dq/actors/CMakeLists.linux-aarch64.txt
index 74bf95ff13..ae0b2aa77f 100644
--- a/ydb/library/yql/providers/dq/actors/CMakeLists.linux-aarch64.txt
+++ b/ydb/library/yql/providers/dq/actors/CMakeLists.linux-aarch64.txt
@@ -34,6 +34,7 @@ target_link_libraries(providers-dq-actors PUBLIC
yql-dq-proto
yql-dq-runtime
yql-dq-tasks
+ dq-actors-compute
yql-utils-failure_injector
providers-common-metrics
dq-actors-events
diff --git a/ydb/library/yql/providers/dq/actors/CMakeLists.linux-x86_64.txt b/ydb/library/yql/providers/dq/actors/CMakeLists.linux-x86_64.txt
index 74bf95ff13..ae0b2aa77f 100644
--- a/ydb/library/yql/providers/dq/actors/CMakeLists.linux-x86_64.txt
+++ b/ydb/library/yql/providers/dq/actors/CMakeLists.linux-x86_64.txt
@@ -34,6 +34,7 @@ target_link_libraries(providers-dq-actors PUBLIC
yql-dq-proto
yql-dq-runtime
yql-dq-tasks
+ dq-actors-compute
yql-utils-failure_injector
providers-common-metrics
dq-actors-events
diff --git a/ydb/library/yql/providers/dq/actors/CMakeLists.windows-x86_64.txt b/ydb/library/yql/providers/dq/actors/CMakeLists.windows-x86_64.txt
index a7cb059668..10e86803f2 100644
--- a/ydb/library/yql/providers/dq/actors/CMakeLists.windows-x86_64.txt
+++ b/ydb/library/yql/providers/dq/actors/CMakeLists.windows-x86_64.txt
@@ -33,6 +33,7 @@ target_link_libraries(providers-dq-actors PUBLIC
yql-dq-proto
yql-dq-runtime
yql-dq-tasks
+ dq-actors-compute
yql-utils-failure_injector
providers-common-metrics
dq-actors-events
diff --git a/ydb/library/yql/providers/dq/actors/compute_actor.cpp b/ydb/library/yql/providers/dq/actors/compute_actor.cpp
index a944e60b00..c1cfe959a4 100644
--- a/ydb/library/yql/providers/dq/actors/compute_actor.cpp
+++ b/ydb/library/yql/providers/dq/actors/compute_actor.cpp
@@ -40,6 +40,7 @@ IActor* CreateComputeActor(
computeRuntimeSettings.ExtraMemoryAllocationPool = 3;
computeRuntimeSettings.FailOnUndelivery = false;
computeRuntimeSettings.StatsMode = (statsMode != NDqProto::DQ_STATS_MODE_UNSPECIFIED) ? statsMode : NDqProto::DQ_STATS_MODE_FULL;
+ computeRuntimeSettings.UseSpilling = options.UseSpilling;
// clear fake actorids
for (auto& input : *task->MutableInputs()) {
diff --git a/ydb/library/yql/providers/dq/actors/worker_actor.cpp b/ydb/library/yql/providers/dq/actors/worker_actor.cpp
index 915ca8453b..ab8aeb8c93 100644
--- a/ydb/library/yql/providers/dq/actors/worker_actor.cpp
+++ b/ydb/library/yql/providers/dq/actors/worker_actor.cpp
@@ -1,8 +1,8 @@
#include "worker_actor.h"
#include <ydb/library/yql/dq/actors/dq.h>
+#include <ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.h>
#include <ydb/library/yql/providers/dq/common/yql_dq_common.h>
-
#include <ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.h>
#include <ydb/library/yql/providers/dq/runtime/runtime_data.h>
@@ -94,13 +94,15 @@ public:
const ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory,
const IDqAsyncIoFactory::TPtr& asyncIoFactory,
TWorkerRuntimeData* runtimeData,
- const TString& traceId)
+ const TString& traceId,
+ bool useSpilling)
: TRichActor<TDqWorker>(&TDqWorker::Handler)
, AsyncIoFactory(asyncIoFactory)
, TaskRunnerActorFactory(taskRunnerActorFactory)
, RuntimeData(runtimeData)
, TraceId(traceId)
, MemoryQuotaManager(new TDummyMemoryQuotaManager)
+ , UseSpilling(useSpilling)
{
YQL_LOG_CTX_ROOT_SESSION_SCOPE(TraceId);
YQL_CLOG(DEBUG, ProviderDq) << "TDqWorker created ";
@@ -258,7 +260,12 @@ private:
TDqTaskRunnerMemoryLimits limits; // used for local mode only
limits.ChannelBufferSize = 20_MB;
limits.OutputChunkMaxSize = 2_MB;
- Send(TaskRunnerActor, new TEvTaskRunnerCreate(std::move(ev->Get()->Record.GetTask()), limits));
+
+ auto wakeup = [this]{ ResumeExecution(); };
+ std::shared_ptr<IDqTaskRunnerExecutionContext> execCtx = std::make_shared<TDqTaskRunnerExecutionContext>(
+ TraceId, UseSpilling, std::move(wakeup), TlsActivationContext->AsActorContext());
+
+ Send(TaskRunnerActor, new TEvTaskRunnerCreate(std::move(ev->Get()->Record.GetTask()), limits, execCtx));
}
void OnTaskRunnerCreated(TEvTaskRunnerCreateFinished::TPtr& ev, const TActorContext& ) {
@@ -789,13 +796,15 @@ private:
TVector<Yql::DqsProto::TWorkerInfo> AllWorkers;
IMemoryQuotaManager::TPtr MemoryQuotaManager;
+ const bool UseSpilling;
};
NActors::IActor* CreateWorkerActor(
TWorkerRuntimeData* runtimeData,
const TString& traceId,
const ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory,
- const IDqAsyncIoFactory::TPtr& asyncIoFactory)
+ const IDqAsyncIoFactory::TPtr& asyncIoFactory,
+ bool useSpilling)
{
Y_ABORT_UNLESS(taskRunnerActorFactory);
return new TLogWrapReceive(
@@ -803,7 +812,8 @@ NActors::IActor* CreateWorkerActor(
taskRunnerActorFactory,
asyncIoFactory,
runtimeData,
- traceId), traceId);
+ traceId,
+ useSpilling), traceId);
}
} // namespace NYql::NDqs
diff --git a/ydb/library/yql/providers/dq/actors/worker_actor.h b/ydb/library/yql/providers/dq/actors/worker_actor.h
index ea1ee7bfdb..1a68754920 100644
--- a/ydb/library/yql/providers/dq/actors/worker_actor.h
+++ b/ydb/library/yql/providers/dq/actors/worker_actor.h
@@ -25,6 +25,7 @@ namespace NYql::NDqs {
TWorkerRuntimeData* runtimeData,
const TString& traceId,
const NDq::NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory,
- const NDq::IDqAsyncIoFactory::TPtr& asyncIoFactory);
+ const NDq::IDqAsyncIoFactory::TPtr& asyncIoFactory,
+ bool useSpilling);
} // namespace NYql::NDqs
diff --git a/ydb/library/yql/providers/dq/actors/ya.make b/ydb/library/yql/providers/dq/actors/ya.make
index 7d3df6fe4b..52fa536894 100644
--- a/ydb/library/yql/providers/dq/actors/ya.make
+++ b/ydb/library/yql/providers/dq/actors/ya.make
@@ -35,6 +35,7 @@ PEERDIR(
ydb/library/yql/dq/proto
ydb/library/yql/dq/runtime
ydb/library/yql/dq/tasks
+ ydb/library/yql/dq/actors/compute
ydb/library/yql/utils/failure_injector
ydb/library/yql/providers/common/metrics
ydb/library/yql/providers/dq/actors/events
diff --git a/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp b/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp
index 9fc6789564..464a9382a3 100644
--- a/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp
+++ b/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp
@@ -76,6 +76,16 @@ TDqConfiguration::TDqConfiguration() {
REGISTER_SETTING(*this, TaskRunnerStats).Parser([](const TString& v) { return FromString<ETaskRunnerStats>(v); });
REGISTER_SETTING(*this, _SkipRevisionCheck);
REGISTER_SETTING(*this, UseBlockReader);
+ REGISTER_SETTING(*this, SpillingEngine)
+ .Parser([](const TString& v) {
+ return FromString<TDqSettings::ESpillingEngine>(v);
+ })
+ .ValueSetter([this](const TString&, TDqSettings::ESpillingEngine value) {
+ SpillingEngine = value;
+ if (value != TDqSettings::ESpillingEngine::Disable) {
+ EnableDqReplicate = true;
+ }
+ });
}
} // namespace NYql
diff --git a/ydb/library/yql/providers/dq/common/yql_dq_settings.h b/ydb/library/yql/providers/dq/common/yql_dq_settings.h
index 75cca0876e..d80c25dd89 100644
--- a/ydb/library/yql/providers/dq/common/yql_dq_settings.h
+++ b/ydb/library/yql/providers/dq/common/yql_dq_settings.h
@@ -21,6 +21,11 @@ struct TDqSettings {
Profile
};
+ enum class ESpillingEngine {
+ Disable /* "disable" */,
+ File /* "file" */,
+ };
+
struct TDefault {
static constexpr ui32 MaxTasksPerStage = 20U;
static constexpr ui32 MaxTasksPerOperation = 70U;
@@ -48,6 +53,7 @@ struct TDqSettings {
static constexpr bool EnableChannelStats = false;
static constexpr bool ExportStats = false;
static constexpr ETaskRunnerStats TaskRunnerStats = ETaskRunnerStats::Basic;
+ static constexpr ESpillingEngine SpillingEngine = ESpillingEngine::Disable;
};
using TPtr = std::shared_ptr<TDqSettings>;
@@ -115,6 +121,7 @@ struct TDqSettings {
NCommon::TConfSetting<ETaskRunnerStats, false> TaskRunnerStats;
NCommon::TConfSetting<bool, false> _SkipRevisionCheck;
NCommon::TConfSetting<bool, false> UseBlockReader;
+ NCommon::TConfSetting<ESpillingEngine, false> SpillingEngine;
// This options will be passed to executor_actor and worker_actor
template <typename TProtoConfig>
diff --git a/ydb/library/yql/providers/dq/config/config.proto b/ydb/library/yql/providers/dq/config/config.proto
index b5b66eee66..ef6a0fd7e0 100644
--- a/ydb/library/yql/providers/dq/config/config.proto
+++ b/ydb/library/yql/providers/dq/config/config.proto
@@ -66,6 +66,18 @@ message TDqConfig {
repeated TAttr Setting = 1;
}
+ message TSpillingSettings {
+ optional string Root = 1 [default = "./spilling"];
+
+ optional uint64 MaxTotalSize = 2;
+ optional uint64 MaxFileSize = 3;
+ optional uint64 MaxFilePartSize = 4;
+
+ optional uint32 IoThreadPoolWorkersCount = 5 [default = 2];
+ optional uint32 IoThreadPoolQueueSize = 6 [default = 1000];
+ optional bool CleanupOnShutdown = 7;
+ }
+
message TYtBackend {
optional string ClusterName = 1 [default = "hume"];
optional string User = 2; // default -- current user name
@@ -103,6 +115,7 @@ message TDqConfig {
optional TSolomon Solomon = 30;
optional bool CanUseComputeActor = 32 [default = false];
optional bool EnforceJobUtc = 33;
+ optional TSpillingSettings SpillingSettings = 38;
}
repeated TYtBackend YtBackends = 5;
diff --git a/ydb/library/yql/providers/dq/local_gateway/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/providers/dq/local_gateway/CMakeLists.darwin-x86_64.txt
index efe17d868c..25fa353881 100644
--- a/ydb/library/yql/providers/dq/local_gateway/CMakeLists.darwin-x86_64.txt
+++ b/ydb/library/yql/providers/dq/local_gateway/CMakeLists.darwin-x86_64.txt
@@ -16,6 +16,7 @@ target_link_libraries(providers-dq-local_gateway PUBLIC
yutil
library-yql-utils
dq-actors-compute
+ dq-actors-spilling
providers-dq-provider
dq-api-protos
providers-dq-task_runner
diff --git a/ydb/library/yql/providers/dq/local_gateway/CMakeLists.linux-aarch64.txt b/ydb/library/yql/providers/dq/local_gateway/CMakeLists.linux-aarch64.txt
index 88a991c531..3e3131a9b8 100644
--- a/ydb/library/yql/providers/dq/local_gateway/CMakeLists.linux-aarch64.txt
+++ b/ydb/library/yql/providers/dq/local_gateway/CMakeLists.linux-aarch64.txt
@@ -17,6 +17,7 @@ target_link_libraries(providers-dq-local_gateway PUBLIC
yutil
library-yql-utils
dq-actors-compute
+ dq-actors-spilling
providers-dq-provider
dq-api-protos
providers-dq-task_runner
diff --git a/ydb/library/yql/providers/dq/local_gateway/CMakeLists.linux-x86_64.txt b/ydb/library/yql/providers/dq/local_gateway/CMakeLists.linux-x86_64.txt
index 88a991c531..3e3131a9b8 100644
--- a/ydb/library/yql/providers/dq/local_gateway/CMakeLists.linux-x86_64.txt
+++ b/ydb/library/yql/providers/dq/local_gateway/CMakeLists.linux-x86_64.txt
@@ -17,6 +17,7 @@ target_link_libraries(providers-dq-local_gateway PUBLIC
yutil
library-yql-utils
dq-actors-compute
+ dq-actors-spilling
providers-dq-provider
dq-api-protos
providers-dq-task_runner
diff --git a/ydb/library/yql/providers/dq/local_gateway/ya.make b/ydb/library/yql/providers/dq/local_gateway/ya.make
index 4b3afaeacb..0eabb17f7f 100644
--- a/ydb/library/yql/providers/dq/local_gateway/ya.make
+++ b/ydb/library/yql/providers/dq/local_gateway/ya.make
@@ -9,6 +9,7 @@ SRCS(
PEERDIR(
ydb/library/yql/utils
ydb/library/yql/dq/actors/compute
+ ydb/library/yql/dq/actors/spilling
ydb/library/yql/providers/dq/provider
ydb/library/yql/providers/dq/api/protos
ydb/library/yql/providers/dq/task_runner
diff --git a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp
index fd1f07185f..57f9144cee 100644
--- a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp
+++ b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp
@@ -7,8 +7,8 @@
#include <ydb/library/yql/providers/dq/service/service_node.h>
#include <ydb/library/yql/providers/dq/stats_collector/pool_stats_collector.h>
-
#include <ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h>
+#include <ydb/library/yql/dq/actors/spilling/spilling_file.h>
#include <ydb/library/yql/utils/range_walker.h>
#include <ydb/library/yql/utils/bind_in_range.h>
@@ -17,6 +17,7 @@
#include <util/system/env.h>
#include <util/generic/size_literals.h>
+#include <util/folder/dirut.h>
namespace NYql {
@@ -29,7 +30,8 @@ public:
TTaskTransformFactory taskTransformFactory, const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories, NBus::TBindResult interconnectPort, NBus::TBindResult grpcPort,
NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, int threads,
IMetricsRegistryPtr metricsRegistry,
- const std::function<IActor*(void)>& metricsPusherFactory)
+ const std::function<IActor*(void)>& metricsPusherFactory,
+ bool withSpilling)
: MetricsRegistry(metricsRegistry
? metricsRegistry
: CreateMetricsRegistry(GetSensorsGroupFor(NSensorComponent::kDq))
@@ -57,6 +59,7 @@ public:
threads,
MetricsRegistry);
+ auto lwmGroup = MetricsRegistry->GetSensors()->GetSubgroup("component", "lwm");
auto patternCache = std::make_shared<NKikimr::NMiniKQL::TComputationPatternLRUCache>(200_MB);
NDqs::TLocalWorkerManagerOptions lwmOptions;
lwmOptions.Factory = NTaskRunnerProxy::CreateFactory(functionRegistry, compFactory, taskTransformFactory, patternCache, true);
@@ -68,14 +71,27 @@ public:
{
return factory->Get(task);
});
- lwmOptions.Counters = NDqs::TWorkerManagerCounters(MetricsRegistry->GetSensors()->GetSubgroup("component", "lwm"));
+ lwmOptions.Counters = NDqs::TWorkerManagerCounters(lwmGroup);
lwmOptions.DropTaskCountersOnFinish = false;
+ lwmOptions.UseSpilling = withSpilling;
auto resman = NDqs::CreateLocalWorkerManager(lwmOptions);
ServiceNode->AddLocalService(
MakeWorkerManagerActorID(nodeId),
TActorSetupCmd(resman, TMailboxType::Simple, 0));
+ if (withSpilling) {
+ char tempDir[MAX_PATH];
+ if (MakeTempDir(tempDir, nullptr) != 0)
+ ythrow yexception() << "LocalServiceHolder: Can't create temporary directory " << tempDir;
+
+ auto spillingActor = NDq::CreateDqLocalFileSpillingService(NDq::TFileSpillingServiceConfig{.Root = tempDir, .CleanupOnShutdown = true}, MakeIntrusive<NDq::TSpillingCounters>(lwmGroup));
+
+ ServiceNode->AddLocalService(
+ NDq::MakeDqLocalFileSpillingServiceID(nodeId),
+ TActorSetupCmd(spillingActor, TMailboxType::Simple, 0));
+ }
+
auto statsCollector = CreateStatsCollector(1, *ServiceNode->GetSetup(), MetricsRegistry->GetSensors());
auto actorSystem = ServiceNode->StartActorSystem();
@@ -233,7 +249,7 @@ THolder<TLocalServiceHolder> CreateLocalServiceHolder(const NKikimr::NMiniKQL::I
NBus::TBindResult interconnectPort, NBus::TBindResult grpcPort,
NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, int threads,
IMetricsRegistryPtr metricsRegistry,
- const std::function<IActor*(void)>& metricsPusherFactory)
+ const std::function<IActor*(void)>& metricsPusherFactory, bool withSpilling)
{
return MakeHolder<TLocalServiceHolder>(functionRegistry,
compFactory,
@@ -244,13 +260,14 @@ THolder<TLocalServiceHolder> CreateLocalServiceHolder(const NKikimr::NMiniKQL::I
std::move(asyncIoFactory),
threads,
metricsRegistry,
- metricsPusherFactory);
+ metricsPusherFactory,
+ withSpilling);
}
TIntrusivePtr<IDqGateway> CreateLocalDqGateway(const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
NKikimr::NMiniKQL::TComputationNodeFactory compFactory,
TTaskTransformFactory taskTransformFactory, const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories,
- NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, int threads,
+ bool withSpilling, NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, int threads,
IMetricsRegistryPtr metricsRegistry,
const std::function<IActor*(void)>& metricsPusherFactory)
{
@@ -270,7 +287,8 @@ TIntrusivePtr<IDqGateway> CreateLocalDqGateway(const NKikimr::NMiniKQL::IFunctio
std::move(asyncIoFactory),
threads,
metricsRegistry,
- metricsPusherFactory),
+ metricsPusherFactory,
+ withSpilling),
CreateDqGateway("[::1]", grpcPort.Addr.GetPort()));
}
diff --git a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.h b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.h
index b558fbeb7d..91329aab8d 100644
--- a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.h
+++ b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.h
@@ -14,6 +14,7 @@ namespace NYql {
TIntrusivePtr<IDqGateway> CreateLocalDqGateway(const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
NKikimr::NMiniKQL::TComputationNodeFactory compFactory,
TTaskTransformFactory taskTransformFactory, const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories,
+ bool withSpilling,
NDq::IDqAsyncIoFactory::TPtr = nullptr, int threads = 16,
IMetricsRegistryPtr metricsRegistry = {},
const std::function<NActors::IActor*(void)>& metricsPusherFactory = {});
diff --git a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp
index ff9be8a613..cb43eab7a9 100644
--- a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp
+++ b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp
@@ -143,7 +143,8 @@ public:
{
auto guard = runner->BindAllocator(State->Settings->MemoryLimit.Get().GetOrElse(0));
- runner->Prepare(runnerSettings, limits);
+ NDq::TDqTaskRunnerExecutionContextDefault execCtx;
+ runner->Prepare(runnerSettings, limits, execCtx);
}
TVector<NDq::TDqSerializedBatch> rows;
@@ -245,7 +246,12 @@ private:
} else {
mode = NDq::EChannelMode::CHANNEL_WIDE_BLOCK;
}
- pipeline->Add(NDq::CreateDqBuildPhyStagesTransformer(false, *pipeline->GetTypeAnnotationContext(), mode), "BuildPhy");
+ pipeline->Add(
+ NDq::CreateDqBuildPhyStagesTransformer(
+ State_->Settings->SpillingEngine.Get().GetOrElse(TDqSettings::TDefault::SpillingEngine) != TDqSettings::ESpillingEngine::Disable,
+ *pipeline->GetTypeAnnotationContext(), mode
+ ),
+ "BuildPhy");
pipeline->Add(NDqs::CreateDqsRewritePhyCallablesTransformer(*pipeline->GetTypeAnnotationContext()), "RewritePhyCallables");
}
diff --git a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp
index bf68169309..561e8fcdfb 100644
--- a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp
+++ b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp
@@ -226,7 +226,7 @@ public:
}
_exit(127);
}
-
+
NDqProto::TMeteringStatsResponse GetMeteringStats() {
NDqProto::TMeteringStatsResponse resp;
auto* stats = Runner->GetMeteringStats();
@@ -767,7 +767,9 @@ public:
limits.ChannelBufferSize = DqConfiguration->ChannelBufferSize.Get().GetOrElse(TDqSettings::TDefault::ChannelBufferSize);
limits.OutputChunkMaxSize = DqConfiguration->OutputChunkMaxSize.Get().GetOrElse(TDqSettings::TDefault::OutputChunkMaxSize);
limits.ChunkSizeLimit = DqConfiguration->ChunkSizeLimit.Get().GetOrElse(TDqSettings::TDefault::ChunkSizeLimit);
- Runner->Prepare(task, limits);
+
+ NDq::TDqTaskRunnerExecutionContextDefault execCtx;
+ Runner->Prepare(task, limits, execCtx);
});
result.Save(&output);
diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_local.cpp b/ydb/library/yql/providers/dq/task_runner/tasks_runner_local.cpp
index a842a362ed..51e888dd6c 100644
--- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_local.cpp
+++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_local.cpp
@@ -116,7 +116,8 @@ public:
NYql::NDqProto::TPrepareResponse Prepare() override {
NYql::NDqProto::TPrepareResponse ret;
- Runner->Prepare(Task, DefaultMemoryLimits());
+ TDqTaskRunnerExecutionContextDefault ctx;
+ Runner->Prepare(Task, DefaultMemoryLimits(), ctx);
return ret;
}
diff --git a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp
index 872bc8258c..9b7440ebf4 100644
--- a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp
+++ b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp
@@ -302,7 +302,8 @@ private:
Options.RuntimeData,
traceId,
Options.TaskRunnerActorFactory,
- Options.AsyncIoFactory));
+ Options.AsyncIoFactory,
+ Options.UseSpilling));
}
allocationInfo.WorkerActors.emplace_back(RegisterChild(
actor.Release(), createComputeActor ? NYql::NDq::TEvDq::TEvAbortExecution::Unavailable("Aborted by LWM").Release() : nullptr
diff --git a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h
index 95ecae911e..3b36757089 100644
--- a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h
+++ b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h
@@ -39,6 +39,7 @@ namespace NYql::NDqs {
NActors::TActorId QuoterServiceActorId;
bool ComputeActorOwnsCounters = false;
bool DropTaskCountersOnFinish = true;
+ bool UseSpilling = false;
};
NActors::IActor* CreateLocalWorkerManager(const TLocalWorkerManagerOptions& options);
diff --git a/ydb/library/yql/tools/dq/worker_node/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/tools/dq/worker_node/CMakeLists.darwin-x86_64.txt
index 7a8db69ea2..31b7fb97c1 100644
--- a/ydb/library/yql/tools/dq/worker_node/CMakeLists.darwin-x86_64.txt
+++ b/ydb/library/yql/tools/dq/worker_node/CMakeLists.darwin-x86_64.txt
@@ -19,6 +19,7 @@ target_link_libraries(worker_node PUBLIC
library-cpp-getopt
cpp-mapreduce-client
dq-actors-compute
+ dq-actors-spilling
yql-dq-comp_nodes
dq-integration-transform
yql-dq-transform
diff --git a/ydb/library/yql/tools/dq/worker_node/CMakeLists.linux-aarch64.txt b/ydb/library/yql/tools/dq/worker_node/CMakeLists.linux-aarch64.txt
index 11e2b042f9..ef56db89b7 100644
--- a/ydb/library/yql/tools/dq/worker_node/CMakeLists.linux-aarch64.txt
+++ b/ydb/library/yql/tools/dq/worker_node/CMakeLists.linux-aarch64.txt
@@ -19,6 +19,7 @@ target_link_libraries(worker_node PUBLIC
library-cpp-getopt
cpp-mapreduce-client
dq-actors-compute
+ dq-actors-spilling
yql-dq-comp_nodes
dq-integration-transform
yql-dq-transform
diff --git a/ydb/library/yql/tools/dq/worker_node/CMakeLists.linux-x86_64.txt b/ydb/library/yql/tools/dq/worker_node/CMakeLists.linux-x86_64.txt
index 17c842dbb4..3739d1ca70 100644
--- a/ydb/library/yql/tools/dq/worker_node/CMakeLists.linux-x86_64.txt
+++ b/ydb/library/yql/tools/dq/worker_node/CMakeLists.linux-x86_64.txt
@@ -20,6 +20,7 @@ target_link_libraries(worker_node PUBLIC
library-cpp-getopt
cpp-mapreduce-client
dq-actors-compute
+ dq-actors-spilling
yql-dq-comp_nodes
dq-integration-transform
yql-dq-transform
diff --git a/ydb/library/yql/tools/dq/worker_node/main.cpp b/ydb/library/yql/tools/dq/worker_node/main.cpp
index c8d20bed06..1fd2c83144 100644
--- a/ydb/library/yql/tools/dq/worker_node/main.cpp
+++ b/ydb/library/yql/tools/dq/worker_node/main.cpp
@@ -13,6 +13,7 @@
#include <library/cpp/digest/md5/md5.h>
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h>
+#include <ydb/library/yql/dq/actors/spilling/spilling_file.h>
#include <ydb/library/yql/providers/dq/service/interconnect_helpers.h>
#include <ydb/library/yql/providers/dq/runtime/file_cache.h>
@@ -60,6 +61,7 @@
#include <util/system/env.h>
#include <util/system/getpid.h>
#include <util/system/fs.h>
+#include <util/folder/dirut.h>
constexpr ui32 THREAD_PER_NODE = 8;
@@ -403,11 +405,24 @@ int main(int argc, char** argv) {
})
: NTaskRunnerActor::CreateTaskRunnerActorFactory(lwmOptions.Factory, lwmOptions.TaskRunnerInvokerFactory);
lwmOptions.ComputeActorOwnsCounters = true;
+ lwmOptions.UseSpilling = true;
auto resman = NDqs::CreateLocalWorkerManager(lwmOptions);
auto workerManagerActorId = actorSystem->Register(resman);
actorSystem->RegisterLocalService(MakeWorkerManagerActorID(nodeId), workerManagerActorId);
+ auto spillingActor = actorSystem->Register(
+ NDq::CreateDqLocalFileSpillingService(
+ NDq::TFileSpillingServiceConfig{
+ .Root = "./spilling",
+ .CleanupOnShutdown = true
+ },
+ MakeIntrusive<NDq::TSpillingCounters>(dqSensors)
+ )
+ );
+
+ actorSystem->RegisterLocalService(NDq::MakeDqLocalFileSpillingServiceID(nodeId), spillingActor);
+
auto endFuture = ShouldContinue.GetFuture();
signal(SIGINT, &OnTerminate);
diff --git a/ydb/library/yql/tools/dq/worker_node/ya.make b/ydb/library/yql/tools/dq/worker_node/ya.make
index 216572850a..51b9f8babc 100644
--- a/ydb/library/yql/tools/dq/worker_node/ya.make
+++ b/ydb/library/yql/tools/dq/worker_node/ya.make
@@ -6,6 +6,7 @@ IF (NOT OS_WINDOWS)
library/cpp/getopt
yt/cpp/mapreduce/client
ydb/library/yql/dq/actors/compute
+ ydb/library/yql/dq/actors/spilling
ydb/library/yql/dq/comp_nodes
ydb/library/yql/dq/integration/transform
ydb/library/yql/dq/transform
diff --git a/ydb/library/yql/tools/dqrun/dqrun.cpp b/ydb/library/yql/tools/dqrun/dqrun.cpp
index e730c97dec..e674e5abee 100644
--- a/ydb/library/yql/tools/dqrun/dqrun.cpp
+++ b/ydb/library/yql/tools/dqrun/dqrun.cpp
@@ -787,7 +787,7 @@ int RunMain(int argc, const char* argv[])
size_t maxRetries = gatewaysConfig.HasHttpGateway() && gatewaysConfig.GetHttpGateway().HasMaxRetries() ? gatewaysConfig.GetHttpGateway().GetMaxRetries() : 2;
dqGateway = CreateLocalDqGateway(funcRegistry.Get(), dqCompFactory, dqTaskTransformFactory, dqTaskPreprocessorFactories,
- CreateAsyncIoFactory(driver, httpGateway, genericClient, requestTimeout, maxRetries), threads,
+ false/*spilling*/, CreateAsyncIoFactory(driver, httpGateway, genericClient, requestTimeout, maxRetries), threads,
metricsRegistry,
metricsPusherFactory);
}