diff options
author | uzhastik <uzhas@ydb.tech> | 2024-06-18 12:47:32 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-06-18 12:47:32 +0300 |
commit | dba296f900921846d3e18083e75caab61cf88cc0 (patch) | |
tree | 41653c63389f83facfe5ae2f31e3e452ff3cfaf6 | |
parent | 0bf02f135dcff9f368ac1ccffd090124753fd564 (diff) | |
download | ydb-dba296f900921846d3e18083e75caab61cf88cc0.tar.gz |
initial version for reading from solomon (#5571)
38 files changed, 1221 insertions, 82 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp index cd610279b8..1c00eca6c7 100644 --- a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp @@ -1,7 +1,7 @@ #include "kqp_pure_compute_actor.h" +#include <ydb/core/base/appdata.h> #include <ydb/core/base/feature_flags.h> - namespace NKikimr { namespace NKqp { @@ -15,7 +15,7 @@ TKqpComputeActor::TKqpComputeActor(const TActorId& executerId, ui64 txId, NDqPro const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId, TIntrusivePtr<NActors::TProtoArenaHolder> arena, const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings) - : TBase(executerId, txId, task, std::move(asyncIoFactory), settings, memoryLimits, /* ownMemoryQuota = */ true, /* passExceptions = */ true, /*taskCounters = */ nullptr, std::move(traceId), std::move(arena), GUCSettings) + : TBase(executerId, txId, task, std::move(asyncIoFactory), AppData()->FunctionRegistry, settings, memoryLimits, /* ownMemoryQuota = */ true, /* passExceptions = */ true, /*taskCounters = */ nullptr, std::move(traceId), std::move(arena), GUCSettings) , ComputeCtx(settings.StatsMode) , FederatedQuerySetup(federatedQuerySetup) { @@ -42,7 +42,7 @@ void TKqpComputeActor::DoBootstrap() { TDqTaskRunnerContext execCtx; - execCtx.FuncRegistry = AppData()->FunctionRegistry; + execCtx.FuncRegistry = TBase::FunctionRegistry; execCtx.RandomProvider = TAppData::RandomProvider.Get(); execCtx.TimeProvider = TAppData::TimeProvider.Get(); execCtx.ComputeCtx = &ComputeCtx; diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp index c2876e8ee2..b144be2b09 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp @@ -27,7 +27,7 @@ TKqpScanComputeActor::TKqpScanComputeActor(const TActorId& executerId, ui64 txId IDqAsyncIoFactory::TPtr asyncIoFactory, const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId, TIntrusivePtr<NActors::TProtoArenaHolder> arena) - : TBase(executerId, txId, task, std::move(asyncIoFactory), settings, + : TBase(executerId, txId, task, std::move(asyncIoFactory), AppData()->FunctionRegistry, settings, memoryLimits, /* ownMemoryQuota = */ true, /* passExceptions = */ true, /*taskCounters = */ nullptr, std::move(traceId), std::move(arena)) , ComputeCtx(settings.StatsMode) { @@ -180,7 +180,7 @@ void TKqpScanComputeActor::PollSources(ui64 prevFreeSpace) { void TKqpScanComputeActor::DoBootstrap() { CA_LOG_D("EVLOGKQP START"); NDq::TDqTaskRunnerContext execCtx; - execCtx.FuncRegistry = AppData()->FunctionRegistry; + execCtx.FuncRegistry = TBase::FunctionRegistry; execCtx.ComputeCtx = &ComputeCtx; execCtx.ComputationFactory = NMiniKQL::GetKqpActorComputeFactory(&ComputeCtx, std::nullopt); execCtx.RandomProvider = TAppData::RandomProvider.Get(); diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp index 0208b17f74..945c50cdc1 100644 --- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp @@ -75,13 +75,13 @@ public: static constexpr bool HasAsyncTaskRunner = true; TDqAsyncComputeActor(const TActorId& executerId, const TTxId& txId, NDqProto::TDqTask* task, - IDqAsyncIoFactory::TPtr asyncIoFactory, + IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, const NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory, const ::NMonitoring::TDynamicCounterPtr& taskCounters, const TActorId& quoterServiceActorId, bool ownCounters) - : TBase(executerId, txId, task, std::move(asyncIoFactory), settings, memoryLimits, /* ownMemoryQuota = */ false, false, taskCounters) + : TBase(executerId, txId, task, std::move(asyncIoFactory), functionRegistry, settings, memoryLimits, /* ownMemoryQuota = */ false, false, taskCounters) , TaskRunnerActorFactory(taskRunnerActorFactory) , ReadyToCheckpointFlag(false) , SentStatsRequest(false) @@ -1164,7 +1164,7 @@ private: IActor* CreateDqAsyncComputeActor(const TActorId& executerId, const TTxId& txId, NYql::NDqProto::TDqTask* task, - IDqAsyncIoFactory::TPtr asyncIoFactory, + IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, const NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory, ::NMonitoring::TDynamicCounterPtr taskCounters, @@ -1172,7 +1172,7 @@ IActor* CreateDqAsyncComputeActor(const TActorId& executerId, const TTxId& txId, bool ownCounters) { return new TDqAsyncComputeActor(executerId, txId, task, std::move(asyncIoFactory), - settings, memoryLimits, taskRunnerActorFactory, taskCounters, quoterServiceActorId, ownCounters); + functionRegistry, settings, memoryLimits, taskRunnerActorFactory, taskCounters, quoterServiceActorId, ownCounters); } } // namespace NDq diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.h b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.h index 4492c2a656..726b329ec5 100644 --- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.h +++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.h @@ -18,7 +18,7 @@ namespace NYql { namespace NDq { NActors::IActor* CreateDqAsyncComputeActor(const NActors::TActorId& executerId, const TTxId& txId, NDqProto::TDqTask* task, - IDqAsyncIoFactory::TPtr asyncIoFactory, + IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, const NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory, ::NMonitoring::TDynamicCounterPtr taskCounters = nullptr, diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp index 6f73c08bd7..8851f301f7 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp @@ -35,10 +35,11 @@ public: TDqComputeActor(const TActorId& executerId, const TTxId& txId, NDqProto::TDqTask* task, IDqAsyncIoFactory::TPtr asyncIoFactory, + const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, const TTaskRunnerFactory& taskRunnerFactory, ::NMonitoring::TDynamicCounterPtr taskCounters) - : TBase(executerId, txId, task, std::move(asyncIoFactory), settings, memoryLimits, true, false, taskCounters) + : TBase(executerId, txId, task, std::move(asyncIoFactory), functionRegistry, settings, memoryLimits, true, false, taskCounters) , TaskRunnerFactory(taskRunnerFactory) { InitializeTask(); @@ -74,12 +75,13 @@ private: IActor* CreateDqComputeActor(const TActorId& executerId, const TTxId& txId, NYql::NDqProto::TDqTask* task, IDqAsyncIoFactory::TPtr asyncIoFactory, + const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, const TTaskRunnerFactory& taskRunnerFactory, ::NMonitoring::TDynamicCounterPtr taskCounters) { return new TDqComputeActor(executerId, txId, task, std::move(asyncIoFactory), - settings, memoryLimits, taskRunnerFactory, taskCounters); + functionRegistry, settings, memoryLimits, taskRunnerFactory, taskCounters); } } // namespace NDq diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h index e338445ab7..062c090f2f 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h @@ -382,7 +382,7 @@ void FillTaskRunnerStats(ui64 taskId, ui32 stageId, const TTaskRunnerStatsBase& NDqProto::TDqTaskStats* protoTask, TCollectStatsLevel level); NActors::IActor* CreateDqComputeActor(const NActors::TActorId& executerId, const TTxId& txId, NDqProto::TDqTask* task, - IDqAsyncIoFactory::TPtr asyncIoFactory, + IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, const TTaskRunnerFactory& taskRunnerFactory, ::NMonitoring::TDynamicCounterPtr taskCounters = nullptr); diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input_helper.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input_helper.h index 0a3c18445e..5e64edf2ab 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input_helper.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input_helper.h @@ -4,6 +4,8 @@ #include "dq_compute_actor_metrics.h" #include "dq_compute_actor_watermarks.h" +#include <ydb/library/yql/minikql/mkql_program_builder.h> + //must be included the last #include "dq_compute_actor_log.h" @@ -22,6 +24,7 @@ struct TComputeActorAsyncInputHelper { const NDqProto::EWatermarksMode WatermarksMode = NDqProto::EWatermarksMode::WATERMARKS_MODE_DISABLED; const NKikimr::NMiniKQL::TType* ValueType = nullptr; TMaybe<TInstant> PendingWatermark = Nothing(); + TMaybe<NKikimr::NMiniKQL::TProgramBuilder> ProgramBuilder; public: TComputeActorAsyncInputHelper( const TString& logPrefix, @@ -122,4 +125,3 @@ public: }; } //namespace NYql::NDq - diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h index 85a60599bd..f0d59f7142 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h @@ -266,6 +266,7 @@ public: const NActors::TActorId& ComputeActorId; const NKikimr::NMiniKQL::TTypeEnvironment& TypeEnv; const NKikimr::NMiniKQL::THolderFactory& HolderFactory; + NKikimr::NMiniKQL::TProgramBuilder& ProgramBuilder; ::NMonitoring::TDynamicCounterPtr TaskCounters; std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc; IMemoryQuotaManager::TPtr MemoryQuotaManager; diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index 3af5f52f87..df6878de92 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -161,6 +161,7 @@ public: protected: TDqComputeActorBase(const NActors::TActorId& executerId, const TTxId& txId, NDqProto::TDqTask* task, IDqAsyncIoFactory::TPtr asyncIoFactory, + const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, bool ownMemoryQuota = true, bool passExceptions = false, const ::NMonitoring::TDynamicCounterPtr& taskCounters = nullptr, @@ -174,6 +175,7 @@ protected: , MemoryLimits(memoryLimits) , CanAllocateExtraMemory(RuntimeSettings.ExtraMemoryAllocationPool != 0) , AsyncIoFactory(std::move(asyncIoFactory)) + , FunctionRegistry(functionRegistry) , CheckpointingMode(GetTaskCheckpointingMode(Task)) , State(Task.GetCreateSuspended() ? NDqProto::COMPUTE_STATE_UNKNOWN : NDqProto::COMPUTE_STATE_EXECUTING) , WatermarksTracker(this->SelfId(), TxId, Task.GetId()) @@ -1256,6 +1258,7 @@ protected: const auto& inputDesc = Task.GetInputs(inputIndex); Y_ABORT_UNLESS(inputDesc.HasSource()); source.Type = inputDesc.GetSource().GetType(); + source.ProgramBuilder.ConstructInPlace(typeEnv, *FunctionRegistry); const auto& settings = Task.GetSourceSettings(); Y_ABORT_UNLESS(settings.empty() || inputIndex < settings.size()); CA_LOG_D("Create source for input " << inputIndex << " " << inputDesc); @@ -1273,6 +1276,7 @@ protected: .ComputeActorId = this->SelfId(), .TypeEnv = typeEnv, .HolderFactory = holderFactory, + .ProgramBuilder = *source.ProgramBuilder, .TaskCounters = TaskCounters, .Alloc = Alloc, .MemoryQuotaManager = MemoryLimits.MemoryQuotaManager, @@ -1856,6 +1860,7 @@ protected: TComputeMemoryLimits MemoryLimits; const bool CanAllocateExtraMemory = false; const IDqAsyncIoFactory::TPtr AsyncIoFactory; + const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry = nullptr; const NDqProto::ECheckpointingMode CheckpointingMode; TDqComputeActorChannels* Channels = nullptr; TDqComputeActorCheckpoints* Checkpoints = nullptr; diff --git a/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h b/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h index 820ff26887..8a776d83fb 100644 --- a/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h +++ b/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h @@ -369,4 +369,3 @@ protected: }; } //namespace NYql::NDq - diff --git a/ydb/library/yql/dq/actors/compute/ya.make b/ydb/library/yql/dq/actors/compute/ya.make index b0d50b84ea..3cec159c24 100644 --- a/ydb/library/yql/dq/actors/compute/ya.make +++ b/ydb/library/yql/dq/actors/compute/ya.make @@ -26,6 +26,7 @@ PEERDIR( ydb/library/yql/dq/runtime ydb/library/yql/dq/tasks ydb/library/yql/dq/actors/spilling + ydb/library/yql/minikql ydb/library/yql/minikql/comp_nodes ydb/library/yql/public/issue ydb/core/quoter/public diff --git a/ydb/library/yql/dq/actors/task_runner/events.h b/ydb/library/yql/dq/actors/task_runner/events.h index bf5d38965a..c95e6ad476 100644 --- a/ydb/library/yql/dq/actors/task_runner/events.h +++ b/ydb/library/yql/dq/actors/task_runner/events.h @@ -28,12 +28,12 @@ struct TTaskRunnerEvents { EvOutputChannelDataRequest, EvOutputChannelData, - + EvInputChannelData, EvInputChannelDataAck, - + // EvContinueRun -> TaskRunner->Run() -> TEvTaskRunFinished - EvContinueRun, + EvContinueRun, EvRunFinished, EvSourceDataAck, diff --git a/ydb/library/yql/providers/dq/actors/compute_actor.cpp b/ydb/library/yql/providers/dq/actors/compute_actor.cpp index f870334303..ac7f47a232 100644 --- a/ydb/library/yql/providers/dq/actors/compute_actor.cpp +++ b/ydb/library/yql/providers/dq/actors/compute_actor.cpp @@ -67,6 +67,7 @@ IActor* CreateComputeActor( operationId, task, options.AsyncIoFactory, + options.FunctionRegistry, computeRuntimeSettings, memoryLimits, taskRunnerFactory, @@ -77,6 +78,7 @@ IActor* CreateComputeActor( operationId, task, options.AsyncIoFactory, + options.FunctionRegistry, computeRuntimeSettings, memoryLimits, taskRunnerActorFactory, diff --git a/ydb/library/yql/providers/dq/actors/worker_actor.cpp b/ydb/library/yql/providers/dq/actors/worker_actor.cpp index 59e28aa949..88371368e8 100644 --- a/ydb/library/yql/providers/dq/actors/worker_actor.cpp +++ b/ydb/library/yql/providers/dq/actors/worker_actor.cpp @@ -11,6 +11,7 @@ #include <ydb/library/yql/utils/log/log.h> #include <ydb/library/yql/minikql/mkql_string_util.h> +#include <ydb/library/yql/minikql/mkql_program_builder.h> #include <ydb/library/actors/core/event_pb.h> #include <ydb/library/actors/core/hfunc.h> @@ -56,6 +57,7 @@ struct TSourceInfo { bool PushStarted = false; bool Finished = false; NKikimr::NMiniKQL::TTypeEnvironment* TypeEnv = nullptr; + std::optional<NKikimr::NMiniKQL::TProgramBuilder> ProgramBuilder; }; struct TSinkInfo { @@ -93,10 +95,12 @@ public: explicit TDqWorker( const ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory, const IDqAsyncIoFactory::TPtr& asyncIoFactory, + const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, TWorkerRuntimeData* runtimeData, const TString& traceId) : TRichActor<TDqWorker>(&TDqWorker::Handler) , AsyncIoFactory(asyncIoFactory) + , FunctionRegistry(functionRegistry) , TaskRunnerActorFactory(taskRunnerActorFactory) , RuntimeData(runtimeData) , TraceId(traceId) @@ -294,6 +298,7 @@ private: if (input.HasSource()) { auto& source = SourcesMap[inputId]; source.TypeEnv = const_cast<NKikimr::NMiniKQL::TTypeEnvironment*>(&typeEnv); + source.ProgramBuilder.emplace(*source.TypeEnv, *FunctionRegistry); std::tie(source.Source, source.Actor) = AsyncIoFactory->CreateDqSource( IDqAsyncIoFactory::TSourceArguments { @@ -307,6 +312,7 @@ private: .ComputeActorId = SelfId(), .TypeEnv = typeEnv, .HolderFactory = holderFactory, + .ProgramBuilder = *source.ProgramBuilder, .MemoryQuotaManager = MemoryQuotaManager }); RegisterLocalChild(source.Actor); @@ -769,6 +775,7 @@ private: /*_________________________________________________________*/ IDqAsyncIoFactory::TPtr AsyncIoFactory; + const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry; ITaskRunnerActorFactory::TPtr TaskRunnerActorFactory; NTaskRunnerActor::ITaskRunnerActor* Actor = nullptr; TActorId TaskRunnerActor; @@ -808,13 +815,15 @@ NActors::IActor* CreateWorkerActor( TWorkerRuntimeData* runtimeData, const TString& traceId, const ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory, - const IDqAsyncIoFactory::TPtr& asyncIoFactory) + const IDqAsyncIoFactory::TPtr& asyncIoFactory, + const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry) { Y_ABORT_UNLESS(taskRunnerActorFactory); return new TLogWrapReceive( new TDqWorker( taskRunnerActorFactory, asyncIoFactory, + functionRegistry, runtimeData, traceId), traceId); } diff --git a/ydb/library/yql/providers/dq/actors/worker_actor.h b/ydb/library/yql/providers/dq/actors/worker_actor.h index 438578fd97..4e06e3a829 100644 --- a/ydb/library/yql/providers/dq/actors/worker_actor.h +++ b/ydb/library/yql/providers/dq/actors/worker_actor.h @@ -25,6 +25,7 @@ namespace NYql::NDqs { TWorkerRuntimeData* runtimeData, const TString& traceId, const NDq::NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory, - const NDq::IDqAsyncIoFactory::TPtr& asyncIoFactory); + const NDq::IDqAsyncIoFactory::TPtr& asyncIoFactory, + const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry); } // namespace NYql::NDqs diff --git a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp index fca8b7a426..1161237803 100644 --- a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp +++ b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp @@ -392,7 +392,8 @@ private: Options.RuntimeData, traceId, Options.TaskRunnerActorFactory, - Options.AsyncIoFactory)); + Options.AsyncIoFactory, + Options.FunctionRegistry)); } allocationInfo.WorkerActors.ActorIds.emplace_back(RegisterChild( actor.Release(), createComputeActor ? NYql::NDq::TEvDq::TEvAbortExecution::Unavailable("Aborted by LWM").Release() : nullptr diff --git a/ydb/library/yql/providers/solomon/async_io/dq_solomon_read_actor.cpp b/ydb/library/yql/providers/solomon/async_io/dq_solomon_read_actor.cpp new file mode 100644 index 0000000000..c027deb05b --- /dev/null +++ b/ydb/library/yql/providers/solomon/async_io/dq_solomon_read_actor.cpp @@ -0,0 +1,454 @@ +#include "dq_solomon_read_actor.h" + +#include <ydb/library/yql/providers/solomon/scheme/yql_solomon_scheme.h> + +#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h> +#include <ydb/library/yql/dq/actors/protos/dq_events.pb.h> +#include <ydb/library/yql/dq/actors/compute/dq_checkpoints_states.h> + +#include <ydb/library/yql/minikql/comp_nodes/mkql_saveload.h> +#include <ydb/library/yql/minikql/mkql_alloc.h> +#include <ydb/library/yql/minikql/mkql_program_builder.h> +#include <ydb/library/yql/minikql/mkql_string_util.h> + +#include <ydb/library/yql/public/udf/udf_data_type.h> + +#include <ydb/library/yql/utils/actor_log/log.h> +#include <ydb/library/yql/utils/actors/http_sender_actor.h> +#include <ydb/library/yql/utils/log/log.h> +#include <ydb/library/yql/utils/url_builder.h> +#include <ydb/library/yql/utils/yql_panic.h> + +#include <ydb/library/actors/core/actor.h> +#include <ydb/library/actors/core/event_local.h> +#include <ydb/library/actors/core/events.h> +#include <ydb/library/actors/core/hfunc.h> +#include <ydb/library/actors/core/log.h> +#include <ydb/library/actors/http/http_proxy.h> +#include <library/cpp/json/json_reader.h> + + +#include <util/generic/algorithm.h> +#include <util/generic/hash.h> +#include <util/system/compiler.h> + +#define SINK_LOG_T(s) \ + LOG_TRACE_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s) +#define SINK_LOG_D(s) \ + LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s) +#define SINK_LOG_I(s) \ + LOG_INFO_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s) +#define SINK_LOG_W(s) \ + LOG_WARN_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s) +#define SINK_LOG_N(s) \ + LOG_NOTICE_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s) +#define SINK_LOG_E(s) \ + LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s) +#define SINK_LOG_C(s) \ + LOG_CRIT_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s) +#define SINK_LOG(prio, s) \ + LOG_LOG_S(*NActors::TlsActivationContext, prio, NKikimrServices::KQP_COMPUTE, LogPrefix << s) + +namespace NYql::NDq { + +using namespace NActors; +using namespace NLog; +using namespace NKikimr::NMiniKQL; + +namespace { + +enum ESystemColumn{ + SC_KIND = 0, + SC_LABELS, + SC_VALUE, + SC_TYPE, + SC_TS +}; + +struct TDqSolomonReadParams { + NSo::NProto::TDqSolomonSource Source; +}; + +TString GetReadSolomonUrl(const TString& endpoint, bool useSsl, const TString& project, const ::NYql::NSo::NProto::ESolomonClusterType& type) { + TUrlBuilder builder((useSsl ? "https://" : "http://") + endpoint); + + switch (type) { + case NSo::NProto::ESolomonClusterType::CT_SOLOMON: { + builder.AddPathComponent("api"); + builder.AddPathComponent("v2"); + builder.AddPathComponent("projects"); + builder.AddPathComponent(project); + builder.AddPathComponent("sensors"); + builder.AddPathComponent("data"); + break; + } + case NSo::NProto::ESolomonClusterType::CT_MONITORING: { + [[fallthrough]]; + } + default: + Y_ENSURE(false, "Invalid cluster type " << ToString<ui32>(type)); + } + + return builder.Build(); +} + +auto RetryPolicy = NYql::NDq::THttpSenderRetryPolicy::GetExponentialBackoffPolicy( + [](const NHttp::TEvHttpProxy::TEvHttpIncomingResponse* resp){ + if (!resp || !resp->Response) { + // Connection wasn't established. Should retry. + return ERetryErrorClass::ShortRetry; + } + + if (resp->Response->Status == "401") { + return ERetryErrorClass::NoRetry; + } + + return ERetryErrorClass::ShortRetry; + }); + +} // namespace + + +class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadActor>, public IDqComputeActorAsyncInput { +public: + static constexpr char ActorName[] = "DQ_SOLOMON_READ_ACTOR"; + + TDqSolomonReadActor( + ui64 inputIndex, + TCollectStatsLevel statsLevel, + const TTxId& txId, + const NActors::TActorId& computeActorId, + const THolderFactory& holderFactory, + NKikimr::NMiniKQL::TProgramBuilder& programBuilder, + TDqSolomonReadParams&& readParams, + const ::NMonitoring::TDynamicCounterPtr& counters, + std::shared_ptr<NYdb::ICredentialsProvider> credentialsProvider + ) + : InputIndex(inputIndex) + , TxId(txId) + , ComputeActorId(computeActorId) + , HolderFactory(holderFactory) + , ProgramBuilder(programBuilder) + , LogPrefix(TStringBuilder() << "TxId: " << TxId << ", Solomon source. ") + , ReadParams(std::move(readParams)) + , Url(GetUrl()) + , CredentialsProvider(credentialsProvider) + { + Y_UNUSED(counters); + SINK_LOG_D("Init"); + IngressStats.Level = statsLevel; + + auto stringType = ProgramBuilder.NewDataType(NYql::NUdf::TDataType<char*>::Id); + DictType = ProgramBuilder.NewDictType(stringType, stringType, false); + + FillSystemColumnPositionindex(); + } + + void FillSystemColumnPositionindex() { + std::vector<TString> names(ReadParams.Source.GetSystemColumns().begin(), ReadParams.Source.GetSystemColumns().end()); + names.insert(names.end(), ReadParams.Source.GetLabelNames().begin(), ReadParams.Source.GetLabelNames().end()); + std::sort(names.begin(), names.end()); + size_t index = 0; + for (auto& n : names) { + Index[n] = index++; + } + } + + void Bootstrap() { + Become(&TDqSolomonReadActor::StateFunc); + RequestMetrics(); + } + + STRICT_STFUNC(StateFunc, + hFunc(TEvHttpBase::TEvSendResult, Handle); + ) + + i64 GetAsyncInputData(TUnboxedValueBatch& buffer, TMaybe<TInstant>&, bool& finished, i64 freeSpace) final { + Y_UNUSED(freeSpace); + YQL_ENSURE(!buffer.IsWide(), "Wide stream is not supported"); + + for (auto j : Batch) { + auto& a = j["vector"].GetArray(); + for (auto& aa : a) { + auto& series = aa["timeseries"]; + auto& kind = series["kind"]; + auto& type = series["type"]; + auto& labels = series["labels"]; + auto dictValueBuilder = HolderFactory.NewDict(DictType, 0); + for (auto& [k, v]: labels.GetMap()) { + dictValueBuilder->Add(NKikimr::NMiniKQL::MakeString(k), NKikimr::NMiniKQL::MakeString(v.GetString())); + } + auto dictValue = dictValueBuilder->Build(); + + auto& timestamps = series["timestamps"].GetArray(); + auto& values = series["values"].GetArray(); + + for (size_t i = 0; i < timestamps.size(); ++i){ + NUdf::TUnboxedValue* items = nullptr; + auto value = HolderFactory.CreateDirectArrayHolder(ReadParams.Source.GetSystemColumns().size() + ReadParams.Source.GetLabelNames().size(), items); + if (auto it = Index.find(SOLOMON_SCHEME_KIND); it != Index.end()) { + items[it->second] = NKikimr::NMiniKQL::MakeString(kind.GetString()); + } + + if (auto it = Index.find(SOLOMON_SCHEME_LABELS); it != Index.end()) { + items[it->second] = dictValue; + } + + if (auto it = Index.find(SOLOMON_SCHEME_VALUE); it != Index.end()) { + items[it->second] = NUdf::TUnboxedValuePod(values[i].GetDouble()); + } + + if (auto it = Index.find(SOLOMON_SCHEME_TYPE); it != Index.end()) { + items[it->second] = NKikimr::NMiniKQL::MakeString(type.GetString()); + } + + if (auto it = Index.find(SOLOMON_SCHEME_TS); it != Index.end()) { + // convert ms to sec + items[it->second] = NUdf::TUnboxedValuePod((ui64)timestamps[i].GetUInteger() / 1000); + } + + for (const auto& c : ReadParams.Source.GetLabelNames()) { + auto& v = items[Index[c]]; + auto it = labels.GetMap().find(c); + if (it != labels.GetMap().end()) { + v = NKikimr::NMiniKQL::MakeString(it->second.GetString()); + } else { + // empty string + v = NKikimr::NMiniKQL::MakeString(""); + } + } + + buffer.push_back(value); + } + } + } + + finished = !Batch.empty(); + Batch.clear(); + return 0; + } + + void SaveState(const NDqProto::TCheckpoint&, TSourceState&) final {} + void LoadState(const TSourceState&) override { } + void CommitState(const NDqProto::TCheckpoint&) override { } + + ui64 GetInputIndex() const override { + return InputIndex; + } + + const TDqAsyncStats& GetIngressStats() const override { + return IngressStats; + } + +private: + // IActor & IDqComputeActorAsyncInput + void PassAway() override { // Is called from Compute Actor + if (HttpProxyId) { + Send(HttpProxyId, new TEvents::TEvPoison()); + } + + TActor<TDqSolomonReadActor>::PassAway(); + } + +private: + TSourceState BuildState() { return {}; } + + void NotifyComputeActorWithData() { + Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex)); + } + + TString GetUrl() const { + return GetReadSolomonUrl(ReadParams.Source.GetEndpoint(), + ReadParams.Source.GetUseSsl(), + ReadParams.Source.GetProject(), + ReadParams.Source.GetClusterType()); + } + + NHttp::THttpOutgoingRequestPtr BuildSolomonRequest(TStringBuf data) { + NHttp::THttpOutgoingRequestPtr httpRequest = NHttp::THttpOutgoingRequest::CreateRequestPost(Url); + FillAuth(httpRequest); + httpRequest->Set<&NHttp::THttpRequest::ContentType>("application/json"); + httpRequest->Set<&NHttp::THttpRequest::Body>(data); + return httpRequest; + } + + void FillAuth(NHttp::THttpOutgoingRequestPtr& httpRequest) { + const TString authorizationHeader = "Authorization"; + const TString authToken = CredentialsProvider->GetAuthInfo(); + + switch (ReadParams.Source.GetClusterType()) { + case NSo::NProto::ESolomonClusterType::CT_SOLOMON: + httpRequest->Set(authorizationHeader, "OAuth " + authToken); + break; + case NSo::NProto::ESolomonClusterType::CT_MONITORING: + httpRequest->Set(authorizationHeader, "Bearer " + authToken); + break; + default: + Y_ENSURE(false, "Invalid cluster type " << ToString<ui32>(ReadParams.Source.GetClusterType())); + } + } + + void RequestMetrics() { + if (Y_UNLIKELY(!HttpProxyId)) { + HttpProxyId = Register(NHttp::CreateHttpProxy(NMonitoring::TMetricRegistry::SharedInstance())); + } + const auto& source = ReadParams.Source; + const auto& ds = source.GetDownsampling(); + + NJsonWriter::TBuf w; + w.BeginObject() + .UnsafeWriteKey("from").WriteString(TInstant::Seconds(source.GetFrom()).ToString()) + .UnsafeWriteKey("to").WriteString(TInstant::Seconds(source.GetTo()).ToString()) + .UnsafeWriteKey("program").WriteString(source.GetProgram()) + .UnsafeWriteKey("downsampling") + .BeginObject() + .UnsafeWriteKey("disabled").WriteBool(ds.GetDisabled()) + .UnsafeWriteKey("aggregation").WriteString(ds.GetAggregation()) + .UnsafeWriteKey("fill").WriteString(ds.GetFill()) + .UnsafeWriteKey("gridMillis").WriteLongLong(ds.GetGridMs()) + .EndObject() + .EndObject(); + + // const TString body = R"({ + // "downsampling": { + // "aggregation": "MAX", + // "disabled": true, + // "fill": "PREVIOUS", + // "gridMillis": 3600000, + // "ignoreMinStepMillis": true, + // "maxPoints": 500 + // }, + // "forceCluster": "", + // "from": "2023-12-08T14:40:39Z", + // "program": "{execpool=User,activity=YQ_STORAGE_PROXY,sensor=ActorsAliveByActivity}", + // "to": "2023-12-08T14:45:39Z" + // })"; + + // const TStringBuf body = w.Str(); + //Cerr << "EX: Sending request: " << body << Endl; + const NHttp::THttpOutgoingRequestPtr httpRequest = BuildSolomonRequest(w.Str()); + + //const size_t bodySize = body.size(); + const TActorId httpSenderId = Register(CreateHttpSenderActor(SelfId(), HttpProxyId, RetryPolicy)); + ui8 cookie = 0; + Send(httpSenderId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(httpRequest), /*flags=*/0, cookie); + //SINK_LOG_T("Sent read to solomon, body size: " << bodySize); + //Cerr << "EX: RequestMetrics" << Endl; + } + + void Handle(TEvHttpBase::TEvSendResult::TPtr& ev) { + //Cerr << "EX: Handle(TEvHttpBase::TEvSendResult::TPtr& ev)" << Endl; + const auto* res = ev->Get(); + const TString& error = res->HttpIncomingResponse->Get()->GetError(); + + //Cerr << "EX: Handle(TEvHttpBase::TEvSendResult::TPtr& ev), error: " << error << Endl; + if (!error.empty() || (res->HttpIncomingResponse->Get()->Response && res->HttpIncomingResponse->Get()->Response->Status != "200")) { + TStringBuilder errorBuilder; + errorBuilder << "Error while sending request to monitoring api: " << error; + const auto& response = res->HttpIncomingResponse->Get()->Response; + if (response) { + errorBuilder << " " << response->GetObfuscatedData(); + } + + TIssues issues { TIssue(errorBuilder) }; + SINK_LOG_W("Got " << (res->IsTerminal ? "terminal " : "") << "error response[" << ev->Cookie << "] from solomon: " << issues.ToOneLineString()); + //Cerr << "Got " << (res->IsTerminal ? "terminal " : "") << "error response[" << ev->Cookie << "] from solomon: " << issues.ToOneLineString(); + return; + } + + HandleSuccessSolomonResponse(*res->HttpIncomingResponse->Get(), ev->Cookie); + } + + void HandleSuccessSolomonResponse(const NHttp::TEvHttpProxy::TEvHttpIncomingResponse& response, ui64 cookie) { + Y_UNUSED(cookie); + //SINK_LOG_E("Solomon response[" << cookie << "]: " << response.Response->GetObfuscatedData()); + //Cerr << "EX:" << response.Response->Body << Endl; + NJson::TJsonValue json; + if (!NJson::ReadJsonTree(response.Response->Body, &json, false)) { + // todo: improve + Y_ABORT_UNLESS(false, "Failed to parse json response"); + return; + } + + Batch.push_back(json); + NotifyComputeActorWithData(); + } + +private: + const ui64 InputIndex; + TDqAsyncStats IngressStats; + const TTxId TxId; + const NActors::TActorId ComputeActorId; + const THolderFactory& HolderFactory; + NKikimr::NMiniKQL::TProgramBuilder& ProgramBuilder; + const TString LogPrefix; + const TDqSolomonReadParams ReadParams; + const TString Url; + TActorId HttpProxyId; + + TString SourceId; + std::shared_ptr<NYdb::ICredentialsProvider> CredentialsProvider; + TType* DictType = nullptr; + std::vector<size_t> SystemColumnPositionIndex; + THashMap<TString, size_t> Index; + std::vector<NJson::TJsonValue> Batch; +}; + +std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqSolomonReadActor( + NYql::NSo::NProto::TDqSolomonSource&& settings, + ui64 inputIndex, + TCollectStatsLevel statsLevel, + const TTxId& txId, + const NActors::TActorId& computeActorId, + const THolderFactory& holderFactory, + NKikimr::NMiniKQL::TProgramBuilder& programBuilder, + const THashMap<TString, TString>& secureParams, + const ::NMonitoring::TDynamicCounterPtr& counters, + ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory) +{ + const TString& tokenName = settings.GetToken().GetName(); + const TString token = secureParams.Value(tokenName, TString()); + + TDqSolomonReadParams params { + .Source = std::move(settings), + }; + + auto credentialsProviderFactory = CreateCredentialsProviderFactoryForStructuredToken(credentialsFactory, token); + auto credentialsProvider = credentialsProviderFactory->CreateProvider(); + + TDqSolomonReadActor* actor = new TDqSolomonReadActor( + inputIndex, + statsLevel, + txId, + computeActorId, + holderFactory, + programBuilder, + std::move(params), + counters, + credentialsProvider); + return {actor, actor}; +} + +void RegisterDQSolomonReadActorFactory(TDqAsyncIoFactory& factory, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory) { + factory.RegisterSource<NSo::NProto::TDqSolomonSource>("SolomonSource", + [credentialsFactory]( + NYql::NSo::NProto::TDqSolomonSource&& settings, + IDqAsyncIoFactory::TSourceArguments&& args) + { + auto counters = MakeIntrusive<::NMonitoring::TDynamicCounters>(); + + return CreateDqSolomonReadActor( + std::move(settings), + args.InputIndex, + args.StatsLevel, + args.TxId, + args.ComputeActorId, + args.HolderFactory, + args.ProgramBuilder, + args.SecureParams, + counters, + credentialsFactory); + }); +} + +} diff --git a/ydb/library/yql/providers/solomon/async_io/dq_solomon_read_actor.h b/ydb/library/yql/providers/solomon/async_io/dq_solomon_read_actor.h new file mode 100644 index 0000000000..d74e2e7679 --- /dev/null +++ b/ydb/library/yql/providers/solomon/async_io/dq_solomon_read_actor.h @@ -0,0 +1,29 @@ +#pragma once + +#include <ydb/library/yql/utils/actors/http_sender.h> +#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h> +#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h> +#include <ydb/library/yql/providers/common/token_accessor/client/factory.h> +#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h> +#include <ydb/library/yql/providers/solomon/proto/dq_solomon_shard.pb.h> + +#include <ydb/library/actors/core/actor.h> +#include <ydb/library/actors/core/events.h> +#include <ydb/library/actors/http/http_proxy.h> + +#include <ydb/library/yql/providers/solomon/proto/dq_solomon_shard.pb.h> + +namespace NYql::NDq { + +std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqSolomonReadActor( + NYql::NSo::NProto::TDqSolomonShard&& settings, + ui64 inputIndex, + TCollectStatsLevel statsLevel, + const TTxId& txId, + const THashMap<TString, TString>& secureParams, + const ::NMonitoring::TDynamicCounterPtr& counters, + ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory); + +void RegisterDQSolomonReadActorFactory(TDqAsyncIoFactory& factory, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory); + +} // namespace NYql::NDq diff --git a/ydb/library/yql/providers/solomon/async_io/ya.make b/ydb/library/yql/providers/solomon/async_io/ya.make index 4c803283c6..d5c465a1aa 100644 --- a/ydb/library/yql/providers/solomon/async_io/ya.make +++ b/ydb/library/yql/providers/solomon/async_io/ya.make @@ -1,6 +1,7 @@ LIBRARY() SRCS( + dq_solomon_read_actor.cpp dq_solomon_write_actor.cpp metrics_encoder.cpp ) @@ -8,13 +9,13 @@ SRCS( PEERDIR( library/cpp/json/easy_parse library/cpp/monlib/encode/json - ydb/library/yql/minikql/computation + ydb/library/yql/dq/actors/compute ydb/library/yql/providers/common/token_accessor/client + ydb/library/yql/providers/solomon/proto + ydb/library/yql/providers/solomon/scheme ydb/library/yql/public/types ydb/library/yql/public/udf ydb/library/yql/utils/log - ydb/library/yql/dq/actors/compute - ydb/library/yql/providers/solomon/proto ) YQL_LAST_ABI_VERSION() diff --git a/ydb/library/yql/providers/solomon/expr_nodes/yql_solomon_expr_nodes.json b/ydb/library/yql/providers/solomon/expr_nodes/yql_solomon_expr_nodes.json index d064daa1b4..6af3982a91 100644 --- a/ydb/library/yql/providers/solomon/expr_nodes/yql_solomon_expr_nodes.json +++ b/ydb/library/yql/providers/solomon/expr_nodes/yql_solomon_expr_nodes.json @@ -44,13 +44,43 @@ ] }, { - "Name": "TSoReadShardMeta", + "Name": "TSoSourceSettings", "Base": "TCallable", - "Match": {"Type": "Callable", "Name": "SoReadShardMeta!"}, + "Match": {"Type": "Callable", "Name": "SoSourceSettings"}, + "Children": [ + {"Index": 0, "Name": "Token", "Type": "TCoSecureParam"}, + {"Index": 1, "Name": "RowType", "Type": "TExprBase"}, + {"Index": 2, "Name": "SystemColumns", "Type": "TCoAtomList"}, + {"Index": 3, "Name": "LabelNames", "Type": "TCoAtomList"}, + {"Index": 4, "Name": "From", "Type": "TCoAtom"}, + {"Index": 5, "Name": "To", "Type": "TCoAtom"}, + {"Index": 6, "Name": "Program", "Type": "TCoAtom"}, + {"Index": 7, "Name": "DownsamplingDisabled", "Type": "TCoBool"}, + {"Index": 8, "Name": "DownsamplingAggregation", "Type": "TCoAtom"}, + {"Index": 9, "Name": "DownsamplingFill", "Type": "TCoAtom"}, + {"Index": 10, "Name": "DownsamplingGridSec", "Type": "TCoUint32"} + ] + }, + { + "Name": "TSoObject", + "Base": "TCallable", + "Match": {"Type": "Callable", "Name": "SoObject"}, + "Children": [ + {"Index": 0, "Name": "Settings", "Type": "TExprBase"} + ] + }, + { + "Name": "TSoReadObject", + "Base": "TCallable", + "Match": {"Type": "Callable", "Name": "SoReadObject!"}, "Children": [ {"Index": 0, "Name": "World", "Type": "TExprBase"}, {"Index": 1, "Name": "DataSource", "Type": "TSoDataSource"}, - {"Index": 2, "Name": "Shard", "Type": "TCoAtom"} + {"Index": 2, "Name": "Object", "Type": "TSoObject"}, + {"Index": 3, "Name": "SystemColumns", "Type": "TCoAtomList"}, + {"Index": 4, "Name": "LabelNames", "Type": "TCoAtomList"}, + {"Index": 5, "Name": "RowType", "Type": "TExprBase"}, + {"Index": 6, "Name": "ColumnOrder", "Type": "TExprBase", "Optional": true} ] }, { @@ -75,16 +105,6 @@ {"Index": 3, "Name": "Service", "Type": "TCoAtom"}, {"Index": 4, "Name": "Token", "Type": "TCoSecureParam", "Optional": true} ] - }, - { - "Name": "TDqSoShardSink", - "Base": "TCallable", - "Match": {"Type": "Callable", "Name": "DqSoShardSink"}, - "Children": [ - {"Index": 0, "Name": "Shard", "Type": "TSoShard"}, - {"Index": 1, "Name": "Settings", "Type": "TCoNameValueTupleList"}, - {"Index": 2, "Name": "Token", "Type": "TCoSecureParam", "Optional": true} - ] } ] } diff --git a/ydb/library/yql/providers/solomon/proto/dq_solomon_shard.proto b/ydb/library/yql/providers/solomon/proto/dq_solomon_shard.proto index 3bb9d19d51..018c530372 100644 --- a/ydb/library/yql/providers/solomon/proto/dq_solomon_shard.proto +++ b/ydb/library/yql/providers/solomon/proto/dq_solomon_shard.proto @@ -39,3 +39,30 @@ message TDqSolomonShard { string ServiceAccount = 40; TToken Token = 41; } + +message TDownsampling { + bool Disabled = 1; + string Aggregation = 2; + string Fill = 3; + int64 GridMs = 4; +} + +message TDqSolomonSource { + ESolomonClusterType ClusterType = 1; + string Endpoint = 2; + bool UseSsl = 3; + string ServiceAccount = 4; + TToken Token = 5; + + string Project = 6; + oneof p { + string Selectors = 7; + string Program = 8; + } + // seconds since Epoch + int64 From = 9; + int64 To = 10; + TDownsampling Downsampling = 11; + repeated string SystemColumns = 12; + repeated string LabelNames = 13; +} diff --git a/ydb/library/yql/providers/solomon/provider/ya.make b/ydb/library/yql/providers/solomon/provider/ya.make index 2ca90b8ac0..77ebe2161b 100644 --- a/ydb/library/yql/providers/solomon/provider/ya.make +++ b/ydb/library/yql/providers/solomon/provider/ya.make @@ -11,6 +11,7 @@ SRCS( yql_solomon_dq_integration.cpp yql_solomon_io_discovery.cpp yql_solomon_load_meta.cpp + yql_solomon_mkql_compiler.cpp yql_solomon_physical_optimize.cpp yql_solomon_provider.cpp ) @@ -19,6 +20,7 @@ PEERDIR( ydb/library/actors/protos ydb/library/yql/dq/expr_nodes ydb/library/yql/dq/integration + ydb/library/yql/dq/opt ydb/library/yql/providers/common/config ydb/library/yql/providers/common/proto ydb/library/yql/providers/common/provider @@ -27,7 +29,7 @@ PEERDIR( ydb/library/yql/providers/result/expr_nodes ydb/library/yql/providers/solomon/expr_nodes ydb/library/yql/providers/solomon/proto - ydb/library/yql/dq/opt + ydb/library/yql/providers/solomon/scheme ) YQL_LAST_ABI_VERSION() diff --git a/ydb/library/yql/providers/solomon/provider/yql_solomon_config.cpp b/ydb/library/yql/providers/solomon/provider/yql_solomon_config.cpp index fe00223f7c..f9e979bef4 100644 --- a/ydb/library/yql/providers/solomon/provider/yql_solomon_config.cpp +++ b/ydb/library/yql/providers/solomon/provider/yql_solomon_config.cpp @@ -6,6 +6,7 @@ using namespace NCommon; TSolomonConfiguration::TSolomonConfiguration() { + REGISTER_SETTING(*this, _EnableReading); } TSolomonSettings::TConstPtr TSolomonConfiguration::Snapshot() const { diff --git a/ydb/library/yql/providers/solomon/provider/yql_solomon_config.h b/ydb/library/yql/providers/solomon/provider/yql_solomon_config.h index 6f1d695d0b..e8f14aeca5 100644 --- a/ydb/library/yql/providers/solomon/provider/yql_solomon_config.h +++ b/ydb/library/yql/providers/solomon/provider/yql_solomon_config.h @@ -9,6 +9,7 @@ namespace NYql { struct TSolomonSettings { using TConstPtr = std::shared_ptr<const TSolomonSettings>; + NCommon::TConfSetting<bool, false> _EnableReading; }; struct TSolomonConfiguration diff --git a/ydb/library/yql/providers/solomon/provider/yql_solomon_datasource.cpp b/ydb/library/yql/providers/solomon/provider/yql_solomon_datasource.cpp index 31ceb1701c..98aa9e47cf 100644 --- a/ydb/library/yql/providers/solomon/provider/yql_solomon_datasource.cpp +++ b/ydb/library/yql/providers/solomon/provider/yql_solomon_datasource.cpp @@ -1,4 +1,5 @@ #include "yql_solomon_provider_impl.h" +#include "yql_solomon_dq_integration.h" #include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h> #include <ydb/library/yql/providers/solomon/expr_nodes/yql_solomon_expr_nodes.h> @@ -24,6 +25,7 @@ public: , LoadMetaDataTransformer_(CreateSolomonLoadTableMetadataTransformer(State_)) , TypeAnnotationTransformer_(CreateSolomonDataSourceTypeAnnotationTransformer(State_)) , ExecutionTransformer_(CreateSolomonDataSourceExecTransformer(State_)) + , DqIntegration_(CreateSolomonDqIntegration(State_)) { } @@ -35,13 +37,13 @@ public: return *ConfigurationTransformer_; } -// IGraphTransformer& GetIODiscoveryTransformer() override { -// return *IODiscoveryTransformer_; -// } + IGraphTransformer& GetIODiscoveryTransformer() override { + return *IODiscoveryTransformer_; + } -// IGraphTransformer& GetLoadTableMetadataTransformer() override { -// return *LoadMetaDataTransformer_; -// } + IGraphTransformer& GetLoadTableMetadataTransformer() override { + return *LoadMetaDataTransformer_; + } IGraphTransformer& GetTypeAnnotationTransformer(bool instantOnly) override { Y_UNUSED(instantOnly); @@ -81,9 +83,14 @@ public: } bool CanPullResult(const TExprNode& node, TSyncMap& syncList, bool& canRef) override { - Y_UNUSED(node); Y_UNUSED(syncList); canRef = false; + if (node.IsCallable(TCoRight::CallableName())) { + const auto input = node.Child(0); + if (input->IsCallable(TSoReadObject::CallableName())) { + return true; + } + } return false; } @@ -93,6 +100,29 @@ public: return node; } + bool GetDependencies(const TExprNode& node, TExprNode::TListType& children, bool compact) override { + Y_UNUSED(compact); + + for (auto& child : node.Children()) { + children.push_back(child.Get()); + } + + if (TMaybeNode<TSoReadObject>(&node)) { + return true; + } + return false; + } + + ui32 GetInputs(const TExprNode& node, TVector<TPinInfo>&, bool withLimits) override { + Y_UNUSED(node); + Y_UNUSED(withLimits); + return 0; + } + + IDqIntegration* GetDqIntegration() override { + return DqIntegration_.Get(); + } + private: TSolomonState::TPtr State_; @@ -101,6 +131,7 @@ private: THolder<IGraphTransformer> LoadMetaDataTransformer_; THolder<TVisitorTransformerBase> TypeAnnotationTransformer_; THolder<TExecTransformerBase> ExecutionTransformer_; + const THolder<IDqIntegration> DqIntegration_; }; TIntrusivePtr<IDataProvider> CreateSolomonDataSource(TSolomonState::TPtr state) { diff --git a/ydb/library/yql/providers/solomon/provider/yql_solomon_datasource_type_ann.cpp b/ydb/library/yql/providers/solomon/provider/yql_solomon_datasource_type_ann.cpp index f6a0ce8003..c33e1574ed 100644 --- a/ydb/library/yql/providers/solomon/provider/yql_solomon_datasource_type_ann.cpp +++ b/ydb/library/yql/providers/solomon/provider/yql_solomon_datasource_type_ann.cpp @@ -1,17 +1,176 @@ #include "yql_solomon_provider_impl.h" +#include <ydb/library/yql/providers/solomon/expr_nodes/yql_solomon_expr_nodes.h> +#include <ydb/library/yql/providers/common/provider/yql_provider_names.h> #include <ydb/library/yql/providers/common/provider/yql_provider.h> namespace NYql { using namespace NNodes; +namespace { + +bool ValidateDatetimeFormat(TStringBuf settingName, const TExprNode& settingValue, TExprContext& ctx) { + TInstant unused; + if (!TInstant::TryParseIso8601(settingValue.Content(), unused)) { + ctx.AddError(TIssue(ctx.GetPosition(settingValue.Pos()), TStringBuilder() << settingName << " must be correct datetime, e.g. 2010-03-27T21:27:00Z, but has " << settingValue.Content())); + return false; + } + return true; +} + +} + class TSolomonDataSourceTypeAnnotationTransformer : public TVisitorTransformerBase { public: - TSolomonDataSourceTypeAnnotationTransformer(TSolomonState::TPtr state) + explicit TSolomonDataSourceTypeAnnotationTransformer(TSolomonState::TPtr state) : TVisitorTransformerBase(true) , State_(state) { + using TSelf = TSolomonDataSourceTypeAnnotationTransformer; + AddHandler({TSoReadObject::CallableName()}, Hndl(&TSelf::HandleRead)); + AddHandler({TSoObject::CallableName()}, Hndl(&TSelf::HandleSoObject)); + AddHandler({TSoSourceSettings::CallableName()}, Hndl(&TSelf::HandleSoSourceSettings)); + } + + TStatus HandleSoSourceSettings(const TExprNode::TPtr& input, TExprContext& ctx) { + if (!EnsureArgsCount(*input, 11U, ctx)) { + return TStatus::Error; + } + + if (!TCoSecureParam::Match(input->Child(TSoSourceSettings::idx_Token))) { + ctx.AddError(TIssue(ctx.GetPosition(input->Child(TSoSourceSettings::idx_Token)->Pos()), TStringBuilder() << "Expected " << TCoSecureParam::CallableName())); + return TStatus::Error; + } + + const auto& rowType = *input->Child(TSoSourceSettings::idx_RowType); + if (!EnsureType(rowType, ctx)) { + return TStatus::Error; + } + + auto& systemColumns = *input->Child(TSoSourceSettings::idx_SystemColumns); + if (!EnsureTupleOfAtoms(systemColumns, ctx)) { + return TStatus::Error; + } + + auto& labelNames = *input->Child(TSoSourceSettings::idx_LabelNames); + if (!EnsureTupleOfAtoms(labelNames, ctx)) { + return TStatus::Error; + } + + auto& from = *input->Child(TSoSourceSettings::idx_From); + if (!EnsureAtom(from, ctx) || !ValidateDatetimeFormat("from"sv, from, ctx)) { + return TStatus::Error; + } + + auto& to = *input->Child(TSoSourceSettings::idx_To); + if (!EnsureAtom(to, ctx) || !ValidateDatetimeFormat("to"sv, to, ctx)) { + return TStatus::Error; + } + + auto& program = *input->Child(TSoSourceSettings::idx_Program); + if (!EnsureAtom(program, ctx)) { + return TStatus::Error; + } + + if (program.Content().empty()) { + ctx.AddError(TIssue(ctx.GetPosition(program.Pos()), "program must be specified")); + return TStatus::Error; + } + + auto& downsamplingDisabled = *input->Child(TSoSourceSettings::idx_DownsamplingDisabled); + if (!downsamplingDisabled.IsCallable("Bool")) { + ctx.AddError(TIssue(ctx.GetPosition(downsamplingDisabled.Pos()), "downsampling.disabled must be bool")); + return TStatus::Error; + } + + auto& downsamplingAggregation = *input->Child(TSoSourceSettings::idx_DownsamplingAggregation); + if (!EnsureAtom(downsamplingAggregation, ctx)) { + return TStatus::Error; + } + + auto& downsamplingFill = *input->Child(TSoSourceSettings::idx_DownsamplingFill); + if (!EnsureAtom(downsamplingFill, ctx)) { + return TStatus::Error; + } + + auto& downsamplingGridSec = *input->Child(TSoSourceSettings::idx_DownsamplingGridSec); + if (!downsamplingGridSec.IsCallable("Uint32")) { + ctx.AddError(TIssue(ctx.GetPosition(downsamplingGridSec.Pos()), "downsampling.grid_interval must be uint32 in seconds")); + return TStatus::Error; + } + + const auto type = rowType.GetTypeAnn()->Cast<TTypeExprType>()->GetType(); + input->SetTypeAnn(ctx.MakeType<TStreamExprType>(type)); + return TStatus::Ok; + } + + TStatus HandleSoObject(const TExprNode::TPtr& input, TExprContext& ctx) { + if (!EnsureArgsCount(*input, 1U, ctx)) { + return TStatus::Error; + } + + // todo: check settings + input->SetTypeAnn(ctx.MakeType<TUnitExprType>()); + return TStatus::Ok; + } + + TStatus HandleRead(const TExprNode::TPtr& input, TExprContext& ctx) { + if (!EnsureMinMaxArgsCount(*input, 6U, 7U, ctx)) { + return TStatus::Error; + } + + if (!EnsureWorldType(*input->Child(TSoReadObject::idx_World), ctx)) { + return TStatus::Error; + } + + if (!EnsureSpecificDataSource(*input->Child(TSoReadObject::idx_DataSource), SolomonProviderName, ctx)) { + return TStatus::Error; + } + + auto& systemColumns = *input->Child(TSoReadObject::idx_SystemColumns); + if (!EnsureTupleOfAtoms(systemColumns, ctx)) { + return TStatus::Error; + } + + auto& labelNames = *input->Child(TSoReadObject::idx_LabelNames); + if (!EnsureTupleOfAtoms(labelNames, ctx)) { + return TStatus::Error; + } + + const auto& rowType = *input->Child(TSoReadObject::idx_RowType); + if (!EnsureType(rowType, ctx)) { + return TStatus::Error; + } + + if (input->ChildrenSize() > TSoReadObject::idx_ColumnOrder) { + auto& order = *input->Child(TSoReadObject::idx_ColumnOrder); + if (!EnsureTupleOfAtoms(order, ctx)) { + return TStatus::Error; + } + TVector<TString> columnOrder; + THashSet<TStringBuf> uniqs; + columnOrder.reserve(order.ChildrenSize()); + uniqs.reserve(order.ChildrenSize()); + + for (auto& child : order.ChildrenList()) { + TStringBuf col = child->Content(); + if (!uniqs.emplace(col).second) { + ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), TStringBuilder() << "Duplicate column '" << col << "' in column order list")); + return TStatus::Error; + } + columnOrder.push_back(ToString(col)); + } + return State_->Types->SetColumnOrder(*input, columnOrder, ctx); + } + + const auto type = rowType.GetTypeAnn()->Cast<TTypeExprType>()->GetType(); + input->SetTypeAnn(ctx.MakeType<TTupleExprType>(TTypeAnnotationNode::TListType{ + input->Child(TSoReadObject::idx_World)->GetTypeAnn(), + ctx.MakeType<TListExprType>(type) + })); + + return TStatus::Ok; } private: diff --git a/ydb/library/yql/providers/solomon/provider/yql_solomon_dq_integration.cpp b/ydb/library/yql/providers/solomon/provider/yql_solomon_dq_integration.cpp index bbf8883ec5..f60f7f4634 100644 --- a/ydb/library/yql/providers/solomon/provider/yql_solomon_dq_integration.cpp +++ b/ydb/library/yql/providers/solomon/provider/yql_solomon_dq_integration.cpp @@ -1,10 +1,11 @@ #include "yql_solomon_dq_integration.h" - +#include "yql_solomon_mkql_compiler.h" #include <ydb/library/yql/ast/yql_expr.h> #include <ydb/library/yql/dq/expr_nodes/dq_expr_nodes.h> #include <ydb/library/yql/utils/log/log.h> #include <ydb/library/yql/providers/common/dq/yql_dq_integration_impl.h> #include <ydb/library/yql/providers/common/proto/gateways_config.pb.h> +#include <ydb/library/yql/providers/common/schema/expr/yql_expr_schema.h> #include <ydb/library/yql/providers/dq/common/yql_dq_settings.h> #include <ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.h> #include <ydb/library/yql/providers/solomon/expr_nodes/yql_solomon_expr_nodes.h> @@ -18,6 +19,20 @@ using namespace NNodes; namespace { +bool ExtractSettingValue(const TExprNode& value, TStringBuf settingName, TExprContext& ctx, TStringBuf& settingValue) { + if (value.IsAtom()) { + settingValue = value.Content(); + return true; + } + + if (!value.IsCallable({ "String", "Utf8" })) { + ctx.AddError(TIssue(ctx.GetPosition(value.Pos()), TStringBuilder() << settingName << " must be literal value")); + return false; + } + settingValue = value.Head().Content(); + return true; +} + NSo::NProto::ESolomonClusterType MapClusterType(TSolomonClusterConfig::ESolomonClusterType clusterType) { switch (clusterType) { case TSolomonClusterConfig::SCT_SOLOMON: @@ -59,7 +74,7 @@ void FillScheme(const TTypeAnnotationNode& itemType, NSo::NProto::TDqSolomonShar schemeItem.SetDataTypeId(dataType.TypeId); if (dataType.Features & NUdf::DateType || dataType.Features & NUdf::TzDateType) { - *scheme.MutableTimestamp() = schemeItem; + *scheme.MutableTimestamp() = std::move(schemeItem); } else if (dataType.Features & NUdf::NumericType) { scheme.MutableSensors()->Add(std::move(schemeItem)); } else if (dataType.Features & NUdf::StringType) { @@ -72,30 +87,198 @@ void FillScheme(const TTypeAnnotationNode& itemType, NSo::NProto::TDqSolomonShar class TSolomonDqIntegration: public TDqIntegrationBase { public: - TSolomonDqIntegration(const TSolomonState::TPtr& state) + explicit TSolomonDqIntegration(const TSolomonState::TPtr& state) : State_(state.Get()) { } - bool CanRead(const TExprNode&, TExprContext&, bool) override { - YQL_ENSURE(false, "Unimplemented"); + ui64 Partition(const TDqSettings&, size_t maxPartitions, const TExprNode& node, TVector<TString>& partitions, TString*, TExprContext&, bool) override { + Y_UNUSED(maxPartitions); + Y_UNUSED(node); + Y_UNUSED(partitions); + partitions.push_back("zz_partition"); + return 0; + } + + bool CanRead(const TExprNode& read, TExprContext&, bool) override { + return TSoReadObject::Match(&read); } TMaybe<ui64> EstimateReadSize(ui64 /*dataSizePerJob*/, ui32 /*maxTasksPerStage*/, const TVector<const TExprNode*>&, TExprContext&) override { YQL_ENSURE(false, "Unimplemented"); } - - TExprNode::TPtr WrapRead(const TDqSettings&, const TExprNode::TPtr&, TExprContext&) override { - YQL_ENSURE(false, "Unimplemented"); + TExprNode::TPtr WrapRead(const TDqSettings&, const TExprNode::TPtr& read, TExprContext& ctx) override { + if (const auto& maybeSoReadObject = TMaybeNode<TSoReadObject>(read)) { + const auto& soReadObject = maybeSoReadObject.Cast(); + YQL_ENSURE(soReadObject.Ref().GetTypeAnn(), "No type annotation for node " << soReadObject.Ref().Content()); + + const auto& clusterName = soReadObject.DataSource().Cluster().StringValue(); + + const auto token = "cluster:default_" + clusterName; + YQL_CLOG(INFO, ProviderS3) << "Wrap " << read->Content() << " with token: " << token; + + auto settings = soReadObject.Object().Settings(); + auto& settingsRef = settings.Ref(); + TString from; + TString to; + TString program; + bool downsamplingDisabled = false; + TString downsamplingAggregation = "AVG"; + TString downsamplingFill = "PREVIOUS"; + ui32 downsamplingGridSec = 15; + + for (auto i = 0U; i < settingsRef.ChildrenSize(); ++i) { + if (settingsRef.Child(i)->Head().IsAtom("from"sv)) { + TStringBuf value; + if (!ExtractSettingValue(settingsRef.Child(i)->Tail(), "from"sv, ctx, value)) { + return {}; + } + + from = value; + continue; + } + if (settingsRef.Child(i)->Head().IsAtom("to"sv)) { + TStringBuf value; + if (!ExtractSettingValue(settingsRef.Child(i)->Tail(), "to"sv, ctx, value)) { + return {}; + } + + to = value; + continue; + } + if (settingsRef.Child(i)->Head().IsAtom("program"sv)) { + TStringBuf value; + if (!ExtractSettingValue(settingsRef.Child(i)->Tail(), "program"sv, ctx, value)) { + return {}; + } + + program = value; + continue; + } + if (settingsRef.Child(i)->Head().IsAtom("downsampling.disabled"sv)) { + TStringBuf value; + if (!ExtractSettingValue(settingsRef.Child(i)->Tail(), "downsampling.disabled"sv, ctx, value)) { + return {}; + } + downsamplingDisabled = FromString<bool>(value); + continue; + } + if (settingsRef.Child(i)->Head().IsAtom("downsampling.gridaggregation"sv)) { + TStringBuf value; + if (!ExtractSettingValue(settingsRef.Child(i)->Tail(), "downsampling.gridaggregation"sv, ctx, value)) { + return {}; + } + if (!IsIn({ "AVG"sv, "COUNT"sv, "DEFAULT_AGGREGATION"sv, "LAST"sv, "MAX"sv, "MIN"sv, "SUM"sv }, value)) { + ctx.AddError(TIssue(ctx.GetPosition(settingsRef.Child(i)->Head().Pos()), TStringBuilder() << "downsampling.grid_aggregation must be one of AVG, COUNT, DEFAULT_AGGREGATION, LAST, MAX, MIN, SUM, but has " << value)); + return {}; + } + downsamplingAggregation = value; + continue; + } + if (settingsRef.Child(i)->Head().IsAtom("downsampling.fill"sv)) { + TStringBuf value; + if (!ExtractSettingValue(settingsRef.Child(i)->Tail(), "downsampling.fill"sv, ctx, value)) { + return {}; + } + if (!IsIn({ "NONE"sv, "NULL"sv, "PREVIOUS"sv }, value)) { + ctx.AddError(TIssue(ctx.GetPosition(settingsRef.Child(i)->Head().Pos()), TStringBuilder() << "downsampling.grid_fill must be one of NONE, NULL, PREVIOUS, but has " << value)); + return {}; + } + downsamplingFill = value; + continue; + } + if (settingsRef.Child(i)->Head().IsAtom("downsampling.gridinterval"sv)) { + TStringBuf value; + if (!ExtractSettingValue(settingsRef.Child(i)->Tail(), "downsampling.gridinterval"sv, ctx, value)) { + return {}; + } + ui32 intValue = 0; + if (!TryFromString(value, intValue)) { + ctx.AddError(TIssue(ctx.GetPosition(settingsRef.Child(i)->Head().Pos()), TStringBuilder() << "downsampling.grid_interval must be positive number, but has " << value)); + return {}; + } + downsamplingGridSec = intValue; + continue; + } + } + + return Build<TDqSourceWrap>(ctx, read->Pos()) + .Input<TSoSourceSettings>() + .Token<TCoSecureParam>() + .Name().Build(token) + .Build() + .RowType(soReadObject.RowType()) + .SystemColumns(soReadObject.SystemColumns()) + .LabelNames(soReadObject.LabelNames()) + .From<TCoAtom>().Build(from) + .To<TCoAtom>().Build(to) + .Program<TCoAtom>().Build(program) + .DownsamplingDisabled<TCoBool>().Literal().Build(downsamplingDisabled ? "true" : "false").Build() + .DownsamplingAggregation<TCoAtom>().Build(downsamplingAggregation) + .DownsamplingFill<TCoAtom>().Build(downsamplingFill) + .DownsamplingGridSec<TCoUint32>().Literal().Build(ToString(downsamplingGridSec)).Build() + .Build() + .DataSource(soReadObject.DataSource().Cast<TCoDataSource>()) + .RowType(soReadObject.RowType()) + .Settings(settings) + .Done().Ptr(); + } + return read; } - TMaybe<bool> CanWrite(const TExprNode&, TExprContext&) override { - YQL_ENSURE(false, "Unimplemented"); + TMaybe<bool> CanWrite(const TExprNode& write, TExprContext&) override { + return TSoWrite::Match(&write); } - void FillSourceSettings(const TExprNode&, ::google::protobuf::Any&, TString&, size_t) override { - YQL_ENSURE(false, "Unimplemented"); + void FillSourceSettings(const TExprNode& node, ::google::protobuf::Any& protoSettings, TString& sourceType, size_t) override { + const TDqSource dqSource(&node); + const auto maybeSettings = dqSource.Settings().Maybe<TSoSourceSettings>(); + if (!maybeSettings) { + return; + } + + const auto settings = maybeSettings.Cast(); + const auto& cluster = dqSource.DataSource().Cast<TSoDataSource>().Cluster().StringValue(); + const auto* clusterDesc = State_->Configuration->ClusterConfigs.FindPtr(cluster); + YQL_ENSURE(clusterDesc, "Unknown cluster " << cluster); + NSo::NProto::TDqSolomonSource source; + source.SetEndpoint(clusterDesc->GetCluster()); + source.SetProject("yq"); + + source.SetClusterType(MapClusterType(clusterDesc->GetClusterType())); + source.SetUseSsl(clusterDesc->GetUseSsl()); + source.SetFrom(TInstant::ParseIso8601(settings.From().StringValue()).Seconds()); + source.SetTo(TInstant::ParseIso8601(settings.To().StringValue()).Seconds()); + source.SetProgram(settings.Program().StringValue()); + + auto& downsampling = *source.MutableDownsampling(); + const bool isDisabled = FromString<bool>(settings.DownsamplingDisabled().Literal().Value()); + downsampling.SetDisabled(isDisabled); + downsampling.SetAggregation(settings.DownsamplingAggregation().StringValue()); + downsampling.SetFill(settings.DownsamplingFill().StringValue()); + const ui32 gridIntervalSec = FromString<ui32>(settings.DownsamplingGridSec().Literal().Value()); + downsampling.SetGridMs(gridIntervalSec * 1000); + + source.MutableToken()->SetName(settings.Token().Name().StringValue()); + + THashSet<TString> uniqueColumns; + for (const auto& c : settings.SystemColumns()) { + const auto& columnAsString = c.StringValue(); + uniqueColumns.insert(columnAsString); + source.AddSystemColumns(columnAsString); + } + + for (const auto& c : settings.LabelNames()) { + const auto& columnAsString = c.StringValue(); + if (!uniqueColumns.insert(columnAsString).second) { + throw yexception() << "Column " << columnAsString << " already registered"; + } + source.AddLabelNames(columnAsString); + } + + protoSettings.PackFrom(source); + sourceType = "SolomonSource"; } void FillSinkSettings(const TExprNode& node, ::google::protobuf::Any& protoSettings, TString& sinkType) override { @@ -137,6 +320,10 @@ public: sinkType = "SolomonSink"; } + void RegisterMkqlCompiler(NCommon::TMkqlCallableCompilerBase& compiler) override { + RegisterDqSolomonMkqlCompilers(compiler); + } + private: TSolomonState* State_; // State owns dq integration, so back reference must be not smart. }; diff --git a/ydb/library/yql/providers/solomon/provider/yql_solomon_io_discovery.cpp b/ydb/library/yql/providers/solomon/provider/yql_solomon_io_discovery.cpp index 73a21638be..e2cba012ec 100644 --- a/ydb/library/yql/providers/solomon/provider/yql_solomon_io_discovery.cpp +++ b/ydb/library/yql/providers/solomon/provider/yql_solomon_io_discovery.cpp @@ -2,18 +2,85 @@ #include <ydb/library/yql/providers/common/provider/yql_provider.h> #include <ydb/library/yql/providers/solomon/expr_nodes/yql_solomon_expr_nodes.h> +#include <ydb/library/yql/providers/solomon/scheme/yql_solomon_scheme.h> -/* -#include <ydb/library/yql/core/yql_expr_optimize.h> -#include <yql/library/statface_client/client.h> - -#include <util/generic/hash_set.h> -*/ +#include <util/string/split.h> +#include <util/string/strip.h> namespace NYql { +namespace { using namespace NNodes; +std::array<TExprNode::TPtr, 2U> GetSchema(const TExprNode::TListType& settings) { + for (auto it = settings.cbegin(); settings.cend() != it; ++it) { + if (const auto item = *it; item->Head().IsAtom("userschema")) { + return {item->ChildPtr(1), item->ChildrenSize() > 2 ? item->TailPtr() : TExprNode::TPtr()}; + } + } + + return {}; +} + +TVector<TCoAtom> GetUserLabels(TPositionHandle pos, TExprContext& ctx, const TExprNode::TListType& settings) { + for (auto it = settings.cbegin(); settings.cend() != it; ++it) { + if (const auto item = *it; item->Head().IsAtom("labels")) { + TVector<TCoAtom> result; + auto labels = StringSplitter(TString(item->Tail().Content())).Split(',').SkipEmpty().ToList<TString>(); + result.reserve(labels.size()); + for (TString& label : labels) { + auto v = Build<TCoAtom>(ctx, pos).Value(StripString(label)).Done(); + result.emplace_back(std::move(v)); + } + return result; + } + } + + return {}; +} + +const TStructExprType* BuildScheme(TPositionHandle pos, const TVector<TCoAtom>& userLabels, TExprContext& ctx, TVector<TCoAtom>& systemColumns) { + auto allSystemColumns = {SOLOMON_SCHEME_KIND, SOLOMON_SCHEME_LABELS, SOLOMON_SCHEME_VALUE, SOLOMON_SCHEME_TS, SOLOMON_SCHEME_TYPE}; + TVector<const TItemExprType*> columnTypes; + columnTypes.reserve(systemColumns.size() + userLabels.size()); + const TTypeAnnotationNode* stringType = ctx.MakeType<TDataExprType>(EDataSlot::String); + for (auto systemColumn : allSystemColumns) { + const TTypeAnnotationNode* type = nullptr; + if (systemColumn == SOLOMON_SCHEME_LABELS && !userLabels.empty()) { + continue; + } + + if (systemColumn == SOLOMON_SCHEME_TS) { + type = ctx.MakeType<TDataExprType>(EDataSlot::Datetime); + } else if (systemColumn == SOLOMON_SCHEME_VALUE) { + type = ctx.MakeType<TDataExprType>(EDataSlot::Double); + } else if (systemColumn == SOLOMON_SCHEME_LABELS) { + type = ctx.MakeType<NYql::TDictExprType>(stringType, stringType); + } else if (IsIn({ SOLOMON_SCHEME_KIND, SOLOMON_SCHEME_TYPE }, systemColumn)) { + type = stringType; + } else { + ctx.AddError(TIssue(ctx.GetPosition(pos), TStringBuilder() << "Unknown system column " << systemColumn)); + return nullptr; + } + + systemColumns.push_back(Build<TCoAtom>(ctx, pos).Value(systemColumn).Done()); + columnTypes.push_back(ctx.MakeType<TItemExprType>(systemColumn, type)); + } + + for (const auto& label : userLabels) { + if (IsIn({ SOLOMON_SCHEME_TS, SOLOMON_SCHEME_KIND, SOLOMON_SCHEME_TYPE, SOLOMON_SCHEME_LABELS, SOLOMON_SCHEME_VALUE }, label.Value())) { + // tmp constraint + ctx.AddError(TIssue(ctx.GetPosition(pos), TStringBuilder() << "System column should not be used as label name: " << label.Value())); + return nullptr; + } + columnTypes.push_back(ctx.MakeType<TItemExprType>(label.Value(), stringType)); + } + + return ctx.MakeType<TStructExprType>(columnTypes); +} + +} + class TSolomonIODiscoveryTransformer : public TSyncTransformerBase { public: TSolomonIODiscoveryTransformer(TSolomonState::TPtr state) @@ -28,9 +95,7 @@ public: return TStatus::Ok; } - TVector<TIssue> issues; - - auto status = OptimizeExpr(input, output, [] (const TExprNode::TPtr& node, TExprContext& ctx) -> TExprNode::TPtr { + auto status = OptimizeExpr(input, output, [this] (const TExprNode::TPtr& node, TExprContext& ctx) -> TExprNode::TPtr { if (auto maybeWrite = TMaybeNode<TSoWrite>(node)) { if (!maybeWrite.DataSink()) { return node; @@ -51,25 +116,76 @@ public: .Add(write.Arg(3)) .Build() .Done().Ptr(); + } - } else if (TMaybeNode<TSoRead>(node).DataSource()) { - return node; + if (auto maybeRead = TMaybeNode<TSoRead>(node)) { + auto read = maybeRead.Cast(); + if (read.DataSource().Category().Value() != SolomonProviderName) { + return node; + } + + const auto& object = read.Arg(2).Ref(); + YQL_ENSURE(object.IsCallable("MrTableConcat")); + + auto cluster = read.DataSource().Cluster().StringValue(); + if (!this->State_->Configuration->_EnableReading.Get().GetOrElse(false)) { + ctx.AddError(TIssue(ctx.GetPosition(object.Pos()), TStringBuilder() << "Reading is disabled for monitoring cluster " << cluster)); + return {}; + } + + auto settings = read.Ref().Child(4); + auto settingsList = read.Ref().Child(4)->ChildrenList(); + auto userSchema = GetSchema(settingsList); + TVector<TCoAtom> userLabels = GetUserLabels(settings->Pos(), ctx, settingsList); + + auto soObject = Build<TSoObject>(ctx, read.Pos()) + .Settings(settings) + .Done(); + + TVector<TCoAtom> systemColumns; + auto* scheme = BuildScheme(settings->Pos(), userLabels, ctx, systemColumns); + if (!scheme) { + return {}; + } + + auto rowTypeNode = ExpandType(read.Pos(), *scheme, ctx); + auto systemColumnsNode = Build<TCoAtomList>(ctx, read.Pos()) + .Add(systemColumns) + .Done(); + + auto labelNamesNode = Build<TCoAtomList>(ctx, read.Pos()) + .Add(userLabels) + .Done(); + + return userSchema.back() + ? Build<TSoReadObject>(ctx, read.Pos()) + .World(read.World()) + .DataSource(read.DataSource()) + .Object(soObject) + .SystemColumns(systemColumnsNode) + .LabelNames(labelNamesNode) + .RowType(rowTypeNode) + .ColumnOrder(std::move(userSchema.back())) + .Done().Ptr() + : Build<TSoReadObject>(ctx, read.Pos()) + .World(read.World()) + .DataSource(read.DataSource()) + .Object(soObject) + .SystemColumns(systemColumnsNode) + .LabelNames(labelNamesNode) + .RowType(rowTypeNode) + .Done().Ptr(); } + return node; }, ctx, TOptimizeExprSettings {nullptr}); - if (issues) { - for (const auto& issue: issues) { - ctx.AddError(issue); - } - status = status.Combine(TStatus::Error); - } - return status; } void Rewind() final { } + private: TSolomonState::TPtr State_; }; diff --git a/ydb/library/yql/providers/solomon/provider/yql_solomon_load_meta.cpp b/ydb/library/yql/providers/solomon/provider/yql_solomon_load_meta.cpp index c4ddad581d..712877a6f2 100644 --- a/ydb/library/yql/providers/solomon/provider/yql_solomon_load_meta.cpp +++ b/ydb/library/yql/providers/solomon/provider/yql_solomon_load_meta.cpp @@ -19,7 +19,7 @@ public: return TStatus::Ok; } - return TStatus::Async; + return TStatus::Ok; } NThreading::TFuture<void> DoGetAsyncFuture(const TExprNode& input) final { diff --git a/ydb/library/yql/providers/solomon/provider/yql_solomon_mkql_compiler.cpp b/ydb/library/yql/providers/solomon/provider/yql_solomon_mkql_compiler.cpp new file mode 100644 index 0000000000..26203f5f6c --- /dev/null +++ b/ydb/library/yql/providers/solomon/provider/yql_solomon_mkql_compiler.cpp @@ -0,0 +1,50 @@ +#include "yql_solomon_mkql_compiler.h" + +#include <ydb/library/yql/providers/common/provider/yql_provider_names.h> +#include <ydb/library/yql/providers/common/mkql/parser.h> + +namespace NYql { + +using namespace NKikimr::NMiniKQL; +using namespace NNodes; + +namespace { +template <typename T> +TRuntimeNode Wrap(T wrapper, NCommon::TMkqlBuildContext& ctx) { + const auto input = MkqlBuildExpr(wrapper.Input().Ref(), ctx); + + return ctx.ProgramBuilder.ExpandMap(ctx.ProgramBuilder.ToFlow(input), [&](TRuntimeNode item) { + TRuntimeNode::TList fields; + bool isOptional = false; + auto unpacked = UnpackOptional(item, isOptional); + auto tupleType = AS_TYPE(TTupleType, unpacked); + fields.reserve(tupleType->GetElementsCount()); + for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) { + fields.push_back(ctx.ProgramBuilder.Nth(item, i)); + } + + return fields; + }); +} +} + +void RegisterDqSolomonMkqlCompilers(NCommon::TMkqlCallableCompilerBase& compiler) { + compiler.ChainCallable(TDqSourceWideWrap::CallableName(), + [](const TExprNode& node, NCommon::TMkqlBuildContext& ctx) { + if (const auto wrapper = TDqSourceWideWrap(&node); wrapper.DataSource().Category().Value() == SolomonProviderName) { + return Wrap(wrapper, ctx); + } + return TRuntimeNode(); + }); + + // compiler.ChainCallable(TDqSourceWideBlockWrap::CallableName(), + // [](const TExprNode& node, NCommon::TMkqlBuildContext& ctx) { + // if (const auto wrapper = TDqSourceWideBlockWrap(&node); wrapper.DataSource().Category().Value() == SolomonProviderName) { + // return Wrap(wrapper, ctx); + // } + + // return TRuntimeNode(); + // }); +} + +} diff --git a/ydb/library/yql/providers/solomon/provider/yql_solomon_mkql_compiler.h b/ydb/library/yql/providers/solomon/provider/yql_solomon_mkql_compiler.h new file mode 100644 index 0000000000..0c847cd8f8 --- /dev/null +++ b/ydb/library/yql/providers/solomon/provider/yql_solomon_mkql_compiler.h @@ -0,0 +1,9 @@ +#pragma once + +#include <ydb/library/yql/providers/common/mkql/yql_provider_mkql.h> + +namespace NYql { + +void RegisterDqSolomonMkqlCompilers(NCommon::TMkqlCallableCompilerBase& compiler); + +} diff --git a/ydb/library/yql/providers/solomon/provider/yql_solomon_physical_optimize.cpp b/ydb/library/yql/providers/solomon/provider/yql_solomon_physical_optimize.cpp index ece1e1c666..9574179e81 100644 --- a/ydb/library/yql/providers/solomon/provider/yql_solomon_physical_optimize.cpp +++ b/ydb/library/yql/providers/solomon/provider/yql_solomon_physical_optimize.cpp @@ -63,11 +63,23 @@ public: { #define HNDL(name) "PhysicalOptimizer-"#name, Hndl(&TSoPhysicalOptProposalTransformer::name) AddHandler(0, &TSoWriteToShard::Match, HNDL(SoWriteToShard)); + AddHandler(0, &TCoLeft::Match, HNDL(TrimReadWorld)); #undef HNDL SetGlobal(0); // Stage 0 of this optimizer is global => we can remap nodes. } + TMaybeNode<TExprBase> TrimReadWorld(TExprBase node, TExprContext& ctx) const { + Y_UNUSED(ctx); + + const auto& maybeRead = node.Cast<TCoLeft>().Input().Maybe<TSoReadObject>(); + if (!maybeRead) { + return node; + } + + return TExprBase(maybeRead.Cast().World().Ptr()); + } + TMaybeNode<TExprBase> SoWriteToShard(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) const { if (State_->IsRtmrMode()) { return node; diff --git a/ydb/library/yql/providers/solomon/scheme/ya.make b/ydb/library/yql/providers/solomon/scheme/ya.make new file mode 100644 index 0000000000..9b39d73bb9 --- /dev/null +++ b/ydb/library/yql/providers/solomon/scheme/ya.make @@ -0,0 +1,7 @@ +LIBRARY() + +SRCS( + yql_solomon_scheme.h +) + +END() diff --git a/ydb/library/yql/providers/solomon/scheme/yql_solomon_scheme.h b/ydb/library/yql/providers/solomon/scheme/yql_solomon_scheme.h new file mode 100644 index 0000000000..e043b2c752 --- /dev/null +++ b/ydb/library/yql/providers/solomon/scheme/yql_solomon_scheme.h @@ -0,0 +1,11 @@ +#pragma once
+
+#include <util/generic/strbuf.h>
+
+namespace NYql {
+ constexpr TStringBuf SOLOMON_SCHEME_KIND = "kind"sv;
+ constexpr TStringBuf SOLOMON_SCHEME_LABELS = "labels"sv;
+ constexpr TStringBuf SOLOMON_SCHEME_VALUE = "value"sv;
+ constexpr TStringBuf SOLOMON_SCHEME_TYPE = "type"sv;
+ constexpr TStringBuf SOLOMON_SCHEME_TS = "ts"sv;
+}
diff --git a/ydb/library/yql/providers/solomon/ya.make b/ydb/library/yql/providers/solomon/ya.make index 9e1893e851..df43a1f649 100644 --- a/ydb/library/yql/providers/solomon/ya.make +++ b/ydb/library/yql/providers/solomon/ya.make @@ -3,4 +3,5 @@ RECURSE( expr_nodes gateway provider + scheme ) diff --git a/ydb/library/yql/sql/v1/sql_translation.cpp b/ydb/library/yql/sql/v1/sql_translation.cpp index 67119a1994..ce2670030f 100644 --- a/ydb/library/yql/sql/v1/sql_translation.cpp +++ b/ydb/library/yql/sql/v1/sql_translation.cpp @@ -1054,11 +1054,6 @@ bool TSqlTranslation::TableRefImpl(const TRule_table_ref& node, TTableRef& resul } } - if (service == SolomonProviderName) { - Ctx.Error() << "Selecting data from monitoring source is not supported"; - return false; - } - TTableRef tr(Context().MakeName("table"), service, cluster, nullptr); TPosition pos(Context().Pos()); TTableHints hints = GetContextHints(Ctx); diff --git a/ydb/library/yql/tools/dqrun/dqrun.cpp b/ydb/library/yql/tools/dqrun/dqrun.cpp index 9ac2e5044d..d9d5f081a6 100644 --- a/ydb/library/yql/tools/dqrun/dqrun.cpp +++ b/ydb/library/yql/tools/dqrun/dqrun.cpp @@ -40,6 +40,7 @@ #include <ydb/library/yql/providers/function/provider/dq_function_provider.h> #include <ydb/library/yql/providers/s3/provider/yql_s3_provider.h> #include <ydb/library/yql/providers/s3/actors/yql_s3_actors_factory_impl.h> +#include <ydb/library/yql/providers/solomon/async_io/dq_solomon_read_actor.h> #include <ydb/library/yql/providers/solomon/gateway/yql_solomon_gateway.h> #include <ydb/library/yql/providers/solomon/provider/yql_solomon_provider.h> #include <ydb/library/yql/providers/pg/provider/yql_pg_provider.h> @@ -276,6 +277,7 @@ NDq::IDqAsyncIoFactory::TPtr CreateAsyncIoFactory( } RegisterDqPqReadActorFactory(*factory, driver, nullptr); RegisterYdbReadActorFactory(*factory, driver, nullptr); + RegisterDQSolomonReadActorFactory(*factory, nullptr); RegisterClickHouseReadActorFactory(*factory, nullptr, httpGateway); RegisterGenericProviderFactories(*factory, credentialsFactory, genericClient); RegisterDqPqWriteActorFactory(*factory, driver, nullptr); diff --git a/ydb/library/yql/tools/dqrun/ya.make b/ydb/library/yql/tools/dqrun/ya.make index 38d7fb919f..8a19b6aee6 100644 --- a/ydb/library/yql/tools/dqrun/ya.make +++ b/ydb/library/yql/tools/dqrun/ya.make @@ -57,6 +57,7 @@ ENDIF() ydb/library/yql/providers/pq/provider ydb/library/yql/providers/s3/actors ydb/library/yql/providers/s3/provider + ydb/library/yql/providers/solomon/async_io ydb/library/yql/providers/solomon/gateway ydb/library/yql/providers/solomon/provider ydb/library/yql/providers/ydb/actors |