aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoruzhastik <uzhas@ydb.tech>2024-06-18 12:47:32 +0300
committerGitHub <noreply@github.com>2024-06-18 12:47:32 +0300
commitdba296f900921846d3e18083e75caab61cf88cc0 (patch)
tree41653c63389f83facfe5ae2f31e3e452ff3cfaf6
parent0bf02f135dcff9f368ac1ccffd090124753fd564 (diff)
downloadydb-dba296f900921846d3e18083e75caab61cf88cc0.tar.gz
initial version for reading from solomon (#5571)
-rw-r--r--ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp6
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp4
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp8
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_async_compute_actor.h2
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp6
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor.h2
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input_helper.h4
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h1
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h5
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h1
-rw-r--r--ydb/library/yql/dq/actors/compute/ya.make1
-rw-r--r--ydb/library/yql/dq/actors/task_runner/events.h6
-rw-r--r--ydb/library/yql/providers/dq/actors/compute_actor.cpp2
-rw-r--r--ydb/library/yql/providers/dq/actors/worker_actor.cpp11
-rw-r--r--ydb/library/yql/providers/dq/actors/worker_actor.h3
-rw-r--r--ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp3
-rw-r--r--ydb/library/yql/providers/solomon/async_io/dq_solomon_read_actor.cpp454
-rw-r--r--ydb/library/yql/providers/solomon/async_io/dq_solomon_read_actor.h29
-rw-r--r--ydb/library/yql/providers/solomon/async_io/ya.make7
-rw-r--r--ydb/library/yql/providers/solomon/expr_nodes/yql_solomon_expr_nodes.json46
-rw-r--r--ydb/library/yql/providers/solomon/proto/dq_solomon_shard.proto27
-rw-r--r--ydb/library/yql/providers/solomon/provider/ya.make4
-rw-r--r--ydb/library/yql/providers/solomon/provider/yql_solomon_config.cpp1
-rw-r--r--ydb/library/yql/providers/solomon/provider/yql_solomon_config.h1
-rw-r--r--ydb/library/yql/providers/solomon/provider/yql_solomon_datasource.cpp45
-rw-r--r--ydb/library/yql/providers/solomon/provider/yql_solomon_datasource_type_ann.cpp161
-rw-r--r--ydb/library/yql/providers/solomon/provider/yql_solomon_dq_integration.cpp211
-rw-r--r--ydb/library/yql/providers/solomon/provider/yql_solomon_io_discovery.cpp152
-rw-r--r--ydb/library/yql/providers/solomon/provider/yql_solomon_load_meta.cpp2
-rw-r--r--ydb/library/yql/providers/solomon/provider/yql_solomon_mkql_compiler.cpp50
-rw-r--r--ydb/library/yql/providers/solomon/provider/yql_solomon_mkql_compiler.h9
-rw-r--r--ydb/library/yql/providers/solomon/provider/yql_solomon_physical_optimize.cpp12
-rw-r--r--ydb/library/yql/providers/solomon/scheme/ya.make7
-rw-r--r--ydb/library/yql/providers/solomon/scheme/yql_solomon_scheme.h11
-rw-r--r--ydb/library/yql/providers/solomon/ya.make1
-rw-r--r--ydb/library/yql/sql/v1/sql_translation.cpp5
-rw-r--r--ydb/library/yql/tools/dqrun/dqrun.cpp2
-rw-r--r--ydb/library/yql/tools/dqrun/ya.make1
38 files changed, 1221 insertions, 82 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
index cd610279b8..1c00eca6c7 100644
--- a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
@@ -1,7 +1,7 @@
#include "kqp_pure_compute_actor.h"
+#include <ydb/core/base/appdata.h>
#include <ydb/core/base/feature_flags.h>
-
namespace NKikimr {
namespace NKqp {
@@ -15,7 +15,7 @@ TKqpComputeActor::TKqpComputeActor(const TActorId& executerId, ui64 txId, NDqPro
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
NWilson::TTraceId traceId, TIntrusivePtr<NActors::TProtoArenaHolder> arena,
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings)
- : TBase(executerId, txId, task, std::move(asyncIoFactory), settings, memoryLimits, /* ownMemoryQuota = */ true, /* passExceptions = */ true, /*taskCounters = */ nullptr, std::move(traceId), std::move(arena), GUCSettings)
+ : TBase(executerId, txId, task, std::move(asyncIoFactory), AppData()->FunctionRegistry, settings, memoryLimits, /* ownMemoryQuota = */ true, /* passExceptions = */ true, /*taskCounters = */ nullptr, std::move(traceId), std::move(arena), GUCSettings)
, ComputeCtx(settings.StatsMode)
, FederatedQuerySetup(federatedQuerySetup)
{
@@ -42,7 +42,7 @@ void TKqpComputeActor::DoBootstrap() {
TDqTaskRunnerContext execCtx;
- execCtx.FuncRegistry = AppData()->FunctionRegistry;
+ execCtx.FuncRegistry = TBase::FunctionRegistry;
execCtx.RandomProvider = TAppData::RandomProvider.Get();
execCtx.TimeProvider = TAppData::TimeProvider.Get();
execCtx.ComputeCtx = &ComputeCtx;
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
index c2876e8ee2..b144be2b09 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
@@ -27,7 +27,7 @@ TKqpScanComputeActor::TKqpScanComputeActor(const TActorId& executerId, ui64 txId
IDqAsyncIoFactory::TPtr asyncIoFactory,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId,
TIntrusivePtr<NActors::TProtoArenaHolder> arena)
- : TBase(executerId, txId, task, std::move(asyncIoFactory), settings,
+ : TBase(executerId, txId, task, std::move(asyncIoFactory), AppData()->FunctionRegistry, settings,
memoryLimits, /* ownMemoryQuota = */ true, /* passExceptions = */ true, /*taskCounters = */ nullptr, std::move(traceId), std::move(arena))
, ComputeCtx(settings.StatsMode)
{
@@ -180,7 +180,7 @@ void TKqpScanComputeActor::PollSources(ui64 prevFreeSpace) {
void TKqpScanComputeActor::DoBootstrap() {
CA_LOG_D("EVLOGKQP START");
NDq::TDqTaskRunnerContext execCtx;
- execCtx.FuncRegistry = AppData()->FunctionRegistry;
+ execCtx.FuncRegistry = TBase::FunctionRegistry;
execCtx.ComputeCtx = &ComputeCtx;
execCtx.ComputationFactory = NMiniKQL::GetKqpActorComputeFactory(&ComputeCtx, std::nullopt);
execCtx.RandomProvider = TAppData::RandomProvider.Get();
diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
index 0208b17f74..945c50cdc1 100644
--- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
@@ -75,13 +75,13 @@ public:
static constexpr bool HasAsyncTaskRunner = true;
TDqAsyncComputeActor(const TActorId& executerId, const TTxId& txId, NDqProto::TDqTask* task,
- IDqAsyncIoFactory::TPtr asyncIoFactory,
+ IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
const NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory,
const ::NMonitoring::TDynamicCounterPtr& taskCounters,
const TActorId& quoterServiceActorId,
bool ownCounters)
- : TBase(executerId, txId, task, std::move(asyncIoFactory), settings, memoryLimits, /* ownMemoryQuota = */ false, false, taskCounters)
+ : TBase(executerId, txId, task, std::move(asyncIoFactory), functionRegistry, settings, memoryLimits, /* ownMemoryQuota = */ false, false, taskCounters)
, TaskRunnerActorFactory(taskRunnerActorFactory)
, ReadyToCheckpointFlag(false)
, SentStatsRequest(false)
@@ -1164,7 +1164,7 @@ private:
IActor* CreateDqAsyncComputeActor(const TActorId& executerId, const TTxId& txId, NYql::NDqProto::TDqTask* task,
- IDqAsyncIoFactory::TPtr asyncIoFactory,
+ IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
const NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory,
::NMonitoring::TDynamicCounterPtr taskCounters,
@@ -1172,7 +1172,7 @@ IActor* CreateDqAsyncComputeActor(const TActorId& executerId, const TTxId& txId,
bool ownCounters)
{
return new TDqAsyncComputeActor(executerId, txId, task, std::move(asyncIoFactory),
- settings, memoryLimits, taskRunnerActorFactory, taskCounters, quoterServiceActorId, ownCounters);
+ functionRegistry, settings, memoryLimits, taskRunnerActorFactory, taskCounters, quoterServiceActorId, ownCounters);
}
} // namespace NDq
diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.h b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.h
index 4492c2a656..726b329ec5 100644
--- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.h
+++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.h
@@ -18,7 +18,7 @@ namespace NYql {
namespace NDq {
NActors::IActor* CreateDqAsyncComputeActor(const NActors::TActorId& executerId, const TTxId& txId, NDqProto::TDqTask* task,
- IDqAsyncIoFactory::TPtr asyncIoFactory,
+ IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
const NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory,
::NMonitoring::TDynamicCounterPtr taskCounters = nullptr,
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp
index 6f73c08bd7..8851f301f7 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp
@@ -35,10 +35,11 @@ public:
TDqComputeActor(const TActorId& executerId, const TTxId& txId, NDqProto::TDqTask* task,
IDqAsyncIoFactory::TPtr asyncIoFactory,
+ const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
const TTaskRunnerFactory& taskRunnerFactory,
::NMonitoring::TDynamicCounterPtr taskCounters)
- : TBase(executerId, txId, task, std::move(asyncIoFactory), settings, memoryLimits, true, false, taskCounters)
+ : TBase(executerId, txId, task, std::move(asyncIoFactory), functionRegistry, settings, memoryLimits, true, false, taskCounters)
, TaskRunnerFactory(taskRunnerFactory)
{
InitializeTask();
@@ -74,12 +75,13 @@ private:
IActor* CreateDqComputeActor(const TActorId& executerId, const TTxId& txId, NYql::NDqProto::TDqTask* task,
IDqAsyncIoFactory::TPtr asyncIoFactory,
+ const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
const TTaskRunnerFactory& taskRunnerFactory,
::NMonitoring::TDynamicCounterPtr taskCounters)
{
return new TDqComputeActor(executerId, txId, task, std::move(asyncIoFactory),
- settings, memoryLimits, taskRunnerFactory, taskCounters);
+ functionRegistry, settings, memoryLimits, taskRunnerFactory, taskCounters);
}
} // namespace NDq
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h
index e338445ab7..062c090f2f 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h
@@ -382,7 +382,7 @@ void FillTaskRunnerStats(ui64 taskId, ui32 stageId, const TTaskRunnerStatsBase&
NDqProto::TDqTaskStats* protoTask, TCollectStatsLevel level);
NActors::IActor* CreateDqComputeActor(const NActors::TActorId& executerId, const TTxId& txId, NDqProto::TDqTask* task,
- IDqAsyncIoFactory::TPtr asyncIoFactory,
+ IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
const TTaskRunnerFactory& taskRunnerFactory,
::NMonitoring::TDynamicCounterPtr taskCounters = nullptr);
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input_helper.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input_helper.h
index 0a3c18445e..5e64edf2ab 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input_helper.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input_helper.h
@@ -4,6 +4,8 @@
#include "dq_compute_actor_metrics.h"
#include "dq_compute_actor_watermarks.h"
+#include <ydb/library/yql/minikql/mkql_program_builder.h>
+
//must be included the last
#include "dq_compute_actor_log.h"
@@ -22,6 +24,7 @@ struct TComputeActorAsyncInputHelper {
const NDqProto::EWatermarksMode WatermarksMode = NDqProto::EWatermarksMode::WATERMARKS_MODE_DISABLED;
const NKikimr::NMiniKQL::TType* ValueType = nullptr;
TMaybe<TInstant> PendingWatermark = Nothing();
+ TMaybe<NKikimr::NMiniKQL::TProgramBuilder> ProgramBuilder;
public:
TComputeActorAsyncInputHelper(
const TString& logPrefix,
@@ -122,4 +125,3 @@ public:
};
} //namespace NYql::NDq
-
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h
index 85a60599bd..f0d59f7142 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h
@@ -266,6 +266,7 @@ public:
const NActors::TActorId& ComputeActorId;
const NKikimr::NMiniKQL::TTypeEnvironment& TypeEnv;
const NKikimr::NMiniKQL::THolderFactory& HolderFactory;
+ NKikimr::NMiniKQL::TProgramBuilder& ProgramBuilder;
::NMonitoring::TDynamicCounterPtr TaskCounters;
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
IMemoryQuotaManager::TPtr MemoryQuotaManager;
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
index 3af5f52f87..df6878de92 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
@@ -161,6 +161,7 @@ public:
protected:
TDqComputeActorBase(const NActors::TActorId& executerId, const TTxId& txId, NDqProto::TDqTask* task,
IDqAsyncIoFactory::TPtr asyncIoFactory,
+ const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
bool ownMemoryQuota = true, bool passExceptions = false,
const ::NMonitoring::TDynamicCounterPtr& taskCounters = nullptr,
@@ -174,6 +175,7 @@ protected:
, MemoryLimits(memoryLimits)
, CanAllocateExtraMemory(RuntimeSettings.ExtraMemoryAllocationPool != 0)
, AsyncIoFactory(std::move(asyncIoFactory))
+ , FunctionRegistry(functionRegistry)
, CheckpointingMode(GetTaskCheckpointingMode(Task))
, State(Task.GetCreateSuspended() ? NDqProto::COMPUTE_STATE_UNKNOWN : NDqProto::COMPUTE_STATE_EXECUTING)
, WatermarksTracker(this->SelfId(), TxId, Task.GetId())
@@ -1256,6 +1258,7 @@ protected:
const auto& inputDesc = Task.GetInputs(inputIndex);
Y_ABORT_UNLESS(inputDesc.HasSource());
source.Type = inputDesc.GetSource().GetType();
+ source.ProgramBuilder.ConstructInPlace(typeEnv, *FunctionRegistry);
const auto& settings = Task.GetSourceSettings();
Y_ABORT_UNLESS(settings.empty() || inputIndex < settings.size());
CA_LOG_D("Create source for input " << inputIndex << " " << inputDesc);
@@ -1273,6 +1276,7 @@ protected:
.ComputeActorId = this->SelfId(),
.TypeEnv = typeEnv,
.HolderFactory = holderFactory,
+ .ProgramBuilder = *source.ProgramBuilder,
.TaskCounters = TaskCounters,
.Alloc = Alloc,
.MemoryQuotaManager = MemoryLimits.MemoryQuotaManager,
@@ -1856,6 +1860,7 @@ protected:
TComputeMemoryLimits MemoryLimits;
const bool CanAllocateExtraMemory = false;
const IDqAsyncIoFactory::TPtr AsyncIoFactory;
+ const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry = nullptr;
const NDqProto::ECheckpointingMode CheckpointingMode;
TDqComputeActorChannels* Channels = nullptr;
TDqComputeActorCheckpoints* Checkpoints = nullptr;
diff --git a/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h b/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h
index 820ff26887..8a776d83fb 100644
--- a/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h
+++ b/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h
@@ -369,4 +369,3 @@ protected:
};
} //namespace NYql::NDq
-
diff --git a/ydb/library/yql/dq/actors/compute/ya.make b/ydb/library/yql/dq/actors/compute/ya.make
index b0d50b84ea..3cec159c24 100644
--- a/ydb/library/yql/dq/actors/compute/ya.make
+++ b/ydb/library/yql/dq/actors/compute/ya.make
@@ -26,6 +26,7 @@ PEERDIR(
ydb/library/yql/dq/runtime
ydb/library/yql/dq/tasks
ydb/library/yql/dq/actors/spilling
+ ydb/library/yql/minikql
ydb/library/yql/minikql/comp_nodes
ydb/library/yql/public/issue
ydb/core/quoter/public
diff --git a/ydb/library/yql/dq/actors/task_runner/events.h b/ydb/library/yql/dq/actors/task_runner/events.h
index bf5d38965a..c95e6ad476 100644
--- a/ydb/library/yql/dq/actors/task_runner/events.h
+++ b/ydb/library/yql/dq/actors/task_runner/events.h
@@ -28,12 +28,12 @@ struct TTaskRunnerEvents {
EvOutputChannelDataRequest,
EvOutputChannelData,
-
+
EvInputChannelData,
EvInputChannelDataAck,
-
+
// EvContinueRun -> TaskRunner->Run() -> TEvTaskRunFinished
- EvContinueRun,
+ EvContinueRun,
EvRunFinished,
EvSourceDataAck,
diff --git a/ydb/library/yql/providers/dq/actors/compute_actor.cpp b/ydb/library/yql/providers/dq/actors/compute_actor.cpp
index f870334303..ac7f47a232 100644
--- a/ydb/library/yql/providers/dq/actors/compute_actor.cpp
+++ b/ydb/library/yql/providers/dq/actors/compute_actor.cpp
@@ -67,6 +67,7 @@ IActor* CreateComputeActor(
operationId,
task,
options.AsyncIoFactory,
+ options.FunctionRegistry,
computeRuntimeSettings,
memoryLimits,
taskRunnerFactory,
@@ -77,6 +78,7 @@ IActor* CreateComputeActor(
operationId,
task,
options.AsyncIoFactory,
+ options.FunctionRegistry,
computeRuntimeSettings,
memoryLimits,
taskRunnerActorFactory,
diff --git a/ydb/library/yql/providers/dq/actors/worker_actor.cpp b/ydb/library/yql/providers/dq/actors/worker_actor.cpp
index 59e28aa949..88371368e8 100644
--- a/ydb/library/yql/providers/dq/actors/worker_actor.cpp
+++ b/ydb/library/yql/providers/dq/actors/worker_actor.cpp
@@ -11,6 +11,7 @@
#include <ydb/library/yql/utils/log/log.h>
#include <ydb/library/yql/minikql/mkql_string_util.h>
+#include <ydb/library/yql/minikql/mkql_program_builder.h>
#include <ydb/library/actors/core/event_pb.h>
#include <ydb/library/actors/core/hfunc.h>
@@ -56,6 +57,7 @@ struct TSourceInfo {
bool PushStarted = false;
bool Finished = false;
NKikimr::NMiniKQL::TTypeEnvironment* TypeEnv = nullptr;
+ std::optional<NKikimr::NMiniKQL::TProgramBuilder> ProgramBuilder;
};
struct TSinkInfo {
@@ -93,10 +95,12 @@ public:
explicit TDqWorker(
const ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory,
const IDqAsyncIoFactory::TPtr& asyncIoFactory,
+ const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
TWorkerRuntimeData* runtimeData,
const TString& traceId)
: TRichActor<TDqWorker>(&TDqWorker::Handler)
, AsyncIoFactory(asyncIoFactory)
+ , FunctionRegistry(functionRegistry)
, TaskRunnerActorFactory(taskRunnerActorFactory)
, RuntimeData(runtimeData)
, TraceId(traceId)
@@ -294,6 +298,7 @@ private:
if (input.HasSource()) {
auto& source = SourcesMap[inputId];
source.TypeEnv = const_cast<NKikimr::NMiniKQL::TTypeEnvironment*>(&typeEnv);
+ source.ProgramBuilder.emplace(*source.TypeEnv, *FunctionRegistry);
std::tie(source.Source, source.Actor) =
AsyncIoFactory->CreateDqSource(
IDqAsyncIoFactory::TSourceArguments {
@@ -307,6 +312,7 @@ private:
.ComputeActorId = SelfId(),
.TypeEnv = typeEnv,
.HolderFactory = holderFactory,
+ .ProgramBuilder = *source.ProgramBuilder,
.MemoryQuotaManager = MemoryQuotaManager
});
RegisterLocalChild(source.Actor);
@@ -769,6 +775,7 @@ private:
/*_________________________________________________________*/
IDqAsyncIoFactory::TPtr AsyncIoFactory;
+ const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry;
ITaskRunnerActorFactory::TPtr TaskRunnerActorFactory;
NTaskRunnerActor::ITaskRunnerActor* Actor = nullptr;
TActorId TaskRunnerActor;
@@ -808,13 +815,15 @@ NActors::IActor* CreateWorkerActor(
TWorkerRuntimeData* runtimeData,
const TString& traceId,
const ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory,
- const IDqAsyncIoFactory::TPtr& asyncIoFactory)
+ const IDqAsyncIoFactory::TPtr& asyncIoFactory,
+ const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry)
{
Y_ABORT_UNLESS(taskRunnerActorFactory);
return new TLogWrapReceive(
new TDqWorker(
taskRunnerActorFactory,
asyncIoFactory,
+ functionRegistry,
runtimeData,
traceId), traceId);
}
diff --git a/ydb/library/yql/providers/dq/actors/worker_actor.h b/ydb/library/yql/providers/dq/actors/worker_actor.h
index 438578fd97..4e06e3a829 100644
--- a/ydb/library/yql/providers/dq/actors/worker_actor.h
+++ b/ydb/library/yql/providers/dq/actors/worker_actor.h
@@ -25,6 +25,7 @@ namespace NYql::NDqs {
TWorkerRuntimeData* runtimeData,
const TString& traceId,
const NDq::NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory,
- const NDq::IDqAsyncIoFactory::TPtr& asyncIoFactory);
+ const NDq::IDqAsyncIoFactory::TPtr& asyncIoFactory,
+ const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry);
} // namespace NYql::NDqs
diff --git a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp
index fca8b7a426..1161237803 100644
--- a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp
+++ b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp
@@ -392,7 +392,8 @@ private:
Options.RuntimeData,
traceId,
Options.TaskRunnerActorFactory,
- Options.AsyncIoFactory));
+ Options.AsyncIoFactory,
+ Options.FunctionRegistry));
}
allocationInfo.WorkerActors.ActorIds.emplace_back(RegisterChild(
actor.Release(), createComputeActor ? NYql::NDq::TEvDq::TEvAbortExecution::Unavailable("Aborted by LWM").Release() : nullptr
diff --git a/ydb/library/yql/providers/solomon/async_io/dq_solomon_read_actor.cpp b/ydb/library/yql/providers/solomon/async_io/dq_solomon_read_actor.cpp
new file mode 100644
index 0000000000..c027deb05b
--- /dev/null
+++ b/ydb/library/yql/providers/solomon/async_io/dq_solomon_read_actor.cpp
@@ -0,0 +1,454 @@
+#include "dq_solomon_read_actor.h"
+
+#include <ydb/library/yql/providers/solomon/scheme/yql_solomon_scheme.h>
+
+#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h>
+#include <ydb/library/yql/dq/actors/protos/dq_events.pb.h>
+#include <ydb/library/yql/dq/actors/compute/dq_checkpoints_states.h>
+
+#include <ydb/library/yql/minikql/comp_nodes/mkql_saveload.h>
+#include <ydb/library/yql/minikql/mkql_alloc.h>
+#include <ydb/library/yql/minikql/mkql_program_builder.h>
+#include <ydb/library/yql/minikql/mkql_string_util.h>
+
+#include <ydb/library/yql/public/udf/udf_data_type.h>
+
+#include <ydb/library/yql/utils/actor_log/log.h>
+#include <ydb/library/yql/utils/actors/http_sender_actor.h>
+#include <ydb/library/yql/utils/log/log.h>
+#include <ydb/library/yql/utils/url_builder.h>
+#include <ydb/library/yql/utils/yql_panic.h>
+
+#include <ydb/library/actors/core/actor.h>
+#include <ydb/library/actors/core/event_local.h>
+#include <ydb/library/actors/core/events.h>
+#include <ydb/library/actors/core/hfunc.h>
+#include <ydb/library/actors/core/log.h>
+#include <ydb/library/actors/http/http_proxy.h>
+#include <library/cpp/json/json_reader.h>
+
+
+#include <util/generic/algorithm.h>
+#include <util/generic/hash.h>
+#include <util/system/compiler.h>
+
+#define SINK_LOG_T(s) \
+ LOG_TRACE_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s)
+#define SINK_LOG_D(s) \
+ LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s)
+#define SINK_LOG_I(s) \
+ LOG_INFO_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s)
+#define SINK_LOG_W(s) \
+ LOG_WARN_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s)
+#define SINK_LOG_N(s) \
+ LOG_NOTICE_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s)
+#define SINK_LOG_E(s) \
+ LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s)
+#define SINK_LOG_C(s) \
+ LOG_CRIT_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s)
+#define SINK_LOG(prio, s) \
+ LOG_LOG_S(*NActors::TlsActivationContext, prio, NKikimrServices::KQP_COMPUTE, LogPrefix << s)
+
+namespace NYql::NDq {
+
+using namespace NActors;
+using namespace NLog;
+using namespace NKikimr::NMiniKQL;
+
+namespace {
+
+enum ESystemColumn{
+ SC_KIND = 0,
+ SC_LABELS,
+ SC_VALUE,
+ SC_TYPE,
+ SC_TS
+};
+
+struct TDqSolomonReadParams {
+ NSo::NProto::TDqSolomonSource Source;
+};
+
+TString GetReadSolomonUrl(const TString& endpoint, bool useSsl, const TString& project, const ::NYql::NSo::NProto::ESolomonClusterType& type) {
+ TUrlBuilder builder((useSsl ? "https://" : "http://") + endpoint);
+
+ switch (type) {
+ case NSo::NProto::ESolomonClusterType::CT_SOLOMON: {
+ builder.AddPathComponent("api");
+ builder.AddPathComponent("v2");
+ builder.AddPathComponent("projects");
+ builder.AddPathComponent(project);
+ builder.AddPathComponent("sensors");
+ builder.AddPathComponent("data");
+ break;
+ }
+ case NSo::NProto::ESolomonClusterType::CT_MONITORING: {
+ [[fallthrough]];
+ }
+ default:
+ Y_ENSURE(false, "Invalid cluster type " << ToString<ui32>(type));
+ }
+
+ return builder.Build();
+}
+
+auto RetryPolicy = NYql::NDq::THttpSenderRetryPolicy::GetExponentialBackoffPolicy(
+ [](const NHttp::TEvHttpProxy::TEvHttpIncomingResponse* resp){
+ if (!resp || !resp->Response) {
+ // Connection wasn't established. Should retry.
+ return ERetryErrorClass::ShortRetry;
+ }
+
+ if (resp->Response->Status == "401") {
+ return ERetryErrorClass::NoRetry;
+ }
+
+ return ERetryErrorClass::ShortRetry;
+ });
+
+} // namespace
+
+
+class TDqSolomonReadActor : public NActors::TActorBootstrapped<TDqSolomonReadActor>, public IDqComputeActorAsyncInput {
+public:
+ static constexpr char ActorName[] = "DQ_SOLOMON_READ_ACTOR";
+
+ TDqSolomonReadActor(
+ ui64 inputIndex,
+ TCollectStatsLevel statsLevel,
+ const TTxId& txId,
+ const NActors::TActorId& computeActorId,
+ const THolderFactory& holderFactory,
+ NKikimr::NMiniKQL::TProgramBuilder& programBuilder,
+ TDqSolomonReadParams&& readParams,
+ const ::NMonitoring::TDynamicCounterPtr& counters,
+ std::shared_ptr<NYdb::ICredentialsProvider> credentialsProvider
+ )
+ : InputIndex(inputIndex)
+ , TxId(txId)
+ , ComputeActorId(computeActorId)
+ , HolderFactory(holderFactory)
+ , ProgramBuilder(programBuilder)
+ , LogPrefix(TStringBuilder() << "TxId: " << TxId << ", Solomon source. ")
+ , ReadParams(std::move(readParams))
+ , Url(GetUrl())
+ , CredentialsProvider(credentialsProvider)
+ {
+ Y_UNUSED(counters);
+ SINK_LOG_D("Init");
+ IngressStats.Level = statsLevel;
+
+ auto stringType = ProgramBuilder.NewDataType(NYql::NUdf::TDataType<char*>::Id);
+ DictType = ProgramBuilder.NewDictType(stringType, stringType, false);
+
+ FillSystemColumnPositionindex();
+ }
+
+ void FillSystemColumnPositionindex() {
+ std::vector<TString> names(ReadParams.Source.GetSystemColumns().begin(), ReadParams.Source.GetSystemColumns().end());
+ names.insert(names.end(), ReadParams.Source.GetLabelNames().begin(), ReadParams.Source.GetLabelNames().end());
+ std::sort(names.begin(), names.end());
+ size_t index = 0;
+ for (auto& n : names) {
+ Index[n] = index++;
+ }
+ }
+
+ void Bootstrap() {
+ Become(&TDqSolomonReadActor::StateFunc);
+ RequestMetrics();
+ }
+
+ STRICT_STFUNC(StateFunc,
+ hFunc(TEvHttpBase::TEvSendResult, Handle);
+ )
+
+ i64 GetAsyncInputData(TUnboxedValueBatch& buffer, TMaybe<TInstant>&, bool& finished, i64 freeSpace) final {
+ Y_UNUSED(freeSpace);
+ YQL_ENSURE(!buffer.IsWide(), "Wide stream is not supported");
+
+ for (auto j : Batch) {
+ auto& a = j["vector"].GetArray();
+ for (auto& aa : a) {
+ auto& series = aa["timeseries"];
+ auto& kind = series["kind"];
+ auto& type = series["type"];
+ auto& labels = series["labels"];
+ auto dictValueBuilder = HolderFactory.NewDict(DictType, 0);
+ for (auto& [k, v]: labels.GetMap()) {
+ dictValueBuilder->Add(NKikimr::NMiniKQL::MakeString(k), NKikimr::NMiniKQL::MakeString(v.GetString()));
+ }
+ auto dictValue = dictValueBuilder->Build();
+
+ auto& timestamps = series["timestamps"].GetArray();
+ auto& values = series["values"].GetArray();
+
+ for (size_t i = 0; i < timestamps.size(); ++i){
+ NUdf::TUnboxedValue* items = nullptr;
+ auto value = HolderFactory.CreateDirectArrayHolder(ReadParams.Source.GetSystemColumns().size() + ReadParams.Source.GetLabelNames().size(), items);
+ if (auto it = Index.find(SOLOMON_SCHEME_KIND); it != Index.end()) {
+ items[it->second] = NKikimr::NMiniKQL::MakeString(kind.GetString());
+ }
+
+ if (auto it = Index.find(SOLOMON_SCHEME_LABELS); it != Index.end()) {
+ items[it->second] = dictValue;
+ }
+
+ if (auto it = Index.find(SOLOMON_SCHEME_VALUE); it != Index.end()) {
+ items[it->second] = NUdf::TUnboxedValuePod(values[i].GetDouble());
+ }
+
+ if (auto it = Index.find(SOLOMON_SCHEME_TYPE); it != Index.end()) {
+ items[it->second] = NKikimr::NMiniKQL::MakeString(type.GetString());
+ }
+
+ if (auto it = Index.find(SOLOMON_SCHEME_TS); it != Index.end()) {
+ // convert ms to sec
+ items[it->second] = NUdf::TUnboxedValuePod((ui64)timestamps[i].GetUInteger() / 1000);
+ }
+
+ for (const auto& c : ReadParams.Source.GetLabelNames()) {
+ auto& v = items[Index[c]];
+ auto it = labels.GetMap().find(c);
+ if (it != labels.GetMap().end()) {
+ v = NKikimr::NMiniKQL::MakeString(it->second.GetString());
+ } else {
+ // empty string
+ v = NKikimr::NMiniKQL::MakeString("");
+ }
+ }
+
+ buffer.push_back(value);
+ }
+ }
+ }
+
+ finished = !Batch.empty();
+ Batch.clear();
+ return 0;
+ }
+
+ void SaveState(const NDqProto::TCheckpoint&, TSourceState&) final {}
+ void LoadState(const TSourceState&) override { }
+ void CommitState(const NDqProto::TCheckpoint&) override { }
+
+ ui64 GetInputIndex() const override {
+ return InputIndex;
+ }
+
+ const TDqAsyncStats& GetIngressStats() const override {
+ return IngressStats;
+ }
+
+private:
+ // IActor & IDqComputeActorAsyncInput
+ void PassAway() override { // Is called from Compute Actor
+ if (HttpProxyId) {
+ Send(HttpProxyId, new TEvents::TEvPoison());
+ }
+
+ TActor<TDqSolomonReadActor>::PassAway();
+ }
+
+private:
+ TSourceState BuildState() { return {}; }
+
+ void NotifyComputeActorWithData() {
+ Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex));
+ }
+
+ TString GetUrl() const {
+ return GetReadSolomonUrl(ReadParams.Source.GetEndpoint(),
+ ReadParams.Source.GetUseSsl(),
+ ReadParams.Source.GetProject(),
+ ReadParams.Source.GetClusterType());
+ }
+
+ NHttp::THttpOutgoingRequestPtr BuildSolomonRequest(TStringBuf data) {
+ NHttp::THttpOutgoingRequestPtr httpRequest = NHttp::THttpOutgoingRequest::CreateRequestPost(Url);
+ FillAuth(httpRequest);
+ httpRequest->Set<&NHttp::THttpRequest::ContentType>("application/json");
+ httpRequest->Set<&NHttp::THttpRequest::Body>(data);
+ return httpRequest;
+ }
+
+ void FillAuth(NHttp::THttpOutgoingRequestPtr& httpRequest) {
+ const TString authorizationHeader = "Authorization";
+ const TString authToken = CredentialsProvider->GetAuthInfo();
+
+ switch (ReadParams.Source.GetClusterType()) {
+ case NSo::NProto::ESolomonClusterType::CT_SOLOMON:
+ httpRequest->Set(authorizationHeader, "OAuth " + authToken);
+ break;
+ case NSo::NProto::ESolomonClusterType::CT_MONITORING:
+ httpRequest->Set(authorizationHeader, "Bearer " + authToken);
+ break;
+ default:
+ Y_ENSURE(false, "Invalid cluster type " << ToString<ui32>(ReadParams.Source.GetClusterType()));
+ }
+ }
+
+ void RequestMetrics() {
+ if (Y_UNLIKELY(!HttpProxyId)) {
+ HttpProxyId = Register(NHttp::CreateHttpProxy(NMonitoring::TMetricRegistry::SharedInstance()));
+ }
+ const auto& source = ReadParams.Source;
+ const auto& ds = source.GetDownsampling();
+
+ NJsonWriter::TBuf w;
+ w.BeginObject()
+ .UnsafeWriteKey("from").WriteString(TInstant::Seconds(source.GetFrom()).ToString())
+ .UnsafeWriteKey("to").WriteString(TInstant::Seconds(source.GetTo()).ToString())
+ .UnsafeWriteKey("program").WriteString(source.GetProgram())
+ .UnsafeWriteKey("downsampling")
+ .BeginObject()
+ .UnsafeWriteKey("disabled").WriteBool(ds.GetDisabled())
+ .UnsafeWriteKey("aggregation").WriteString(ds.GetAggregation())
+ .UnsafeWriteKey("fill").WriteString(ds.GetFill())
+ .UnsafeWriteKey("gridMillis").WriteLongLong(ds.GetGridMs())
+ .EndObject()
+ .EndObject();
+
+ // const TString body = R"({
+ // "downsampling": {
+ // "aggregation": "MAX",
+ // "disabled": true,
+ // "fill": "PREVIOUS",
+ // "gridMillis": 3600000,
+ // "ignoreMinStepMillis": true,
+ // "maxPoints": 500
+ // },
+ // "forceCluster": "",
+ // "from": "2023-12-08T14:40:39Z",
+ // "program": "{execpool=User,activity=YQ_STORAGE_PROXY,sensor=ActorsAliveByActivity}",
+ // "to": "2023-12-08T14:45:39Z"
+ // })";
+
+ // const TStringBuf body = w.Str();
+ //Cerr << "EX: Sending request: " << body << Endl;
+ const NHttp::THttpOutgoingRequestPtr httpRequest = BuildSolomonRequest(w.Str());
+
+ //const size_t bodySize = body.size();
+ const TActorId httpSenderId = Register(CreateHttpSenderActor(SelfId(), HttpProxyId, RetryPolicy));
+ ui8 cookie = 0;
+ Send(httpSenderId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(httpRequest), /*flags=*/0, cookie);
+ //SINK_LOG_T("Sent read to solomon, body size: " << bodySize);
+ //Cerr << "EX: RequestMetrics" << Endl;
+ }
+
+ void Handle(TEvHttpBase::TEvSendResult::TPtr& ev) {
+ //Cerr << "EX: Handle(TEvHttpBase::TEvSendResult::TPtr& ev)" << Endl;
+ const auto* res = ev->Get();
+ const TString& error = res->HttpIncomingResponse->Get()->GetError();
+
+ //Cerr << "EX: Handle(TEvHttpBase::TEvSendResult::TPtr& ev), error: " << error << Endl;
+ if (!error.empty() || (res->HttpIncomingResponse->Get()->Response && res->HttpIncomingResponse->Get()->Response->Status != "200")) {
+ TStringBuilder errorBuilder;
+ errorBuilder << "Error while sending request to monitoring api: " << error;
+ const auto& response = res->HttpIncomingResponse->Get()->Response;
+ if (response) {
+ errorBuilder << " " << response->GetObfuscatedData();
+ }
+
+ TIssues issues { TIssue(errorBuilder) };
+ SINK_LOG_W("Got " << (res->IsTerminal ? "terminal " : "") << "error response[" << ev->Cookie << "] from solomon: " << issues.ToOneLineString());
+ //Cerr << "Got " << (res->IsTerminal ? "terminal " : "") << "error response[" << ev->Cookie << "] from solomon: " << issues.ToOneLineString();
+ return;
+ }
+
+ HandleSuccessSolomonResponse(*res->HttpIncomingResponse->Get(), ev->Cookie);
+ }
+
+ void HandleSuccessSolomonResponse(const NHttp::TEvHttpProxy::TEvHttpIncomingResponse& response, ui64 cookie) {
+ Y_UNUSED(cookie);
+ //SINK_LOG_E("Solomon response[" << cookie << "]: " << response.Response->GetObfuscatedData());
+ //Cerr << "EX:" << response.Response->Body << Endl;
+ NJson::TJsonValue json;
+ if (!NJson::ReadJsonTree(response.Response->Body, &json, false)) {
+ // todo: improve
+ Y_ABORT_UNLESS(false, "Failed to parse json response");
+ return;
+ }
+
+ Batch.push_back(json);
+ NotifyComputeActorWithData();
+ }
+
+private:
+ const ui64 InputIndex;
+ TDqAsyncStats IngressStats;
+ const TTxId TxId;
+ const NActors::TActorId ComputeActorId;
+ const THolderFactory& HolderFactory;
+ NKikimr::NMiniKQL::TProgramBuilder& ProgramBuilder;
+ const TString LogPrefix;
+ const TDqSolomonReadParams ReadParams;
+ const TString Url;
+ TActorId HttpProxyId;
+
+ TString SourceId;
+ std::shared_ptr<NYdb::ICredentialsProvider> CredentialsProvider;
+ TType* DictType = nullptr;
+ std::vector<size_t> SystemColumnPositionIndex;
+ THashMap<TString, size_t> Index;
+ std::vector<NJson::TJsonValue> Batch;
+};
+
+std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqSolomonReadActor(
+ NYql::NSo::NProto::TDqSolomonSource&& settings,
+ ui64 inputIndex,
+ TCollectStatsLevel statsLevel,
+ const TTxId& txId,
+ const NActors::TActorId& computeActorId,
+ const THolderFactory& holderFactory,
+ NKikimr::NMiniKQL::TProgramBuilder& programBuilder,
+ const THashMap<TString, TString>& secureParams,
+ const ::NMonitoring::TDynamicCounterPtr& counters,
+ ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory)
+{
+ const TString& tokenName = settings.GetToken().GetName();
+ const TString token = secureParams.Value(tokenName, TString());
+
+ TDqSolomonReadParams params {
+ .Source = std::move(settings),
+ };
+
+ auto credentialsProviderFactory = CreateCredentialsProviderFactoryForStructuredToken(credentialsFactory, token);
+ auto credentialsProvider = credentialsProviderFactory->CreateProvider();
+
+ TDqSolomonReadActor* actor = new TDqSolomonReadActor(
+ inputIndex,
+ statsLevel,
+ txId,
+ computeActorId,
+ holderFactory,
+ programBuilder,
+ std::move(params),
+ counters,
+ credentialsProvider);
+ return {actor, actor};
+}
+
+void RegisterDQSolomonReadActorFactory(TDqAsyncIoFactory& factory, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory) {
+ factory.RegisterSource<NSo::NProto::TDqSolomonSource>("SolomonSource",
+ [credentialsFactory](
+ NYql::NSo::NProto::TDqSolomonSource&& settings,
+ IDqAsyncIoFactory::TSourceArguments&& args)
+ {
+ auto counters = MakeIntrusive<::NMonitoring::TDynamicCounters>();
+
+ return CreateDqSolomonReadActor(
+ std::move(settings),
+ args.InputIndex,
+ args.StatsLevel,
+ args.TxId,
+ args.ComputeActorId,
+ args.HolderFactory,
+ args.ProgramBuilder,
+ args.SecureParams,
+ counters,
+ credentialsFactory);
+ });
+}
+
+}
diff --git a/ydb/library/yql/providers/solomon/async_io/dq_solomon_read_actor.h b/ydb/library/yql/providers/solomon/async_io/dq_solomon_read_actor.h
new file mode 100644
index 0000000000..d74e2e7679
--- /dev/null
+++ b/ydb/library/yql/providers/solomon/async_io/dq_solomon_read_actor.h
@@ -0,0 +1,29 @@
+#pragma once
+
+#include <ydb/library/yql/utils/actors/http_sender.h>
+#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h>
+#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h>
+#include <ydb/library/yql/providers/common/token_accessor/client/factory.h>
+#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
+#include <ydb/library/yql/providers/solomon/proto/dq_solomon_shard.pb.h>
+
+#include <ydb/library/actors/core/actor.h>
+#include <ydb/library/actors/core/events.h>
+#include <ydb/library/actors/http/http_proxy.h>
+
+#include <ydb/library/yql/providers/solomon/proto/dq_solomon_shard.pb.h>
+
+namespace NYql::NDq {
+
+std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqSolomonReadActor(
+ NYql::NSo::NProto::TDqSolomonShard&& settings,
+ ui64 inputIndex,
+ TCollectStatsLevel statsLevel,
+ const TTxId& txId,
+ const THashMap<TString, TString>& secureParams,
+ const ::NMonitoring::TDynamicCounterPtr& counters,
+ ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory);
+
+void RegisterDQSolomonReadActorFactory(TDqAsyncIoFactory& factory, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory);
+
+} // namespace NYql::NDq
diff --git a/ydb/library/yql/providers/solomon/async_io/ya.make b/ydb/library/yql/providers/solomon/async_io/ya.make
index 4c803283c6..d5c465a1aa 100644
--- a/ydb/library/yql/providers/solomon/async_io/ya.make
+++ b/ydb/library/yql/providers/solomon/async_io/ya.make
@@ -1,6 +1,7 @@
LIBRARY()
SRCS(
+ dq_solomon_read_actor.cpp
dq_solomon_write_actor.cpp
metrics_encoder.cpp
)
@@ -8,13 +9,13 @@ SRCS(
PEERDIR(
library/cpp/json/easy_parse
library/cpp/monlib/encode/json
- ydb/library/yql/minikql/computation
+ ydb/library/yql/dq/actors/compute
ydb/library/yql/providers/common/token_accessor/client
+ ydb/library/yql/providers/solomon/proto
+ ydb/library/yql/providers/solomon/scheme
ydb/library/yql/public/types
ydb/library/yql/public/udf
ydb/library/yql/utils/log
- ydb/library/yql/dq/actors/compute
- ydb/library/yql/providers/solomon/proto
)
YQL_LAST_ABI_VERSION()
diff --git a/ydb/library/yql/providers/solomon/expr_nodes/yql_solomon_expr_nodes.json b/ydb/library/yql/providers/solomon/expr_nodes/yql_solomon_expr_nodes.json
index d064daa1b4..6af3982a91 100644
--- a/ydb/library/yql/providers/solomon/expr_nodes/yql_solomon_expr_nodes.json
+++ b/ydb/library/yql/providers/solomon/expr_nodes/yql_solomon_expr_nodes.json
@@ -44,13 +44,43 @@
]
},
{
- "Name": "TSoReadShardMeta",
+ "Name": "TSoSourceSettings",
"Base": "TCallable",
- "Match": {"Type": "Callable", "Name": "SoReadShardMeta!"},
+ "Match": {"Type": "Callable", "Name": "SoSourceSettings"},
+ "Children": [
+ {"Index": 0, "Name": "Token", "Type": "TCoSecureParam"},
+ {"Index": 1, "Name": "RowType", "Type": "TExprBase"},
+ {"Index": 2, "Name": "SystemColumns", "Type": "TCoAtomList"},
+ {"Index": 3, "Name": "LabelNames", "Type": "TCoAtomList"},
+ {"Index": 4, "Name": "From", "Type": "TCoAtom"},
+ {"Index": 5, "Name": "To", "Type": "TCoAtom"},
+ {"Index": 6, "Name": "Program", "Type": "TCoAtom"},
+ {"Index": 7, "Name": "DownsamplingDisabled", "Type": "TCoBool"},
+ {"Index": 8, "Name": "DownsamplingAggregation", "Type": "TCoAtom"},
+ {"Index": 9, "Name": "DownsamplingFill", "Type": "TCoAtom"},
+ {"Index": 10, "Name": "DownsamplingGridSec", "Type": "TCoUint32"}
+ ]
+ },
+ {
+ "Name": "TSoObject",
+ "Base": "TCallable",
+ "Match": {"Type": "Callable", "Name": "SoObject"},
+ "Children": [
+ {"Index": 0, "Name": "Settings", "Type": "TExprBase"}
+ ]
+ },
+ {
+ "Name": "TSoReadObject",
+ "Base": "TCallable",
+ "Match": {"Type": "Callable", "Name": "SoReadObject!"},
"Children": [
{"Index": 0, "Name": "World", "Type": "TExprBase"},
{"Index": 1, "Name": "DataSource", "Type": "TSoDataSource"},
- {"Index": 2, "Name": "Shard", "Type": "TCoAtom"}
+ {"Index": 2, "Name": "Object", "Type": "TSoObject"},
+ {"Index": 3, "Name": "SystemColumns", "Type": "TCoAtomList"},
+ {"Index": 4, "Name": "LabelNames", "Type": "TCoAtomList"},
+ {"Index": 5, "Name": "RowType", "Type": "TExprBase"},
+ {"Index": 6, "Name": "ColumnOrder", "Type": "TExprBase", "Optional": true}
]
},
{
@@ -75,16 +105,6 @@
{"Index": 3, "Name": "Service", "Type": "TCoAtom"},
{"Index": 4, "Name": "Token", "Type": "TCoSecureParam", "Optional": true}
]
- },
- {
- "Name": "TDqSoShardSink",
- "Base": "TCallable",
- "Match": {"Type": "Callable", "Name": "DqSoShardSink"},
- "Children": [
- {"Index": 0, "Name": "Shard", "Type": "TSoShard"},
- {"Index": 1, "Name": "Settings", "Type": "TCoNameValueTupleList"},
- {"Index": 2, "Name": "Token", "Type": "TCoSecureParam", "Optional": true}
- ]
}
]
}
diff --git a/ydb/library/yql/providers/solomon/proto/dq_solomon_shard.proto b/ydb/library/yql/providers/solomon/proto/dq_solomon_shard.proto
index 3bb9d19d51..018c530372 100644
--- a/ydb/library/yql/providers/solomon/proto/dq_solomon_shard.proto
+++ b/ydb/library/yql/providers/solomon/proto/dq_solomon_shard.proto
@@ -39,3 +39,30 @@ message TDqSolomonShard {
string ServiceAccount = 40;
TToken Token = 41;
}
+
+message TDownsampling {
+ bool Disabled = 1;
+ string Aggregation = 2;
+ string Fill = 3;
+ int64 GridMs = 4;
+}
+
+message TDqSolomonSource {
+ ESolomonClusterType ClusterType = 1;
+ string Endpoint = 2;
+ bool UseSsl = 3;
+ string ServiceAccount = 4;
+ TToken Token = 5;
+
+ string Project = 6;
+ oneof p {
+ string Selectors = 7;
+ string Program = 8;
+ }
+ // seconds since Epoch
+ int64 From = 9;
+ int64 To = 10;
+ TDownsampling Downsampling = 11;
+ repeated string SystemColumns = 12;
+ repeated string LabelNames = 13;
+}
diff --git a/ydb/library/yql/providers/solomon/provider/ya.make b/ydb/library/yql/providers/solomon/provider/ya.make
index 2ca90b8ac0..77ebe2161b 100644
--- a/ydb/library/yql/providers/solomon/provider/ya.make
+++ b/ydb/library/yql/providers/solomon/provider/ya.make
@@ -11,6 +11,7 @@ SRCS(
yql_solomon_dq_integration.cpp
yql_solomon_io_discovery.cpp
yql_solomon_load_meta.cpp
+ yql_solomon_mkql_compiler.cpp
yql_solomon_physical_optimize.cpp
yql_solomon_provider.cpp
)
@@ -19,6 +20,7 @@ PEERDIR(
ydb/library/actors/protos
ydb/library/yql/dq/expr_nodes
ydb/library/yql/dq/integration
+ ydb/library/yql/dq/opt
ydb/library/yql/providers/common/config
ydb/library/yql/providers/common/proto
ydb/library/yql/providers/common/provider
@@ -27,7 +29,7 @@ PEERDIR(
ydb/library/yql/providers/result/expr_nodes
ydb/library/yql/providers/solomon/expr_nodes
ydb/library/yql/providers/solomon/proto
- ydb/library/yql/dq/opt
+ ydb/library/yql/providers/solomon/scheme
)
YQL_LAST_ABI_VERSION()
diff --git a/ydb/library/yql/providers/solomon/provider/yql_solomon_config.cpp b/ydb/library/yql/providers/solomon/provider/yql_solomon_config.cpp
index fe00223f7c..f9e979bef4 100644
--- a/ydb/library/yql/providers/solomon/provider/yql_solomon_config.cpp
+++ b/ydb/library/yql/providers/solomon/provider/yql_solomon_config.cpp
@@ -6,6 +6,7 @@ using namespace NCommon;
TSolomonConfiguration::TSolomonConfiguration()
{
+ REGISTER_SETTING(*this, _EnableReading);
}
TSolomonSettings::TConstPtr TSolomonConfiguration::Snapshot() const {
diff --git a/ydb/library/yql/providers/solomon/provider/yql_solomon_config.h b/ydb/library/yql/providers/solomon/provider/yql_solomon_config.h
index 6f1d695d0b..e8f14aeca5 100644
--- a/ydb/library/yql/providers/solomon/provider/yql_solomon_config.h
+++ b/ydb/library/yql/providers/solomon/provider/yql_solomon_config.h
@@ -9,6 +9,7 @@ namespace NYql {
struct TSolomonSettings {
using TConstPtr = std::shared_ptr<const TSolomonSettings>;
+ NCommon::TConfSetting<bool, false> _EnableReading;
};
struct TSolomonConfiguration
diff --git a/ydb/library/yql/providers/solomon/provider/yql_solomon_datasource.cpp b/ydb/library/yql/providers/solomon/provider/yql_solomon_datasource.cpp
index 31ceb1701c..98aa9e47cf 100644
--- a/ydb/library/yql/providers/solomon/provider/yql_solomon_datasource.cpp
+++ b/ydb/library/yql/providers/solomon/provider/yql_solomon_datasource.cpp
@@ -1,4 +1,5 @@
#include "yql_solomon_provider_impl.h"
+#include "yql_solomon_dq_integration.h"
#include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h>
#include <ydb/library/yql/providers/solomon/expr_nodes/yql_solomon_expr_nodes.h>
@@ -24,6 +25,7 @@ public:
, LoadMetaDataTransformer_(CreateSolomonLoadTableMetadataTransformer(State_))
, TypeAnnotationTransformer_(CreateSolomonDataSourceTypeAnnotationTransformer(State_))
, ExecutionTransformer_(CreateSolomonDataSourceExecTransformer(State_))
+ , DqIntegration_(CreateSolomonDqIntegration(State_))
{
}
@@ -35,13 +37,13 @@ public:
return *ConfigurationTransformer_;
}
-// IGraphTransformer& GetIODiscoveryTransformer() override {
-// return *IODiscoveryTransformer_;
-// }
+ IGraphTransformer& GetIODiscoveryTransformer() override {
+ return *IODiscoveryTransformer_;
+ }
-// IGraphTransformer& GetLoadTableMetadataTransformer() override {
-// return *LoadMetaDataTransformer_;
-// }
+ IGraphTransformer& GetLoadTableMetadataTransformer() override {
+ return *LoadMetaDataTransformer_;
+ }
IGraphTransformer& GetTypeAnnotationTransformer(bool instantOnly) override {
Y_UNUSED(instantOnly);
@@ -81,9 +83,14 @@ public:
}
bool CanPullResult(const TExprNode& node, TSyncMap& syncList, bool& canRef) override {
- Y_UNUSED(node);
Y_UNUSED(syncList);
canRef = false;
+ if (node.IsCallable(TCoRight::CallableName())) {
+ const auto input = node.Child(0);
+ if (input->IsCallable(TSoReadObject::CallableName())) {
+ return true;
+ }
+ }
return false;
}
@@ -93,6 +100,29 @@ public:
return node;
}
+ bool GetDependencies(const TExprNode& node, TExprNode::TListType& children, bool compact) override {
+ Y_UNUSED(compact);
+
+ for (auto& child : node.Children()) {
+ children.push_back(child.Get());
+ }
+
+ if (TMaybeNode<TSoReadObject>(&node)) {
+ return true;
+ }
+ return false;
+ }
+
+ ui32 GetInputs(const TExprNode& node, TVector<TPinInfo>&, bool withLimits) override {
+ Y_UNUSED(node);
+ Y_UNUSED(withLimits);
+ return 0;
+ }
+
+ IDqIntegration* GetDqIntegration() override {
+ return DqIntegration_.Get();
+ }
+
private:
TSolomonState::TPtr State_;
@@ -101,6 +131,7 @@ private:
THolder<IGraphTransformer> LoadMetaDataTransformer_;
THolder<TVisitorTransformerBase> TypeAnnotationTransformer_;
THolder<TExecTransformerBase> ExecutionTransformer_;
+ const THolder<IDqIntegration> DqIntegration_;
};
TIntrusivePtr<IDataProvider> CreateSolomonDataSource(TSolomonState::TPtr state) {
diff --git a/ydb/library/yql/providers/solomon/provider/yql_solomon_datasource_type_ann.cpp b/ydb/library/yql/providers/solomon/provider/yql_solomon_datasource_type_ann.cpp
index f6a0ce8003..c33e1574ed 100644
--- a/ydb/library/yql/providers/solomon/provider/yql_solomon_datasource_type_ann.cpp
+++ b/ydb/library/yql/providers/solomon/provider/yql_solomon_datasource_type_ann.cpp
@@ -1,17 +1,176 @@
#include "yql_solomon_provider_impl.h"
+#include <ydb/library/yql/providers/solomon/expr_nodes/yql_solomon_expr_nodes.h>
+#include <ydb/library/yql/providers/common/provider/yql_provider_names.h>
#include <ydb/library/yql/providers/common/provider/yql_provider.h>
namespace NYql {
using namespace NNodes;
+namespace {
+
+bool ValidateDatetimeFormat(TStringBuf settingName, const TExprNode& settingValue, TExprContext& ctx) {
+ TInstant unused;
+ if (!TInstant::TryParseIso8601(settingValue.Content(), unused)) {
+ ctx.AddError(TIssue(ctx.GetPosition(settingValue.Pos()), TStringBuilder() << settingName << " must be correct datetime, e.g. 2010-03-27T21:27:00Z, but has " << settingValue.Content()));
+ return false;
+ }
+ return true;
+}
+
+}
+
class TSolomonDataSourceTypeAnnotationTransformer : public TVisitorTransformerBase {
public:
- TSolomonDataSourceTypeAnnotationTransformer(TSolomonState::TPtr state)
+ explicit TSolomonDataSourceTypeAnnotationTransformer(TSolomonState::TPtr state)
: TVisitorTransformerBase(true)
, State_(state)
{
+ using TSelf = TSolomonDataSourceTypeAnnotationTransformer;
+ AddHandler({TSoReadObject::CallableName()}, Hndl(&TSelf::HandleRead));
+ AddHandler({TSoObject::CallableName()}, Hndl(&TSelf::HandleSoObject));
+ AddHandler({TSoSourceSettings::CallableName()}, Hndl(&TSelf::HandleSoSourceSettings));
+ }
+
+ TStatus HandleSoSourceSettings(const TExprNode::TPtr& input, TExprContext& ctx) {
+ if (!EnsureArgsCount(*input, 11U, ctx)) {
+ return TStatus::Error;
+ }
+
+ if (!TCoSecureParam::Match(input->Child(TSoSourceSettings::idx_Token))) {
+ ctx.AddError(TIssue(ctx.GetPosition(input->Child(TSoSourceSettings::idx_Token)->Pos()), TStringBuilder() << "Expected " << TCoSecureParam::CallableName()));
+ return TStatus::Error;
+ }
+
+ const auto& rowType = *input->Child(TSoSourceSettings::idx_RowType);
+ if (!EnsureType(rowType, ctx)) {
+ return TStatus::Error;
+ }
+
+ auto& systemColumns = *input->Child(TSoSourceSettings::idx_SystemColumns);
+ if (!EnsureTupleOfAtoms(systemColumns, ctx)) {
+ return TStatus::Error;
+ }
+
+ auto& labelNames = *input->Child(TSoSourceSettings::idx_LabelNames);
+ if (!EnsureTupleOfAtoms(labelNames, ctx)) {
+ return TStatus::Error;
+ }
+
+ auto& from = *input->Child(TSoSourceSettings::idx_From);
+ if (!EnsureAtom(from, ctx) || !ValidateDatetimeFormat("from"sv, from, ctx)) {
+ return TStatus::Error;
+ }
+
+ auto& to = *input->Child(TSoSourceSettings::idx_To);
+ if (!EnsureAtom(to, ctx) || !ValidateDatetimeFormat("to"sv, to, ctx)) {
+ return TStatus::Error;
+ }
+
+ auto& program = *input->Child(TSoSourceSettings::idx_Program);
+ if (!EnsureAtom(program, ctx)) {
+ return TStatus::Error;
+ }
+
+ if (program.Content().empty()) {
+ ctx.AddError(TIssue(ctx.GetPosition(program.Pos()), "program must be specified"));
+ return TStatus::Error;
+ }
+
+ auto& downsamplingDisabled = *input->Child(TSoSourceSettings::idx_DownsamplingDisabled);
+ if (!downsamplingDisabled.IsCallable("Bool")) {
+ ctx.AddError(TIssue(ctx.GetPosition(downsamplingDisabled.Pos()), "downsampling.disabled must be bool"));
+ return TStatus::Error;
+ }
+
+ auto& downsamplingAggregation = *input->Child(TSoSourceSettings::idx_DownsamplingAggregation);
+ if (!EnsureAtom(downsamplingAggregation, ctx)) {
+ return TStatus::Error;
+ }
+
+ auto& downsamplingFill = *input->Child(TSoSourceSettings::idx_DownsamplingFill);
+ if (!EnsureAtom(downsamplingFill, ctx)) {
+ return TStatus::Error;
+ }
+
+ auto& downsamplingGridSec = *input->Child(TSoSourceSettings::idx_DownsamplingGridSec);
+ if (!downsamplingGridSec.IsCallable("Uint32")) {
+ ctx.AddError(TIssue(ctx.GetPosition(downsamplingGridSec.Pos()), "downsampling.grid_interval must be uint32 in seconds"));
+ return TStatus::Error;
+ }
+
+ const auto type = rowType.GetTypeAnn()->Cast<TTypeExprType>()->GetType();
+ input->SetTypeAnn(ctx.MakeType<TStreamExprType>(type));
+ return TStatus::Ok;
+ }
+
+ TStatus HandleSoObject(const TExprNode::TPtr& input, TExprContext& ctx) {
+ if (!EnsureArgsCount(*input, 1U, ctx)) {
+ return TStatus::Error;
+ }
+
+ // todo: check settings
+ input->SetTypeAnn(ctx.MakeType<TUnitExprType>());
+ return TStatus::Ok;
+ }
+
+ TStatus HandleRead(const TExprNode::TPtr& input, TExprContext& ctx) {
+ if (!EnsureMinMaxArgsCount(*input, 6U, 7U, ctx)) {
+ return TStatus::Error;
+ }
+
+ if (!EnsureWorldType(*input->Child(TSoReadObject::idx_World), ctx)) {
+ return TStatus::Error;
+ }
+
+ if (!EnsureSpecificDataSource(*input->Child(TSoReadObject::idx_DataSource), SolomonProviderName, ctx)) {
+ return TStatus::Error;
+ }
+
+ auto& systemColumns = *input->Child(TSoReadObject::idx_SystemColumns);
+ if (!EnsureTupleOfAtoms(systemColumns, ctx)) {
+ return TStatus::Error;
+ }
+
+ auto& labelNames = *input->Child(TSoReadObject::idx_LabelNames);
+ if (!EnsureTupleOfAtoms(labelNames, ctx)) {
+ return TStatus::Error;
+ }
+
+ const auto& rowType = *input->Child(TSoReadObject::idx_RowType);
+ if (!EnsureType(rowType, ctx)) {
+ return TStatus::Error;
+ }
+
+ if (input->ChildrenSize() > TSoReadObject::idx_ColumnOrder) {
+ auto& order = *input->Child(TSoReadObject::idx_ColumnOrder);
+ if (!EnsureTupleOfAtoms(order, ctx)) {
+ return TStatus::Error;
+ }
+ TVector<TString> columnOrder;
+ THashSet<TStringBuf> uniqs;
+ columnOrder.reserve(order.ChildrenSize());
+ uniqs.reserve(order.ChildrenSize());
+
+ for (auto& child : order.ChildrenList()) {
+ TStringBuf col = child->Content();
+ if (!uniqs.emplace(col).second) {
+ ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), TStringBuilder() << "Duplicate column '" << col << "' in column order list"));
+ return TStatus::Error;
+ }
+ columnOrder.push_back(ToString(col));
+ }
+ return State_->Types->SetColumnOrder(*input, columnOrder, ctx);
+ }
+
+ const auto type = rowType.GetTypeAnn()->Cast<TTypeExprType>()->GetType();
+ input->SetTypeAnn(ctx.MakeType<TTupleExprType>(TTypeAnnotationNode::TListType{
+ input->Child(TSoReadObject::idx_World)->GetTypeAnn(),
+ ctx.MakeType<TListExprType>(type)
+ }));
+
+ return TStatus::Ok;
}
private:
diff --git a/ydb/library/yql/providers/solomon/provider/yql_solomon_dq_integration.cpp b/ydb/library/yql/providers/solomon/provider/yql_solomon_dq_integration.cpp
index bbf8883ec5..f60f7f4634 100644
--- a/ydb/library/yql/providers/solomon/provider/yql_solomon_dq_integration.cpp
+++ b/ydb/library/yql/providers/solomon/provider/yql_solomon_dq_integration.cpp
@@ -1,10 +1,11 @@
#include "yql_solomon_dq_integration.h"
-
+#include "yql_solomon_mkql_compiler.h"
#include <ydb/library/yql/ast/yql_expr.h>
#include <ydb/library/yql/dq/expr_nodes/dq_expr_nodes.h>
#include <ydb/library/yql/utils/log/log.h>
#include <ydb/library/yql/providers/common/dq/yql_dq_integration_impl.h>
#include <ydb/library/yql/providers/common/proto/gateways_config.pb.h>
+#include <ydb/library/yql/providers/common/schema/expr/yql_expr_schema.h>
#include <ydb/library/yql/providers/dq/common/yql_dq_settings.h>
#include <ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.h>
#include <ydb/library/yql/providers/solomon/expr_nodes/yql_solomon_expr_nodes.h>
@@ -18,6 +19,20 @@ using namespace NNodes;
namespace {
+bool ExtractSettingValue(const TExprNode& value, TStringBuf settingName, TExprContext& ctx, TStringBuf& settingValue) {
+ if (value.IsAtom()) {
+ settingValue = value.Content();
+ return true;
+ }
+
+ if (!value.IsCallable({ "String", "Utf8" })) {
+ ctx.AddError(TIssue(ctx.GetPosition(value.Pos()), TStringBuilder() << settingName << " must be literal value"));
+ return false;
+ }
+ settingValue = value.Head().Content();
+ return true;
+}
+
NSo::NProto::ESolomonClusterType MapClusterType(TSolomonClusterConfig::ESolomonClusterType clusterType) {
switch (clusterType) {
case TSolomonClusterConfig::SCT_SOLOMON:
@@ -59,7 +74,7 @@ void FillScheme(const TTypeAnnotationNode& itemType, NSo::NProto::TDqSolomonShar
schemeItem.SetDataTypeId(dataType.TypeId);
if (dataType.Features & NUdf::DateType || dataType.Features & NUdf::TzDateType) {
- *scheme.MutableTimestamp() = schemeItem;
+ *scheme.MutableTimestamp() = std::move(schemeItem);
} else if (dataType.Features & NUdf::NumericType) {
scheme.MutableSensors()->Add(std::move(schemeItem));
} else if (dataType.Features & NUdf::StringType) {
@@ -72,30 +87,198 @@ void FillScheme(const TTypeAnnotationNode& itemType, NSo::NProto::TDqSolomonShar
class TSolomonDqIntegration: public TDqIntegrationBase {
public:
- TSolomonDqIntegration(const TSolomonState::TPtr& state)
+ explicit TSolomonDqIntegration(const TSolomonState::TPtr& state)
: State_(state.Get())
{
}
- bool CanRead(const TExprNode&, TExprContext&, bool) override {
- YQL_ENSURE(false, "Unimplemented");
+ ui64 Partition(const TDqSettings&, size_t maxPartitions, const TExprNode& node, TVector<TString>& partitions, TString*, TExprContext&, bool) override {
+ Y_UNUSED(maxPartitions);
+ Y_UNUSED(node);
+ Y_UNUSED(partitions);
+ partitions.push_back("zz_partition");
+ return 0;
+ }
+
+ bool CanRead(const TExprNode& read, TExprContext&, bool) override {
+ return TSoReadObject::Match(&read);
}
TMaybe<ui64> EstimateReadSize(ui64 /*dataSizePerJob*/, ui32 /*maxTasksPerStage*/, const TVector<const TExprNode*>&, TExprContext&) override {
YQL_ENSURE(false, "Unimplemented");
}
-
- TExprNode::TPtr WrapRead(const TDqSettings&, const TExprNode::TPtr&, TExprContext&) override {
- YQL_ENSURE(false, "Unimplemented");
+ TExprNode::TPtr WrapRead(const TDqSettings&, const TExprNode::TPtr& read, TExprContext& ctx) override {
+ if (const auto& maybeSoReadObject = TMaybeNode<TSoReadObject>(read)) {
+ const auto& soReadObject = maybeSoReadObject.Cast();
+ YQL_ENSURE(soReadObject.Ref().GetTypeAnn(), "No type annotation for node " << soReadObject.Ref().Content());
+
+ const auto& clusterName = soReadObject.DataSource().Cluster().StringValue();
+
+ const auto token = "cluster:default_" + clusterName;
+ YQL_CLOG(INFO, ProviderS3) << "Wrap " << read->Content() << " with token: " << token;
+
+ auto settings = soReadObject.Object().Settings();
+ auto& settingsRef = settings.Ref();
+ TString from;
+ TString to;
+ TString program;
+ bool downsamplingDisabled = false;
+ TString downsamplingAggregation = "AVG";
+ TString downsamplingFill = "PREVIOUS";
+ ui32 downsamplingGridSec = 15;
+
+ for (auto i = 0U; i < settingsRef.ChildrenSize(); ++i) {
+ if (settingsRef.Child(i)->Head().IsAtom("from"sv)) {
+ TStringBuf value;
+ if (!ExtractSettingValue(settingsRef.Child(i)->Tail(), "from"sv, ctx, value)) {
+ return {};
+ }
+
+ from = value;
+ continue;
+ }
+ if (settingsRef.Child(i)->Head().IsAtom("to"sv)) {
+ TStringBuf value;
+ if (!ExtractSettingValue(settingsRef.Child(i)->Tail(), "to"sv, ctx, value)) {
+ return {};
+ }
+
+ to = value;
+ continue;
+ }
+ if (settingsRef.Child(i)->Head().IsAtom("program"sv)) {
+ TStringBuf value;
+ if (!ExtractSettingValue(settingsRef.Child(i)->Tail(), "program"sv, ctx, value)) {
+ return {};
+ }
+
+ program = value;
+ continue;
+ }
+ if (settingsRef.Child(i)->Head().IsAtom("downsampling.disabled"sv)) {
+ TStringBuf value;
+ if (!ExtractSettingValue(settingsRef.Child(i)->Tail(), "downsampling.disabled"sv, ctx, value)) {
+ return {};
+ }
+ downsamplingDisabled = FromString<bool>(value);
+ continue;
+ }
+ if (settingsRef.Child(i)->Head().IsAtom("downsampling.gridaggregation"sv)) {
+ TStringBuf value;
+ if (!ExtractSettingValue(settingsRef.Child(i)->Tail(), "downsampling.gridaggregation"sv, ctx, value)) {
+ return {};
+ }
+ if (!IsIn({ "AVG"sv, "COUNT"sv, "DEFAULT_AGGREGATION"sv, "LAST"sv, "MAX"sv, "MIN"sv, "SUM"sv }, value)) {
+ ctx.AddError(TIssue(ctx.GetPosition(settingsRef.Child(i)->Head().Pos()), TStringBuilder() << "downsampling.grid_aggregation must be one of AVG, COUNT, DEFAULT_AGGREGATION, LAST, MAX, MIN, SUM, but has " << value));
+ return {};
+ }
+ downsamplingAggregation = value;
+ continue;
+ }
+ if (settingsRef.Child(i)->Head().IsAtom("downsampling.fill"sv)) {
+ TStringBuf value;
+ if (!ExtractSettingValue(settingsRef.Child(i)->Tail(), "downsampling.fill"sv, ctx, value)) {
+ return {};
+ }
+ if (!IsIn({ "NONE"sv, "NULL"sv, "PREVIOUS"sv }, value)) {
+ ctx.AddError(TIssue(ctx.GetPosition(settingsRef.Child(i)->Head().Pos()), TStringBuilder() << "downsampling.grid_fill must be one of NONE, NULL, PREVIOUS, but has " << value));
+ return {};
+ }
+ downsamplingFill = value;
+ continue;
+ }
+ if (settingsRef.Child(i)->Head().IsAtom("downsampling.gridinterval"sv)) {
+ TStringBuf value;
+ if (!ExtractSettingValue(settingsRef.Child(i)->Tail(), "downsampling.gridinterval"sv, ctx, value)) {
+ return {};
+ }
+ ui32 intValue = 0;
+ if (!TryFromString(value, intValue)) {
+ ctx.AddError(TIssue(ctx.GetPosition(settingsRef.Child(i)->Head().Pos()), TStringBuilder() << "downsampling.grid_interval must be positive number, but has " << value));
+ return {};
+ }
+ downsamplingGridSec = intValue;
+ continue;
+ }
+ }
+
+ return Build<TDqSourceWrap>(ctx, read->Pos())
+ .Input<TSoSourceSettings>()
+ .Token<TCoSecureParam>()
+ .Name().Build(token)
+ .Build()
+ .RowType(soReadObject.RowType())
+ .SystemColumns(soReadObject.SystemColumns())
+ .LabelNames(soReadObject.LabelNames())
+ .From<TCoAtom>().Build(from)
+ .To<TCoAtom>().Build(to)
+ .Program<TCoAtom>().Build(program)
+ .DownsamplingDisabled<TCoBool>().Literal().Build(downsamplingDisabled ? "true" : "false").Build()
+ .DownsamplingAggregation<TCoAtom>().Build(downsamplingAggregation)
+ .DownsamplingFill<TCoAtom>().Build(downsamplingFill)
+ .DownsamplingGridSec<TCoUint32>().Literal().Build(ToString(downsamplingGridSec)).Build()
+ .Build()
+ .DataSource(soReadObject.DataSource().Cast<TCoDataSource>())
+ .RowType(soReadObject.RowType())
+ .Settings(settings)
+ .Done().Ptr();
+ }
+ return read;
}
- TMaybe<bool> CanWrite(const TExprNode&, TExprContext&) override {
- YQL_ENSURE(false, "Unimplemented");
+ TMaybe<bool> CanWrite(const TExprNode& write, TExprContext&) override {
+ return TSoWrite::Match(&write);
}
- void FillSourceSettings(const TExprNode&, ::google::protobuf::Any&, TString&, size_t) override {
- YQL_ENSURE(false, "Unimplemented");
+ void FillSourceSettings(const TExprNode& node, ::google::protobuf::Any& protoSettings, TString& sourceType, size_t) override {
+ const TDqSource dqSource(&node);
+ const auto maybeSettings = dqSource.Settings().Maybe<TSoSourceSettings>();
+ if (!maybeSettings) {
+ return;
+ }
+
+ const auto settings = maybeSettings.Cast();
+ const auto& cluster = dqSource.DataSource().Cast<TSoDataSource>().Cluster().StringValue();
+ const auto* clusterDesc = State_->Configuration->ClusterConfigs.FindPtr(cluster);
+ YQL_ENSURE(clusterDesc, "Unknown cluster " << cluster);
+ NSo::NProto::TDqSolomonSource source;
+ source.SetEndpoint(clusterDesc->GetCluster());
+ source.SetProject("yq");
+
+ source.SetClusterType(MapClusterType(clusterDesc->GetClusterType()));
+ source.SetUseSsl(clusterDesc->GetUseSsl());
+ source.SetFrom(TInstant::ParseIso8601(settings.From().StringValue()).Seconds());
+ source.SetTo(TInstant::ParseIso8601(settings.To().StringValue()).Seconds());
+ source.SetProgram(settings.Program().StringValue());
+
+ auto& downsampling = *source.MutableDownsampling();
+ const bool isDisabled = FromString<bool>(settings.DownsamplingDisabled().Literal().Value());
+ downsampling.SetDisabled(isDisabled);
+ downsampling.SetAggregation(settings.DownsamplingAggregation().StringValue());
+ downsampling.SetFill(settings.DownsamplingFill().StringValue());
+ const ui32 gridIntervalSec = FromString<ui32>(settings.DownsamplingGridSec().Literal().Value());
+ downsampling.SetGridMs(gridIntervalSec * 1000);
+
+ source.MutableToken()->SetName(settings.Token().Name().StringValue());
+
+ THashSet<TString> uniqueColumns;
+ for (const auto& c : settings.SystemColumns()) {
+ const auto& columnAsString = c.StringValue();
+ uniqueColumns.insert(columnAsString);
+ source.AddSystemColumns(columnAsString);
+ }
+
+ for (const auto& c : settings.LabelNames()) {
+ const auto& columnAsString = c.StringValue();
+ if (!uniqueColumns.insert(columnAsString).second) {
+ throw yexception() << "Column " << columnAsString << " already registered";
+ }
+ source.AddLabelNames(columnAsString);
+ }
+
+ protoSettings.PackFrom(source);
+ sourceType = "SolomonSource";
}
void FillSinkSettings(const TExprNode& node, ::google::protobuf::Any& protoSettings, TString& sinkType) override {
@@ -137,6 +320,10 @@ public:
sinkType = "SolomonSink";
}
+ void RegisterMkqlCompiler(NCommon::TMkqlCallableCompilerBase& compiler) override {
+ RegisterDqSolomonMkqlCompilers(compiler);
+ }
+
private:
TSolomonState* State_; // State owns dq integration, so back reference must be not smart.
};
diff --git a/ydb/library/yql/providers/solomon/provider/yql_solomon_io_discovery.cpp b/ydb/library/yql/providers/solomon/provider/yql_solomon_io_discovery.cpp
index 73a21638be..e2cba012ec 100644
--- a/ydb/library/yql/providers/solomon/provider/yql_solomon_io_discovery.cpp
+++ b/ydb/library/yql/providers/solomon/provider/yql_solomon_io_discovery.cpp
@@ -2,18 +2,85 @@
#include <ydb/library/yql/providers/common/provider/yql_provider.h>
#include <ydb/library/yql/providers/solomon/expr_nodes/yql_solomon_expr_nodes.h>
+#include <ydb/library/yql/providers/solomon/scheme/yql_solomon_scheme.h>
-/*
-#include <ydb/library/yql/core/yql_expr_optimize.h>
-#include <yql/library/statface_client/client.h>
-
-#include <util/generic/hash_set.h>
-*/
+#include <util/string/split.h>
+#include <util/string/strip.h>
namespace NYql {
+namespace {
using namespace NNodes;
+std::array<TExprNode::TPtr, 2U> GetSchema(const TExprNode::TListType& settings) {
+ for (auto it = settings.cbegin(); settings.cend() != it; ++it) {
+ if (const auto item = *it; item->Head().IsAtom("userschema")) {
+ return {item->ChildPtr(1), item->ChildrenSize() > 2 ? item->TailPtr() : TExprNode::TPtr()};
+ }
+ }
+
+ return {};
+}
+
+TVector<TCoAtom> GetUserLabels(TPositionHandle pos, TExprContext& ctx, const TExprNode::TListType& settings) {
+ for (auto it = settings.cbegin(); settings.cend() != it; ++it) {
+ if (const auto item = *it; item->Head().IsAtom("labels")) {
+ TVector<TCoAtom> result;
+ auto labels = StringSplitter(TString(item->Tail().Content())).Split(',').SkipEmpty().ToList<TString>();
+ result.reserve(labels.size());
+ for (TString& label : labels) {
+ auto v = Build<TCoAtom>(ctx, pos).Value(StripString(label)).Done();
+ result.emplace_back(std::move(v));
+ }
+ return result;
+ }
+ }
+
+ return {};
+}
+
+const TStructExprType* BuildScheme(TPositionHandle pos, const TVector<TCoAtom>& userLabels, TExprContext& ctx, TVector<TCoAtom>& systemColumns) {
+ auto allSystemColumns = {SOLOMON_SCHEME_KIND, SOLOMON_SCHEME_LABELS, SOLOMON_SCHEME_VALUE, SOLOMON_SCHEME_TS, SOLOMON_SCHEME_TYPE};
+ TVector<const TItemExprType*> columnTypes;
+ columnTypes.reserve(systemColumns.size() + userLabels.size());
+ const TTypeAnnotationNode* stringType = ctx.MakeType<TDataExprType>(EDataSlot::String);
+ for (auto systemColumn : allSystemColumns) {
+ const TTypeAnnotationNode* type = nullptr;
+ if (systemColumn == SOLOMON_SCHEME_LABELS && !userLabels.empty()) {
+ continue;
+ }
+
+ if (systemColumn == SOLOMON_SCHEME_TS) {
+ type = ctx.MakeType<TDataExprType>(EDataSlot::Datetime);
+ } else if (systemColumn == SOLOMON_SCHEME_VALUE) {
+ type = ctx.MakeType<TDataExprType>(EDataSlot::Double);
+ } else if (systemColumn == SOLOMON_SCHEME_LABELS) {
+ type = ctx.MakeType<NYql::TDictExprType>(stringType, stringType);
+ } else if (IsIn({ SOLOMON_SCHEME_KIND, SOLOMON_SCHEME_TYPE }, systemColumn)) {
+ type = stringType;
+ } else {
+ ctx.AddError(TIssue(ctx.GetPosition(pos), TStringBuilder() << "Unknown system column " << systemColumn));
+ return nullptr;
+ }
+
+ systemColumns.push_back(Build<TCoAtom>(ctx, pos).Value(systemColumn).Done());
+ columnTypes.push_back(ctx.MakeType<TItemExprType>(systemColumn, type));
+ }
+
+ for (const auto& label : userLabels) {
+ if (IsIn({ SOLOMON_SCHEME_TS, SOLOMON_SCHEME_KIND, SOLOMON_SCHEME_TYPE, SOLOMON_SCHEME_LABELS, SOLOMON_SCHEME_VALUE }, label.Value())) {
+ // tmp constraint
+ ctx.AddError(TIssue(ctx.GetPosition(pos), TStringBuilder() << "System column should not be used as label name: " << label.Value()));
+ return nullptr;
+ }
+ columnTypes.push_back(ctx.MakeType<TItemExprType>(label.Value(), stringType));
+ }
+
+ return ctx.MakeType<TStructExprType>(columnTypes);
+}
+
+}
+
class TSolomonIODiscoveryTransformer : public TSyncTransformerBase {
public:
TSolomonIODiscoveryTransformer(TSolomonState::TPtr state)
@@ -28,9 +95,7 @@ public:
return TStatus::Ok;
}
- TVector<TIssue> issues;
-
- auto status = OptimizeExpr(input, output, [] (const TExprNode::TPtr& node, TExprContext& ctx) -> TExprNode::TPtr {
+ auto status = OptimizeExpr(input, output, [this] (const TExprNode::TPtr& node, TExprContext& ctx) -> TExprNode::TPtr {
if (auto maybeWrite = TMaybeNode<TSoWrite>(node)) {
if (!maybeWrite.DataSink()) {
return node;
@@ -51,25 +116,76 @@ public:
.Add(write.Arg(3))
.Build()
.Done().Ptr();
+ }
- } else if (TMaybeNode<TSoRead>(node).DataSource()) {
- return node;
+ if (auto maybeRead = TMaybeNode<TSoRead>(node)) {
+ auto read = maybeRead.Cast();
+ if (read.DataSource().Category().Value() != SolomonProviderName) {
+ return node;
+ }
+
+ const auto& object = read.Arg(2).Ref();
+ YQL_ENSURE(object.IsCallable("MrTableConcat"));
+
+ auto cluster = read.DataSource().Cluster().StringValue();
+ if (!this->State_->Configuration->_EnableReading.Get().GetOrElse(false)) {
+ ctx.AddError(TIssue(ctx.GetPosition(object.Pos()), TStringBuilder() << "Reading is disabled for monitoring cluster " << cluster));
+ return {};
+ }
+
+ auto settings = read.Ref().Child(4);
+ auto settingsList = read.Ref().Child(4)->ChildrenList();
+ auto userSchema = GetSchema(settingsList);
+ TVector<TCoAtom> userLabels = GetUserLabels(settings->Pos(), ctx, settingsList);
+
+ auto soObject = Build<TSoObject>(ctx, read.Pos())
+ .Settings(settings)
+ .Done();
+
+ TVector<TCoAtom> systemColumns;
+ auto* scheme = BuildScheme(settings->Pos(), userLabels, ctx, systemColumns);
+ if (!scheme) {
+ return {};
+ }
+
+ auto rowTypeNode = ExpandType(read.Pos(), *scheme, ctx);
+ auto systemColumnsNode = Build<TCoAtomList>(ctx, read.Pos())
+ .Add(systemColumns)
+ .Done();
+
+ auto labelNamesNode = Build<TCoAtomList>(ctx, read.Pos())
+ .Add(userLabels)
+ .Done();
+
+ return userSchema.back()
+ ? Build<TSoReadObject>(ctx, read.Pos())
+ .World(read.World())
+ .DataSource(read.DataSource())
+ .Object(soObject)
+ .SystemColumns(systemColumnsNode)
+ .LabelNames(labelNamesNode)
+ .RowType(rowTypeNode)
+ .ColumnOrder(std::move(userSchema.back()))
+ .Done().Ptr()
+ : Build<TSoReadObject>(ctx, read.Pos())
+ .World(read.World())
+ .DataSource(read.DataSource())
+ .Object(soObject)
+ .SystemColumns(systemColumnsNode)
+ .LabelNames(labelNamesNode)
+ .RowType(rowTypeNode)
+ .Done().Ptr();
}
+
return node;
}, ctx, TOptimizeExprSettings {nullptr});
- if (issues) {
- for (const auto& issue: issues) {
- ctx.AddError(issue);
- }
- status = status.Combine(TStatus::Error);
- }
-
return status;
}
void Rewind() final {
}
+
private:
TSolomonState::TPtr State_;
};
diff --git a/ydb/library/yql/providers/solomon/provider/yql_solomon_load_meta.cpp b/ydb/library/yql/providers/solomon/provider/yql_solomon_load_meta.cpp
index c4ddad581d..712877a6f2 100644
--- a/ydb/library/yql/providers/solomon/provider/yql_solomon_load_meta.cpp
+++ b/ydb/library/yql/providers/solomon/provider/yql_solomon_load_meta.cpp
@@ -19,7 +19,7 @@ public:
return TStatus::Ok;
}
- return TStatus::Async;
+ return TStatus::Ok;
}
NThreading::TFuture<void> DoGetAsyncFuture(const TExprNode& input) final {
diff --git a/ydb/library/yql/providers/solomon/provider/yql_solomon_mkql_compiler.cpp b/ydb/library/yql/providers/solomon/provider/yql_solomon_mkql_compiler.cpp
new file mode 100644
index 0000000000..26203f5f6c
--- /dev/null
+++ b/ydb/library/yql/providers/solomon/provider/yql_solomon_mkql_compiler.cpp
@@ -0,0 +1,50 @@
+#include "yql_solomon_mkql_compiler.h"
+
+#include <ydb/library/yql/providers/common/provider/yql_provider_names.h>
+#include <ydb/library/yql/providers/common/mkql/parser.h>
+
+namespace NYql {
+
+using namespace NKikimr::NMiniKQL;
+using namespace NNodes;
+
+namespace {
+template <typename T>
+TRuntimeNode Wrap(T wrapper, NCommon::TMkqlBuildContext& ctx) {
+ const auto input = MkqlBuildExpr(wrapper.Input().Ref(), ctx);
+
+ return ctx.ProgramBuilder.ExpandMap(ctx.ProgramBuilder.ToFlow(input), [&](TRuntimeNode item) {
+ TRuntimeNode::TList fields;
+ bool isOptional = false;
+ auto unpacked = UnpackOptional(item, isOptional);
+ auto tupleType = AS_TYPE(TTupleType, unpacked);
+ fields.reserve(tupleType->GetElementsCount());
+ for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) {
+ fields.push_back(ctx.ProgramBuilder.Nth(item, i));
+ }
+
+ return fields;
+ });
+}
+}
+
+void RegisterDqSolomonMkqlCompilers(NCommon::TMkqlCallableCompilerBase& compiler) {
+ compiler.ChainCallable(TDqSourceWideWrap::CallableName(),
+ [](const TExprNode& node, NCommon::TMkqlBuildContext& ctx) {
+ if (const auto wrapper = TDqSourceWideWrap(&node); wrapper.DataSource().Category().Value() == SolomonProviderName) {
+ return Wrap(wrapper, ctx);
+ }
+ return TRuntimeNode();
+ });
+
+ // compiler.ChainCallable(TDqSourceWideBlockWrap::CallableName(),
+ // [](const TExprNode& node, NCommon::TMkqlBuildContext& ctx) {
+ // if (const auto wrapper = TDqSourceWideBlockWrap(&node); wrapper.DataSource().Category().Value() == SolomonProviderName) {
+ // return Wrap(wrapper, ctx);
+ // }
+
+ // return TRuntimeNode();
+ // });
+}
+
+}
diff --git a/ydb/library/yql/providers/solomon/provider/yql_solomon_mkql_compiler.h b/ydb/library/yql/providers/solomon/provider/yql_solomon_mkql_compiler.h
new file mode 100644
index 0000000000..0c847cd8f8
--- /dev/null
+++ b/ydb/library/yql/providers/solomon/provider/yql_solomon_mkql_compiler.h
@@ -0,0 +1,9 @@
+#pragma once
+
+#include <ydb/library/yql/providers/common/mkql/yql_provider_mkql.h>
+
+namespace NYql {
+
+void RegisterDqSolomonMkqlCompilers(NCommon::TMkqlCallableCompilerBase& compiler);
+
+}
diff --git a/ydb/library/yql/providers/solomon/provider/yql_solomon_physical_optimize.cpp b/ydb/library/yql/providers/solomon/provider/yql_solomon_physical_optimize.cpp
index ece1e1c666..9574179e81 100644
--- a/ydb/library/yql/providers/solomon/provider/yql_solomon_physical_optimize.cpp
+++ b/ydb/library/yql/providers/solomon/provider/yql_solomon_physical_optimize.cpp
@@ -63,11 +63,23 @@ public:
{
#define HNDL(name) "PhysicalOptimizer-"#name, Hndl(&TSoPhysicalOptProposalTransformer::name)
AddHandler(0, &TSoWriteToShard::Match, HNDL(SoWriteToShard));
+ AddHandler(0, &TCoLeft::Match, HNDL(TrimReadWorld));
#undef HNDL
SetGlobal(0); // Stage 0 of this optimizer is global => we can remap nodes.
}
+ TMaybeNode<TExprBase> TrimReadWorld(TExprBase node, TExprContext& ctx) const {
+ Y_UNUSED(ctx);
+
+ const auto& maybeRead = node.Cast<TCoLeft>().Input().Maybe<TSoReadObject>();
+ if (!maybeRead) {
+ return node;
+ }
+
+ return TExprBase(maybeRead.Cast().World().Ptr());
+ }
+
TMaybeNode<TExprBase> SoWriteToShard(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) const {
if (State_->IsRtmrMode()) {
return node;
diff --git a/ydb/library/yql/providers/solomon/scheme/ya.make b/ydb/library/yql/providers/solomon/scheme/ya.make
new file mode 100644
index 0000000000..9b39d73bb9
--- /dev/null
+++ b/ydb/library/yql/providers/solomon/scheme/ya.make
@@ -0,0 +1,7 @@
+LIBRARY()
+
+SRCS(
+ yql_solomon_scheme.h
+)
+
+END()
diff --git a/ydb/library/yql/providers/solomon/scheme/yql_solomon_scheme.h b/ydb/library/yql/providers/solomon/scheme/yql_solomon_scheme.h
new file mode 100644
index 0000000000..e043b2c752
--- /dev/null
+++ b/ydb/library/yql/providers/solomon/scheme/yql_solomon_scheme.h
@@ -0,0 +1,11 @@
+#pragma once
+
+#include <util/generic/strbuf.h>
+
+namespace NYql {
+ constexpr TStringBuf SOLOMON_SCHEME_KIND = "kind"sv;
+ constexpr TStringBuf SOLOMON_SCHEME_LABELS = "labels"sv;
+ constexpr TStringBuf SOLOMON_SCHEME_VALUE = "value"sv;
+ constexpr TStringBuf SOLOMON_SCHEME_TYPE = "type"sv;
+ constexpr TStringBuf SOLOMON_SCHEME_TS = "ts"sv;
+}
diff --git a/ydb/library/yql/providers/solomon/ya.make b/ydb/library/yql/providers/solomon/ya.make
index 9e1893e851..df43a1f649 100644
--- a/ydb/library/yql/providers/solomon/ya.make
+++ b/ydb/library/yql/providers/solomon/ya.make
@@ -3,4 +3,5 @@ RECURSE(
expr_nodes
gateway
provider
+ scheme
)
diff --git a/ydb/library/yql/sql/v1/sql_translation.cpp b/ydb/library/yql/sql/v1/sql_translation.cpp
index 67119a1994..ce2670030f 100644
--- a/ydb/library/yql/sql/v1/sql_translation.cpp
+++ b/ydb/library/yql/sql/v1/sql_translation.cpp
@@ -1054,11 +1054,6 @@ bool TSqlTranslation::TableRefImpl(const TRule_table_ref& node, TTableRef& resul
}
}
- if (service == SolomonProviderName) {
- Ctx.Error() << "Selecting data from monitoring source is not supported";
- return false;
- }
-
TTableRef tr(Context().MakeName("table"), service, cluster, nullptr);
TPosition pos(Context().Pos());
TTableHints hints = GetContextHints(Ctx);
diff --git a/ydb/library/yql/tools/dqrun/dqrun.cpp b/ydb/library/yql/tools/dqrun/dqrun.cpp
index 9ac2e5044d..d9d5f081a6 100644
--- a/ydb/library/yql/tools/dqrun/dqrun.cpp
+++ b/ydb/library/yql/tools/dqrun/dqrun.cpp
@@ -40,6 +40,7 @@
#include <ydb/library/yql/providers/function/provider/dq_function_provider.h>
#include <ydb/library/yql/providers/s3/provider/yql_s3_provider.h>
#include <ydb/library/yql/providers/s3/actors/yql_s3_actors_factory_impl.h>
+#include <ydb/library/yql/providers/solomon/async_io/dq_solomon_read_actor.h>
#include <ydb/library/yql/providers/solomon/gateway/yql_solomon_gateway.h>
#include <ydb/library/yql/providers/solomon/provider/yql_solomon_provider.h>
#include <ydb/library/yql/providers/pg/provider/yql_pg_provider.h>
@@ -276,6 +277,7 @@ NDq::IDqAsyncIoFactory::TPtr CreateAsyncIoFactory(
}
RegisterDqPqReadActorFactory(*factory, driver, nullptr);
RegisterYdbReadActorFactory(*factory, driver, nullptr);
+ RegisterDQSolomonReadActorFactory(*factory, nullptr);
RegisterClickHouseReadActorFactory(*factory, nullptr, httpGateway);
RegisterGenericProviderFactories(*factory, credentialsFactory, genericClient);
RegisterDqPqWriteActorFactory(*factory, driver, nullptr);
diff --git a/ydb/library/yql/tools/dqrun/ya.make b/ydb/library/yql/tools/dqrun/ya.make
index 38d7fb919f..8a19b6aee6 100644
--- a/ydb/library/yql/tools/dqrun/ya.make
+++ b/ydb/library/yql/tools/dqrun/ya.make
@@ -57,6 +57,7 @@ ENDIF()
ydb/library/yql/providers/pq/provider
ydb/library/yql/providers/s3/actors
ydb/library/yql/providers/s3/provider
+ ydb/library/yql/providers/solomon/async_io
ydb/library/yql/providers/solomon/gateway
ydb/library/yql/providers/solomon/provider
ydb/library/yql/providers/ydb/actors