aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2023-10-12 10:52:47 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2023-10-12 11:10:57 +0300
commit406ca2a64106a130a73b4088828c26fcebcd6d99 (patch)
tree0a1734413b9905f5f61a73e47f972fcbc7668ee8
parent72ea0d54af4315a211feff21d26018b9562650ef (diff)
downloadydb-406ca2a64106a130a73b4088828c26fcebcd6d99.tar.gz
Intermediate changes
-rw-r--r--yt/yql/plugin/native/plugin.cpp82
1 files changed, 69 insertions, 13 deletions
diff --git a/yt/yql/plugin/native/plugin.cpp b/yt/yql/plugin/native/plugin.cpp
index 2051f335e7..f393dc79e6 100644
--- a/yt/yql/plugin/native/plugin.cpp
+++ b/yt/yql/plugin/native/plugin.cpp
@@ -8,8 +8,6 @@
#include <ydb/library/yql/providers/yt/gateway/native/yql_yt_native.h>
#include <ydb/library/yql/providers/yt/provider/yql_yt_provider.h>
-#include <ydb/library/yql/core/url_preprocessing/url_preprocessing.h>
-
#include <ydb/library/yql/providers/common/udf_resolve/yql_simple_udf_resolver.h>
#include "ydb/library/yql/providers/common/proto/gateways_config.pb.h"
#include <ydb/library/yql/providers/common/provider/yql_provider_names.h>
@@ -21,6 +19,8 @@
#include <ydb/library/yql/core/file_storage/file_storage.h>
#include "ydb/library/yql/core/file_storage/proto/file_storage.pb.h"
#include <ydb/library/yql/core/services/mounts/yql_mounts.h>
+#include <ydb/library/yql/core/services/yql_transform_pipeline.h>
+#include <ydb/library/yql/core/url_preprocessing/url_preprocessing.h>
#include <ydb/library/yql/utils/log/log.h>
#include <ydb/library/yql/utils/backtrace/backtrace.h>
@@ -49,6 +49,22 @@ using namespace NYson;
////////////////////////////////////////////////////////////////////////////////
+std::optional<TString> MaybeToOptional(const TMaybe<TString>& maybeStr)
+{
+ if (!maybeStr) {
+ return std::nullopt;
+ }
+ return *maybeStr;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct TQueryPlan
+{
+ std::optional<TString> Plan;
+ YT_DECLARE_SPIN_LOCK(NThreading::TReaderWriterSpinLock, PlanSpinLock);
+};
+
struct TActiveQuery
{
TProgressMerger ProgressMerger;
@@ -57,6 +73,42 @@ struct TActiveQuery
////////////////////////////////////////////////////////////////////////////////
+class TQueryPipelineConfigurator
+ : public NYql::IPipelineConfigurator
+{
+public:
+ TQueryPipelineConfigurator(NYql::TProgramPtr program, TQueryPlan& plan)
+ : Program_(program)
+ , Plan_(plan)
+ { }
+
+ void AfterCreate(NYql::TTransformationPipeline* /*pipeline*/) const override
+ { }
+
+ void AfterTypeAnnotation(NYql::TTransformationPipeline* /*pipeline*/) const override
+ { }
+
+ void AfterOptimize(NYql::TTransformationPipeline* pipeline) const override
+ {
+ auto transformer = [this](NYql::TExprNode::TPtr input, NYql::TExprNode::TPtr& output, NYql::TExprContext& /*ctx*/) {
+ output = input;
+
+ auto guard = WriterGuard(Plan_.PlanSpinLock);
+ Plan_.Plan = MaybeToOptional(Program_->GetQueryPlan());
+
+ return NYql::IGraphTransformer::TStatus::Ok;
+ };
+
+ pipeline->Add(NYql::CreateFunctorTransformer(transformer), "PlanOutput");
+ }
+
+private:
+ NYql::TProgramPtr Program_;
+ TQueryPlan& Plan_;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
class TYqlPlugin
: public IYqlPlugin
{
@@ -203,17 +255,21 @@ public:
auto userDataTable = FilesToUserTable(files);
program->AddUserDataTable(userDataTable);
- auto maybeToOptional = [] (const TMaybe<TString>& maybeStr) -> std::optional<TString> {
- if (!maybeStr) {
- return std::nullopt;
- }
- return *maybeStr;
- };
+ TQueryPlan queryPlan;
+ auto pipelineConfigurator = TQueryPipelineConfigurator(program, queryPlan);
program->SetProgressWriter([&] (const NYql::TOperationProgress& progress) {
+ std::optional<TString> plan;
+ {
+ auto guard = ReaderGuard(queryPlan.PlanSpinLock);
+ plan.swap(queryPlan.Plan);
+ }
+
auto guard = WriterGuard(ProgressSpinLock);
ActiveQueriesProgress_[queryId].ProgressMerger.MergeWith(progress);
- ActiveQueriesProgress_[queryId].Plan = maybeToOptional(program->GetQueryPlan());
+ if (plan) {
+ ActiveQueriesProgress_[queryId].Plan.swap(plan);
+ }
});
NSQLTranslation::TTranslationSettings sqlSettings;
@@ -238,7 +294,7 @@ public:
}
NYql::TProgram::TStatus status = NYql::TProgram::TStatus::Error;
- status = program->Run(GetUsername(), nullptr, nullptr, nullptr);
+ status = program->RunWithConfig(GetUsername(), pipelineConfigurator);
if (status == NYql::TProgram::TStatus::Error) {
return TQueryResult{
@@ -266,10 +322,10 @@ public:
return {
.YsonResult = result.Empty() ? std::nullopt : std::make_optional(result.Str()),
- .Plan = maybeToOptional(program->GetQueryPlan()),
- .Statistics = maybeToOptional(program->GetStatistics()),
+ .Plan = MaybeToOptional(program->GetQueryPlan()),
+ .Statistics = MaybeToOptional(program->GetStatistics()),
.Progress = progress,
- .TaskInfo = maybeToOptional(program->GetTasksInfo()),
+ .TaskInfo = MaybeToOptional(program->GetTasksInfo()),
};
}