diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2023-10-12 10:52:47 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2023-10-12 11:10:57 +0300 |
commit | 406ca2a64106a130a73b4088828c26fcebcd6d99 (patch) | |
tree | 0a1734413b9905f5f61a73e47f972fcbc7668ee8 | |
parent | 72ea0d54af4315a211feff21d26018b9562650ef (diff) | |
download | ydb-406ca2a64106a130a73b4088828c26fcebcd6d99.tar.gz |
Intermediate changes
-rw-r--r-- | yt/yql/plugin/native/plugin.cpp | 82 |
1 files changed, 69 insertions, 13 deletions
diff --git a/yt/yql/plugin/native/plugin.cpp b/yt/yql/plugin/native/plugin.cpp index 2051f335e7..f393dc79e6 100644 --- a/yt/yql/plugin/native/plugin.cpp +++ b/yt/yql/plugin/native/plugin.cpp @@ -8,8 +8,6 @@ #include <ydb/library/yql/providers/yt/gateway/native/yql_yt_native.h> #include <ydb/library/yql/providers/yt/provider/yql_yt_provider.h> -#include <ydb/library/yql/core/url_preprocessing/url_preprocessing.h> - #include <ydb/library/yql/providers/common/udf_resolve/yql_simple_udf_resolver.h> #include "ydb/library/yql/providers/common/proto/gateways_config.pb.h" #include <ydb/library/yql/providers/common/provider/yql_provider_names.h> @@ -21,6 +19,8 @@ #include <ydb/library/yql/core/file_storage/file_storage.h> #include "ydb/library/yql/core/file_storage/proto/file_storage.pb.h" #include <ydb/library/yql/core/services/mounts/yql_mounts.h> +#include <ydb/library/yql/core/services/yql_transform_pipeline.h> +#include <ydb/library/yql/core/url_preprocessing/url_preprocessing.h> #include <ydb/library/yql/utils/log/log.h> #include <ydb/library/yql/utils/backtrace/backtrace.h> @@ -49,6 +49,22 @@ using namespace NYson; //////////////////////////////////////////////////////////////////////////////// +std::optional<TString> MaybeToOptional(const TMaybe<TString>& maybeStr) +{ + if (!maybeStr) { + return std::nullopt; + } + return *maybeStr; +}; + +//////////////////////////////////////////////////////////////////////////////// + +struct TQueryPlan +{ + std::optional<TString> Plan; + YT_DECLARE_SPIN_LOCK(NThreading::TReaderWriterSpinLock, PlanSpinLock); +}; + struct TActiveQuery { TProgressMerger ProgressMerger; @@ -57,6 +73,42 @@ struct TActiveQuery //////////////////////////////////////////////////////////////////////////////// +class TQueryPipelineConfigurator + : public NYql::IPipelineConfigurator +{ +public: + TQueryPipelineConfigurator(NYql::TProgramPtr program, TQueryPlan& plan) + : Program_(program) + , Plan_(plan) + { } + + void AfterCreate(NYql::TTransformationPipeline* /*pipeline*/) const override + { } + + void AfterTypeAnnotation(NYql::TTransformationPipeline* /*pipeline*/) const override + { } + + void AfterOptimize(NYql::TTransformationPipeline* pipeline) const override + { + auto transformer = [this](NYql::TExprNode::TPtr input, NYql::TExprNode::TPtr& output, NYql::TExprContext& /*ctx*/) { + output = input; + + auto guard = WriterGuard(Plan_.PlanSpinLock); + Plan_.Plan = MaybeToOptional(Program_->GetQueryPlan()); + + return NYql::IGraphTransformer::TStatus::Ok; + }; + + pipeline->Add(NYql::CreateFunctorTransformer(transformer), "PlanOutput"); + } + +private: + NYql::TProgramPtr Program_; + TQueryPlan& Plan_; +}; + +//////////////////////////////////////////////////////////////////////////////// + class TYqlPlugin : public IYqlPlugin { @@ -203,17 +255,21 @@ public: auto userDataTable = FilesToUserTable(files); program->AddUserDataTable(userDataTable); - auto maybeToOptional = [] (const TMaybe<TString>& maybeStr) -> std::optional<TString> { - if (!maybeStr) { - return std::nullopt; - } - return *maybeStr; - }; + TQueryPlan queryPlan; + auto pipelineConfigurator = TQueryPipelineConfigurator(program, queryPlan); program->SetProgressWriter([&] (const NYql::TOperationProgress& progress) { + std::optional<TString> plan; + { + auto guard = ReaderGuard(queryPlan.PlanSpinLock); + plan.swap(queryPlan.Plan); + } + auto guard = WriterGuard(ProgressSpinLock); ActiveQueriesProgress_[queryId].ProgressMerger.MergeWith(progress); - ActiveQueriesProgress_[queryId].Plan = maybeToOptional(program->GetQueryPlan()); + if (plan) { + ActiveQueriesProgress_[queryId].Plan.swap(plan); + } }); NSQLTranslation::TTranslationSettings sqlSettings; @@ -238,7 +294,7 @@ public: } NYql::TProgram::TStatus status = NYql::TProgram::TStatus::Error; - status = program->Run(GetUsername(), nullptr, nullptr, nullptr); + status = program->RunWithConfig(GetUsername(), pipelineConfigurator); if (status == NYql::TProgram::TStatus::Error) { return TQueryResult{ @@ -266,10 +322,10 @@ public: return { .YsonResult = result.Empty() ? std::nullopt : std::make_optional(result.Str()), - .Plan = maybeToOptional(program->GetQueryPlan()), - .Statistics = maybeToOptional(program->GetStatistics()), + .Plan = MaybeToOptional(program->GetQueryPlan()), + .Statistics = MaybeToOptional(program->GetStatistics()), .Progress = progress, - .TaskInfo = maybeToOptional(program->GetTasksInfo()), + .TaskInfo = MaybeToOptional(program->GetTasksInfo()), }; } |