diff options
author | udovichenko-r <rvu@ydb.tech> | 2023-10-30 12:00:48 +0300 |
---|---|---|
committer | udovichenko-r <rvu@ydb.tech> | 2023-10-30 12:35:14 +0300 |
commit | 733053293601705b5762becc1393afbfb75064e8 (patch) | |
tree | f16847f54b2108d36db9bba410e5176e2026b395 | |
parent | cb5ce0bf4ff3f14eea1992635480f5f04feee6dd (diff) | |
download | ydb-733053293601705b5762becc1393afbfb75064e8.tar.gz |
[dq] Move spilling to yql/dq/actors
YQL-16013
103 files changed, 1779 insertions, 1045 deletions
diff --git a/.mapping.json b/.mapping.json index 44b8d55c91..7de9cea3ec 100644 --- a/.mapping.json +++ b/.mapping.json @@ -7010,6 +7010,16 @@ "ydb/library/yql/dq/actors/protos/CMakeLists.linux-x86_64.txt":"", "ydb/library/yql/dq/actors/protos/CMakeLists.txt":"", "ydb/library/yql/dq/actors/protos/CMakeLists.windows-x86_64.txt":"", + "ydb/library/yql/dq/actors/spilling/CMakeLists.darwin-x86_64.txt":"", + "ydb/library/yql/dq/actors/spilling/CMakeLists.linux-aarch64.txt":"", + "ydb/library/yql/dq/actors/spilling/CMakeLists.linux-x86_64.txt":"", + "ydb/library/yql/dq/actors/spilling/CMakeLists.txt":"", + "ydb/library/yql/dq/actors/spilling/CMakeLists.windows-x86_64.txt":"", + "ydb/library/yql/dq/actors/spilling/ut/CMakeLists.darwin-x86_64.txt":"", + "ydb/library/yql/dq/actors/spilling/ut/CMakeLists.linux-aarch64.txt":"", + "ydb/library/yql/dq/actors/spilling/ut/CMakeLists.linux-x86_64.txt":"", + "ydb/library/yql/dq/actors/spilling/ut/CMakeLists.txt":"", + "ydb/library/yql/dq/actors/spilling/ut/CMakeLists.windows-x86_64.txt":"", "ydb/library/yql/dq/actors/task_runner/CMakeLists.darwin-x86_64.txt":"", "ydb/library/yql/dq/actors/task_runner/CMakeLists.linux-aarch64.txt":"", "ydb/library/yql/dq/actors/task_runner/CMakeLists.linux-x86_64.txt":"", diff --git a/ydb/core/kqp/common/simple/services.h b/ydb/core/kqp/common/simple/services.h index 80002d604f..ebee571eb0 100644 --- a/ydb/core/kqp/common/simple/services.h +++ b/ydb/core/kqp/common/simple/services.h @@ -31,11 +31,6 @@ inline NActors::TActorId MakeKqpNodeServiceID(ui32 nodeId) { return NActors::TActorId(nodeId, TStringBuf(name, 12)); } -inline NActors::TActorId MakeKqpLocalFileSpillingServiceID(ui32 nodeId) { - const char name[12] = "kqp_lfspill"; - return NActors::TActorId(nodeId, TStringBuf(name, 12)); -} - inline NActors::TActorId MakeKqpCompileComputationPatternServiceID(ui32 nodeId) { const char name[12] = "kqp_comp_cp"; return NActors::TActorId(nodeId, TStringBuf(name, 12)); diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h b/ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h index a555f7c286..db7f86c85d 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h +++ b/ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h @@ -2,7 +2,7 @@ #include "kqp_compute_actor.h" -#include <ydb/core/kqp/runtime/kqp_channel_storage.h> +#include <ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.h> #include <ydb/core/kqp/runtime/kqp_tasks_runner.h> @@ -12,14 +12,12 @@ namespace NKqp { using namespace NYql; using namespace NYql::NDq; -class TKqpTaskRunnerExecutionContext : public IDqTaskRunnerExecutionContext { +class TKqpTaskRunnerExecutionContext : public TDqTaskRunnerExecutionContext { public: - TKqpTaskRunnerExecutionContext(ui64 txId, bool withSpilling, IDqChannelStorage::TWakeUpCallback&& wakeUp, - const TActorContext& ctx) - : TxId(txId) - , WakeUp(std::move(wakeUp)) - , Ctx(ctx) - , WithSpilling(withSpilling) {} + TKqpTaskRunnerExecutionContext(ui64 txId, bool withSpilling, IDqChannelStorage::TWakeUpCallback&& wakeUp, const TActorContext& ctx) + : TDqTaskRunnerExecutionContext(txId, withSpilling, std::move(wakeUp), ctx) + { + } IDqOutputConsumer::TPtr CreateOutputConsumer(const NDqProto::TTaskOutput& outputDesc, const NMiniKQL::TType* type, NUdf::IApplyContext* applyCtx, const NMiniKQL::TTypeEnvironment& typeEnv, @@ -28,20 +26,6 @@ public: { return KqpBuildOutputConsumer(outputDesc, type, applyCtx, typeEnv, holderFactory, std::move(outputs)); } - - IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId) const override { - if (WithSpilling) { - return CreateKqpChannelStorage(TxId, channelId, WakeUp, Ctx); - } else { - return nullptr; - } - } - -private: - const ui64 TxId; - const IDqChannelStorage::TWakeUpCallback WakeUp; - const TActorContext& Ctx; - const bool WithSpilling; }; } // namespace NKqp diff --git a/ydb/core/kqp/counters/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/counters/CMakeLists.darwin-x86_64.txt index 8f457ba4da..b5a08b2bb1 100644 --- a/ydb/core/kqp/counters/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/kqp/counters/CMakeLists.darwin-x86_64.txt @@ -14,6 +14,8 @@ target_link_libraries(core-kqp-counters PUBLIC ydb-core-base ydb-core-protos core-sys_view-service + dq-actors-spilling + library-yql-minikql ) target_sources(core-kqp-counters PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/counters/kqp_counters.cpp diff --git a/ydb/core/kqp/counters/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/counters/CMakeLists.linux-aarch64.txt index f0dbbe779d..3a231a5261 100644 --- a/ydb/core/kqp/counters/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kqp/counters/CMakeLists.linux-aarch64.txt @@ -15,6 +15,8 @@ target_link_libraries(core-kqp-counters PUBLIC ydb-core-base ydb-core-protos core-sys_view-service + dq-actors-spilling + library-yql-minikql ) target_sources(core-kqp-counters PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/counters/kqp_counters.cpp diff --git a/ydb/core/kqp/counters/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/counters/CMakeLists.linux-x86_64.txt index f0dbbe779d..3a231a5261 100644 --- a/ydb/core/kqp/counters/CMakeLists.linux-x86_64.txt +++ b/ydb/core/kqp/counters/CMakeLists.linux-x86_64.txt @@ -15,6 +15,8 @@ target_link_libraries(core-kqp-counters PUBLIC ydb-core-base ydb-core-protos core-sys_view-service + dq-actors-spilling + library-yql-minikql ) target_sources(core-kqp-counters PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/counters/kqp_counters.cpp diff --git a/ydb/core/kqp/counters/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/counters/CMakeLists.windows-x86_64.txt index 8f457ba4da..b5a08b2bb1 100644 --- a/ydb/core/kqp/counters/CMakeLists.windows-x86_64.txt +++ b/ydb/core/kqp/counters/CMakeLists.windows-x86_64.txt @@ -14,6 +14,8 @@ target_link_libraries(core-kqp-counters PUBLIC ydb-core-base ydb-core-protos core-sys_view-service + dq-actors-spilling + library-yql-minikql ) target_sources(core-kqp-counters PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/counters/kqp_counters.cpp diff --git a/ydb/core/kqp/counters/kqp_counters.cpp b/ydb/core/kqp/counters/kqp_counters.cpp index 50fedabfa4..d45e4facc1 100644 --- a/ydb/core/kqp/counters/kqp_counters.cpp +++ b/ydb/core/kqp/counters/kqp_counters.cpp @@ -724,7 +724,8 @@ void TKqpCounters::UpdateTxCounters(const TKqpTransactionInfo& txInfo, } TKqpCounters::TKqpCounters(const ::NMonitoring::TDynamicCounterPtr& counters, const TActorContext* ctx) - : AllocCounters(counters, "kqp") + : NYql::NDq::TSpillingCounters(counters) + , AllocCounters(counters, "kqp") { Counters = counters; KqpGroup = GetServiceCounters(counters, "kqp"); @@ -786,15 +787,6 @@ TKqpCounters::TKqpCounters(const ::NMonitoring::TDynamicCounterPtr& counters, co RmMaxSnapshotLatency = KqpGroup->GetCounter("RM/MaxSnapshotLatency", false); RmNodeNumberInSnapshot = KqpGroup->GetCounter("RM/NodeNumberInSnapshot", false); - /* Spilling */ - SpillingWriteBlobs = KqpGroup->GetCounter("Spilling/WriteBlobs", true); - SpillingReadBlobs = KqpGroup->GetCounter("Spilling/ReadBlobs", true); - SpillingStoredBlobs = KqpGroup->GetCounter("Spilling/StoredBlobs", false); - SpillingTotalSpaceUsed = KqpGroup->GetCounter("Spilling/TotalSpaceUsed", false); - SpillingTooBigFileErrors = KqpGroup->GetCounter("Spilling/TooBigFileErrors", true); - SpillingNoSpaceErrors = KqpGroup->GetCounter("Spilling/NoSpaceErrors", true); - SpillingIoErrors = KqpGroup->GetCounter("Spilling/IoErrors", true); - /* Scan queries */ ScanQueryShardDisconnect = KqpGroup->GetCounter("ScanQuery/ShardDisconnect", true); ScanQueryShardResolve = KqpGroup->GetCounter("ScanQuery/ShardResolve", true); diff --git a/ydb/core/kqp/counters/kqp_counters.h b/ydb/core/kqp/counters/kqp_counters.h index 8a8e9da788..f629743445 100644 --- a/ydb/core/kqp/counters/kqp_counters.h +++ b/ydb/core/kqp/counters/kqp_counters.h @@ -12,6 +12,7 @@ #include <ydb/core/tx/tx_proxy/mon.h> #include <ydb/library/yql/minikql/aligned_page_pool.h> +#include <ydb/library/yql/dq/actors/spilling/spilling_counters.h> #include <util/system/spinlock.h> @@ -246,7 +247,7 @@ private: using TKqpDbCountersPtr = TIntrusivePtr<TKqpDbCounters>; -class TKqpCounters : public TThrRefBase, public TKqpCountersBase { +class TKqpCounters : public TKqpCountersBase, public NYql::NDq::TSpillingCounters { private: struct TTxByKindCounters { NMonitoring::THistogramPtr TotalDuration; @@ -378,15 +379,6 @@ public: ::NMonitoring::TDynamicCounters::TCounterPtr RmMaxSnapshotLatency; ::NMonitoring::TDynamicCounters::TCounterPtr RmNodeNumberInSnapshot; - // Spilling counters - ::NMonitoring::TDynamicCounters::TCounterPtr SpillingWriteBlobs; - ::NMonitoring::TDynamicCounters::TCounterPtr SpillingReadBlobs; - ::NMonitoring::TDynamicCounters::TCounterPtr SpillingStoredBlobs; - ::NMonitoring::TDynamicCounters::TCounterPtr SpillingTotalSpaceUsed; - ::NMonitoring::TDynamicCounters::TCounterPtr SpillingTooBigFileErrors; - ::NMonitoring::TDynamicCounters::TCounterPtr SpillingNoSpaceErrors; - ::NMonitoring::TDynamicCounters::TCounterPtr SpillingIoErrors; - // Scan queries counters ::NMonitoring::TDynamicCounters::TCounterPtr ScanQueryShardDisconnect; ::NMonitoring::TDynamicCounters::TCounterPtr ScanQueryShardResolve; diff --git a/ydb/core/kqp/counters/ya.make b/ydb/core/kqp/counters/ya.make index dd0786c24f..4ce62cb5e1 100644 --- a/ydb/core/kqp/counters/ya.make +++ b/ydb/core/kqp/counters/ya.make @@ -10,6 +10,8 @@ PEERDIR( ydb/core/base ydb/core/protos ydb/core/sys_view/service + ydb/library/yql/dq/actors/spilling + ydb/library/yql/minikql ) END() diff --git a/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp b/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp index 04aabbd48c..39f224f736 100644 --- a/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp @@ -66,7 +66,7 @@ TDqTaskRunnerMemoryLimits CreateTaskRunnerMemoryLimits() { return memoryLimits; } -TDqTaskRunnerExecutionContext CreateTaskRunnerExecutionContext() { +TDqTaskRunnerExecutionContextDefault CreateTaskRunnerExecutionContext() { return {}; } diff --git a/ydb/core/kqp/proxy_service/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/proxy_service/CMakeLists.darwin-x86_64.txt index f8582de4c3..449150439b 100644 --- a/ydb/core/kqp/proxy_service/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/kqp/proxy_service/CMakeLists.darwin-x86_64.txt @@ -32,10 +32,12 @@ target_link_libraries(core-kqp-proxy_service PUBLIC core-tx-tx_proxy core-tx-scheme_cache core-tx-schemeshard + ydb-core-mon ydb-library-query_actor providers-common-http_gateway providers-common-proto yql-public-issue + dq-actors-spilling api-protos public-lib-operation_id public-lib-scheme_types diff --git a/ydb/core/kqp/proxy_service/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/proxy_service/CMakeLists.linux-aarch64.txt index 8469df0747..1b2aa5977e 100644 --- a/ydb/core/kqp/proxy_service/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kqp/proxy_service/CMakeLists.linux-aarch64.txt @@ -33,10 +33,12 @@ target_link_libraries(core-kqp-proxy_service PUBLIC core-tx-tx_proxy core-tx-scheme_cache core-tx-schemeshard + ydb-core-mon ydb-library-query_actor providers-common-http_gateway providers-common-proto yql-public-issue + dq-actors-spilling api-protos public-lib-operation_id public-lib-scheme_types diff --git a/ydb/core/kqp/proxy_service/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/proxy_service/CMakeLists.linux-x86_64.txt index 8469df0747..1b2aa5977e 100644 --- a/ydb/core/kqp/proxy_service/CMakeLists.linux-x86_64.txt +++ b/ydb/core/kqp/proxy_service/CMakeLists.linux-x86_64.txt @@ -33,10 +33,12 @@ target_link_libraries(core-kqp-proxy_service PUBLIC core-tx-tx_proxy core-tx-scheme_cache core-tx-schemeshard + ydb-core-mon ydb-library-query_actor providers-common-http_gateway providers-common-proto yql-public-issue + dq-actors-spilling api-protos public-lib-operation_id public-lib-scheme_types diff --git a/ydb/core/kqp/proxy_service/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/proxy_service/CMakeLists.windows-x86_64.txt index f8582de4c3..449150439b 100644 --- a/ydb/core/kqp/proxy_service/CMakeLists.windows-x86_64.txt +++ b/ydb/core/kqp/proxy_service/CMakeLists.windows-x86_64.txt @@ -32,10 +32,12 @@ target_link_libraries(core-kqp-proxy_service PUBLIC core-tx-tx_proxy core-tx-scheme_cache core-tx-schemeshard + ydb-core-mon ydb-library-query_actor providers-common-http_gateway providers-common-proto yql-public-issue + dq-actors-spilling api-protos public-lib-operation_id public-lib-scheme_types diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp index dfac53d9d2..1d55ec7194 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp @@ -15,13 +15,14 @@ #include <ydb/core/kqp/compile_service/kqp_compile_service.h> #include <ydb/core/kqp/session_actor/kqp_worker_common.h> #include <ydb/core/kqp/node_service/kqp_node_service.h> -#include <ydb/core/kqp/runtime/kqp_spilling_file.h> -#include <ydb/core/kqp/runtime/kqp_spilling.h> +#include <ydb/library/yql/dq/actors/spilling/spilling_file.h> +#include <ydb/library/yql/dq/actors/spilling/spilling.h> #include <ydb/core/actorlib_impl/long_timer.h> #include <ydb/public/lib/operation_id/operation_id.h> #include <ydb/core/node_whiteboard/node_whiteboard.h> #include <ydb/core/ydb_convert/ydb_convert.h> #include <ydb/core/kqp/compute_actor/kqp_compute_actor.h> +#include <ydb/core/mon/mon.h> #include <ydb/library/yql/utils/actor_log/log.h> #include <ydb/library/yql/core/services/mounts/yql_mounts.h> @@ -211,9 +212,25 @@ public: WhiteBoardService = NNodeWhiteboard::MakeNodeWhiteboardServiceId(SelfId().NodeId()); if (auto& cfg = TableServiceConfig.GetSpillingServiceConfig().GetLocalFileConfig(); cfg.GetEnable()) { - SpillingService = TlsActivationContext->ExecutorThread.RegisterActor(CreateKqpLocalFileSpillingService(cfg, Counters)); + SpillingService = TlsActivationContext->ExecutorThread.RegisterActor(NYql::NDq::CreateDqLocalFileSpillingService( + NYql::NDq::TFileSpillingServiceConfig{ + .Root = cfg.GetRoot(), + .MaxTotalSize = cfg.GetMaxTotalSize(), + .MaxFileSize = cfg.GetMaxFileSize(), + .MaxFilePartSize = cfg.GetMaxFilePartSize(), + .IoThreadPoolWorkersCount = cfg.GetIoThreadPool().GetWorkersCount(), + .IoThreadPoolQueueSize = cfg.GetIoThreadPool().GetQueueSize(), + .CleanupOnShutdown = false + }, + Counters)); TlsActivationContext->ExecutorThread.ActorSystem->RegisterLocalService( - MakeKqpLocalFileSpillingServiceID(SelfId().NodeId()), SpillingService); + NYql::NDq::MakeDqLocalFileSpillingServiceID(SelfId().NodeId()), SpillingService); + + if (NActors::TMon* mon = AppData()->Mon) { + NMonitoring::TIndexMonPage* actorsMonPage = mon->RegisterIndexPage("actors", "Actors"); + mon->RegisterActorPage(actorsMonPage, "kqp_spilling_file", "KQP Local File Spilling Service", false, + TlsActivationContext->ExecutorThread.ActorSystem, SpillingService); + } } // Create compile service @@ -1580,7 +1597,7 @@ private: IActor* CreateKqpProxyService(const NKikimrConfig::TLogConfig& logConfig, const NKikimrConfig::TTableServiceConfig& tableServiceConfig, - const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, + const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig, TVector<NKikimrKqp::TKqpSetting>&& settings, std::shared_ptr<IQueryReplayBackendFactory> queryReplayFactory, diff --git a/ydb/core/kqp/proxy_service/ya.make b/ydb/core/kqp/proxy_service/ya.make index cef6cbd664..e93003bae8 100644 --- a/ydb/core/kqp/proxy_service/ya.make +++ b/ydb/core/kqp/proxy_service/ya.make @@ -24,10 +24,12 @@ PEERDIR( ydb/core/tx/tx_proxy ydb/core/tx/scheme_cache ydb/core/tx/schemeshard + ydb/core/mon ydb/library/query_actor ydb/library/yql/providers/common/http_gateway ydb/library/yql/providers/common/proto ydb/library/yql/public/issue + ydb/library/yql/dq/actors/spilling ydb/public/api/protos ydb/public/lib/operation_id ydb/public/lib/scheme_types diff --git a/ydb/core/kqp/runtime/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/runtime/CMakeLists.darwin-x86_64.txt index e8854f07a9..c297a48ab8 100644 --- a/ydb/core/kqp/runtime/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/kqp/runtime/CMakeLists.darwin-x86_64.txt @@ -7,12 +7,6 @@ add_subdirectory(ut) -get_built_tool_path( - TOOL_enum_parser_bin - TOOL_enum_parser_dependency - tools/enum_parser/enum_parser - enum_parser -) add_library(core-kqp-runtime) target_compile_options(core-kqp-runtime PRIVATE @@ -34,13 +28,12 @@ target_link_libraries(core-kqp-runtime PUBLIC minikql-comp_nodes-llvm library-yql-utils dq-actors-protos + dq-actors-spilling yql-dq-common yql-dq-runtime cpp-threading-hot_swap - tools-enum_parser-enum_serialization_runtime ) target_sources(core-kqp-runtime PRIVATE - ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_channel_storage.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_compute.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_effects.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_output_stream.cpp @@ -51,15 +44,9 @@ target_sources(core-kqp-runtime PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_sequencer_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_sequencer_factory.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_scan_data_meta.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_spilling_file.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_stream_lookup_factory.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_tasks_runner.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_transport.cpp ) -generate_enum_serilization(core-kqp-runtime - ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_spilling.h - INCLUDE_HEADERS - ydb/core/kqp/runtime/kqp_spilling.h -) diff --git a/ydb/core/kqp/runtime/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/runtime/CMakeLists.linux-aarch64.txt index 8a8cb70240..ba1adf7be3 100644 --- a/ydb/core/kqp/runtime/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kqp/runtime/CMakeLists.linux-aarch64.txt @@ -7,12 +7,6 @@ add_subdirectory(ut) -get_built_tool_path( - TOOL_enum_parser_bin - TOOL_enum_parser_dependency - tools/enum_parser/enum_parser - enum_parser -) add_library(core-kqp-runtime) target_compile_options(core-kqp-runtime PRIVATE @@ -35,13 +29,12 @@ target_link_libraries(core-kqp-runtime PUBLIC minikql-comp_nodes-llvm library-yql-utils dq-actors-protos + dq-actors-spilling yql-dq-common yql-dq-runtime cpp-threading-hot_swap - tools-enum_parser-enum_serialization_runtime ) target_sources(core-kqp-runtime PRIVATE - ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_channel_storage.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_compute.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_effects.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_output_stream.cpp @@ -52,15 +45,9 @@ target_sources(core-kqp-runtime PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_sequencer_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_sequencer_factory.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_scan_data_meta.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_spilling_file.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_stream_lookup_factory.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_tasks_runner.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_transport.cpp ) -generate_enum_serilization(core-kqp-runtime - ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_spilling.h - INCLUDE_HEADERS - ydb/core/kqp/runtime/kqp_spilling.h -) diff --git a/ydb/core/kqp/runtime/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/runtime/CMakeLists.linux-x86_64.txt index 8a8cb70240..ba1adf7be3 100644 --- a/ydb/core/kqp/runtime/CMakeLists.linux-x86_64.txt +++ b/ydb/core/kqp/runtime/CMakeLists.linux-x86_64.txt @@ -7,12 +7,6 @@ add_subdirectory(ut) -get_built_tool_path( - TOOL_enum_parser_bin - TOOL_enum_parser_dependency - tools/enum_parser/enum_parser - enum_parser -) add_library(core-kqp-runtime) target_compile_options(core-kqp-runtime PRIVATE @@ -35,13 +29,12 @@ target_link_libraries(core-kqp-runtime PUBLIC minikql-comp_nodes-llvm library-yql-utils dq-actors-protos + dq-actors-spilling yql-dq-common yql-dq-runtime cpp-threading-hot_swap - tools-enum_parser-enum_serialization_runtime ) target_sources(core-kqp-runtime PRIVATE - ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_channel_storage.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_compute.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_effects.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_output_stream.cpp @@ -52,15 +45,9 @@ target_sources(core-kqp-runtime PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_sequencer_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_sequencer_factory.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_scan_data_meta.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_spilling_file.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_stream_lookup_factory.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_tasks_runner.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_transport.cpp ) -generate_enum_serilization(core-kqp-runtime - ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_spilling.h - INCLUDE_HEADERS - ydb/core/kqp/runtime/kqp_spilling.h -) diff --git a/ydb/core/kqp/runtime/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/runtime/CMakeLists.windows-x86_64.txt index e8854f07a9..c297a48ab8 100644 --- a/ydb/core/kqp/runtime/CMakeLists.windows-x86_64.txt +++ b/ydb/core/kqp/runtime/CMakeLists.windows-x86_64.txt @@ -7,12 +7,6 @@ add_subdirectory(ut) -get_built_tool_path( - TOOL_enum_parser_bin - TOOL_enum_parser_dependency - tools/enum_parser/enum_parser - enum_parser -) add_library(core-kqp-runtime) target_compile_options(core-kqp-runtime PRIVATE @@ -34,13 +28,12 @@ target_link_libraries(core-kqp-runtime PUBLIC minikql-comp_nodes-llvm library-yql-utils dq-actors-protos + dq-actors-spilling yql-dq-common yql-dq-runtime cpp-threading-hot_swap - tools-enum_parser-enum_serialization_runtime ) target_sources(core-kqp-runtime PRIVATE - ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_channel_storage.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_compute.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_effects.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_output_stream.cpp @@ -51,15 +44,9 @@ target_sources(core-kqp-runtime PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_sequencer_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_sequencer_factory.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_scan_data_meta.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_spilling_file.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_stream_lookup_factory.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_tasks_runner.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_transport.cpp ) -generate_enum_serilization(core-kqp-runtime - ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_spilling.h - INCLUDE_HEADERS - ydb/core/kqp/runtime/kqp_spilling.h -) diff --git a/ydb/core/kqp/runtime/kqp_channel_storage.cpp b/ydb/core/kqp/runtime/kqp_channel_storage.cpp deleted file mode 100644 index d0e40d8d95..0000000000 --- a/ydb/core/kqp/runtime/kqp_channel_storage.cpp +++ /dev/null @@ -1,246 +0,0 @@ -#include "kqp_channel_storage.h" -#include "kqp_spilling.h" -#include "kqp_spilling_file.h" - -#include <ydb/library/yql/utils/yql_panic.h> - -#include <library/cpp/actors/core/actor_bootstrapped.h> -#include <library/cpp/actors/core/hfunc.h> -#include <library/cpp/actors/core/log.h> - -#include <util/generic/buffer.h> -#include <util/generic/map.h> -#include <util/generic/set.h> -#include <util/generic/size_literals.h> - - -namespace NKikimr::NKqp { - -using namespace NActors; -using namespace NYql; -using namespace NYql::NDq; - -#define LOG(...) do { if (Y_UNLIKELY(LogFunc)) { LogFunc(__VA_ARGS__); } } while (0) - -#define LOG_D(s) \ - LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId << ", channelId: " << ChannelId << ". " << s) -#define LOG_I(s) \ - LOG_INFO_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId << ", channelId: " << ChannelId << ". " << s) -#define LOG_E(s) \ - LOG_ERROR_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId << ", channelId: " << ChannelId << ". " << s) -#define LOG_C(s) \ - LOG_CRIT_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId << ", channelId: " << ChannelId << ". " << s) -#define LOG_W(s) \ - LOG_WARN_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId << ", channelId: " << ChannelId << ". " << s) -#define LOG_T(s) \ - LOG_TRACE_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId << ", channelId: " << ChannelId << ". " << s) - -namespace { - -constexpr ui32 MAX_INFLIGHT_BLOBS_COUNT = 10; -constexpr ui64 MAX_INFLIGHT_BLOBS_SIZE = 50_MB; - -class TKqpChannelStorageActor : public TActorBootstrapped<TKqpChannelStorageActor> { - using TBase = TActorBootstrapped<TKqpChannelStorageActor>; - -public: - TKqpChannelStorageActor(ui64 txId, ui64 channelId, IDqChannelStorage::TWakeUpCallback&& wakeUp) - : TxId(txId) - , ChannelId(channelId) - , WakeUp(std::move(wakeUp)) {} - - void Bootstrap() { - auto spillingActor = CreateKqpLocalFileSpillingActor(TxId, TStringBuilder() << "ChannelId: " << ChannelId, - SelfId(), true); - SpillingActorId = Register(spillingActor); - - Become(&TKqpChannelStorageActor::WorkState); - } - - static constexpr char ActorName[] = "KQP_CHANNEL_STORAGE"; - -protected: - void PassAway() override { - Send(SpillingActorId, new TEvents::TEvPoison); - TBase::PassAway(); - } - -private: - STATEFN(WorkState) { - switch (ev->GetTypeRewrite()) { - hFunc(TEvKqpSpilling::TEvWriteResult, HandleWork); - hFunc(TEvKqpSpilling::TEvReadResult, HandleWork); - hFunc(TEvKqpSpilling::TEvError, HandleWork); - default: - Y_ABORT("TKqpChannelStorageActor::WorkState unexpected event type: %" PRIx32 " event: %s", - ev->GetTypeRewrite(), - ev->ToString().data()); - } - } - - void HandleWork(TEvKqpSpilling::TEvWriteResult::TPtr& ev) { - auto& msg = *ev->Get(); - LOG_T("[TEvWriteResult] blobId: " << msg.BlobId); - - auto it = WritingBlobs.find(msg.BlobId); - if (it == WritingBlobs.end()) { - LOG_E("Got unexpected TEvWriteResult, blobId: " << msg.BlobId); - - Error = "Internal error"; - - Send(SpillingActorId, new TEvents::TEvPoison); - return; - } - - ui64 size = it->second; - WritingBlobsSize -= size; - WritingBlobs.erase(it); - - StoredBlobsCount++; - StoredBlobsSize += size; - } - - void HandleWork(TEvKqpSpilling::TEvReadResult::TPtr& ev) { - auto& msg = *ev->Get(); - LOG_T("[TEvReadResult] blobId: " << msg.BlobId << ", size: " << msg.Blob.size()); - - if (LoadingBlobs.erase(msg.BlobId) != 1) { - LOG_E("[TEvReadResult] unexpected, blobId: " << msg.BlobId << ", size: " << msg.Blob.size()); - return; - } - - LoadedBlobs[msg.BlobId].Swap(msg.Blob); - YQL_ENSURE(LoadedBlobs[msg.BlobId].size() != 0); - - if (LoadedBlobs.size() == 1) { - WakeUp(); - } - } - - void HandleWork(TEvKqpSpilling::TEvError::TPtr& ev) { - auto& msg = *ev->Get(); - LOG_D("[TEvError] " << msg.Message); - - Error.ConstructInPlace(msg.Message); - } - -public: - [[nodiscard]] - const TMaybe<TString>& GetError() const { - return Error; - } - - bool IsEmpty() const { - return WritingBlobs.empty() && StoredBlobsCount == 0 && LoadedBlobs.empty(); - } - - bool IsFull() const { - return WritingBlobs.size() > MAX_INFLIGHT_BLOBS_COUNT || WritingBlobsSize > MAX_INFLIGHT_BLOBS_SIZE; - } - - void Put(ui64 blobId, TRope&& blob) { - FailOnError(); - - // TODO: timeout - // TODO: limit inflight events - - ui64 size = blob.size(); - - Send(SpillingActorId, new TEvKqpSpilling::TEvWrite(blobId, std::move(blob))); - - WritingBlobs.emplace(blobId, size); - WritingBlobsSize += size; - } - - bool Get(ui64 blobId, TBuffer& blob) { - FailOnError(); - - auto loadedIt = LoadedBlobs.find(blobId); - if (loadedIt != LoadedBlobs.end()) { - YQL_ENSURE(loadedIt->second.size() != 0); - blob.Swap(loadedIt->second); - LoadedBlobs.erase(loadedIt); - return true; - } - - auto result = LoadingBlobs.emplace(blobId); - if (result.second) { - Send(SpillingActorId, new TEvKqpSpilling::TEvRead(blobId, true)); - } - - return false; - } - - void Terminate() { - PassAway(); - } - -private: - void FailOnError() { - if (Error) { - LOG_E("TxId: " << TxId << ", channelId: " << ChannelId << ", error: " << *Error); - ythrow TDqChannelStorageException() << "TxId: " << TxId << ", channelId: " << ChannelId - << ", error: " << *Error; - } - } - -private: - const ui64 TxId; - const ui64 ChannelId; - IDqChannelStorage::TWakeUpCallback WakeUp; - TActorId SpillingActorId; - - TMap<ui64, ui64> WritingBlobs; // blobId -> blobSize - ui64 WritingBlobsSize = 0; - - ui32 StoredBlobsCount = 0; - ui64 StoredBlobsSize = 0; - - TSet<ui64> LoadingBlobs; - TMap<ui64, TBuffer> LoadedBlobs; - - TMaybe<TString> Error; -}; - - -class TKqpChannelStorage : public IDqChannelStorage { -public: - TKqpChannelStorage(ui64 txId, ui64 channelId, TWakeUpCallback&& wakeUp, const TActorContext& ctx) - { - SelfActor = new TKqpChannelStorageActor(txId, channelId, std::move(wakeUp)); - ctx.RegisterWithSameMailbox(SelfActor); - } - - ~TKqpChannelStorage() { - SelfActor->Terminate(); - } - - bool IsEmpty() const override { - return SelfActor->IsEmpty(); - } - - bool IsFull() const override { - return SelfActor->IsFull(); - } - - void Put(ui64 blobId, TRope&& blob) override { - SelfActor->Put(blobId, std::move(blob)); - } - - bool Get(ui64 blobId, TBuffer& blob) override { - return SelfActor->Get(blobId, blob); - } - -private: - TKqpChannelStorageActor* SelfActor; -}; - -} // anonymous namespace - -IDqChannelStorage::TPtr CreateKqpChannelStorage(ui64 txId, ui64 channelId, IDqChannelStorage::TWakeUpCallback wakeUp, - const TActorContext& ctx) -{ - return new TKqpChannelStorage(txId, channelId, std::move(wakeUp), ctx); -} - -} // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/runtime/kqp_spilling_file.h b/ydb/core/kqp/runtime/kqp_spilling_file.h deleted file mode 100644 index 14196a8498..0000000000 --- a/ydb/core/kqp/runtime/kqp_spilling_file.h +++ /dev/null @@ -1,18 +0,0 @@ -#pragma once - -#include <ydb/core/kqp/counters/kqp_counters.h> -#include <ydb/core/protos/config.pb.h> - -#include <library/cpp/actors/core/actor.h> - - -namespace NKikimr::NKqp { - -NActors::IActor* CreateKqpLocalFileSpillingActor(ui64 txId, const TString& details, const NActors::TActorId& client, - bool removeBlobsAfterRead); - -NActors::IActor* CreateKqpLocalFileSpillingService( - const NKikimrConfig::TTableServiceConfig::TSpillingServiceConfig::TLocalFileConfig& config, - TIntrusivePtr<TKqpCounters> counters); - -} // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/runtime/kqp_spilling_file_ut.cpp b/ydb/core/kqp/runtime/kqp_spilling_file_ut.cpp deleted file mode 100644 index 5211e30104..0000000000 --- a/ydb/core/kqp/runtime/kqp_spilling_file_ut.cpp +++ /dev/null @@ -1,441 +0,0 @@ -#include "kqp_spilling_file.h" -#include "kqp_spilling.h" - -#include <ydb/core/kqp/common/kqp.h> -#include <ydb/core/testlib/basics/appdata.h> -#include <ydb/core/testlib/basics/runtime.h> - -#include <library/cpp/testing/unittest/registar.h> - -#include <util/system/fs.h> - -namespace NKikimr::NKqp { - -using namespace NActors; -using namespace NKikimr; -using namespace NKikimr::NMiniKQL; -using namespace NKikimr::NKqp; -using namespace NYql; - -namespace { - -TString GetSpillingPrefix() { - static TString str = Sprintf("%s_%d/", "kqp_spilling", (int)getpid()); - return str; -} - -TBuffer CreateBlob(ui32 size, char symbol) { - TBuffer blob(size); - blob.Fill(symbol, size); - return blob; -} - -TRope CreateRope(ui32 size, char symbol, ui32 chunkSize = 7) { - TRope result; - while (size) { - size_t count = std::min(size, chunkSize); - TString str(count, symbol); - result.Insert(result.End(), TRope{str}); - size -= count; - } - return result; -} - -void AssertEquals(const TBuffer& lhs, const TBuffer& rhs) { - TStringBuf l{lhs.data(), lhs.size()}; - TStringBuf r{rhs.data(), rhs.size()}; - UNIT_ASSERT_STRINGS_EQUAL(l, r); -} - -TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters() { - static auto counters = MakeIntrusive<::NMonitoring::TDynamicCounters>(); - return counters; -} - -TActorId StartSpillingService(TTestBasicRuntime& runtime, ui64 maxTotalSize = 1000, ui64 maxFileSize = 500, - ui64 maxFilePartSize = 100, const TString& root = "./" + GetSpillingPrefix()) -{ - Cerr << "cwd: " << NFs::CurrentWorkingDirectory() << Endl; - - NKikimrConfig::TTableServiceConfig::TSpillingServiceConfig::TLocalFileConfig config; - config.SetEnable(true); - config.SetRoot(root); - config.SetMaxTotalSize(maxTotalSize); - config.SetMaxFileSize(maxFileSize); - config.SetMaxFilePartSize(maxFilePartSize); - - auto counters = Counters(); - counters->ResetCounters(); - auto kqpCounters = MakeIntrusive<TKqpCounters>(counters); - - auto* spillingService = CreateKqpLocalFileSpillingService(config, kqpCounters); - auto spillingServiceActorId = runtime.Register(spillingService); - runtime.EnableScheduleForActor(spillingServiceActorId); - runtime.RegisterService(MakeKqpLocalFileSpillingServiceID(runtime.GetNodeId()), spillingServiceActorId); - - return spillingServiceActorId; -} - -TActorId StartSpillingActor(TTestBasicRuntime& runtime, const TActorId& client, bool removeBlobsAfterRead = true) { - auto *spillingActor = CreateKqpLocalFileSpillingActor(1, "test", client, removeBlobsAfterRead); - auto spillingActorId = runtime.Register(spillingActor); - runtime.EnableScheduleForActor(spillingActorId); - - return spillingActorId; -} - -void WaitBootstrap(TTestBasicRuntime& runtime) { - TDispatchOptions options; - options.FinalEvents.emplace_back(TEvents::TSystem::Bootstrap, 1); - UNIT_ASSERT(runtime.DispatchEvents(options)); -} - -void SetupLogs(TTestBasicRuntime& runtime) { - runtime.SetLogPriority(NKikimrServices::KQP_BLOBS_STORAGE, NActors::NLog::PRI_ERROR); -} - -} // anonymous namespace - -Y_UNIT_TEST_SUITE(KqpSpillingFileTests) { - -Y_UNIT_TEST(Simple) { - TTestBasicRuntime runtime{1, false}; - runtime.Initialize(TAppPrepare().Unwrap()); - SetupLogs(runtime); - - auto spillingService = StartSpillingService(runtime); - auto tester = runtime.AllocateEdgeActor(); - auto spillingActor = StartSpillingActor(runtime, tester); - - WaitBootstrap(runtime); - - // put blob 1 - { - auto ev = new TEvKqpSpilling::TEvWrite(1, CreateRope(10, 'a')); - runtime.Send(new IEventHandle(spillingActor, tester, ev)); - - auto resp = runtime.GrabEdgeEvent<TEvKqpSpilling::TEvWriteResult>(tester, TDuration::Seconds(1)); - UNIT_ASSERT_VALUES_EQUAL(1, resp->Get()->BlobId); - } - - // put blob 2 - { - auto ev = new TEvKqpSpilling::TEvWrite(2, CreateRope(11, 'z')); - runtime.Send(new IEventHandle(spillingActor, tester, ev)); - - auto resp = runtime.GrabEdgeEvent<TEvKqpSpilling::TEvWriteResult>(tester, TDuration::Seconds(1)); - UNIT_ASSERT_VALUES_EQUAL(2, resp->Get()->BlobId); - } - - // get blob 1 - { - auto ev = new TEvKqpSpilling::TEvRead(1); - runtime.Send(new IEventHandle(spillingActor, tester, ev)); - - auto resp = runtime.GrabEdgeEvent<TEvKqpSpilling::TEvReadResult>(tester, TDuration::Seconds(1)); - UNIT_ASSERT_VALUES_EQUAL(1, resp->Get()->BlobId); - - TBuffer expected = CreateBlob(10, 'a'); - AssertEquals(expected, resp->Get()->Blob); - } - - // get blob 2 - { - auto ev = new TEvKqpSpilling::TEvRead(2); - runtime.Send(new IEventHandle(spillingActor, tester, ev)); - - auto resp = runtime.GrabEdgeEvent<TEvKqpSpilling::TEvReadResult>(tester, TDuration::Seconds(1)); - UNIT_ASSERT_VALUES_EQUAL(2, resp->Get()->BlobId); - - TBuffer expected = CreateBlob(11, 'z'); - AssertEquals(expected, resp->Get()->Blob); - } - - // terminate - { - runtime.Send(new IEventHandle(spillingActor, tester, new TEvents::TEvPoison)); - - std::atomic<bool> done = false; - runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& event) { - if (event->GetRecipientRewrite() == spillingService) { - if (event->GetTypeRewrite() == 2146435074 /* EvCloseFileResponse */ ) { - done = true; - } - } - return TTestActorRuntimeBase::EEventAction::PROCESS; - }); - - TDispatchOptions options; - options.CustomFinalCondition = [&]() { - return (bool) done; - }; - - runtime.DispatchEvents(options, TDuration::Seconds(1)); - } -} - -Y_UNIT_TEST(Write_TotalSizeLimitExceeded) { - TTestBasicRuntime runtime{1, false}; - runtime.Initialize(TAppPrepare().Unwrap()); - SetupLogs(runtime); - - StartSpillingService(runtime, 100, 1000, 1000); - auto tester = runtime.AllocateEdgeActor(); - auto spillingActor = StartSpillingActor(runtime, tester); - - WaitBootstrap(runtime); - - { - auto ev = new TEvKqpSpilling::TEvWrite(1, CreateRope(51, 'a')); - runtime.Send(new IEventHandle(spillingActor, tester, ev)); - - auto resp = runtime.GrabEdgeEvent<TEvKqpSpilling::TEvWriteResult>(tester); - UNIT_ASSERT_VALUES_EQUAL(1, resp->Get()->BlobId); - } - - { - auto ev = new TEvKqpSpilling::TEvWrite(2, CreateRope(50, 'b')); - runtime.Send(new IEventHandle(spillingActor, tester, ev)); - - auto resp = runtime.GrabEdgeEvent<TEvKqpSpilling::TEvError>(tester); - UNIT_ASSERT_STRINGS_EQUAL("Total size limit exceeded", resp->Get()->Message); - } -} - -Y_UNIT_TEST(Write_FileSizeLimitExceeded) { - TTestBasicRuntime runtime{1, false}; - runtime.Initialize(TAppPrepare().Unwrap()); - SetupLogs(runtime); - - StartSpillingService(runtime, 1000, 100, 1000); - auto tester = runtime.AllocateEdgeActor(); - auto spillingActor = StartSpillingActor(runtime, tester); - - WaitBootstrap(runtime); - - { - auto ev = new TEvKqpSpilling::TEvWrite(1, CreateRope(51, 'a')); - runtime.Send(new IEventHandle(spillingActor, tester, ev)); - - auto resp = runtime.GrabEdgeEvent<TEvKqpSpilling::TEvWriteResult>(tester); - UNIT_ASSERT_VALUES_EQUAL(1, resp->Get()->BlobId); - } - - { - auto ev = new TEvKqpSpilling::TEvWrite(2, CreateRope(50, 'b')); - runtime.Send(new IEventHandle(spillingActor, tester, ev)); - - auto resp = runtime.GrabEdgeEvent<TEvKqpSpilling::TEvError>(tester); - UNIT_ASSERT_STRINGS_EQUAL("File size limit exceeded", resp->Get()->Message); - } -} - -Y_UNIT_TEST(MultipleFileParts) { - TTestBasicRuntime runtime{1, false}; - runtime.Initialize(TAppPrepare().Unwrap()); - SetupLogs(runtime); - - StartSpillingService(runtime, 1000, 100, 25); - auto tester = runtime.AllocateEdgeActor(); - auto spillingActor = StartSpillingActor(runtime, tester); - - WaitBootstrap(runtime); - - const TString filePrefix = TStringBuilder() << NFs::CurrentWorkingDirectory() << "/" << GetSpillingPrefix() << "node_" << runtime.GetNodeId() << "/1_test_"; - - for (ui32 i = 0; i < 5; ++i) { - // Cerr << "---- store blob #" << i << Endl; - auto ev = new TEvKqpSpilling::TEvWrite(i, CreateRope(20, 'a' + i)); - runtime.Send(new IEventHandle(spillingActor, tester, ev)); - - auto resp = runtime.GrabEdgeEvent<TEvKqpSpilling::TEvWriteResult>(tester); - UNIT_ASSERT_VALUES_EQUAL(i, resp->Get()->BlobId); - - UNIT_ASSERT(NFs::Exists(TStringBuilder() << filePrefix << i)); - } - - for (i32 i = 4; i >= 0; --i) { - // Cerr << "---- load blob #" << i << Endl; - auto ev = new TEvKqpSpilling::TEvRead(i, true); - runtime.Send(new IEventHandle(spillingActor, tester, ev)); - - auto resp = runtime.GrabEdgeEvent<TEvKqpSpilling::TEvReadResult>(tester); - UNIT_ASSERT_VALUES_EQUAL(i, resp->Get()->BlobId); - TBuffer expected = CreateBlob(20, 'a' + i); - AssertEquals(expected, resp->Get()->Blob); - - if (i == 4) { - // do not remove last file - UNIT_ASSERT(NFs::Exists(TStringBuilder() << filePrefix << i)); - } else { - UNIT_ASSERT(!NFs::Exists(TStringBuilder() << filePrefix << i)); - } - } -} - -Y_UNIT_TEST(SingleFilePart) { - TTestBasicRuntime runtime{1, false}; - runtime.Initialize(TAppPrepare().Unwrap()); - SetupLogs(runtime); - - StartSpillingService(runtime, 1000, 100, 25); - auto tester = runtime.AllocateEdgeActor(); - auto spillingActor = StartSpillingActor(runtime, tester, false); - - WaitBootstrap(runtime); - - const TString filePrefix = TStringBuilder() << NFs::CurrentWorkingDirectory() << "/" << GetSpillingPrefix() << "node_" << runtime.GetNodeId() << "/1_test_"; - - for (ui32 i = 0; i < 5; ++i) { - // Cerr << "---- store blob #" << i << Endl; - auto ev = new TEvKqpSpilling::TEvWrite(i, CreateRope(20, 'a' + i)); - runtime.Send(new IEventHandle(spillingActor, tester, ev)); - - auto resp = runtime.GrabEdgeEvent<TEvKqpSpilling::TEvWriteResult>(tester); - UNIT_ASSERT_VALUES_EQUAL(i, resp->Get()->BlobId); - - UNIT_ASSERT(NFs::Exists(TStringBuilder() << filePrefix << 0)); - if (i > 0) { - UNIT_ASSERT(!NFs::Exists(TStringBuilder() << filePrefix << i)); - } - } - - for (i32 i = 4; i >= 0; --i) { - // Cerr << "---- load blob #" << i << Endl; - auto ev = new TEvKqpSpilling::TEvRead(i, true); - runtime.Send(new IEventHandle(spillingActor, tester, ev)); - - auto resp = runtime.GrabEdgeEvent<TEvKqpSpilling::TEvReadResult>(tester); - UNIT_ASSERT_VALUES_EQUAL(i, resp->Get()->BlobId); - TBuffer expected = CreateBlob(20, 'a' + i); - AssertEquals(expected, resp->Get()->Blob); - - UNIT_ASSERT(NFs::Exists(TStringBuilder() << filePrefix << 0)); - } -} - -Y_UNIT_TEST(ReadError) { - return; - - TTestBasicRuntime runtime{1, false}; - runtime.Initialize(TAppPrepare().Unwrap()); - SetupLogs(runtime); - - StartSpillingService(runtime); - auto tester = runtime.AllocateEdgeActor(); - auto spillingActor = StartSpillingActor(runtime, tester); - - WaitBootstrap(runtime); - - { - auto ev = new TEvKqpSpilling::TEvWrite(0, CreateRope(20, 'a')); - runtime.Send(new IEventHandle(spillingActor, tester, ev)); - - auto resp = runtime.GrabEdgeEvent<TEvKqpSpilling::TEvWriteResult>(tester); - UNIT_ASSERT_VALUES_EQUAL(0, resp->Get()->BlobId); - } - - ::unlink((NFs::CurrentWorkingDirectory() + GetSpillingPrefix() + "node_1/1_test_0").c_str()); - - { - auto ev = new TEvKqpSpilling::TEvRead(0, true); - runtime.Send(new IEventHandle(spillingActor, tester, ev)); - - auto resp = runtime.GrabEdgeEvent<TEvKqpSpilling::TEvError>(tester); - auto& err = resp->Get()->Message; - auto expected = "can't open \"" + GetSpillingPrefix() + "node_1/1_test_0\" with mode RdOnly"; - UNIT_ASSERT_C(err.Contains("No such file or directory"), err); - UNIT_ASSERT_C(err.Contains(expected), err); - } -} - - -struct THttpRequest : NMonitoring::IHttpRequest { - HTTP_METHOD Method; - TCgiParameters CgiParameters; - THttpHeaders HttpHeaders; - - THttpRequest(HTTP_METHOD method) - : Method(method) - {} - - ~THttpRequest() {} - - const char* GetURI() const override { - return ""; - } - - const char* GetPath() const override { - return ""; - } - - const TCgiParameters& GetParams() const override { - return CgiParameters; - } - - const TCgiParameters& GetPostParams() const override { - return CgiParameters; - } - - TStringBuf GetPostContent() const override { - return TString(); - } - - HTTP_METHOD GetMethod() const override { - return Method; - } - - const THttpHeaders& GetHeaders() const override { - return HttpHeaders; - } - - TString GetRemoteAddr() const override { - return TString(); - } -}; - -Y_UNIT_TEST(StartError) { - TTestBasicRuntime runtime{1, false}; - runtime.Initialize(TAppPrepare().Unwrap()); - SetupLogs(runtime); - - auto spillingService = StartSpillingService(runtime, 100, 500, 100, "/nonexistent/" + GetSpillingPrefix()); - auto tester = runtime.AllocateEdgeActor(); - auto spillingActor = StartSpillingActor(runtime, tester); - - WaitBootstrap(runtime); - - // put blob 1 - { - auto ev = new TEvKqpSpilling::TEvWrite(1, CreateRope(10, 'a')); - runtime.Send(new IEventHandle(spillingActor, tester, ev)); - - auto resp = runtime.GrabEdgeEvent<TEvKqpSpilling::TEvError>(tester, TDuration::Seconds(1)); - UNIT_ASSERT_EQUAL("Service not started", resp->Get()->Message); - } - - // get blob 1 - { - auto ev = new TEvKqpSpilling::TEvRead(1); - runtime.Send(new IEventHandle(spillingActor, tester, ev)); - - auto resp = runtime.GrabEdgeEvent<TEvKqpSpilling::TEvError>(tester, TDuration::Seconds(1)); - UNIT_ASSERT_EQUAL("Service not started", resp->Get()->Message); - } - - // mon - { - THttpRequest httpReq(HTTP_METHOD_GET); - NMonitoring::TMonService2HttpRequest monReq(nullptr, &httpReq, nullptr, nullptr, "", nullptr); - - runtime.Send(new IEventHandle(spillingService, tester, new NMon::TEvHttpInfo(monReq))); - - auto resp = runtime.GrabEdgeEvent<NMon::TEvHttpInfoRes>(tester, TDuration::Seconds(1)); - UNIT_ASSERT_EQUAL("<html><h2>Service is not started due to IO error</h2></html>", - ((NMon::TEvHttpInfoRes*) resp->Get())->Answer); - } -} - -} // suite - -} // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/runtime/ut/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/runtime/ut/CMakeLists.darwin-x86_64.txt index 896a8cd357..c4329c5beb 100644 --- a/ydb/core/kqp/runtime/ut/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/kqp/runtime/ut/CMakeLists.darwin-x86_64.txt @@ -32,7 +32,6 @@ target_link_options(ydb-core-kqp-runtime-ut PRIVATE CoreFoundation ) target_sources(ydb-core-kqp-runtime-ut PRIVATE - ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_spilling_file_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_scan_data_ut.cpp ) set_property( diff --git a/ydb/core/kqp/runtime/ut/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/runtime/ut/CMakeLists.linux-aarch64.txt index e197bb662a..099a1c281f 100644 --- a/ydb/core/kqp/runtime/ut/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kqp/runtime/ut/CMakeLists.linux-aarch64.txt @@ -35,7 +35,6 @@ target_link_options(ydb-core-kqp-runtime-ut PRIVATE -ldl ) target_sources(ydb-core-kqp-runtime-ut PRIVATE - ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_spilling_file_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_scan_data_ut.cpp ) set_property( diff --git a/ydb/core/kqp/runtime/ut/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/runtime/ut/CMakeLists.linux-x86_64.txt index 3c64538479..ebf4ee2a3b 100644 --- a/ydb/core/kqp/runtime/ut/CMakeLists.linux-x86_64.txt +++ b/ydb/core/kqp/runtime/ut/CMakeLists.linux-x86_64.txt @@ -36,7 +36,6 @@ target_link_options(ydb-core-kqp-runtime-ut PRIVATE -ldl ) target_sources(ydb-core-kqp-runtime-ut PRIVATE - ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_spilling_file_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_scan_data_ut.cpp ) set_property( diff --git a/ydb/core/kqp/runtime/ut/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/runtime/ut/CMakeLists.windows-x86_64.txt index 81ecb94af8..5ac0f20fa5 100644 --- a/ydb/core/kqp/runtime/ut/CMakeLists.windows-x86_64.txt +++ b/ydb/core/kqp/runtime/ut/CMakeLists.windows-x86_64.txt @@ -25,7 +25,6 @@ target_link_libraries(ydb-core-kqp-runtime-ut PUBLIC udf-service-exception_policy ) target_sources(ydb-core-kqp-runtime-ut PRIVATE - ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_spilling_file_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_scan_data_ut.cpp ) set_property( diff --git a/ydb/core/kqp/runtime/ut/ya.make b/ydb/core/kqp/runtime/ut/ya.make index 9279dbdc98..c2bc69362d 100644 --- a/ydb/core/kqp/runtime/ut/ya.make +++ b/ydb/core/kqp/runtime/ut/ya.make @@ -6,7 +6,6 @@ SIZE(MEDIUM) TIMEOUT(180) SRCS( - kqp_spilling_file_ut.cpp kqp_scan_data_ut.cpp ) diff --git a/ydb/core/kqp/runtime/ya.make b/ydb/core/kqp/runtime/ya.make index 3f41b86b27..9bee3453d3 100644 --- a/ydb/core/kqp/runtime/ya.make +++ b/ydb/core/kqp/runtime/ya.make @@ -1,7 +1,6 @@ LIBRARY() SRCS( - kqp_channel_storage.cpp kqp_compute.cpp kqp_effects.cpp kqp_output_stream.cpp @@ -13,7 +12,6 @@ SRCS( kqp_sequencer_actor.cpp kqp_sequencer_factory.cpp kqp_scan_data_meta.cpp - kqp_spilling_file.cpp kqp_stream_lookup_actor.cpp kqp_stream_lookup_actor.h kqp_stream_lookup_factory.cpp @@ -38,6 +36,7 @@ PEERDIR( ydb/library/yql/minikql/comp_nodes/llvm ydb/library/yql/utils ydb/library/yql/dq/actors/protos + ydb/library/yql/dq/actors/spilling ydb/library/yql/dq/common ydb/library/yql/dq/runtime library/cpp/threading/hot_swap @@ -45,8 +44,6 @@ PEERDIR( YQL_LAST_ABI_VERSION() -GENERATE_ENUM_SERIALIZATION(kqp_spilling.h) - END() RECURSE_FOR_TESTS( diff --git a/ydb/library/yql/dq/actors/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/dq/actors/CMakeLists.darwin-x86_64.txt index 1747265bae..9017d23dd4 100644 --- a/ydb/library/yql/dq/actors/CMakeLists.darwin-x86_64.txt +++ b/ydb/library/yql/dq/actors/CMakeLists.darwin-x86_64.txt @@ -8,6 +8,7 @@ add_subdirectory(compute) add_subdirectory(protos) +add_subdirectory(spilling) add_subdirectory(task_runner) get_built_tool_path( TOOL_grpc_cpp_bin diff --git a/ydb/library/yql/dq/actors/CMakeLists.linux-aarch64.txt b/ydb/library/yql/dq/actors/CMakeLists.linux-aarch64.txt index af14a84926..efbd60eba8 100644 --- a/ydb/library/yql/dq/actors/CMakeLists.linux-aarch64.txt +++ b/ydb/library/yql/dq/actors/CMakeLists.linux-aarch64.txt @@ -8,6 +8,7 @@ add_subdirectory(compute) add_subdirectory(protos) +add_subdirectory(spilling) add_subdirectory(task_runner) get_built_tool_path( TOOL_grpc_cpp_bin diff --git a/ydb/library/yql/dq/actors/CMakeLists.linux-x86_64.txt b/ydb/library/yql/dq/actors/CMakeLists.linux-x86_64.txt index af14a84926..efbd60eba8 100644 --- a/ydb/library/yql/dq/actors/CMakeLists.linux-x86_64.txt +++ b/ydb/library/yql/dq/actors/CMakeLists.linux-x86_64.txt @@ -8,6 +8,7 @@ add_subdirectory(compute) add_subdirectory(protos) +add_subdirectory(spilling) add_subdirectory(task_runner) get_built_tool_path( TOOL_grpc_cpp_bin diff --git a/ydb/library/yql/dq/actors/CMakeLists.windows-x86_64.txt b/ydb/library/yql/dq/actors/CMakeLists.windows-x86_64.txt index 1747265bae..9017d23dd4 100644 --- a/ydb/library/yql/dq/actors/CMakeLists.windows-x86_64.txt +++ b/ydb/library/yql/dq/actors/CMakeLists.windows-x86_64.txt @@ -8,6 +8,7 @@ add_subdirectory(compute) add_subdirectory(protos) +add_subdirectory(spilling) add_subdirectory(task_runner) get_built_tool_path( TOOL_grpc_cpp_bin diff --git a/ydb/library/yql/dq/actors/compute/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/dq/actors/compute/CMakeLists.darwin-x86_64.txt index 7b0917ad05..e9b39cdfbe 100644 --- a/ydb/library/yql/dq/actors/compute/CMakeLists.darwin-x86_64.txt +++ b/ydb/library/yql/dq/actors/compute/CMakeLists.darwin-x86_64.txt @@ -23,11 +23,13 @@ target_link_libraries(dq-actors-compute PUBLIC yql-dq-proto yql-dq-runtime yql-dq-tasks + dq-actors-spilling minikql-comp_nodes-llvm yql-public-issue core-quoter-public ) target_sources(dq-actors-compute PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp diff --git a/ydb/library/yql/dq/actors/compute/CMakeLists.linux-aarch64.txt b/ydb/library/yql/dq/actors/compute/CMakeLists.linux-aarch64.txt index 0fed5ab4c0..b6596cf87f 100644 --- a/ydb/library/yql/dq/actors/compute/CMakeLists.linux-aarch64.txt +++ b/ydb/library/yql/dq/actors/compute/CMakeLists.linux-aarch64.txt @@ -24,11 +24,13 @@ target_link_libraries(dq-actors-compute PUBLIC yql-dq-proto yql-dq-runtime yql-dq-tasks + dq-actors-spilling minikql-comp_nodes-llvm yql-public-issue core-quoter-public ) target_sources(dq-actors-compute PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp diff --git a/ydb/library/yql/dq/actors/compute/CMakeLists.linux-x86_64.txt b/ydb/library/yql/dq/actors/compute/CMakeLists.linux-x86_64.txt index 0fed5ab4c0..b6596cf87f 100644 --- a/ydb/library/yql/dq/actors/compute/CMakeLists.linux-x86_64.txt +++ b/ydb/library/yql/dq/actors/compute/CMakeLists.linux-x86_64.txt @@ -24,11 +24,13 @@ target_link_libraries(dq-actors-compute PUBLIC yql-dq-proto yql-dq-runtime yql-dq-tasks + dq-actors-spilling minikql-comp_nodes-llvm yql-public-issue core-quoter-public ) target_sources(dq-actors-compute PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp diff --git a/ydb/library/yql/dq/actors/compute/CMakeLists.windows-x86_64.txt b/ydb/library/yql/dq/actors/compute/CMakeLists.windows-x86_64.txt index 7b0917ad05..e9b39cdfbe 100644 --- a/ydb/library/yql/dq/actors/compute/CMakeLists.windows-x86_64.txt +++ b/ydb/library/yql/dq/actors/compute/CMakeLists.windows-x86_64.txt @@ -23,11 +23,13 @@ target_link_libraries(dq-actors-compute PUBLIC yql-dq-proto yql-dq-runtime yql-dq-tasks + dq-actors-spilling minikql-comp_nodes-llvm yql-public-issue core-quoter-public ) target_sources(dq-actors-compute PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp index 5a24705ba3..c382470556 100644 --- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp @@ -1,5 +1,6 @@ #include "dq_compute_actor.h" #include "dq_async_compute_actor.h" +#include "dq_task_runner_exec_ctx.h" #include <ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h> @@ -82,8 +83,9 @@ public: Become(&TDqAsyncComputeActor::StateFuncWrapper<&TDqAsyncComputeActor::StateFuncBody>); - // TODO: - std::shared_ptr<IDqTaskRunnerExecutionContext> execCtx = std::shared_ptr<IDqTaskRunnerExecutionContext>(new TDqTaskRunnerExecutionContext()); + auto wakeup = [this]{ ContinueExecute(); }; + std::shared_ptr<IDqTaskRunnerExecutionContext> execCtx = std::make_shared<TDqTaskRunnerExecutionContext>( + TxId, RuntimeSettings.UseSpilling, std::move(wakeup), TlsActivationContext->AsActorContext()); Send(TaskRunnerActorId, new NTaskRunnerActor::TEvTaskRunnerCreate( diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp index f8d2105aab..c1a8db880c 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp @@ -1,5 +1,6 @@ #include "dq_compute_actor_impl.h" #include "dq_compute_actor.h" +#include "dq_task_runner_exec_ctx.h" #include <ydb/library/yql/dq/common/dq_common.h> @@ -55,7 +56,9 @@ public: auto taskRunner = TaskRunnerFactory(Task, logger); SetTaskRunner(taskRunner); - PrepareTaskRunner(); + auto wakeup = [this]{ ContinueExecute(); }; + TDqTaskRunnerExecutionContext execCtx(TxId, RuntimeSettings.UseSpilling, std::move(wakeup), TlsActivationContext->AsActorContext()); + PrepareTaskRunner(execCtx); ContinueExecute(); } diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index 8c433ee435..cac8935d01 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -1529,7 +1529,7 @@ protected: TaskRunner = taskRunner; } - void PrepareTaskRunner(const IDqTaskRunnerExecutionContext& execCtx = TDqTaskRunnerExecutionContext()) { + void PrepareTaskRunner(const IDqTaskRunnerExecutionContext& execCtx) { YQL_ENSURE(TaskRunner); auto guard = TaskRunner->BindAllocator(MemoryQuota->GetMkqlMemoryLimit()); diff --git a/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.cpp b/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.cpp new file mode 100644 index 0000000000..08265d3f3e --- /dev/null +++ b/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.cpp @@ -0,0 +1,26 @@ +#include "dq_task_runner_exec_ctx.h" + +#include <ydb/library/yql/dq/actors/spilling/channel_storage.h> + + +namespace NYql { +namespace NDq { + +TDqTaskRunnerExecutionContext::TDqTaskRunnerExecutionContext(TTxId txId, bool withSpilling, IDqChannelStorage::TWakeUpCallback&& wakeUp, const NActors::TActorContext& ctx) + : TxId_(txId) + , WakeUp_(std::move(wakeUp)) + , Ctx_(ctx) + , WithSpilling_(withSpilling) +{ +} + +IDqChannelStorage::TPtr TDqTaskRunnerExecutionContext::CreateChannelStorage(ui64 channelId) const { + if (WithSpilling_) { + return CreateDqChannelStorage(TxId_, channelId, WakeUp_, Ctx_); + } else { + return nullptr; + } +} + +} // namespace NDq +} // namespace NYql diff --git a/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.h b/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.h new file mode 100644 index 0000000000..e364dda0d2 --- /dev/null +++ b/ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.h @@ -0,0 +1,25 @@ +#pragma once + +#include <ydb/library/yql/dq/runtime/dq_tasks_runner.h> +#include <ydb/library/yql/dq/common/dq_common.h> +#include <library/cpp/actors/core/actor.h> + + +namespace NYql { +namespace NDq { + +class TDqTaskRunnerExecutionContext : public TDqTaskRunnerExecutionContextBase { +public: + TDqTaskRunnerExecutionContext(TTxId txId, bool withSpilling, IDqChannelStorage::TWakeUpCallback&& wakeUp, const NActors::TActorContext& ctx); + + IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId) const override; + +private: + const TTxId TxId_; + const IDqChannelStorage::TWakeUpCallback WakeUp_; + const NActors::TActorContext& Ctx_; + const bool WithSpilling_; +}; + +} // namespace NDq +} // namespace NYql diff --git a/ydb/library/yql/dq/actors/compute/ya.make b/ydb/library/yql/dq/actors/compute/ya.make index 486920dadf..183fb0eb85 100644 --- a/ydb/library/yql/dq/actors/compute/ya.make +++ b/ydb/library/yql/dq/actors/compute/ya.make @@ -1,6 +1,7 @@ LIBRARY() SRCS( + dq_task_runner_exec_ctx.cpp dq_async_compute_actor.cpp dq_compute_actor_async_io_factory.cpp dq_compute_actor_channels.cpp @@ -24,6 +25,7 @@ PEERDIR( ydb/library/yql/dq/proto ydb/library/yql/dq/runtime ydb/library/yql/dq/tasks + ydb/library/yql/dq/actors/spilling ydb/library/yql/minikql/comp_nodes/llvm ydb/library/yql/public/issue ydb/core/quoter/public diff --git a/ydb/library/yql/dq/actors/dq_events_ids.h b/ydb/library/yql/dq/actors/dq_events_ids.h index 11b0225233..d097e6bd34 100644 --- a/ydb/library/yql/dq/actors/dq_events_ids.h +++ b/ydb/library/yql/dq/actors/dq_events_ids.h @@ -58,5 +58,15 @@ struct TDqComputeEvents { static_assert(EvEnd < EventSpaceBegin((TDqEvents::ES_DQ_COMPUTE + 1))); }; +struct TDqSpillingEvents { + enum EDqSpillingEvents { + EvWrite = EventSpaceBegin(TDqEvents::ES_DQ_COMPUTE) + 100, + EvWriteResult, + EvRead, + EvReadResult, + EvError, + }; +}; + } // namespace NDq } // namespace NYql diff --git a/ydb/library/yql/dq/actors/spilling/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/dq/actors/spilling/CMakeLists.darwin-x86_64.txt new file mode 100644 index 0000000000..5c8ffc401e --- /dev/null +++ b/ydb/library/yql/dq/actors/spilling/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,33 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(ut) + +add_library(dq-actors-spilling) +target_compile_options(dq-actors-spilling PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(dq-actors-spilling PUBLIC + contrib-libs-cxxsupp + yutil + ydb-library-services + yql-dq-common + yql-dq-actors + yql-dq-runtime + library-yql-utils + cpp-actors-core + cpp-actors-util + cpp-monlib-dynamic_counters + monlib-service-pages +) +target_sources(dq-actors-spilling PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/spilling/channel_storage.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/spilling/spilling_counters.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/spilling/spilling_file.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/spilling/spilling.cpp +) diff --git a/ydb/library/yql/dq/actors/spilling/CMakeLists.linux-aarch64.txt b/ydb/library/yql/dq/actors/spilling/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..94acf3d36a --- /dev/null +++ b/ydb/library/yql/dq/actors/spilling/CMakeLists.linux-aarch64.txt @@ -0,0 +1,34 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(ut) + +add_library(dq-actors-spilling) +target_compile_options(dq-actors-spilling PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(dq-actors-spilling PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + ydb-library-services + yql-dq-common + yql-dq-actors + yql-dq-runtime + library-yql-utils + cpp-actors-core + cpp-actors-util + cpp-monlib-dynamic_counters + monlib-service-pages +) +target_sources(dq-actors-spilling PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/spilling/channel_storage.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/spilling/spilling_counters.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/spilling/spilling_file.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/spilling/spilling.cpp +) diff --git a/ydb/library/yql/dq/actors/spilling/CMakeLists.linux-x86_64.txt b/ydb/library/yql/dq/actors/spilling/CMakeLists.linux-x86_64.txt new file mode 100644 index 0000000000..94acf3d36a --- /dev/null +++ b/ydb/library/yql/dq/actors/spilling/CMakeLists.linux-x86_64.txt @@ -0,0 +1,34 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(ut) + +add_library(dq-actors-spilling) +target_compile_options(dq-actors-spilling PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(dq-actors-spilling PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + ydb-library-services + yql-dq-common + yql-dq-actors + yql-dq-runtime + library-yql-utils + cpp-actors-core + cpp-actors-util + cpp-monlib-dynamic_counters + monlib-service-pages +) +target_sources(dq-actors-spilling PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/spilling/channel_storage.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/spilling/spilling_counters.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/spilling/spilling_file.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/spilling/spilling.cpp +) diff --git a/ydb/library/yql/dq/actors/spilling/CMakeLists.txt b/ydb/library/yql/dq/actors/spilling/CMakeLists.txt new file mode 100644 index 0000000000..f8b31df0c1 --- /dev/null +++ b/ydb/library/yql/dq/actors/spilling/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/library/yql/dq/actors/spilling/CMakeLists.windows-x86_64.txt b/ydb/library/yql/dq/actors/spilling/CMakeLists.windows-x86_64.txt new file mode 100644 index 0000000000..5c8ffc401e --- /dev/null +++ b/ydb/library/yql/dq/actors/spilling/CMakeLists.windows-x86_64.txt @@ -0,0 +1,33 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(ut) + +add_library(dq-actors-spilling) +target_compile_options(dq-actors-spilling PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(dq-actors-spilling PUBLIC + contrib-libs-cxxsupp + yutil + ydb-library-services + yql-dq-common + yql-dq-actors + yql-dq-runtime + library-yql-utils + cpp-actors-core + cpp-actors-util + cpp-monlib-dynamic_counters + monlib-service-pages +) +target_sources(dq-actors-spilling PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/spilling/channel_storage.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/spilling/spilling_counters.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/spilling/spilling_file.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/spilling/spilling.cpp +) diff --git a/ydb/library/yql/dq/actors/spilling/channel_storage.cpp b/ydb/library/yql/dq/actors/spilling/channel_storage.cpp new file mode 100644 index 0000000000..4f5a55a23e --- /dev/null +++ b/ydb/library/yql/dq/actors/spilling/channel_storage.cpp @@ -0,0 +1,242 @@ +#include "channel_storage.h" +#include "spilling.h" +#include "spilling_file.h" + +#include <ydb/library/yql/utils/yql_panic.h> +#include <ydb/library/services/services.pb.h> + +#include <library/cpp/actors/core/actor_bootstrapped.h> +#include <library/cpp/actors/core/hfunc.h> +#include <library/cpp/actors/core/log.h> + +#include <util/generic/buffer.h> +#include <util/generic/map.h> +#include <util/generic/set.h> +#include <util/generic/size_literals.h> + + +namespace NYql::NDq { + +using namespace NActors; + +#define LOG_D(s) \ + LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId_ << ", channelId: " << ChannelId_ << ". " << s) +#define LOG_I(s) \ + LOG_INFO_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId << ", channelId: " << ChannelId << ". " << s) +#define LOG_E(s) \ + LOG_ERROR_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId_ << ", channelId: " << ChannelId_ << ". " << s) +#define LOG_C(s) \ + LOG_CRIT_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId << ", channelId: " << ChannelId << ". " << s) +#define LOG_W(s) \ + LOG_WARN_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId << ", channelId: " << ChannelId << ". " << s) +#define LOG_T(s) \ + LOG_TRACE_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId_ << ", channelId: " << ChannelId_ << ". " << s) + +namespace { + +constexpr ui32 MAX_INFLIGHT_BLOBS_COUNT = 10; +constexpr ui64 MAX_INFLIGHT_BLOBS_SIZE = 50_MB; + +class TDqChannelStorageActor : public TActorBootstrapped<TDqChannelStorageActor> { + using TBase = TActorBootstrapped<TDqChannelStorageActor>; + +public: + TDqChannelStorageActor(TTxId txId, ui64 channelId, IDqChannelStorage::TWakeUpCallback&& wakeUp) + : TxId_(txId) + , ChannelId_(channelId) + , WakeUp_(std::move(wakeUp)) {} + + void Bootstrap() { + auto spillingActor = CreateDqLocalFileSpillingActor(TxId_, TStringBuilder() << "ChannelId: " << ChannelId_, + SelfId(), true); + SpillingActorId_ = Register(spillingActor); + + Become(&TDqChannelStorageActor::WorkState); + } + + static constexpr char ActorName[] = "DQ_CHANNEL_STORAGE"; + +protected: + void PassAway() override { + Send(SpillingActorId_, new TEvents::TEvPoison); + TBase::PassAway(); + } + +private: + STATEFN(WorkState) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvDqSpilling::TEvWriteResult, HandleWork); + hFunc(TEvDqSpilling::TEvReadResult, HandleWork); + hFunc(TEvDqSpilling::TEvError, HandleWork); + default: + Y_ABORT("TDqChannelStorageActor::WorkState unexpected event type: %" PRIx32 " event: %s", + ev->GetTypeRewrite(), + ev->ToString().data()); + } + } + + void HandleWork(TEvDqSpilling::TEvWriteResult::TPtr& ev) { + auto& msg = *ev->Get(); + LOG_T("[TEvWriteResult] blobId: " << msg.BlobId); + + auto it = WritingBlobs_.find(msg.BlobId); + if (it == WritingBlobs_.end()) { + LOG_E("Got unexpected TEvWriteResult, blobId: " << msg.BlobId); + + Error_ = "Internal error"; + + Send(SpillingActorId_, new TEvents::TEvPoison); + return; + } + + ui64 size = it->second; + WritingBlobsSize_ -= size; + WritingBlobs_.erase(it); + + StoredBlobsCount_++; + StoredBlobsSize_ += size; + } + + void HandleWork(TEvDqSpilling::TEvReadResult::TPtr& ev) { + auto& msg = *ev->Get(); + LOG_T("[TEvReadResult] blobId: " << msg.BlobId << ", size: " << msg.Blob.size()); + + if (LoadingBlobs_.erase(msg.BlobId) != 1) { + LOG_E("[TEvReadResult] unexpected, blobId: " << msg.BlobId << ", size: " << msg.Blob.size()); + return; + } + + LoadedBlobs_[msg.BlobId].Swap(msg.Blob); + YQL_ENSURE(LoadedBlobs_[msg.BlobId].size() != 0); + + if (LoadedBlobs_.size() == 1) { + WakeUp_(); + } + } + + void HandleWork(TEvDqSpilling::TEvError::TPtr& ev) { + auto& msg = *ev->Get(); + LOG_D("[TEvError] " << msg.Message); + + Error_.ConstructInPlace(msg.Message); + } + +public: + [[nodiscard]] + const TMaybe<TString>& GetError() const { + return Error_; + } + + bool IsEmpty() const { + return WritingBlobs_.empty() && StoredBlobsCount_ == 0 && LoadedBlobs_.empty(); + } + + bool IsFull() const { + return WritingBlobs_.size() > MAX_INFLIGHT_BLOBS_COUNT || WritingBlobsSize_ > MAX_INFLIGHT_BLOBS_SIZE; + } + + void Put(ui64 blobId, TRope&& blob) { + FailOnError(); + + // TODO: timeout + // TODO: limit inflight events + + ui64 size = blob.size(); + + Send(SpillingActorId_, new TEvDqSpilling::TEvWrite(blobId, std::move(blob))); + + WritingBlobs_.emplace(blobId, size); + WritingBlobsSize_ += size; + } + + bool Get(ui64 blobId, TBuffer& blob) { + FailOnError(); + + auto loadedIt = LoadedBlobs_.find(blobId); + if (loadedIt != LoadedBlobs_.end()) { + YQL_ENSURE(loadedIt->second.size() != 0); + blob.Swap(loadedIt->second); + LoadedBlobs_.erase(loadedIt); + return true; + } + + auto result = LoadingBlobs_.emplace(blobId); + if (result.second) { + Send(SpillingActorId_, new TEvDqSpilling::TEvRead(blobId, true)); + } + + return false; + } + + void Terminate() { + PassAway(); + } + +private: + void FailOnError() { + if (Error_) { + LOG_E("Error: " << *Error_); + ythrow TDqChannelStorageException() << "TxId: " << TxId_ << ", channelId: " << ChannelId_ + << ", error: " << *Error_; + } + } + +private: + const TTxId TxId_; + const ui64 ChannelId_; + IDqChannelStorage::TWakeUpCallback WakeUp_; + TActorId SpillingActorId_; + + TMap<ui64, ui64> WritingBlobs_; // blobId -> blobSize + ui64 WritingBlobsSize_ = 0; + + ui32 StoredBlobsCount_ = 0; + ui64 StoredBlobsSize_ = 0; + + TSet<ui64> LoadingBlobs_; + TMap<ui64, TBuffer> LoadedBlobs_; + + TMaybe<TString> Error_; +}; + + +class TDqChannelStorage : public IDqChannelStorage { +public: + TDqChannelStorage(TTxId txId, ui64 channelId, TWakeUpCallback&& wakeUp, const TActorContext& ctx) { + SelfActor_ = new TDqChannelStorageActor(txId, channelId, std::move(wakeUp)); + ctx.RegisterWithSameMailbox(SelfActor_); + } + + ~TDqChannelStorage() { + SelfActor_->Terminate(); + } + + bool IsEmpty() const override { + return SelfActor_->IsEmpty(); + } + + bool IsFull() const override { + return SelfActor_->IsFull(); + } + + void Put(ui64 blobId, TRope&& blob) override { + SelfActor_->Put(blobId, std::move(blob)); + } + + bool Get(ui64 blobId, TBuffer& blob) override { + return SelfActor_->Get(blobId, blob); + } + +private: + TDqChannelStorageActor* SelfActor_; +}; + +} // anonymous namespace + +IDqChannelStorage::TPtr CreateDqChannelStorage(TTxId txId, ui64 channelId, IDqChannelStorage::TWakeUpCallback wakeUp, + const TActorContext& ctx) +{ + return new TDqChannelStorage(txId, channelId, std::move(wakeUp), ctx); +} + +} // namespace NYql::NDq diff --git a/ydb/core/kqp/runtime/kqp_channel_storage.h b/ydb/library/yql/dq/actors/spilling/channel_storage.h index ec149722b8..f0d6905f02 100644 --- a/ydb/core/kqp/runtime/kqp_channel_storage.h +++ b/ydb/library/yql/dq/actors/spilling/channel_storage.h @@ -4,9 +4,9 @@ #include <ydb/library/yql/dq/runtime/dq_channel_storage.h> #include <library/cpp/actors/core/actor.h> -namespace NKikimr::NKqp { +namespace NYql::NDq { -NYql::NDq::IDqChannelStorage::TPtr CreateKqpChannelStorage(ui64 txId, ui64 channelId, +NYql::NDq::IDqChannelStorage::TPtr CreateDqChannelStorage(TTxId txId, ui64 channelId, NYql::NDq::IDqChannelStorage::TWakeUpCallback wakeUpCb, const NActors::TActorContext& ctx); -} // namespace NKikimr::NKqp +} // namespace NYql::NDq diff --git a/ydb/library/yql/dq/actors/spilling/spilling.cpp b/ydb/library/yql/dq/actors/spilling/spilling.cpp new file mode 100644 index 0000000000..9c8ef0f0df --- /dev/null +++ b/ydb/library/yql/dq/actors/spilling/spilling.cpp @@ -0,0 +1 @@ +#include "spilling.h" diff --git a/ydb/core/kqp/runtime/kqp_spilling.h b/ydb/library/yql/dq/actors/spilling/spilling.h index 8cc9636946..b9d43de0a7 100644 --- a/ydb/core/kqp/runtime/kqp_spilling.h +++ b/ydb/library/yql/dq/actors/spilling/spilling.h @@ -1,16 +1,17 @@ #pragma once -#include <ydb/core/kqp/common/kqp_event_ids.h> -#include <ydb/core/protos/config.pb.h> +#include <ydb/library/yql/dq/actors/dq_events_ids.h> #include <library/cpp/actors/util/rope.h> +#include <library/cpp/actors/core/event_local.h> +#include <util/datetime/base.h> #include <util/generic/buffer.h> -namespace NKikimr::NKqp { +namespace NYql::NDq { -struct TEvKqpSpilling { - struct TEvWrite : public TEventLocal<TEvWrite, TKqpSpillingEvents::EvWrite> { +struct TEvDqSpilling { + struct TEvWrite : public NActors::TEventLocal<TEvWrite, TDqSpillingEvents::EvWrite> { ui64 BlobId; TRope Blob; TMaybe<TDuration> Timeout; @@ -19,14 +20,14 @@ struct TEvKqpSpilling { : BlobId(blobId), Blob(std::move(blob)), Timeout(timeout) {} }; - struct TEvWriteResult : public TEventLocal<TEvWriteResult, TKqpSpillingEvents::EvWriteResult> { + struct TEvWriteResult : public NActors::TEventLocal<TEvWriteResult, TDqSpillingEvents::EvWriteResult> { ui64 BlobId; TEvWriteResult(ui64 blobId) : BlobId(blobId) {} }; - struct TEvRead : public TEventLocal<TEvRead, TKqpSpillingEvents::EvRead> { + struct TEvRead : public NActors::TEventLocal<TEvRead, TDqSpillingEvents::EvRead> { ui64 BlobId; bool RemoveBlob; TMaybe<TDuration> Timeout; @@ -35,7 +36,7 @@ struct TEvKqpSpilling { : BlobId(blobId), RemoveBlob(removeBlob), Timeout(timeout) {} }; - struct TEvReadResult : public TEventLocal<TEvReadResult, TKqpSpillingEvents::EvReadResult> { + struct TEvReadResult : public NActors::TEventLocal<TEvReadResult, TDqSpillingEvents::EvReadResult> { ui64 BlobId; TBuffer Blob; @@ -43,7 +44,7 @@ struct TEvKqpSpilling { : BlobId(blobId), Blob(std::move(blob)) {} }; - struct TEvError : public TEventLocal<TEvError, TKqpSpillingEvents::EvError> { + struct TEvError : public NActors::TEventLocal<TEvError, TDqSpillingEvents::EvError> { TString Message; TEvError(const TString& message) @@ -51,4 +52,4 @@ struct TEvKqpSpilling { }; }; -} // namespace NKikimr::NKqp +} // namespace NYql::NDq diff --git a/ydb/library/yql/dq/actors/spilling/spilling_counters.cpp b/ydb/library/yql/dq/actors/spilling/spilling_counters.cpp new file mode 100644 index 0000000000..d3023385c1 --- /dev/null +++ b/ydb/library/yql/dq/actors/spilling/spilling_counters.cpp @@ -0,0 +1,15 @@ +#include "spilling_counters.h" + +namespace NYql::NDq { + +TSpillingCounters::TSpillingCounters(const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters) { + SpillingWriteBlobs = counters->GetCounter("Spilling/WriteBlobs", true); + SpillingReadBlobs = counters->GetCounter("Spilling/ReadBlobs", true); + SpillingStoredBlobs = counters->GetCounter("Spilling/StoredBlobs", false); + SpillingTotalSpaceUsed = counters->GetCounter("Spilling/TotalSpaceUsed", false); + SpillingTooBigFileErrors = counters->GetCounter("Spilling/TooBigFileErrors", true); + SpillingNoSpaceErrors = counters->GetCounter("Spilling/NoSpaceErrors", true); + SpillingIoErrors = counters->GetCounter("Spilling/IoErrors", true); +} + +} // namespace NYql::NDq diff --git a/ydb/library/yql/dq/actors/spilling/spilling_counters.h b/ydb/library/yql/dq/actors/spilling/spilling_counters.h new file mode 100644 index 0000000000..4122c13e7e --- /dev/null +++ b/ydb/library/yql/dq/actors/spilling/spilling_counters.h @@ -0,0 +1,22 @@ +#pragma once + +#include <library/cpp/monlib/dynamic_counters/counters.h> + +#include <util/generic/ptr.h> + +namespace NYql::NDq { + +struct TSpillingCounters : public TThrRefBase { + + TSpillingCounters(const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters); + + ::NMonitoring::TDynamicCounters::TCounterPtr SpillingWriteBlobs; + ::NMonitoring::TDynamicCounters::TCounterPtr SpillingReadBlobs; + ::NMonitoring::TDynamicCounters::TCounterPtr SpillingStoredBlobs; + ::NMonitoring::TDynamicCounters::TCounterPtr SpillingTotalSpaceUsed; + ::NMonitoring::TDynamicCounters::TCounterPtr SpillingTooBigFileErrors; + ::NMonitoring::TDynamicCounters::TCounterPtr SpillingNoSpaceErrors; + ::NMonitoring::TDynamicCounters::TCounterPtr SpillingIoErrors; +}; + +} // namespace NYql::NDq diff --git a/ydb/core/kqp/runtime/kqp_spilling_file.cpp b/ydb/library/yql/dq/actors/spilling/spilling_file.cpp index 190ac2ac4f..b2f7c80603 100644 --- a/ydb/core/kqp/runtime/kqp_spilling_file.cpp +++ b/ydb/library/yql/dq/actors/spilling/spilling_file.cpp @@ -1,23 +1,20 @@ -#include "kqp_spilling.h" -#include "kqp_spilling_file.h" - -#include <ydb/core/actorlib_impl/long_timer.h> -#include <ydb/core/base/appdata.h> -#include <ydb/core/kqp/common/kqp.h> -#include <ydb/core/mon/mon.h> +#include "spilling.h" +#include "spilling_file.h" +#include <ydb/library/services/services.pb.h> #include <ydb/library/yql/utils/yql_panic.h> #include <library/cpp/actors/core/actor_bootstrapped.h> #include <library/cpp/actors/core/hfunc.h> #include <library/cpp/actors/core/log.h> +#include <library/cpp/actors/core/events.h> #include <library/cpp/monlib/service/pages/templates.h> #include <util/folder/path.h> #include <util/stream/file.h> #include <util/thread/pool.h> -namespace NKikimr::NKqp { +namespace NYql::NDq { using namespace NActors; @@ -26,31 +23,31 @@ namespace { // Read, write, and execute by owner only constexpr int DIR_MODE = S_IRWXU; -#define LOG_D(s) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_BLOBS_STORAGE, s) -#define LOG_I(s) LOG_INFO_S(*TlsActivationContext, NKikimrServices::KQP_BLOBS_STORAGE, s) -#define LOG_E(s) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::KQP_BLOBS_STORAGE, s) -#define LOG_C(s) LOG_CRIT_S(*TlsActivationContext, NKikimrServices::KQP_BLOBS_STORAGE, s) +#define LOG_D(s) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, s) +#define LOG_I(s) LOG_INFO_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, s) +#define LOG_E(s) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, s) +#define LOG_C(s) LOG_CRIT_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, s) -#define A_LOG_D(s) LOG_DEBUG_S(*ActorSystem, NKikimrServices::KQP_BLOBS_STORAGE, s) -#define A_LOG_E(s) LOG_ERROR_S(*ActorSystem, NKikimrServices::KQP_BLOBS_STORAGE, s) +#define A_LOG_D(s) LOG_DEBUG_S(*ActorSystem, NKikimrServices::KQP_COMPUTE, s) +#define A_LOG_E(s) LOG_ERROR_S(*ActorSystem, NKikimrServices::KQP_COMPUTE, s) #define REMOVE_FILES 1 // Local File Storage Events -struct TEvKqpSpillingLocalFile { +struct TEvDqSpillingLocalFile { enum EEv { - EvOpenFile = EventSpaceBegin(TKikimrEvents::ES_PRIVATE), + EvOpenFile = EventSpaceBegin(TEvents::ES_PRIVATE), EvCloseFile, LastEvent = EvCloseFile }; struct TEvOpenFile : public TEventLocal<TEvOpenFile, EvOpenFile> { - ui64 TxId; + TTxId TxId; TString Description; // for viewer & logs only bool RemoveBlobsAfterRead; - TEvOpenFile(ui64 txId, const TString& description, bool removeBlobsAfterRead) + TEvOpenFile(TTxId txId, const TString& description, bool removeBlobsAfterRead) : TxId(txId), Description(description), RemoveBlobsAfterRead(removeBlobsAfterRead) {} }; @@ -66,101 +63,102 @@ struct TEvKqpSpillingLocalFile { // It is a simple proxy between client and spilling service with one feature -- // it provides human-readable description for logs and viewer. -class TKqpLocalFileSpillingActor : public TActorBootstrapped<TKqpLocalFileSpillingActor> { +class TDqLocalFileSpillingActor : public TActorBootstrapped<TDqLocalFileSpillingActor> { public: - TKqpLocalFileSpillingActor(ui64 txId, const TString& details, const TActorId& client, bool removeBlobsAfterRead) - : TxId(txId) - , Details(details) - , ClientActorId(client) - , RemoveBlobsAfterRead(removeBlobsAfterRead) {} + TDqLocalFileSpillingActor(TTxId txId, const TString& details, const TActorId& client, bool removeBlobsAfterRead) + : TxId_(txId) + , Details_(details) + , ClientActorId_(client) + , RemoveBlobsAfterRead_(removeBlobsAfterRead) + {} void Bootstrap() { - ServiceActorId = MakeKqpLocalFileSpillingServiceID(SelfId().NodeId()); - YQL_ENSURE(ServiceActorId); + ServiceActorId_ = MakeDqLocalFileSpillingServiceID(SelfId().NodeId()); + YQL_ENSURE(ServiceActorId_); - LOG_D("Register LocalFileSpillingActor " << SelfId() << " at service " << ServiceActorId); - Send(ServiceActorId, new TEvKqpSpillingLocalFile::TEvOpenFile(TxId, Details, RemoveBlobsAfterRead)); + LOG_D("Register LocalFileSpillingActor " << SelfId() << " at service " << ServiceActorId_); + Send(ServiceActorId_, new TEvDqSpillingLocalFile::TEvOpenFile(TxId_, Details_, RemoveBlobsAfterRead_)); - Become(&TKqpLocalFileSpillingActor::WorkState); + Become(&TDqLocalFileSpillingActor::WorkState); } - static constexpr char ActorName[] = "KQP_LOCAL_FILE_SPILLING"; + static constexpr char ActorName[] = "DQ_LOCAL_FILE_SPILLING"; private: STRICT_STFUNC(WorkState, - hFunc(TEvKqpSpilling::TEvWrite, HandleWork) - hFunc(TEvKqpSpilling::TEvWriteResult, HandleWork) - hFunc(TEvKqpSpilling::TEvRead, HandleWork) - hFunc(TEvKqpSpilling::TEvReadResult, HandleWork) - hFunc(TEvKqpSpilling::TEvError, HandleWork) + hFunc(TEvDqSpilling::TEvWrite, HandleWork) + hFunc(TEvDqSpilling::TEvWriteResult, HandleWork) + hFunc(TEvDqSpilling::TEvRead, HandleWork) + hFunc(TEvDqSpilling::TEvReadResult, HandleWork) + hFunc(TEvDqSpilling::TEvError, HandleWork) hFunc(TEvents::TEvPoison, HandleWork) ); - void HandleWork(TEvKqpSpilling::TEvWrite::TPtr& ev) { + void HandleWork(TEvDqSpilling::TEvWrite::TPtr& ev) { ValidateSender(ev->Sender); - Send(ServiceActorId, ev->Release().Release()); + Send(ServiceActorId_, ev->Release().Release()); } - void HandleWork(TEvKqpSpilling::TEvWriteResult::TPtr& ev) { - if (!Send(ClientActorId, ev->Release().Release())) { + void HandleWork(TEvDqSpilling::TEvWriteResult::TPtr& ev) { + if (!Send(ClientActorId_, ev->Release().Release())) { ClientLost(); } } - void HandleWork(TEvKqpSpilling::TEvRead::TPtr& ev) { + void HandleWork(TEvDqSpilling::TEvRead::TPtr& ev) { ValidateSender(ev->Sender); - Send(ServiceActorId, ev->Release().Release()); + Send(ServiceActorId_, ev->Release().Release()); } - void HandleWork(TEvKqpSpilling::TEvReadResult::TPtr& ev) { - if (!Send(ClientActorId, ev->Release().Release())) { + void HandleWork(TEvDqSpilling::TEvReadResult::TPtr& ev) { + if (!Send(ClientActorId_, ev->Release().Release())) { ClientLost(); } } - void HandleWork(TEvKqpSpilling::TEvError::TPtr& ev) { - Send(ClientActorId, ev->Release().Release()); + void HandleWork(TEvDqSpilling::TEvError::TPtr& ev) { + Send(ClientActorId_, ev->Release().Release()); } void HandleWork(TEvents::TEvPoison::TPtr& ev) { ValidateSender(ev->Sender); - Send(ServiceActorId, new TEvKqpSpillingLocalFile::TEvCloseFile); + Send(ServiceActorId_, new TEvDqSpillingLocalFile::TEvCloseFile); PassAway(); } private: void ValidateSender(const TActorId& sender) { - YQL_ENSURE(ClientActorId == sender, "" << ClientActorId << " != " << sender); + YQL_ENSURE(ClientActorId_ == sender, "" << ClientActorId_ << " != " << sender); } void ClientLost() { - Send(ServiceActorId, new TEvKqpSpillingLocalFile::TEvCloseFile("Client lost")); + Send(ServiceActorId_, new TEvDqSpillingLocalFile::TEvCloseFile("Client lost")); PassAway(); } private: - const ui64 TxId; - const TString Details; - const TActorId ClientActorId; - const bool RemoveBlobsAfterRead; - TActorId ServiceActorId; + const TTxId TxId_; + const TString Details_; + const TActorId ClientActorId_; + const bool RemoveBlobsAfterRead_; + TActorId ServiceActorId_; }; -class TKqpLocalFileSpillingService : public TActorBootstrapped<TKqpLocalFileSpillingService> { +class TDqLocalFileSpillingService : public TActorBootstrapped<TDqLocalFileSpillingService> { private: struct TEvPrivate { enum EEv { - EvCloseFileResponse = TEvKqpSpillingLocalFile::EEv::LastEvent + 1, + EvCloseFileResponse = TEvDqSpillingLocalFile::EEv::LastEvent + 1, EvWriteFileResponse, EvReadFileResponse, LastEvent }; - static_assert(EEv::LastEvent - EventSpaceBegin(TKikimrEvents::ES_PRIVATE) < 16); + static_assert(EEv::LastEvent - EventSpaceBegin(TEvents::ES_PRIVATE) < 16); struct TEvCloseFileResponse : public TEventLocal<TEvCloseFileResponse, EvCloseFileResponse> { TActorId Client; @@ -192,58 +190,54 @@ private: using TFilesIt = __yhashtable_iterator<std::pair<const TActorId, TFileDesc>>; public: - TKqpLocalFileSpillingService(const NKikimrConfig::TTableServiceConfig::TSpillingServiceConfig::TLocalFileConfig& config, - TIntrusivePtr<TKqpCounters> counters) - : Config(config) - , Counters(counters) + TDqLocalFileSpillingService(const TFileSpillingServiceConfig& config, + TIntrusivePtr<TSpillingCounters> counters) + : Config_(config) + , Counters_(counters) { - IoThreadPool = CreateThreadPool(Config.GetIoThreadPool().GetWorkersCount(), - Config.GetIoThreadPool().GetQueueSize(), IThreadPool::TParams().SetThreadNamePrefix("KqpSpilling")); + IoThreadPool_ = CreateThreadPool(Config_.IoThreadPoolWorkersCount, + Config_.IoThreadPoolQueueSize, IThreadPool::TParams().SetThreadNamePrefix("DqSpilling")); } void Bootstrap() { - Root = Config.GetRoot(); - Root /= (TStringBuilder() << "node_" << SelfId().NodeId()); + Root_ = Config_.Root; + Root_ /= (TStringBuilder() << "node_" << SelfId().NodeId()); - LOG_I("Init KQP local file spilling service at " << Root << ", actor: " << SelfId()); + LOG_I("Init DQ local file spilling service at " << Root_ << ", actor: " << SelfId()); try { - if (Root.IsSymlink()) { - throw TIoException() << Root << " is a symlink, can not start Spilling Service"; + if (Root_.IsSymlink()) { + throw TIoException() << Root_ << " is a symlink, can not start Spilling Service"; } - Root.ForceDelete(); - Root.MkDirs(DIR_MODE); + Root_.ForceDelete(); + Root_.MkDirs(DIR_MODE); } catch (...) { LOG_E(CurrentExceptionMessage()); - Become(&TKqpLocalFileSpillingService::BrokenState); + Become(&TDqLocalFileSpillingService::BrokenState); return; } - NActors::TMon* mon = AppData()->Mon; - if (mon) { - NMonitoring::TIndexMonPage* actorsMonPage = mon->RegisterIndexPage("actors", "Actors"); - mon->RegisterActorPage(actorsMonPage, "kqp_spilling_file", "KQP Local File Spilling Service", false, - TlsActivationContext->ExecutorThread.ActorSystem, SelfId()); - } - - Become(&TKqpLocalFileSpillingService::WorkState); + Become(&TDqLocalFileSpillingService::WorkState); } - static constexpr char ActorName[] = "KQP_LOCAL_FILE_SPILLING_SERVICE"; + static constexpr char ActorName[] = "DQ_LOCAL_FILE_SPILLING_SERVICE"; protected: void PassAway() override { - IoThreadPool->Stop(); + IoThreadPool_->Stop(); IActor::PassAway(); + if (Config_.CleanupOnShutdown) { + Root_.ForceDelete(); + } } private: STATEFN(BrokenState) { switch (ev->GetTypeRewrite()) { - case TEvKqpSpillingLocalFile::TEvOpenFile::EventType: - case TEvKqpSpillingLocalFile::TEvCloseFile::EventType: - case TEvKqpSpilling::TEvWrite::EventType: - case TEvKqpSpilling::TEvRead::EventType: { + case TEvDqSpillingLocalFile::TEvOpenFile::EventType: + case TEvDqSpillingLocalFile::TEvCloseFile::EventType: + case TEvDqSpilling::TEvWrite::EventType: + case TEvDqSpilling::TEvRead::EventType: { HandleBroken(ev->Sender); break; } @@ -256,7 +250,7 @@ private: void HandleBroken(const TActorId& from) { LOG_E("Service is broken, send error to client " << from); - Send(from, new TEvKqpSpilling::TEvError("Service not started")); + Send(from, new TEvDqSpilling::TEvError("Service not started")); } void HandleBroken(NMon::TEvHttpInfo::TPtr& ev) { @@ -265,43 +259,43 @@ private: private: STRICT_STFUNC(WorkState, - hFunc(TEvKqpSpillingLocalFile::TEvOpenFile, HandleWork) - hFunc(TEvKqpSpillingLocalFile::TEvCloseFile, HandleWork) + hFunc(TEvDqSpillingLocalFile::TEvOpenFile, HandleWork) + hFunc(TEvDqSpillingLocalFile::TEvCloseFile, HandleWork) hFunc(TEvPrivate::TEvCloseFileResponse, HandleWork) - hFunc(TEvKqpSpilling::TEvWrite, HandleWork) + hFunc(TEvDqSpilling::TEvWrite, HandleWork) hFunc(TEvPrivate::TEvWriteFileResponse, HandleWork) - hFunc(TEvKqpSpilling::TEvRead, HandleWork) + hFunc(TEvDqSpilling::TEvRead, HandleWork) hFunc(TEvPrivate::TEvReadFileResponse, HandleWork) hFunc(NMon::TEvHttpInfo, HandleWork) cFunc(TEvents::TEvPoison::EventType, PassAway) ); - void HandleWork(TEvKqpSpillingLocalFile::TEvOpenFile::TPtr& ev) { + void HandleWork(TEvDqSpillingLocalFile::TEvOpenFile::TPtr& ev) { auto& msg = *ev->Get(); LOG_D("[OpenFile] TxId: " << msg.TxId << ", desc: " << msg.Description << ", from: " << ev->Sender << ", removeBlobsAfterRead: " << msg.RemoveBlobsAfterRead); - auto it = Files.find(ev->Sender); - if (it != Files.end()) { + auto it = Files_.find(ev->Sender); + if (it != Files_.end()) { LOG_E("[OpenFile] Can not open file: already exists. TxId: " << msg.TxId << ", desc: " << msg.Description); - Send(ev->Sender, new TEvKqpSpilling::TEvError("File already exists")); + Send(ev->Sender, new TEvDqSpilling::TEvError("File already exists")); return; } - auto& fd = Files[ev->Sender]; + auto& fd = Files_[ev->Sender]; fd.TxId = msg.TxId; fd.Description = std::move(msg.Description); fd.RemoveBlobsAfterRead = msg.RemoveBlobsAfterRead; fd.OpenAt = TInstant::Now(); } - void HandleWork(TEvKqpSpillingLocalFile::TEvCloseFile::TPtr& ev) { + void HandleWork(TEvDqSpillingLocalFile::TEvCloseFile::TPtr& ev) { auto& msg = *ev->Get(); LOG_D("[CloseFile] from: " << ev->Sender << ", error: " << msg.Error); - auto it = Files.find(ev->Sender); - if (it == Files.end()) { + auto it = Files_.find(ev->Sender); + if (it == Files_.end()) { LOG_E("[CloseFile] Can not close file: not found. From: " << ev->Sender << ", error: " << msg.Error); return; } @@ -342,8 +336,8 @@ private: auto& msg = *ev->Get(); LOG_D("[CloseFileResponse] from: " << msg.Client); - auto it = Files.find(msg.Client); - if (it == Files.end()) { + auto it = Files_.find(msg.Client); + if (it == Files_.end()) { LOG_E("[CloseFileResponse] Can not find file from: " << msg.Client); return; } @@ -353,22 +347,22 @@ private: blobs += fp.Blobs.size(); } - Counters->SpillingStoredBlobs->Sub(blobs); - Counters->SpillingTotalSpaceUsed->Sub(it->second.TotalSize); + Counters_->SpillingStoredBlobs->Sub(blobs); + Counters_->SpillingTotalSpaceUsed->Sub(it->second.TotalSize); MoveFileToClosed(it); } - void HandleWork(TEvKqpSpilling::TEvWrite::TPtr& ev) { + void HandleWork(TEvDqSpilling::TEvWrite::TPtr& ev) { auto& msg = *ev->Get(); LOG_D("[Write] from: " << ev->Sender << ", blobId: " << msg.BlobId << ", bytes: " << msg.Blob.size()); - auto it = Files.find(ev->Sender); - if (it == Files.end()) { + auto it = Files_.find(ev->Sender); + if (it == Files_.end()) { LOG_E("[Write] File not found. " << "From: " << ev->Sender << ", blobId: " << msg.BlobId << ", bytes: " << msg.Blob.size()); - Send(ev->Sender, new TEvKqpSpilling::TEvError("File not found")); + Send(ev->Sender, new TEvDqSpilling::TEvError("File not found")); return; } @@ -378,37 +372,37 @@ private: LOG_E("[Write] File already closed. " << "From: " << ev->Sender << ", blobId: " << msg.BlobId << ", bytes: " << msg.Blob.size()); - Send(ev->Sender, new TEvKqpSpilling::TEvError("File already closed")); + Send(ev->Sender, new TEvDqSpilling::TEvError("File already closed")); return; } - if (fd.TotalSize + msg.Blob.size() > Config.GetMaxFileSize()) { + if (Config_.MaxFileSize && fd.TotalSize + msg.Blob.size() > Config_.MaxFileSize) { LOG_E("[Write] File size limit exceeded. " << "From: " << ev->Sender << ", blobId: " << msg.BlobId << ", bytes: " << msg.Blob.size()); - Send(ev->Sender, new TEvKqpSpilling::TEvError("File size limit exceeded")); + Send(ev->Sender, new TEvDqSpilling::TEvError("File size limit exceeded")); - Counters->SpillingTooBigFileErrors->Inc(); + Counters_->SpillingTooBigFileErrors->Inc(); return; } - if (TotalSize + msg.Blob.size() > Config.GetMaxTotalSize()) { + if (Config_.MaxTotalSize && TotalSize_ + msg.Blob.size() > Config_.MaxTotalSize) { LOG_E("[Write] Total size limit exceeded. " << "From: " << ev->Sender << ", blobId: " << msg.BlobId << ", bytes: " << msg.Blob.size()); - Send(ev->Sender, new TEvKqpSpilling::TEvError("Total size limit exceeded")); + Send(ev->Sender, new TEvDqSpilling::TEvError("Total size limit exceeded")); - Counters->SpillingNoSpaceErrors->Inc(); + Counters_->SpillingNoSpaceErrors->Inc(); return; } fd.TotalSize += msg.Blob.size(); - TotalSize += msg.Blob.size(); + TotalSize_ += msg.Blob.size(); TFileDesc::TFilePart* fp = fd.PartsList.empty() ? nullptr : &fd.PartsList.back(); bool newFile = false; - if (!fp || (fd.RemoveBlobsAfterRead && (fp->Size + msg.Blob.size() > Config.GetMaxFilePartSize()))) { + if (!fp || (fd.RemoveBlobsAfterRead && (Config_.MaxFilePartSize && fp->Size + msg.Blob.size() > Config_.MaxFilePartSize))) { if (!fd.PartsList.empty()) { fd.PartsList.back().Last = false; } @@ -418,7 +412,7 @@ private: fd.Parts.emplace(msg.BlobId, fp); auto fname = TStringBuilder() << fd.TxId << "_" << fd.Description << "_" << fd.NextPartListIndex++; - fp->FileName = (Root / fname).GetPath(); + fp->FileName = (Root_ / fname).GetPath(); LOG_D("[Write] create new FilePart " << fp->FileName); newFile = true; @@ -448,12 +442,12 @@ private: auto& msg = *ev->Get(); LOG_D("[WriteFileResponse] from: " << msg.Client << ", blobId: " << msg.BlobId << ", error: " << msg.Error); - auto it = Files.find(msg.Client); - if (it == Files.end()) { + auto it = Files_.find(msg.Client); + if (it == Files_.end()) { LOG_E("[WriteFileResponse] Can not write file: not found. " << "From: " << msg.Client << ", blobId: " << msg.BlobId << ", error: " << msg.Error); - Send(ev->Sender, new TEvKqpSpilling::TEvError("Internal error")); + Send(ev->Sender, new TEvDqSpilling::TEvError("Internal error")); return; } @@ -472,8 +466,8 @@ private: fd.TotalWriteBytes += blobDesc.Size; - Counters->SpillingStoredBlobs->Inc(); - Counters->SpillingTotalSpaceUsed->Add(blobDesc.Size); + Counters_->SpillingStoredBlobs->Inc(); + Counters_->SpillingTotalSpaceUsed->Add(blobDesc.Size); if (msg.NewFileHandle) { fp->FileHandle.Swap(msg.NewFileHandle); @@ -484,32 +478,32 @@ private: fd.Error = "File part not found"; } - Counters->SpillingIoErrors->Inc(); + Counters_->SpillingIoErrors->Inc(); } if (fd.Error) { - Send(msg.Client, new TEvKqpSpilling::TEvError(*fd.Error)); + Send(msg.Client, new TEvDqSpilling::TEvError(*fd.Error)); fd.Ops.clear(); CloseFile(it, fd.Error); return; } - Counters->SpillingWriteBlobs->Inc(); + Counters_->SpillingWriteBlobs->Inc(); - Send(msg.Client, new TEvKqpSpilling::TEvWriteResult(msg.BlobId)); + Send(msg.Client, new TEvDqSpilling::TEvWriteResult(msg.BlobId)); RunNextOp(fd); } - void HandleWork(TEvKqpSpilling::TEvRead::TPtr& ev) { + void HandleWork(TEvDqSpilling::TEvRead::TPtr& ev) { auto& msg = *ev->Get(); LOG_D("[Read] from: " << ev->Sender << ", blobId: " << msg.BlobId); - auto it = Files.find(ev->Sender); - if (it == Files.end()) { + auto it = Files_.find(ev->Sender); + if (it == Files_.end()) { LOG_E("[Read] Can not read file: not found. From: " << ev->Sender << ", blobId: " << msg.BlobId); - Send(ev->Sender, new TEvKqpSpilling::TEvError("File not found")); + Send(ev->Sender, new TEvDqSpilling::TEvError("File not found")); return; } @@ -518,7 +512,7 @@ private: if (fd.CloseAt) { LOG_E("[Read] Can not read file: closed. From: " << ev->Sender << ", blobId: " << msg.BlobId); - Send(ev->Sender, new TEvKqpSpilling::TEvError("Closed")); + Send(ev->Sender, new TEvDqSpilling::TEvError("Closed")); return; } @@ -526,7 +520,7 @@ private: if (partIt == fd.Parts.end()) { LOG_E("[Read] Can not read file: part not found. From: " << ev->Sender << ", blobId: " << msg.BlobId); - Send(ev->Sender, new TEvKqpSpilling::TEvError("File part not found")); + Send(ev->Sender, new TEvDqSpilling::TEvError("File part not found")); fd.Ops.clear(); TMaybe<TString> err = "Part not found"; @@ -540,7 +534,7 @@ private: if (blobIt == fp->Blobs.end()) { LOG_E("[Read] Can not read file: blob not found in the part. From: " << ev->Sender << ", blobId: " << msg.BlobId); - Send(ev->Sender, new TEvKqpSpilling::TEvError("Blob not found in the file part")); + Send(ev->Sender, new TEvDqSpilling::TEvError("Blob not found in the file part")); fd.Ops.clear(); TMaybe<TString> err = "Blob not found in the file part"; @@ -581,12 +575,12 @@ private: LOG_D("[ReadFileResponse] from: " << msg.Client << ", blobId: " << msg.BlobId << ", removed: " << msg.Removed << ", error: " << msg.Error); - auto it = Files.find(msg.Client); - if (it == Files.end()) { + auto it = Files_.find(msg.Client); + if (it == Files_.end()) { LOG_E("[ReadFileResponse] Can not read file: not found. " << "From: " << msg.Client << ", blobId: " << msg.BlobId << ", error: " << msg.Error); - Send(ev->Sender, new TEvKqpSpilling::TEvError("Internal error")); + Send(ev->Sender, new TEvDqSpilling::TEvError("Internal error")); return; } @@ -606,10 +600,10 @@ private: if (msg.Removed) { fd.TotalSize -= fp->Size; - TotalSize -= fp->Size; + TotalSize_ -= fp->Size; - Counters->SpillingTotalSpaceUsed->Sub(fp->Size); - Counters->SpillingStoredBlobs->Sub(fp->Blobs.size()); + Counters_->SpillingTotalSpaceUsed->Sub(fp->Size); + Counters_->SpillingStoredBlobs->Sub(fp->Blobs.size()); fd.Parts.erase(msg.BlobId); fd.PartsList.remove_if([fp](const auto& x) { return &x == fp; }); @@ -621,40 +615,40 @@ private: } if (fd.Error) { - Send(msg.Client, new TEvKqpSpilling::TEvError(*fd.Error)); + Send(msg.Client, new TEvDqSpilling::TEvError(*fd.Error)); fd.Ops.clear(); CloseFile(it, fd.Error); return; } - Counters->SpillingReadBlobs->Inc(); + Counters_->SpillingReadBlobs->Inc(); - Send(msg.Client, new TEvKqpSpilling::TEvReadResult(msg.BlobId, std::move(msg.Blob))); + Send(msg.Client, new TEvDqSpilling::TEvReadResult(msg.BlobId, std::move(msg.Blob))); RunNextOp(fd); } void HandleWork(NMon::TEvHttpInfo::TPtr& ev) { TStringStream s; - TMap<ui64, TVector<const TFileDesc*>> byTx; - for (const auto& fd : Files) { + TMap<TTxId, TVector<const TFileDesc*>> byTx; + for (const auto& fd : Files_) { byTx[fd.second.TxId].push_back(&fd.second); } HTML(s) { TAG(TH2) { s << "Configuration"; } PRE() { - s << " - Root: " << Config.GetRoot() << Endl; - s << " - MaxTotalSize: " << Config.GetMaxTotalSize() << Endl; - s << " - MaxFileSize: " << Config.GetMaxFileSize() << Endl; - s << " - MaxFilePartSize: " << Config.GetMaxFilePartSize() << Endl; - s << " - IO thread pool, workers: " << Config.GetIoThreadPool().GetWorkersCount() - << ", queue: " << Config.GetIoThreadPool().GetQueueSize() << Endl; + s << " - Root: " << Config_.Root << Endl; + s << " - MaxTotalSize: " << Config_.MaxTotalSize << Endl; + s << " - MaxFileSize: " << Config_.MaxFileSize << Endl; + s << " - MaxFilePartSize: " << Config_.MaxFilePartSize << Endl; + s << " - IO thread pool, workers: " << Config_.IoThreadPoolWorkersCount + << ", queue: " << Config_.IoThreadPoolQueueSize << Endl; } TAG(TH2) { s << "Active files"; } - PRE() { s << "Used space: " << TotalSize << Endl; } + PRE() { s << "Used space: " << TotalSize_ << Endl; } for (const auto& tx : byTx) { TAG(TH2) { s << "Transaction " << tx.first; } @@ -692,7 +686,7 @@ private: TAG(TH2) { s << "Last closed files"; } UL() { - for (auto it = ClosedFiles.rbegin(); it != ClosedFiles.rend(); ++it) { + for (auto it = ClosedFiles_.rbegin(); it != ClosedFiles_.rend(); ++it) { auto& fd = *it; LI() { s << "Transaction: " << fd.TxId << ", " << fd.Description; } PRE() { @@ -720,7 +714,7 @@ private: } else { fd.HasActiveOp = true; // TODO: retry if fails - IoThreadPool->SafeAddAndOwn(std::move(op)); + IoThreadPool_->SafeAddAndOwn(std::move(op)); } } @@ -736,12 +730,12 @@ private: } void MoveFileToClosed(TFilesIt it) { - TotalSize -= it->second.TotalSize; - ClosedFiles.emplace_back(TClosedFileDesc(std::move(it->second))); - while (ClosedFiles.size() > 100) { - ClosedFiles.pop_front(); + TotalSize_ -= it->second.TotalSize; + ClosedFiles_.emplace_back(TClosedFileDesc(std::move(it->second))); + while (ClosedFiles_.size() > 100) { + ClosedFiles_.pop_front(); } - Files.erase(it); + Files_.erase(it); } private: @@ -870,12 +864,8 @@ private: }; private: - const NKikimrConfig::TTableServiceConfig::TSpillingServiceConfig::TLocalFileConfig Config; - TFsPath Root; - TIntrusivePtr<TKqpCounters> Counters; - struct TFileDesc { - ui64 TxId = 0; + TTxId TxId; TString Description; bool RemoveBlobsAfterRead = false; TInstant OpenAt; @@ -919,7 +909,7 @@ private: }; struct TClosedFileDesc { - ui64 TxId; + TTxId TxId; TString Description; bool RemoveBlobsAfterRead; TInstant OpenAt; @@ -940,28 +930,32 @@ private: , WaitTime(fd.TotalWaitTime) , WriteBytes(fd.TotalWriteBytes) , ReadBytes(fd.TotalReadBytes) - , Error(std::move(fd.Error)) {} + , Error(std::move(fd.Error)) + {} }; - THolder<IThreadPool> IoThreadPool; - THashMap<TActorId, TFileDesc> Files; - TList<const TClosedFileDesc> ClosedFiles; - ui64 TotalSize = 0; +private: + const TFileSpillingServiceConfig Config_; + TFsPath Root_; + TIntrusivePtr<TSpillingCounters> Counters_; + + THolder<IThreadPool> IoThreadPool_; + THashMap<TActorId, TFileDesc> Files_; + TList<const TClosedFileDesc> ClosedFiles_; + ui64 TotalSize_ = 0; }; } // anonymous namespace -IActor* CreateKqpLocalFileSpillingActor(ui64 txId, const TString& details, const TActorId& client, +IActor* CreateDqLocalFileSpillingActor(TTxId txId, const TString& details, const TActorId& client, bool removeBlobsAfterRead) { - return new TKqpLocalFileSpillingActor(txId, details, client, removeBlobsAfterRead); + return new TDqLocalFileSpillingActor(txId, details, client, removeBlobsAfterRead); } -IActor* CreateKqpLocalFileSpillingService( - const NKikimrConfig::TTableServiceConfig::TSpillingServiceConfig::TLocalFileConfig& config, - TIntrusivePtr<TKqpCounters> counters) +IActor* CreateDqLocalFileSpillingService(const TFileSpillingServiceConfig& config, TIntrusivePtr<TSpillingCounters> counters) { - return new TKqpLocalFileSpillingService(config, counters); + return new TDqLocalFileSpillingService(config, counters); } -} // namespace NKikimr::NKqp +} // namespace NYql::NDq diff --git a/ydb/library/yql/dq/actors/spilling/spilling_file.h b/ydb/library/yql/dq/actors/spilling/spilling_file.h new file mode 100644 index 0000000000..c2173221eb --- /dev/null +++ b/ydb/library/yql/dq/actors/spilling/spilling_file.h @@ -0,0 +1,33 @@ +#pragma once + +#include "spilling_counters.h" + +#include <ydb/library/yql/dq/common/dq_common.h> +#include <library/cpp/actors/core/actor.h> + +#include <util/system/types.h> +#include <util/generic/strbuf.h> + +namespace NYql::NDq { + +struct TFileSpillingServiceConfig { + TString Root; + ui64 MaxTotalSize = 0; + ui64 MaxFileSize = 0; + ui64 MaxFilePartSize = 0; + + ui32 IoThreadPoolWorkersCount = 2; + ui32 IoThreadPoolQueueSize = 1000; + bool CleanupOnShutdown = false; +}; + +inline NActors::TActorId MakeDqLocalFileSpillingServiceID(ui32 nodeId) { + const char name[12] = "dq_lfspill"; + return NActors::TActorId(nodeId, TStringBuf(name, 12)); +} + +NActors::IActor* CreateDqLocalFileSpillingActor(TTxId txId, const TString& details, const NActors::TActorId& client, bool removeBlobsAfterRead); + +NActors::IActor* CreateDqLocalFileSpillingService(const TFileSpillingServiceConfig& config, TIntrusivePtr<TSpillingCounters> counters); + +} // namespace NYql::NDq diff --git a/ydb/library/yql/dq/actors/spilling/spilling_file_ut.cpp b/ydb/library/yql/dq/actors/spilling/spilling_file_ut.cpp new file mode 100644 index 0000000000..ff496b5f48 --- /dev/null +++ b/ydb/library/yql/dq/actors/spilling/spilling_file_ut.cpp @@ -0,0 +1,456 @@ +#include "spilling_file.h" +#include "spilling.h" + +#include <ydb/library/services/services.pb.h> + +#include <library/cpp/testing/unittest/registar.h> +#include <library/cpp/actors/testlib/test_runtime.h> + +#include <util/system/fs.h> +#include <util/generic/string.h> +#include <util/folder/path.h> + +namespace NYql::NDq { + +using namespace NActors; + +namespace { + +class TTestActorRuntime: public TTestActorRuntimeBase { +public: + void InitNodeImpl(TNodeDataBase* node, size_t nodeIndex) override { + node->LogSettings->Append( + NKikimrServices::EServiceKikimr_MIN, + NKikimrServices::EServiceKikimr_MAX, + NKikimrServices::EServiceKikimr_Name + ); + TTestActorRuntimeBase::InitNodeImpl(node, nodeIndex); + } + + ~TTestActorRuntime() { + if (SpillingRoot_ && SpillingRoot_.Exists()) { + SpillingRoot_.ForceDelete(); + } + } + + void Initialize() override { + TTestActorRuntimeBase::Initialize(); + SetLogPriority(NKikimrServices::KQP_COMPUTE, NActors::NLog::PRI_ERROR); + } + + TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters() { + static auto counters = MakeIntrusive<::NMonitoring::TDynamicCounters>(); + return counters; + } + + static TString GetSpillingPrefix() { + static TString str = Sprintf("%s_%d", "dq_spilling", (int)getpid()); + return str; + } + + TActorId StartSpillingService(ui64 maxTotalSize = 1000, ui64 maxFileSize = 500, + ui64 maxFilePartSize = 100, const TFsPath& root = TFsPath::Cwd() / GetSpillingPrefix()) + { + SpillingRoot_ = root; + + auto config = TFileSpillingServiceConfig{ + .Root = root.GetPath(), + .MaxTotalSize = maxTotalSize, + .MaxFileSize = maxFileSize, + .MaxFilePartSize = maxFilePartSize + }; + + auto counters = Counters(); + counters->ResetCounters(); + + auto spillingService = CreateDqLocalFileSpillingService(config, MakeIntrusive<TSpillingCounters>(counters)); + auto spillingServiceActorId = Register(spillingService); + EnableScheduleForActor(spillingServiceActorId); + RegisterService(MakeDqLocalFileSpillingServiceID(GetNodeId()), spillingServiceActorId); + + return spillingServiceActorId; + } + + TActorId StartSpillingActor(const TActorId& client, bool removeBlobsAfterRead = true) { + auto spillingActor = CreateDqLocalFileSpillingActor(1ul, "test", client, removeBlobsAfterRead); + auto spillingActorId = Register(spillingActor); + EnableScheduleForActor(spillingActorId); + + return spillingActorId; + } + + void WaitBootstrap() { + TDispatchOptions options; + options.FinalEvents.emplace_back(TEvents::TSystem::Bootstrap, 1); + UNIT_ASSERT(DispatchEvents(options)); + } + + const TFsPath& GetSpillingRoot() const { + return SpillingRoot_; + } + +private: + TFsPath SpillingRoot_; +}; + +TBuffer CreateBlob(ui32 size, char symbol) { + TBuffer blob(size); + blob.Fill(symbol, size); + return blob; +} + +TRope CreateRope(ui32 size, char symbol, ui32 chunkSize = 7) { + TRope result; + while (size) { + size_t count = std::min(size, chunkSize); + TString str(count, symbol); + result.Insert(result.End(), TRope{str}); + size -= count; + } + return result; +} + +void AssertEquals(const TBuffer& lhs, const TBuffer& rhs) { + TStringBuf l{lhs.data(), lhs.size()}; + TStringBuf r{rhs.data(), rhs.size()}; + UNIT_ASSERT_STRINGS_EQUAL(l, r); +} + + +struct THttpRequest : NMonitoring::IHttpRequest { + HTTP_METHOD Method; + TCgiParameters CgiParameters; + THttpHeaders HttpHeaders; + + THttpRequest(HTTP_METHOD method) + : Method(method) + {} + + ~THttpRequest() {} + + const char* GetURI() const override { + return ""; + } + + const char* GetPath() const override { + return ""; + } + + const TCgiParameters& GetParams() const override { + return CgiParameters; + } + + const TCgiParameters& GetPostParams() const override { + return CgiParameters; + } + + TStringBuf GetPostContent() const override { + return TString(); + } + + HTTP_METHOD GetMethod() const override { + return Method; + } + + const THttpHeaders& GetHeaders() const override { + return HttpHeaders; + } + + TString GetRemoteAddr() const override { + return TString(); + } +}; + +} // anonymous namespace + +Y_UNIT_TEST_SUITE(DqSpillingFileTests) { + + Y_UNIT_TEST(Simple) { + TTestActorRuntime runtime; + runtime.Initialize(); + + auto spillingService = runtime.StartSpillingService(); + auto tester = runtime.AllocateEdgeActor(); + auto spillingActor = runtime.StartSpillingActor(tester); + + runtime.WaitBootstrap(); + + // put blob 1 + { + auto ev = new TEvDqSpilling::TEvWrite(1, CreateRope(10, 'a')); + runtime.Send(new IEventHandle(spillingActor, tester, ev)); + + auto resp = runtime.GrabEdgeEvent<TEvDqSpilling::TEvWriteResult>(tester, TDuration::Seconds(1)); + UNIT_ASSERT_VALUES_EQUAL(1, resp->Get()->BlobId); + } + + // put blob 2 + { + auto ev = new TEvDqSpilling::TEvWrite(2, CreateRope(11, 'z')); + runtime.Send(new IEventHandle(spillingActor, tester, ev)); + + auto resp = runtime.GrabEdgeEvent<TEvDqSpilling::TEvWriteResult>(tester, TDuration::Seconds(1)); + UNIT_ASSERT_VALUES_EQUAL(2, resp->Get()->BlobId); + } + + // get blob 1 + { + auto ev = new TEvDqSpilling::TEvRead(1); + runtime.Send(new IEventHandle(spillingActor, tester, ev)); + + auto resp = runtime.GrabEdgeEvent<TEvDqSpilling::TEvReadResult>(tester, TDuration::Seconds(1)); + UNIT_ASSERT_VALUES_EQUAL(1, resp->Get()->BlobId); + + TBuffer expected = CreateBlob(10, 'a'); + AssertEquals(expected, resp->Get()->Blob); + } + + // get blob 2 + { + auto ev = new TEvDqSpilling::TEvRead(2); + runtime.Send(new IEventHandle(spillingActor, tester, ev)); + + auto resp = runtime.GrabEdgeEvent<TEvDqSpilling::TEvReadResult>(tester, TDuration::Seconds(1)); + UNIT_ASSERT_VALUES_EQUAL(2, resp->Get()->BlobId); + + TBuffer expected = CreateBlob(11, 'z'); + AssertEquals(expected, resp->Get()->Blob); + } + + // terminate + { + runtime.Send(new IEventHandle(spillingActor, tester, new TEvents::TEvPoison)); + + std::atomic<bool> done = false; + runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& event) { + if (event->GetRecipientRewrite() == spillingService) { + if (event->GetTypeRewrite() == 2146435074 /* EvCloseFileResponse */ ) { + done = true; + } + } + return TTestActorRuntimeBase::EEventAction::PROCESS; + }); + + TDispatchOptions options; + options.CustomFinalCondition = [&]() { + return (bool) done; + }; + + runtime.DispatchEvents(options, TDuration::Seconds(1)); + } + } + + Y_UNIT_TEST(Write_TotalSizeLimitExceeded) { + TTestActorRuntime runtime; + runtime.Initialize(); + + runtime.StartSpillingService(100, 1000, 1000); + auto tester = runtime.AllocateEdgeActor(); + auto spillingActor = runtime.StartSpillingActor(tester); + + runtime.WaitBootstrap(); + + { + auto ev = new TEvDqSpilling::TEvWrite(1, CreateRope(51, 'a')); + runtime.Send(new IEventHandle(spillingActor, tester, ev)); + + auto resp = runtime.GrabEdgeEvent<TEvDqSpilling::TEvWriteResult>(tester); + UNIT_ASSERT_VALUES_EQUAL(1, resp->Get()->BlobId); + } + + { + auto ev = new TEvDqSpilling::TEvWrite(2, CreateRope(50, 'b')); + runtime.Send(new IEventHandle(spillingActor, tester, ev)); + + auto resp = runtime.GrabEdgeEvent<TEvDqSpilling::TEvError>(tester); + UNIT_ASSERT_STRINGS_EQUAL("Total size limit exceeded", resp->Get()->Message); + } + } + + Y_UNIT_TEST(Write_FileSizeLimitExceeded) { + TTestActorRuntime runtime; + runtime.Initialize(); + + runtime.StartSpillingService(1000, 100, 1000); + auto tester = runtime.AllocateEdgeActor(); + auto spillingActor = runtime.StartSpillingActor(tester); + + runtime.WaitBootstrap(); + + { + auto ev = new TEvDqSpilling::TEvWrite(1, CreateRope(51, 'a')); + runtime.Send(new IEventHandle(spillingActor, tester, ev)); + + auto resp = runtime.GrabEdgeEvent<TEvDqSpilling::TEvWriteResult>(tester); + UNIT_ASSERT_VALUES_EQUAL(1, resp->Get()->BlobId); + } + + { + auto ev = new TEvDqSpilling::TEvWrite(2, CreateRope(50, 'b')); + runtime.Send(new IEventHandle(spillingActor, tester, ev)); + + auto resp = runtime.GrabEdgeEvent<TEvDqSpilling::TEvError>(tester); + UNIT_ASSERT_STRINGS_EQUAL("File size limit exceeded", resp->Get()->Message); + } + } + + Y_UNIT_TEST(MultipleFileParts) { + TTestActorRuntime runtime; + runtime.Initialize(); + + runtime.StartSpillingService(1000, 100, 25); + auto tester = runtime.AllocateEdgeActor(); + auto spillingActor = runtime.StartSpillingActor(tester); + + runtime.WaitBootstrap(); + + const TString filePrefix = TStringBuilder() << runtime.GetSpillingRoot().GetPath() << "/node_" << runtime.GetNodeId() << "/1_test_"; + + for (ui32 i = 0; i < 5; ++i) { + // Cerr << "---- store blob #" << i << Endl; + auto ev = new TEvDqSpilling::TEvWrite(i, CreateRope(20, 'a' + i)); + runtime.Send(new IEventHandle(spillingActor, tester, ev)); + + auto resp = runtime.GrabEdgeEvent<TEvDqSpilling::TEvWriteResult>(tester); + UNIT_ASSERT_VALUES_EQUAL(i, resp->Get()->BlobId); + + UNIT_ASSERT(NFs::Exists(TStringBuilder() << filePrefix << i)); + } + + for (i32 i = 4; i >= 0; --i) { + // Cerr << "---- load blob #" << i << Endl; + auto ev = new TEvDqSpilling::TEvRead(i, true); + runtime.Send(new IEventHandle(spillingActor, tester, ev)); + + auto resp = runtime.GrabEdgeEvent<TEvDqSpilling::TEvReadResult>(tester); + UNIT_ASSERT_VALUES_EQUAL(i, resp->Get()->BlobId); + TBuffer expected = CreateBlob(20, 'a' + i); + AssertEquals(expected, resp->Get()->Blob); + + if (i == 4) { + // do not remove last file + UNIT_ASSERT(NFs::Exists(TStringBuilder() << filePrefix << i)); + } else { + UNIT_ASSERT(!NFs::Exists(TStringBuilder() << filePrefix << i)); + } + } + } + + Y_UNIT_TEST(SingleFilePart) { + TTestActorRuntime runtime; + runtime.Initialize(); + + runtime.StartSpillingService(1000, 100, 25); + auto tester = runtime.AllocateEdgeActor(); + auto spillingActor = runtime.StartSpillingActor(tester, false); + + runtime.WaitBootstrap(); + + const TString filePrefix = TStringBuilder() << runtime.GetSpillingRoot().GetPath() << "/node_" << runtime.GetNodeId() << "/1_test_"; + + for (ui32 i = 0; i < 5; ++i) { + // Cerr << "---- store blob #" << i << Endl; + auto ev = new TEvDqSpilling::TEvWrite(i, CreateRope(20, 'a' + i)); + runtime.Send(new IEventHandle(spillingActor, tester, ev)); + + auto resp = runtime.GrabEdgeEvent<TEvDqSpilling::TEvWriteResult>(tester); + UNIT_ASSERT_VALUES_EQUAL(i, resp->Get()->BlobId); + + UNIT_ASSERT(NFs::Exists(TStringBuilder() << filePrefix << 0)); + if (i > 0) { + UNIT_ASSERT(!NFs::Exists(TStringBuilder() << filePrefix << i)); + } + } + + for (i32 i = 4; i >= 0; --i) { + // Cerr << "---- load blob #" << i << Endl; + auto ev = new TEvDqSpilling::TEvRead(i, true); + runtime.Send(new IEventHandle(spillingActor, tester, ev)); + + auto resp = runtime.GrabEdgeEvent<TEvDqSpilling::TEvReadResult>(tester); + UNIT_ASSERT_VALUES_EQUAL(i, resp->Get()->BlobId); + TBuffer expected = CreateBlob(20, 'a' + i); + AssertEquals(expected, resp->Get()->Blob); + + UNIT_ASSERT(NFs::Exists(TStringBuilder() << filePrefix << 0)); + } + } + + Y_UNIT_TEST(ReadError) { + //return; + + TTestActorRuntime runtime; + runtime.Initialize(); + + runtime.StartSpillingService(); + auto tester = runtime.AllocateEdgeActor(); + auto spillingActor = runtime.StartSpillingActor(tester); + + runtime.WaitBootstrap(); + + { + auto ev = new TEvDqSpilling::TEvWrite(0, CreateRope(20, 'a')); + runtime.Send(new IEventHandle(spillingActor, tester, ev)); + + auto resp = runtime.GrabEdgeEvent<TEvDqSpilling::TEvWriteResult>(tester); + UNIT_ASSERT_VALUES_EQUAL(0, resp->Get()->BlobId); + } + + (runtime.GetSpillingRoot() / "node_1" / "1_test_0").ForceDelete(); + + { + auto ev = new TEvDqSpilling::TEvRead(0, true); + runtime.Send(new IEventHandle(spillingActor, tester, ev)); + + auto resp = runtime.GrabEdgeEvent<TEvDqSpilling::TEvError>(tester); + auto& err = resp->Get()->Message; + auto expected = "can't open \"" + runtime.GetSpillingRoot().GetPath() + "/node_1/1_test_0\" with mode RdOnly"; + UNIT_ASSERT_C(err.Contains("No such file or directory"), err); + UNIT_ASSERT_C(err.Contains(expected), err); + } + } + + Y_UNIT_TEST(StartError) { + TTestActorRuntime runtime; + runtime.Initialize(); + + auto spillingService = runtime.StartSpillingService(100, 500, 100, TFsPath("/nonexistent") / runtime.GetSpillingPrefix()); + auto tester = runtime.AllocateEdgeActor(); + auto spillingActor = runtime.StartSpillingActor(tester); + + runtime.WaitBootstrap(); + + // put blob 1 + { + auto ev = new TEvDqSpilling::TEvWrite(1, CreateRope(10, 'a')); + runtime.Send(new IEventHandle(spillingActor, tester, ev)); + + auto resp = runtime.GrabEdgeEvent<TEvDqSpilling::TEvError>(tester, TDuration::Seconds(1)); + UNIT_ASSERT_EQUAL("Service not started", resp->Get()->Message); + } + + // get blob 1 + { + auto ev = new TEvDqSpilling::TEvRead(1); + runtime.Send(new IEventHandle(spillingActor, tester, ev)); + + auto resp = runtime.GrabEdgeEvent<TEvDqSpilling::TEvError>(tester, TDuration::Seconds(1)); + UNIT_ASSERT_EQUAL("Service not started", resp->Get()->Message); + } + + // mon + { + THttpRequest httpReq(HTTP_METHOD_GET); + NMonitoring::TMonService2HttpRequest monReq(nullptr, &httpReq, nullptr, nullptr, "", nullptr); + + runtime.Send(new IEventHandle(spillingService, tester, new NMon::TEvHttpInfo(monReq))); + + auto resp = runtime.GrabEdgeEvent<NMon::TEvHttpInfoRes>(tester, TDuration::Seconds(1)); + UNIT_ASSERT_EQUAL("<html><h2>Service is not started due to IO error</h2></html>", + ((NMon::TEvHttpInfoRes*) resp->Get())->Answer); + } + } + +} // suite + +} // namespace NYql::NDq diff --git a/ydb/library/yql/dq/actors/spilling/ut/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/dq/actors/spilling/ut/CMakeLists.darwin-x86_64.txt new file mode 100644 index 0000000000..14ac9583a2 --- /dev/null +++ b/ydb/library/yql/dq/actors/spilling/ut/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,80 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-library-yql-dq-actors-spilling-ut) +target_compile_options(ydb-library-yql-dq-actors-spilling-ut PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-library-yql-dq-actors-spilling-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/spilling +) +target_link_libraries(ydb-library-yql-dq-actors-spilling-ut PUBLIC + contrib-libs-cxxsupp + yutil + library-cpp-cpuid_check + cpp-testing-unittest_main + dq-actors-spilling + cpp-testing-unittest + cpp-actors-testlib + ydb-library-services +) +target_link_options(ydb-library-yql-dq-actors-spilling-ut PRIVATE + -Wl,-platform_version,macos,11.0,11.0 + -fPIC + -fPIC + -framework + CoreFoundation +) +target_sources(ydb-library-yql-dq-actors-spilling-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/spilling/spilling_file_ut.cpp +) +set_property( + TARGET + ydb-library-yql-dq-actors-spilling-ut + PROPERTY + SPLIT_FACTOR + 10 +) +add_yunittest( + NAME + ydb-library-yql-dq-actors-spilling-ut + TEST_TARGET + ydb-library-yql-dq-actors-spilling-ut + TEST_ARG + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +set_yunittest_property( + TEST + ydb-library-yql-dq-actors-spilling-ut + PROPERTY + LABELS + MEDIUM +) +set_yunittest_property( + TEST + ydb-library-yql-dq-actors-spilling-ut + PROPERTY + PROCESSORS + 1 +) +set_yunittest_property( + TEST + ydb-library-yql-dq-actors-spilling-ut + PROPERTY + TIMEOUT + 180 +) +target_allocator(ydb-library-yql-dq-actors-spilling-ut + system_allocator +) +vcs_info(ydb-library-yql-dq-actors-spilling-ut) diff --git a/ydb/library/yql/dq/actors/spilling/ut/CMakeLists.linux-aarch64.txt b/ydb/library/yql/dq/actors/spilling/ut/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..6ac287eaed --- /dev/null +++ b/ydb/library/yql/dq/actors/spilling/ut/CMakeLists.linux-aarch64.txt @@ -0,0 +1,83 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-library-yql-dq-actors-spilling-ut) +target_compile_options(ydb-library-yql-dq-actors-spilling-ut PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-library-yql-dq-actors-spilling-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/spilling +) +target_link_libraries(ydb-library-yql-dq-actors-spilling-ut PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + cpp-testing-unittest_main + dq-actors-spilling + cpp-testing-unittest + cpp-actors-testlib + ydb-library-services +) +target_link_options(ydb-library-yql-dq-actors-spilling-ut PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl +) +target_sources(ydb-library-yql-dq-actors-spilling-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/spilling/spilling_file_ut.cpp +) +set_property( + TARGET + ydb-library-yql-dq-actors-spilling-ut + PROPERTY + SPLIT_FACTOR + 10 +) +add_yunittest( + NAME + ydb-library-yql-dq-actors-spilling-ut + TEST_TARGET + ydb-library-yql-dq-actors-spilling-ut + TEST_ARG + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +set_yunittest_property( + TEST + ydb-library-yql-dq-actors-spilling-ut + PROPERTY + LABELS + MEDIUM +) +set_yunittest_property( + TEST + ydb-library-yql-dq-actors-spilling-ut + PROPERTY + PROCESSORS + 1 +) +set_yunittest_property( + TEST + ydb-library-yql-dq-actors-spilling-ut + PROPERTY + TIMEOUT + 180 +) +target_allocator(ydb-library-yql-dq-actors-spilling-ut + cpp-malloc-jemalloc +) +vcs_info(ydb-library-yql-dq-actors-spilling-ut) diff --git a/ydb/library/yql/dq/actors/spilling/ut/CMakeLists.linux-x86_64.txt b/ydb/library/yql/dq/actors/spilling/ut/CMakeLists.linux-x86_64.txt new file mode 100644 index 0000000000..0de300a43d --- /dev/null +++ b/ydb/library/yql/dq/actors/spilling/ut/CMakeLists.linux-x86_64.txt @@ -0,0 +1,85 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-library-yql-dq-actors-spilling-ut) +target_compile_options(ydb-library-yql-dq-actors-spilling-ut PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-library-yql-dq-actors-spilling-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/spilling +) +target_link_libraries(ydb-library-yql-dq-actors-spilling-ut PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + library-cpp-cpuid_check + cpp-testing-unittest_main + dq-actors-spilling + cpp-testing-unittest + cpp-actors-testlib + ydb-library-services +) +target_link_options(ydb-library-yql-dq-actors-spilling-ut PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl +) +target_sources(ydb-library-yql-dq-actors-spilling-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/spilling/spilling_file_ut.cpp +) +set_property( + TARGET + ydb-library-yql-dq-actors-spilling-ut + PROPERTY + SPLIT_FACTOR + 10 +) +add_yunittest( + NAME + ydb-library-yql-dq-actors-spilling-ut + TEST_TARGET + ydb-library-yql-dq-actors-spilling-ut + TEST_ARG + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +set_yunittest_property( + TEST + ydb-library-yql-dq-actors-spilling-ut + PROPERTY + LABELS + MEDIUM +) +set_yunittest_property( + TEST + ydb-library-yql-dq-actors-spilling-ut + PROPERTY + PROCESSORS + 1 +) +set_yunittest_property( + TEST + ydb-library-yql-dq-actors-spilling-ut + PROPERTY + TIMEOUT + 180 +) +target_allocator(ydb-library-yql-dq-actors-spilling-ut + cpp-malloc-tcmalloc + libs-tcmalloc-no_percpu_cache +) +vcs_info(ydb-library-yql-dq-actors-spilling-ut) diff --git a/ydb/library/yql/dq/actors/spilling/ut/CMakeLists.txt b/ydb/library/yql/dq/actors/spilling/ut/CMakeLists.txt new file mode 100644 index 0000000000..f8b31df0c1 --- /dev/null +++ b/ydb/library/yql/dq/actors/spilling/ut/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/library/yql/dq/actors/spilling/ut/CMakeLists.windows-x86_64.txt b/ydb/library/yql/dq/actors/spilling/ut/CMakeLists.windows-x86_64.txt new file mode 100644 index 0000000000..48680b0618 --- /dev/null +++ b/ydb/library/yql/dq/actors/spilling/ut/CMakeLists.windows-x86_64.txt @@ -0,0 +1,73 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-library-yql-dq-actors-spilling-ut) +target_compile_options(ydb-library-yql-dq-actors-spilling-ut PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-library-yql-dq-actors-spilling-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/spilling +) +target_link_libraries(ydb-library-yql-dq-actors-spilling-ut PUBLIC + contrib-libs-cxxsupp + yutil + library-cpp-cpuid_check + cpp-testing-unittest_main + dq-actors-spilling + cpp-testing-unittest + cpp-actors-testlib + ydb-library-services +) +target_sources(ydb-library-yql-dq-actors-spilling-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/spilling/spilling_file_ut.cpp +) +set_property( + TARGET + ydb-library-yql-dq-actors-spilling-ut + PROPERTY + SPLIT_FACTOR + 10 +) +add_yunittest( + NAME + ydb-library-yql-dq-actors-spilling-ut + TEST_TARGET + ydb-library-yql-dq-actors-spilling-ut + TEST_ARG + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +set_yunittest_property( + TEST + ydb-library-yql-dq-actors-spilling-ut + PROPERTY + LABELS + MEDIUM +) +set_yunittest_property( + TEST + ydb-library-yql-dq-actors-spilling-ut + PROPERTY + PROCESSORS + 1 +) +set_yunittest_property( + TEST + ydb-library-yql-dq-actors-spilling-ut + PROPERTY + TIMEOUT + 180 +) +target_allocator(ydb-library-yql-dq-actors-spilling-ut + system_allocator +) +vcs_info(ydb-library-yql-dq-actors-spilling-ut) diff --git a/ydb/library/yql/dq/actors/spilling/ut/ya.make b/ydb/library/yql/dq/actors/spilling/ut/ya.make new file mode 100644 index 0000000000..f528b5d1fe --- /dev/null +++ b/ydb/library/yql/dq/actors/spilling/ut/ya.make @@ -0,0 +1,20 @@ +UNITTEST_FOR(ydb/library/yql/dq/actors/spilling) + +FORK_SUBTESTS() + +SIZE(MEDIUM) +TIMEOUT(180) + +SRCS( + spilling_file_ut.cpp +) + +YQL_LAST_ABI_VERSION() + +PEERDIR( + library/cpp/testing/unittest + library/cpp/actors/testlib + ydb/library/services +) + +END() diff --git a/ydb/library/yql/dq/actors/spilling/ya.make b/ydb/library/yql/dq/actors/spilling/ya.make new file mode 100644 index 0000000000..c23435ce09 --- /dev/null +++ b/ydb/library/yql/dq/actors/spilling/ya.make @@ -0,0 +1,29 @@ +LIBRARY() + +SRCS( + channel_storage.cpp + spilling_counters.cpp + spilling_file.cpp + spilling.cpp +) + +PEERDIR( + ydb/library/services + ydb/library/yql/dq/common + ydb/library/yql/dq/actors + ydb/library/yql/dq/runtime + ydb/library/yql/utils + + library/cpp/actors/core + library/cpp/actors/util + library/cpp/monlib/dynamic_counters + library/cpp/monlib/service/pages +) + +YQL_LAST_ABI_VERSION() + +END() + +RECURSE_FOR_TESTS( + ut +) diff --git a/ydb/library/yql/dq/actors/task_runner/events.h b/ydb/library/yql/dq/actors/task_runner/events.h index e63223bb08..08614b07bf 100644 --- a/ydb/library/yql/dq/actors/task_runner/events.h +++ b/ydb/library/yql/dq/actors/task_runner/events.h @@ -158,7 +158,7 @@ struct TEvTaskRunnerCreate TEvTaskRunnerCreate( const NDqProto::TDqTask& task, const TDqTaskRunnerMemoryLimits& memoryLimits, - const std::shared_ptr<IDqTaskRunnerExecutionContext>& execCtx = std::shared_ptr<IDqTaskRunnerExecutionContext>(new TDqTaskRunnerExecutionContext())) + const std::shared_ptr<IDqTaskRunnerExecutionContext>& execCtx) : Task(task) , MemoryLimits(memoryLimits) , ExecCtx(execCtx) diff --git a/ydb/library/yql/dq/actors/ya.make b/ydb/library/yql/dq/actors/ya.make index 731ece61cd..c07b3cc7b9 100644 --- a/ydb/library/yql/dq/actors/ya.make +++ b/ydb/library/yql/dq/actors/ya.make @@ -16,5 +16,6 @@ END() RECURSE( compute protos + spilling task_runner ) diff --git a/ydb/library/yql/dq/runtime/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/dq/runtime/CMakeLists.darwin-x86_64.txt index ee09e6cf14..ddcaa3f12b 100644 --- a/ydb/library/yql/dq/runtime/CMakeLists.darwin-x86_64.txt +++ b/ydb/library/yql/dq/runtime/CMakeLists.darwin-x86_64.txt @@ -32,6 +32,7 @@ target_link_libraries(yql-dq-runtime PUBLIC yql-dq-expr_nodes yql-dq-type_ann common-schema-mkql + cpp-actors-util tools-enum_parser-enum_serialization_runtime ) target_sources(yql-dq-runtime PRIVATE diff --git a/ydb/library/yql/dq/runtime/CMakeLists.linux-aarch64.txt b/ydb/library/yql/dq/runtime/CMakeLists.linux-aarch64.txt index 6f24fe6c02..dfeb295ee7 100644 --- a/ydb/library/yql/dq/runtime/CMakeLists.linux-aarch64.txt +++ b/ydb/library/yql/dq/runtime/CMakeLists.linux-aarch64.txt @@ -33,6 +33,7 @@ target_link_libraries(yql-dq-runtime PUBLIC yql-dq-expr_nodes yql-dq-type_ann common-schema-mkql + cpp-actors-util tools-enum_parser-enum_serialization_runtime ) target_sources(yql-dq-runtime PRIVATE diff --git a/ydb/library/yql/dq/runtime/CMakeLists.linux-x86_64.txt b/ydb/library/yql/dq/runtime/CMakeLists.linux-x86_64.txt index 6f24fe6c02..dfeb295ee7 100644 --- a/ydb/library/yql/dq/runtime/CMakeLists.linux-x86_64.txt +++ b/ydb/library/yql/dq/runtime/CMakeLists.linux-x86_64.txt @@ -33,6 +33,7 @@ target_link_libraries(yql-dq-runtime PUBLIC yql-dq-expr_nodes yql-dq-type_ann common-schema-mkql + cpp-actors-util tools-enum_parser-enum_serialization_runtime ) target_sources(yql-dq-runtime PRIVATE diff --git a/ydb/library/yql/dq/runtime/CMakeLists.windows-x86_64.txt b/ydb/library/yql/dq/runtime/CMakeLists.windows-x86_64.txt index ee09e6cf14..ddcaa3f12b 100644 --- a/ydb/library/yql/dq/runtime/CMakeLists.windows-x86_64.txt +++ b/ydb/library/yql/dq/runtime/CMakeLists.windows-x86_64.txt @@ -32,6 +32,7 @@ target_link_libraries(yql-dq-runtime PUBLIC yql-dq-expr_nodes yql-dq-type_ann common-schema-mkql + cpp-actors-util tools-enum_parser-enum_serialization_runtime ) target_sources(yql-dq-runtime PRIVATE diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp index 6dd562a088..4e4fec2bef 100644 --- a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp +++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp @@ -204,17 +204,13 @@ IDqOutputConsumer::TPtr DqBuildOutputConsumer(const NDqProto::TTaskOutput& outpu } } -IDqOutputConsumer::TPtr TDqTaskRunnerExecutionContext::CreateOutputConsumer(const TTaskOutput& outputDesc, +IDqOutputConsumer::TPtr TDqTaskRunnerExecutionContextBase::CreateOutputConsumer(const TTaskOutput& outputDesc, const NKikimr::NMiniKQL::TType* type, NUdf::IApplyContext*, const TTypeEnvironment& typeEnv, const NKikimr::NMiniKQL::THolderFactory& holderFactory, TVector<IDqOutput::TPtr>&& outputs) const { return DqBuildOutputConsumer(outputDesc, type, typeEnv, holderFactory, std::move(outputs)); } -IDqChannelStorage::TPtr TDqTaskRunnerExecutionContext::CreateChannelStorage(ui64 /* channelId */) const { - return {}; -} - inline TCollectStatsLevel StatsModeToCollectStatsLevel(NDqProto::EDqStatsMode statsMode) { if (statsMode >= NDqProto::DQ_STATS_MODE_PROFILE) return TCollectStatsLevel::Profile; else if (statsMode >= NDqProto::DQ_STATS_MODE_FULL) return TCollectStatsLevel::Full; @@ -445,7 +441,7 @@ public: } else { entry = ticket.GetValueSync(); } - } + } if (!entry) { entry = CreateComputationPattern(task, program.GetRaw(), false, canBeCached); @@ -576,7 +572,7 @@ public: inputs.clear(); inputs.emplace_back(transform->TransformOutput); entryNode->SetValue(AllocatedHolder->ProgramParsed.CompGraph->GetContext(), - CreateInputUnionValue(std::move(inputs), holderFactory, + CreateInputUnionValue(std::move(inputs), holderFactory, {&inputStats, transform->TransformOutputType})); } else { entryNode->SetValue(AllocatedHolder->ProgramParsed.CompGraph->GetContext(), diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.h b/ydb/library/yql/dq/runtime/dq_tasks_runner.h index 22330a5d15..2c6fb027f7 100644 --- a/ydb/library/yql/dq/runtime/dq_tasks_runner.h +++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.h @@ -171,15 +171,20 @@ public: virtual IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId) const = 0; }; -class TDqTaskRunnerExecutionContext : public IDqTaskRunnerExecutionContext { +class TDqTaskRunnerExecutionContextBase : public IDqTaskRunnerExecutionContext { public: IDqOutputConsumer::TPtr CreateOutputConsumer(const NDqProto::TTaskOutput& outputDesc, const NKikimr::NMiniKQL::TType* type, NUdf::IApplyContext* applyCtx, const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv, const NKikimr::NMiniKQL::THolderFactory& holderFactory, TVector<IDqOutput::TPtr>&& outputs) const override; +}; - IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId) const override; +class TDqTaskRunnerExecutionContextDefault : public TDqTaskRunnerExecutionContextBase { +public: + IDqChannelStorage::TPtr CreateChannelStorage(ui64 /*channelId*/) const override { + return {}; + }; }; struct TDqTaskRunnerSettings { @@ -376,7 +381,7 @@ public: virtual ui64 GetTaskId() const = 0; virtual void Prepare(const TDqTaskSettings& task, const TDqTaskRunnerMemoryLimits& memoryLimits, - const IDqTaskRunnerExecutionContext& execCtx = TDqTaskRunnerExecutionContext()) = 0; + const IDqTaskRunnerExecutionContext& execCtx) = 0; virtual ERunStatus Run() = 0; virtual bool HasEffects() const = 0; diff --git a/ydb/library/yql/dq/runtime/ya.make b/ydb/library/yql/dq/runtime/ya.make index c062d8743f..6202096617 100644 --- a/ydb/library/yql/dq/runtime/ya.make +++ b/ydb/library/yql/dq/runtime/ya.make @@ -13,6 +13,7 @@ PEERDIR( ydb/library/yql/dq/type_ann ydb/library/yql/parser/pg_wrapper/interface ydb/library/yql/providers/common/schema/mkql + library/cpp/actors/util ) SRCS( diff --git a/ydb/library/yql/providers/dq/actors/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/providers/dq/actors/CMakeLists.darwin-x86_64.txt index a7cb059668..10e86803f2 100644 --- a/ydb/library/yql/providers/dq/actors/CMakeLists.darwin-x86_64.txt +++ b/ydb/library/yql/providers/dq/actors/CMakeLists.darwin-x86_64.txt @@ -33,6 +33,7 @@ target_link_libraries(providers-dq-actors PUBLIC yql-dq-proto yql-dq-runtime yql-dq-tasks + dq-actors-compute yql-utils-failure_injector providers-common-metrics dq-actors-events diff --git a/ydb/library/yql/providers/dq/actors/CMakeLists.linux-aarch64.txt b/ydb/library/yql/providers/dq/actors/CMakeLists.linux-aarch64.txt index 74bf95ff13..ae0b2aa77f 100644 --- a/ydb/library/yql/providers/dq/actors/CMakeLists.linux-aarch64.txt +++ b/ydb/library/yql/providers/dq/actors/CMakeLists.linux-aarch64.txt @@ -34,6 +34,7 @@ target_link_libraries(providers-dq-actors PUBLIC yql-dq-proto yql-dq-runtime yql-dq-tasks + dq-actors-compute yql-utils-failure_injector providers-common-metrics dq-actors-events diff --git a/ydb/library/yql/providers/dq/actors/CMakeLists.linux-x86_64.txt b/ydb/library/yql/providers/dq/actors/CMakeLists.linux-x86_64.txt index 74bf95ff13..ae0b2aa77f 100644 --- a/ydb/library/yql/providers/dq/actors/CMakeLists.linux-x86_64.txt +++ b/ydb/library/yql/providers/dq/actors/CMakeLists.linux-x86_64.txt @@ -34,6 +34,7 @@ target_link_libraries(providers-dq-actors PUBLIC yql-dq-proto yql-dq-runtime yql-dq-tasks + dq-actors-compute yql-utils-failure_injector providers-common-metrics dq-actors-events diff --git a/ydb/library/yql/providers/dq/actors/CMakeLists.windows-x86_64.txt b/ydb/library/yql/providers/dq/actors/CMakeLists.windows-x86_64.txt index a7cb059668..10e86803f2 100644 --- a/ydb/library/yql/providers/dq/actors/CMakeLists.windows-x86_64.txt +++ b/ydb/library/yql/providers/dq/actors/CMakeLists.windows-x86_64.txt @@ -33,6 +33,7 @@ target_link_libraries(providers-dq-actors PUBLIC yql-dq-proto yql-dq-runtime yql-dq-tasks + dq-actors-compute yql-utils-failure_injector providers-common-metrics dq-actors-events diff --git a/ydb/library/yql/providers/dq/actors/compute_actor.cpp b/ydb/library/yql/providers/dq/actors/compute_actor.cpp index a944e60b00..c1cfe959a4 100644 --- a/ydb/library/yql/providers/dq/actors/compute_actor.cpp +++ b/ydb/library/yql/providers/dq/actors/compute_actor.cpp @@ -40,6 +40,7 @@ IActor* CreateComputeActor( computeRuntimeSettings.ExtraMemoryAllocationPool = 3; computeRuntimeSettings.FailOnUndelivery = false; computeRuntimeSettings.StatsMode = (statsMode != NDqProto::DQ_STATS_MODE_UNSPECIFIED) ? statsMode : NDqProto::DQ_STATS_MODE_FULL; + computeRuntimeSettings.UseSpilling = options.UseSpilling; // clear fake actorids for (auto& input : *task->MutableInputs()) { diff --git a/ydb/library/yql/providers/dq/actors/worker_actor.cpp b/ydb/library/yql/providers/dq/actors/worker_actor.cpp index 915ca8453b..ab8aeb8c93 100644 --- a/ydb/library/yql/providers/dq/actors/worker_actor.cpp +++ b/ydb/library/yql/providers/dq/actors/worker_actor.cpp @@ -1,8 +1,8 @@ #include "worker_actor.h" #include <ydb/library/yql/dq/actors/dq.h> +#include <ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.h> #include <ydb/library/yql/providers/dq/common/yql_dq_common.h> - #include <ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.h> #include <ydb/library/yql/providers/dq/runtime/runtime_data.h> @@ -94,13 +94,15 @@ public: const ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory, const IDqAsyncIoFactory::TPtr& asyncIoFactory, TWorkerRuntimeData* runtimeData, - const TString& traceId) + const TString& traceId, + bool useSpilling) : TRichActor<TDqWorker>(&TDqWorker::Handler) , AsyncIoFactory(asyncIoFactory) , TaskRunnerActorFactory(taskRunnerActorFactory) , RuntimeData(runtimeData) , TraceId(traceId) , MemoryQuotaManager(new TDummyMemoryQuotaManager) + , UseSpilling(useSpilling) { YQL_LOG_CTX_ROOT_SESSION_SCOPE(TraceId); YQL_CLOG(DEBUG, ProviderDq) << "TDqWorker created "; @@ -258,7 +260,12 @@ private: TDqTaskRunnerMemoryLimits limits; // used for local mode only limits.ChannelBufferSize = 20_MB; limits.OutputChunkMaxSize = 2_MB; - Send(TaskRunnerActor, new TEvTaskRunnerCreate(std::move(ev->Get()->Record.GetTask()), limits)); + + auto wakeup = [this]{ ResumeExecution(); }; + std::shared_ptr<IDqTaskRunnerExecutionContext> execCtx = std::make_shared<TDqTaskRunnerExecutionContext>( + TraceId, UseSpilling, std::move(wakeup), TlsActivationContext->AsActorContext()); + + Send(TaskRunnerActor, new TEvTaskRunnerCreate(std::move(ev->Get()->Record.GetTask()), limits, execCtx)); } void OnTaskRunnerCreated(TEvTaskRunnerCreateFinished::TPtr& ev, const TActorContext& ) { @@ -789,13 +796,15 @@ private: TVector<Yql::DqsProto::TWorkerInfo> AllWorkers; IMemoryQuotaManager::TPtr MemoryQuotaManager; + const bool UseSpilling; }; NActors::IActor* CreateWorkerActor( TWorkerRuntimeData* runtimeData, const TString& traceId, const ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory, - const IDqAsyncIoFactory::TPtr& asyncIoFactory) + const IDqAsyncIoFactory::TPtr& asyncIoFactory, + bool useSpilling) { Y_ABORT_UNLESS(taskRunnerActorFactory); return new TLogWrapReceive( @@ -803,7 +812,8 @@ NActors::IActor* CreateWorkerActor( taskRunnerActorFactory, asyncIoFactory, runtimeData, - traceId), traceId); + traceId, + useSpilling), traceId); } } // namespace NYql::NDqs diff --git a/ydb/library/yql/providers/dq/actors/worker_actor.h b/ydb/library/yql/providers/dq/actors/worker_actor.h index ea1ee7bfdb..1a68754920 100644 --- a/ydb/library/yql/providers/dq/actors/worker_actor.h +++ b/ydb/library/yql/providers/dq/actors/worker_actor.h @@ -25,6 +25,7 @@ namespace NYql::NDqs { TWorkerRuntimeData* runtimeData, const TString& traceId, const NDq::NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory, - const NDq::IDqAsyncIoFactory::TPtr& asyncIoFactory); + const NDq::IDqAsyncIoFactory::TPtr& asyncIoFactory, + bool useSpilling); } // namespace NYql::NDqs diff --git a/ydb/library/yql/providers/dq/actors/ya.make b/ydb/library/yql/providers/dq/actors/ya.make index 7d3df6fe4b..52fa536894 100644 --- a/ydb/library/yql/providers/dq/actors/ya.make +++ b/ydb/library/yql/providers/dq/actors/ya.make @@ -35,6 +35,7 @@ PEERDIR( ydb/library/yql/dq/proto ydb/library/yql/dq/runtime ydb/library/yql/dq/tasks + ydb/library/yql/dq/actors/compute ydb/library/yql/utils/failure_injector ydb/library/yql/providers/common/metrics ydb/library/yql/providers/dq/actors/events diff --git a/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp b/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp index 9fc6789564..464a9382a3 100644 --- a/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp +++ b/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp @@ -76,6 +76,16 @@ TDqConfiguration::TDqConfiguration() { REGISTER_SETTING(*this, TaskRunnerStats).Parser([](const TString& v) { return FromString<ETaskRunnerStats>(v); }); REGISTER_SETTING(*this, _SkipRevisionCheck); REGISTER_SETTING(*this, UseBlockReader); + REGISTER_SETTING(*this, SpillingEngine) + .Parser([](const TString& v) { + return FromString<TDqSettings::ESpillingEngine>(v); + }) + .ValueSetter([this](const TString&, TDqSettings::ESpillingEngine value) { + SpillingEngine = value; + if (value != TDqSettings::ESpillingEngine::Disable) { + EnableDqReplicate = true; + } + }); } } // namespace NYql diff --git a/ydb/library/yql/providers/dq/common/yql_dq_settings.h b/ydb/library/yql/providers/dq/common/yql_dq_settings.h index 75cca0876e..d80c25dd89 100644 --- a/ydb/library/yql/providers/dq/common/yql_dq_settings.h +++ b/ydb/library/yql/providers/dq/common/yql_dq_settings.h @@ -21,6 +21,11 @@ struct TDqSettings { Profile }; + enum class ESpillingEngine { + Disable /* "disable" */, + File /* "file" */, + }; + struct TDefault { static constexpr ui32 MaxTasksPerStage = 20U; static constexpr ui32 MaxTasksPerOperation = 70U; @@ -48,6 +53,7 @@ struct TDqSettings { static constexpr bool EnableChannelStats = false; static constexpr bool ExportStats = false; static constexpr ETaskRunnerStats TaskRunnerStats = ETaskRunnerStats::Basic; + static constexpr ESpillingEngine SpillingEngine = ESpillingEngine::Disable; }; using TPtr = std::shared_ptr<TDqSettings>; @@ -115,6 +121,7 @@ struct TDqSettings { NCommon::TConfSetting<ETaskRunnerStats, false> TaskRunnerStats; NCommon::TConfSetting<bool, false> _SkipRevisionCheck; NCommon::TConfSetting<bool, false> UseBlockReader; + NCommon::TConfSetting<ESpillingEngine, false> SpillingEngine; // This options will be passed to executor_actor and worker_actor template <typename TProtoConfig> diff --git a/ydb/library/yql/providers/dq/config/config.proto b/ydb/library/yql/providers/dq/config/config.proto index b5b66eee66..ef6a0fd7e0 100644 --- a/ydb/library/yql/providers/dq/config/config.proto +++ b/ydb/library/yql/providers/dq/config/config.proto @@ -66,6 +66,18 @@ message TDqConfig { repeated TAttr Setting = 1; } + message TSpillingSettings { + optional string Root = 1 [default = "./spilling"]; + + optional uint64 MaxTotalSize = 2; + optional uint64 MaxFileSize = 3; + optional uint64 MaxFilePartSize = 4; + + optional uint32 IoThreadPoolWorkersCount = 5 [default = 2]; + optional uint32 IoThreadPoolQueueSize = 6 [default = 1000]; + optional bool CleanupOnShutdown = 7; + } + message TYtBackend { optional string ClusterName = 1 [default = "hume"]; optional string User = 2; // default -- current user name @@ -103,6 +115,7 @@ message TDqConfig { optional TSolomon Solomon = 30; optional bool CanUseComputeActor = 32 [default = false]; optional bool EnforceJobUtc = 33; + optional TSpillingSettings SpillingSettings = 38; } repeated TYtBackend YtBackends = 5; diff --git a/ydb/library/yql/providers/dq/local_gateway/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/providers/dq/local_gateway/CMakeLists.darwin-x86_64.txt index efe17d868c..25fa353881 100644 --- a/ydb/library/yql/providers/dq/local_gateway/CMakeLists.darwin-x86_64.txt +++ b/ydb/library/yql/providers/dq/local_gateway/CMakeLists.darwin-x86_64.txt @@ -16,6 +16,7 @@ target_link_libraries(providers-dq-local_gateway PUBLIC yutil library-yql-utils dq-actors-compute + dq-actors-spilling providers-dq-provider dq-api-protos providers-dq-task_runner diff --git a/ydb/library/yql/providers/dq/local_gateway/CMakeLists.linux-aarch64.txt b/ydb/library/yql/providers/dq/local_gateway/CMakeLists.linux-aarch64.txt index 88a991c531..3e3131a9b8 100644 --- a/ydb/library/yql/providers/dq/local_gateway/CMakeLists.linux-aarch64.txt +++ b/ydb/library/yql/providers/dq/local_gateway/CMakeLists.linux-aarch64.txt @@ -17,6 +17,7 @@ target_link_libraries(providers-dq-local_gateway PUBLIC yutil library-yql-utils dq-actors-compute + dq-actors-spilling providers-dq-provider dq-api-protos providers-dq-task_runner diff --git a/ydb/library/yql/providers/dq/local_gateway/CMakeLists.linux-x86_64.txt b/ydb/library/yql/providers/dq/local_gateway/CMakeLists.linux-x86_64.txt index 88a991c531..3e3131a9b8 100644 --- a/ydb/library/yql/providers/dq/local_gateway/CMakeLists.linux-x86_64.txt +++ b/ydb/library/yql/providers/dq/local_gateway/CMakeLists.linux-x86_64.txt @@ -17,6 +17,7 @@ target_link_libraries(providers-dq-local_gateway PUBLIC yutil library-yql-utils dq-actors-compute + dq-actors-spilling providers-dq-provider dq-api-protos providers-dq-task_runner diff --git a/ydb/library/yql/providers/dq/local_gateway/ya.make b/ydb/library/yql/providers/dq/local_gateway/ya.make index 4b3afaeacb..0eabb17f7f 100644 --- a/ydb/library/yql/providers/dq/local_gateway/ya.make +++ b/ydb/library/yql/providers/dq/local_gateway/ya.make @@ -9,6 +9,7 @@ SRCS( PEERDIR( ydb/library/yql/utils ydb/library/yql/dq/actors/compute + ydb/library/yql/dq/actors/spilling ydb/library/yql/providers/dq/provider ydb/library/yql/providers/dq/api/protos ydb/library/yql/providers/dq/task_runner diff --git a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp index fd1f07185f..57f9144cee 100644 --- a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp +++ b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp @@ -7,8 +7,8 @@ #include <ydb/library/yql/providers/dq/service/service_node.h> #include <ydb/library/yql/providers/dq/stats_collector/pool_stats_collector.h> - #include <ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h> +#include <ydb/library/yql/dq/actors/spilling/spilling_file.h> #include <ydb/library/yql/utils/range_walker.h> #include <ydb/library/yql/utils/bind_in_range.h> @@ -17,6 +17,7 @@ #include <util/system/env.h> #include <util/generic/size_literals.h> +#include <util/folder/dirut.h> namespace NYql { @@ -29,7 +30,8 @@ public: TTaskTransformFactory taskTransformFactory, const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories, NBus::TBindResult interconnectPort, NBus::TBindResult grpcPort, NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, int threads, IMetricsRegistryPtr metricsRegistry, - const std::function<IActor*(void)>& metricsPusherFactory) + const std::function<IActor*(void)>& metricsPusherFactory, + bool withSpilling) : MetricsRegistry(metricsRegistry ? metricsRegistry : CreateMetricsRegistry(GetSensorsGroupFor(NSensorComponent::kDq)) @@ -57,6 +59,7 @@ public: threads, MetricsRegistry); + auto lwmGroup = MetricsRegistry->GetSensors()->GetSubgroup("component", "lwm"); auto patternCache = std::make_shared<NKikimr::NMiniKQL::TComputationPatternLRUCache>(200_MB); NDqs::TLocalWorkerManagerOptions lwmOptions; lwmOptions.Factory = NTaskRunnerProxy::CreateFactory(functionRegistry, compFactory, taskTransformFactory, patternCache, true); @@ -68,14 +71,27 @@ public: { return factory->Get(task); }); - lwmOptions.Counters = NDqs::TWorkerManagerCounters(MetricsRegistry->GetSensors()->GetSubgroup("component", "lwm")); + lwmOptions.Counters = NDqs::TWorkerManagerCounters(lwmGroup); lwmOptions.DropTaskCountersOnFinish = false; + lwmOptions.UseSpilling = withSpilling; auto resman = NDqs::CreateLocalWorkerManager(lwmOptions); ServiceNode->AddLocalService( MakeWorkerManagerActorID(nodeId), TActorSetupCmd(resman, TMailboxType::Simple, 0)); + if (withSpilling) { + char tempDir[MAX_PATH]; + if (MakeTempDir(tempDir, nullptr) != 0) + ythrow yexception() << "LocalServiceHolder: Can't create temporary directory " << tempDir; + + auto spillingActor = NDq::CreateDqLocalFileSpillingService(NDq::TFileSpillingServiceConfig{.Root = tempDir, .CleanupOnShutdown = true}, MakeIntrusive<NDq::TSpillingCounters>(lwmGroup)); + + ServiceNode->AddLocalService( + NDq::MakeDqLocalFileSpillingServiceID(nodeId), + TActorSetupCmd(spillingActor, TMailboxType::Simple, 0)); + } + auto statsCollector = CreateStatsCollector(1, *ServiceNode->GetSetup(), MetricsRegistry->GetSensors()); auto actorSystem = ServiceNode->StartActorSystem(); @@ -233,7 +249,7 @@ THolder<TLocalServiceHolder> CreateLocalServiceHolder(const NKikimr::NMiniKQL::I NBus::TBindResult interconnectPort, NBus::TBindResult grpcPort, NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, int threads, IMetricsRegistryPtr metricsRegistry, - const std::function<IActor*(void)>& metricsPusherFactory) + const std::function<IActor*(void)>& metricsPusherFactory, bool withSpilling) { return MakeHolder<TLocalServiceHolder>(functionRegistry, compFactory, @@ -244,13 +260,14 @@ THolder<TLocalServiceHolder> CreateLocalServiceHolder(const NKikimr::NMiniKQL::I std::move(asyncIoFactory), threads, metricsRegistry, - metricsPusherFactory); + metricsPusherFactory, + withSpilling); } TIntrusivePtr<IDqGateway> CreateLocalDqGateway(const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, NKikimr::NMiniKQL::TComputationNodeFactory compFactory, TTaskTransformFactory taskTransformFactory, const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories, - NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, int threads, + bool withSpilling, NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, int threads, IMetricsRegistryPtr metricsRegistry, const std::function<IActor*(void)>& metricsPusherFactory) { @@ -270,7 +287,8 @@ TIntrusivePtr<IDqGateway> CreateLocalDqGateway(const NKikimr::NMiniKQL::IFunctio std::move(asyncIoFactory), threads, metricsRegistry, - metricsPusherFactory), + metricsPusherFactory, + withSpilling), CreateDqGateway("[::1]", grpcPort.Addr.GetPort())); } diff --git a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.h b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.h index b558fbeb7d..91329aab8d 100644 --- a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.h +++ b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.h @@ -14,6 +14,7 @@ namespace NYql { TIntrusivePtr<IDqGateway> CreateLocalDqGateway(const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, NKikimr::NMiniKQL::TComputationNodeFactory compFactory, TTaskTransformFactory taskTransformFactory, const TDqTaskPreprocessorFactoryCollection& dqTaskPreprocessorFactories, + bool withSpilling, NDq::IDqAsyncIoFactory::TPtr = nullptr, int threads = 16, IMetricsRegistryPtr metricsRegistry = {}, const std::function<NActors::IActor*(void)>& metricsPusherFactory = {}); diff --git a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp index ff9be8a613..cb43eab7a9 100644 --- a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp +++ b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp @@ -143,7 +143,8 @@ public: { auto guard = runner->BindAllocator(State->Settings->MemoryLimit.Get().GetOrElse(0)); - runner->Prepare(runnerSettings, limits); + NDq::TDqTaskRunnerExecutionContextDefault execCtx; + runner->Prepare(runnerSettings, limits, execCtx); } TVector<NDq::TDqSerializedBatch> rows; @@ -245,7 +246,12 @@ private: } else { mode = NDq::EChannelMode::CHANNEL_WIDE_BLOCK; } - pipeline->Add(NDq::CreateDqBuildPhyStagesTransformer(false, *pipeline->GetTypeAnnotationContext(), mode), "BuildPhy"); + pipeline->Add( + NDq::CreateDqBuildPhyStagesTransformer( + State_->Settings->SpillingEngine.Get().GetOrElse(TDqSettings::TDefault::SpillingEngine) != TDqSettings::ESpillingEngine::Disable, + *pipeline->GetTypeAnnotationContext(), mode + ), + "BuildPhy"); pipeline->Add(NDqs::CreateDqsRewritePhyCallablesTransformer(*pipeline->GetTypeAnnotationContext()), "RewritePhyCallables"); } diff --git a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp index bf68169309..561e8fcdfb 100644 --- a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp +++ b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp @@ -226,7 +226,7 @@ public: } _exit(127); } - + NDqProto::TMeteringStatsResponse GetMeteringStats() { NDqProto::TMeteringStatsResponse resp; auto* stats = Runner->GetMeteringStats(); @@ -767,7 +767,9 @@ public: limits.ChannelBufferSize = DqConfiguration->ChannelBufferSize.Get().GetOrElse(TDqSettings::TDefault::ChannelBufferSize); limits.OutputChunkMaxSize = DqConfiguration->OutputChunkMaxSize.Get().GetOrElse(TDqSettings::TDefault::OutputChunkMaxSize); limits.ChunkSizeLimit = DqConfiguration->ChunkSizeLimit.Get().GetOrElse(TDqSettings::TDefault::ChunkSizeLimit); - Runner->Prepare(task, limits); + + NDq::TDqTaskRunnerExecutionContextDefault execCtx; + Runner->Prepare(task, limits, execCtx); }); result.Save(&output); diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_local.cpp b/ydb/library/yql/providers/dq/task_runner/tasks_runner_local.cpp index a842a362ed..51e888dd6c 100644 --- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_local.cpp +++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_local.cpp @@ -116,7 +116,8 @@ public: NYql::NDqProto::TPrepareResponse Prepare() override { NYql::NDqProto::TPrepareResponse ret; - Runner->Prepare(Task, DefaultMemoryLimits()); + TDqTaskRunnerExecutionContextDefault ctx; + Runner->Prepare(Task, DefaultMemoryLimits(), ctx); return ret; } diff --git a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp index 872bc8258c..9b7440ebf4 100644 --- a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp +++ b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp @@ -302,7 +302,8 @@ private: Options.RuntimeData, traceId, Options.TaskRunnerActorFactory, - Options.AsyncIoFactory)); + Options.AsyncIoFactory, + Options.UseSpilling)); } allocationInfo.WorkerActors.emplace_back(RegisterChild( actor.Release(), createComputeActor ? NYql::NDq::TEvDq::TEvAbortExecution::Unavailable("Aborted by LWM").Release() : nullptr diff --git a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h index 95ecae911e..3b36757089 100644 --- a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h +++ b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h @@ -39,6 +39,7 @@ namespace NYql::NDqs { NActors::TActorId QuoterServiceActorId; bool ComputeActorOwnsCounters = false; bool DropTaskCountersOnFinish = true; + bool UseSpilling = false; }; NActors::IActor* CreateLocalWorkerManager(const TLocalWorkerManagerOptions& options); diff --git a/ydb/library/yql/tools/dq/worker_node/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/tools/dq/worker_node/CMakeLists.darwin-x86_64.txt index 7a8db69ea2..31b7fb97c1 100644 --- a/ydb/library/yql/tools/dq/worker_node/CMakeLists.darwin-x86_64.txt +++ b/ydb/library/yql/tools/dq/worker_node/CMakeLists.darwin-x86_64.txt @@ -19,6 +19,7 @@ target_link_libraries(worker_node PUBLIC library-cpp-getopt cpp-mapreduce-client dq-actors-compute + dq-actors-spilling yql-dq-comp_nodes dq-integration-transform yql-dq-transform diff --git a/ydb/library/yql/tools/dq/worker_node/CMakeLists.linux-aarch64.txt b/ydb/library/yql/tools/dq/worker_node/CMakeLists.linux-aarch64.txt index 11e2b042f9..ef56db89b7 100644 --- a/ydb/library/yql/tools/dq/worker_node/CMakeLists.linux-aarch64.txt +++ b/ydb/library/yql/tools/dq/worker_node/CMakeLists.linux-aarch64.txt @@ -19,6 +19,7 @@ target_link_libraries(worker_node PUBLIC library-cpp-getopt cpp-mapreduce-client dq-actors-compute + dq-actors-spilling yql-dq-comp_nodes dq-integration-transform yql-dq-transform diff --git a/ydb/library/yql/tools/dq/worker_node/CMakeLists.linux-x86_64.txt b/ydb/library/yql/tools/dq/worker_node/CMakeLists.linux-x86_64.txt index 17c842dbb4..3739d1ca70 100644 --- a/ydb/library/yql/tools/dq/worker_node/CMakeLists.linux-x86_64.txt +++ b/ydb/library/yql/tools/dq/worker_node/CMakeLists.linux-x86_64.txt @@ -20,6 +20,7 @@ target_link_libraries(worker_node PUBLIC library-cpp-getopt cpp-mapreduce-client dq-actors-compute + dq-actors-spilling yql-dq-comp_nodes dq-integration-transform yql-dq-transform diff --git a/ydb/library/yql/tools/dq/worker_node/main.cpp b/ydb/library/yql/tools/dq/worker_node/main.cpp index c8d20bed06..1fd2c83144 100644 --- a/ydb/library/yql/tools/dq/worker_node/main.cpp +++ b/ydb/library/yql/tools/dq/worker_node/main.cpp @@ -13,6 +13,7 @@ #include <library/cpp/digest/md5/md5.h> #include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h> +#include <ydb/library/yql/dq/actors/spilling/spilling_file.h> #include <ydb/library/yql/providers/dq/service/interconnect_helpers.h> #include <ydb/library/yql/providers/dq/runtime/file_cache.h> @@ -60,6 +61,7 @@ #include <util/system/env.h> #include <util/system/getpid.h> #include <util/system/fs.h> +#include <util/folder/dirut.h> constexpr ui32 THREAD_PER_NODE = 8; @@ -403,11 +405,24 @@ int main(int argc, char** argv) { }) : NTaskRunnerActor::CreateTaskRunnerActorFactory(lwmOptions.Factory, lwmOptions.TaskRunnerInvokerFactory); lwmOptions.ComputeActorOwnsCounters = true; + lwmOptions.UseSpilling = true; auto resman = NDqs::CreateLocalWorkerManager(lwmOptions); auto workerManagerActorId = actorSystem->Register(resman); actorSystem->RegisterLocalService(MakeWorkerManagerActorID(nodeId), workerManagerActorId); + auto spillingActor = actorSystem->Register( + NDq::CreateDqLocalFileSpillingService( + NDq::TFileSpillingServiceConfig{ + .Root = "./spilling", + .CleanupOnShutdown = true + }, + MakeIntrusive<NDq::TSpillingCounters>(dqSensors) + ) + ); + + actorSystem->RegisterLocalService(NDq::MakeDqLocalFileSpillingServiceID(nodeId), spillingActor); + auto endFuture = ShouldContinue.GetFuture(); signal(SIGINT, &OnTerminate); diff --git a/ydb/library/yql/tools/dq/worker_node/ya.make b/ydb/library/yql/tools/dq/worker_node/ya.make index 216572850a..51b9f8babc 100644 --- a/ydb/library/yql/tools/dq/worker_node/ya.make +++ b/ydb/library/yql/tools/dq/worker_node/ya.make @@ -6,6 +6,7 @@ IF (NOT OS_WINDOWS) library/cpp/getopt yt/cpp/mapreduce/client ydb/library/yql/dq/actors/compute + ydb/library/yql/dq/actors/spilling ydb/library/yql/dq/comp_nodes ydb/library/yql/dq/integration/transform ydb/library/yql/dq/transform diff --git a/ydb/library/yql/tools/dqrun/dqrun.cpp b/ydb/library/yql/tools/dqrun/dqrun.cpp index e730c97dec..e674e5abee 100644 --- a/ydb/library/yql/tools/dqrun/dqrun.cpp +++ b/ydb/library/yql/tools/dqrun/dqrun.cpp @@ -787,7 +787,7 @@ int RunMain(int argc, const char* argv[]) size_t maxRetries = gatewaysConfig.HasHttpGateway() && gatewaysConfig.GetHttpGateway().HasMaxRetries() ? gatewaysConfig.GetHttpGateway().GetMaxRetries() : 2; dqGateway = CreateLocalDqGateway(funcRegistry.Get(), dqCompFactory, dqTaskTransformFactory, dqTaskPreprocessorFactories, - CreateAsyncIoFactory(driver, httpGateway, genericClient, requestTimeout, maxRetries), threads, + false/*spilling*/, CreateAsyncIoFactory(driver, httpGateway, genericClient, requestTimeout, maxRetries), threads, metricsRegistry, metricsPusherFactory); } |