aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorromakondakov <romakondakov@yandex-team.ru>2022-02-10 16:52:16 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:52:16 +0300
commitbc9502dd9565b1d3084b1871eba035befe00d7e5 (patch)
treeab7fbbf3253d4c0e2793218f09378908beb025fb
parentaa1447c8c9b7deea12b1b441d5db739fb7fc91fa (diff)
downloadydb-bc9502dd9565b1d3084b1871eba035befe00d7e5.tar.gz
Restoring authorship annotation for <romakondakov@yandex-team.ru>. Commit 2 of 2.
-rw-r--r--ydb/library/yql/core/facade/yql_facade.cpp4
-rw-r--r--ydb/library/yql/core/services/yql_plan.cpp16
-rw-r--r--ydb/library/yql/core/services/yql_transform_pipeline.cpp2
-rw-r--r--ydb/library/yql/core/type_ann/type_ann_core.cpp12
-rw-r--r--ydb/library/yql/core/yql_join.cpp12
-rw-r--r--ydb/library/yql/core/yql_opt_utils.cpp12
-rw-r--r--ydb/library/yql/core/yql_opt_utils.h2
-rw-r--r--ydb/library/yql/dq/common/dq_common.h8
-rw-r--r--ydb/library/yql/dq/opt/dq_opt_phy.cpp34
-rw-r--r--ydb/library/yql/dq/opt/dq_opt_phy.h4
-rw-r--r--ydb/library/yql/dq/type_ann/dq_type_ann.cpp22
-rw-r--r--ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_datasource.cpp54
-rw-r--r--ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp8
-rw-r--r--ydb/library/yql/providers/common/provider/yql_provider.cpp34
-rw-r--r--ydb/library/yql/providers/config/yql_config_provider.cpp4
-rw-r--r--ydb/library/yql/providers/dq/actors/executer_actor.cpp12
-rw-r--r--ydb/library/yql/providers/dq/actors/resource_allocator.cpp34
-rw-r--r--ydb/library/yql/providers/dq/api/grpc/api.proto2
-rw-r--r--ydb/library/yql/providers/dq/api/protos/dqs.proto20
-rw-r--r--ydb/library/yql/providers/dq/api/protos/service.proto20
-rw-r--r--ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.json20
-rw-r--r--ydb/library/yql/providers/dq/opt/physical_optimize.cpp6
-rw-r--r--ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp74
-rw-r--r--ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp112
-rw-r--r--ydb/library/yql/providers/dq/provider/yql_dq_datasink.h2
-rw-r--r--ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.cpp60
-rw-r--r--ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.h2
-rw-r--r--ydb/library/yql/providers/dq/provider/yql_dq_datasource.cpp8
-rw-r--r--ydb/library/yql/providers/dq/provider/yql_dq_datasource_type_ann.cpp4
-rw-r--r--ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp146
-rw-r--r--ydb/library/yql/providers/dq/provider/yql_dq_gateway.h2
-rw-r--r--ydb/library/yql/providers/dq/provider/yql_dq_provider.cpp6
-rw-r--r--ydb/library/yql/providers/dq/provider/yql_dq_provider.h2
-rw-r--r--ydb/library/yql/providers/dq/provider/yql_dq_recapture.cpp48
-rw-r--r--ydb/library/yql/providers/dq/service/grpc_service.cpp42
-rw-r--r--ydb/library/yql/providers/dq/worker_manager/interface/events.cpp6
-rw-r--r--ydb/library/yql/providers/dq/worker_manager/interface/events.h34
-rw-r--r--ydb/library/yql/providers/dq/worker_manager/interface/worker_info.cpp356
-rw-r--r--ydb/library/yql/providers/dq/worker_manager/interface/worker_info.h192
39 files changed, 719 insertions, 719 deletions
diff --git a/ydb/library/yql/core/facade/yql_facade.cpp b/ydb/library/yql/core/facade/yql_facade.cpp
index 2943c757909..b74abc1c34c 100644
--- a/ydb/library/yql/core/facade/yql_facade.cpp
+++ b/ydb/library/yql/core/facade/yql_facade.cpp
@@ -1318,8 +1318,8 @@ TTypeAnnotationContextPtr TProgram::BuildTypeAnnotationContext(const TString& us
resultProviderDataSources.push_back(TString(PqProviderName));
}
- if (providerNames.contains(DqProviderName)) {
- resultProviderDataSources.push_back(TString(DqProviderName));
+ if (providerNames.contains(DqProviderName)) {
+ resultProviderDataSources.push_back(TString(DqProviderName));
}
if (!resultProviderDataSources.empty())
diff --git a/ydb/library/yql/core/services/yql_plan.cpp b/ydb/library/yql/core/services/yql_plan.cpp
index 20d449fdfe3..4d7cfffaf5a 100644
--- a/ydb/library/yql/core/services/yql_plan.cpp
+++ b/ydb/library/yql/core/services/yql_plan.cpp
@@ -363,15 +363,15 @@ public:
YQL_ENSURE(datasink);
info.Provider = (*datasink).Get();
info.IsVisible = (*datasink)->GetPlanFormatter().GetDependencies(*node, dependencies, true);
- }
- else if (node->IsCallable("DqStage") ||
- node->IsCallable("DqPhyStage") ||
+ }
+ else if (node->IsCallable("DqStage") ||
+ node->IsCallable("DqPhyStage") ||
node->IsCallable("DqQuery!") ||
- node->ChildrenSize() >= 1 && node->Child(0)->IsCallable("TDqOutput")) {
- auto provider = Types_.DataSinkMap.FindPtr(DqProviderName);
- YQL_ENSURE(provider);
- info.Provider = (*provider).Get();
- info.IsVisible = (*provider)->GetPlanFormatter().GetDependencies(*node, dependencies, true);
+ node->ChildrenSize() >= 1 && node->Child(0)->IsCallable("TDqOutput")) {
+ auto provider = Types_.DataSinkMap.FindPtr(DqProviderName);
+ YQL_ENSURE(provider);
+ info.Provider = (*provider).Get();
+ info.IsVisible = (*provider)->GetPlanFormatter().GetDependencies(*node, dependencies, true);
} else {
info.IsVisible = false;
for (auto& child : node->Children()) {
diff --git a/ydb/library/yql/core/services/yql_transform_pipeline.cpp b/ydb/library/yql/core/services/yql_transform_pipeline.cpp
index 3dd1b9707b1..ae425f34c8e 100644
--- a/ydb/library/yql/core/services/yql_transform_pipeline.cpp
+++ b/ydb/library/yql/core/services/yql_transform_pipeline.cpp
@@ -157,7 +157,7 @@ TTransformationPipeline& TTransformationPipeline::AddFinalCommonOptimization(EYq
TTransformationPipeline& TTransformationPipeline::AddOptimization(bool checkWorld, bool withFinalOptimization, EYqlIssueCode issueCode) {
AddCommonOptimization(issueCode);
Transformers_.push_back(TTransformStage(
- CreateRecaptureDataProposalsInspector(*TypeAnnotationContext_, TString{DqProviderName}),
+ CreateRecaptureDataProposalsInspector(*TypeAnnotationContext_, TString{DqProviderName}),
"RecaptureDataProposals",
issueCode));
Transformers_.push_back(TTransformStage(
diff --git a/ydb/library/yql/core/type_ann/type_ann_core.cpp b/ydb/library/yql/core/type_ann/type_ann_core.cpp
index 615947c1727..5846e6cb108 100644
--- a/ydb/library/yql/core/type_ann/type_ann_core.cpp
+++ b/ydb/library/yql/core/type_ann/type_ann_core.cpp
@@ -3113,11 +3113,11 @@ namespace NTypeAnnImpl {
return IGraphTransformer::TStatus::Ok;
}
- IGraphTransformer::TStatus HeadWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
- output = ctx.Expr.RenameNode(*input, "ToOptional");
- return IGraphTransformer::TStatus::Repeat;
- }
-
+ IGraphTransformer::TStatus HeadWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
+ output = ctx.Expr.RenameNode(*input, "ToOptional");
+ return IGraphTransformer::TStatus::Repeat;
+ }
+
IGraphTransformer::TStatus ToFlowWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
if (!EnsureArgsCount(*input, 1, ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
@@ -12802,7 +12802,7 @@ template <NKikimr::NUdf::EDataSlot DataSlot>
Functions["ToList"] = &ToListWrapper;
Functions["ToOptional"] = &ToOptionalWrapper;
Functions["Iterable"] = &IterableWrapper;
- Functions["Head"] = &HeadWrapper;
+ Functions["Head"] = &HeadWrapper;
Functions["Last"] = &ToOptionalWrapper;
Functions["AsTagged"] = &AsTaggedWrapper;
Functions["Untag"] = &UntagWrapper;
diff --git a/ydb/library/yql/core/yql_join.cpp b/ydb/library/yql/core/yql_join.cpp
index 8de9ef72c72..9a9ded7a327 100644
--- a/ydb/library/yql/core/yql_join.cpp
+++ b/ydb/library/yql/core/yql_join.cpp
@@ -1113,14 +1113,14 @@ TMap<TStringBuf, TVector<TStringBuf>> UpdateUsedFieldsInRenameMap(
bool needRemove = !usedFields.contains(item->GetName());
if (auto renamed = reversedRenameMap.FindPtr(item->GetName())) {
if (needRemove) {
- if (newRenameMap[*renamed].empty()) {
- newRenameMap[*renamed].push_back("");
- }
+ if (newRenameMap[*renamed].empty()) {
+ newRenameMap[*renamed].push_back("");
+ }
}
else {
- if (!newRenameMap[*renamed].empty() && newRenameMap[*renamed][0].empty()) {
- newRenameMap[*renamed].clear(); // Do not remove column because it will be renamed.
- }
+ if (!newRenameMap[*renamed].empty() && newRenameMap[*renamed][0].empty()) {
+ newRenameMap[*renamed].clear(); // Do not remove column because it will be renamed.
+ }
newRenameMap[*renamed].push_back(item->GetName());
}
}
diff --git a/ydb/library/yql/core/yql_opt_utils.cpp b/ydb/library/yql/core/yql_opt_utils.cpp
index 7a2c730cd89..69aa84a500f 100644
--- a/ydb/library/yql/core/yql_opt_utils.cpp
+++ b/ydb/library/yql/core/yql_opt_utils.cpp
@@ -1003,11 +1003,11 @@ void ExtractSimpleSortTraits(const TExprNode& sortDirections, const TExprNode& k
}
const TExprNode& SkipCallables(const TExprNode& node, const std::initializer_list<std::string_view>& skipCallables) {
- const TExprNode* p = &node;
- while (p->IsCallable(skipCallables)) {
- p = &p->Head();
- }
- return *p;
+ const TExprNode* p = &node;
+ while (p->IsCallable(skipCallables)) {
+ p = &p->Head();
+ }
+ return *p;
}
namespace {
@@ -1029,7 +1029,7 @@ TExprNode::TPtr ApplyWithCastStructForFirstArg(const TExprNode::TPtr& node, cons
auto result = ctx.NewLambda(pos, ctx.NewArguments(pos, std::move(args)), std::move(body));
return ctx.DeepCopyLambda(*result);
-}
+}
}
diff --git a/ydb/library/yql/core/yql_opt_utils.h b/ydb/library/yql/core/yql_opt_utils.h
index 3de6350b7f2..00635fea81e 100644
--- a/ydb/library/yql/core/yql_opt_utils.h
+++ b/ydb/library/yql/core/yql_opt_utils.h
@@ -82,7 +82,7 @@ TExprNode::TPtr MakeBool(TPositionHandle position, TExprContext& ctx);
TExprNode::TPtr MakeIdentityLambda(TPositionHandle position, TExprContext& ctx);
constexpr std::initializer_list<std::string_view> SkippableCallables = {"Unordered", "AssumeSorted", "AssumeUnique", "AssumeColumnOrder", "AssumeAllMembersNullableAtOnce"};
-
+
const TExprNode& SkipCallables(const TExprNode& node, const std::initializer_list<std::string_view>& skipCallables);
void ExtractSortKeyAndOrder(TPositionHandle pos, const TExprNode::TPtr& sortTraitsNode, TExprNode::TPtr& sortKey, TExprNode::TPtr& sortOrder, TExprContext& ctx);
diff --git a/ydb/library/yql/dq/common/dq_common.h b/ydb/library/yql/dq/common/dq_common.h
index d91aa3e83f3..71cc7be6cb8 100644
--- a/ydb/library/yql/dq/common/dq_common.h
+++ b/ydb/library/yql/dq/common/dq_common.h
@@ -32,10 +32,10 @@ struct TBaseDqResManEvents {
ES_GET_MASTER_RESPONSE,
ES_CONFIGURE_FAILURE_INJECTOR,
- ES_CONFIGURE_FAILURE_INJECTOR_RESPONSE,
-
- ES_QUERY_STATUS,
- ES_QUERY_STATUS_RESPONSE,
+ ES_CONFIGURE_FAILURE_INJECTOR_RESPONSE,
+
+ ES_QUERY_STATUS,
+ ES_QUERY_STATUS_RESPONSE,
ES_ROUTES,
ES_ROUTES_RESPONSE,
diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.cpp b/ydb/library/yql/dq/opt/dq_opt_phy.cpp
index 2aa8ac92a5c..cbe871059d7 100644
--- a/ydb/library/yql/dq/opt/dq_opt_phy.cpp
+++ b/ydb/library/yql/dq/opt/dq_opt_phy.cpp
@@ -382,15 +382,15 @@ TExprBase DqBuildFlatmapStage(TExprBase node, TExprContext& ctx, IOptimizationCo
.Done();
}
-template <typename BaseLMap>
-TExprBase DqPushBaseLMapToStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
+template <typename BaseLMap>
+TExprBase DqPushBaseLMapToStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
const TParentsMap& parentsMap, bool allowStageMultiUsage = true)
{
- if (!node.Maybe<BaseLMap>().Input().template Maybe<TDqCnUnionAll>()) {
+ if (!node.Maybe<BaseLMap>().Input().template Maybe<TDqCnUnionAll>()) {
return node;
}
- auto lmap = node.Cast<BaseLMap>();
+ auto lmap = node.Cast<BaseLMap>();
auto dqUnion = lmap.Input().template Cast<TDqCnUnionAll>();
if (!IsSingleConsumerConnection(dqUnion, parentsMap, allowStageMultiUsage)) {
return node;
@@ -402,11 +402,11 @@ TExprBase DqPushBaseLMapToStage(TExprBase node, TExprContext& ctx, IOptimization
auto lambda = Build<TCoLambda>(ctx, lmap.Lambda().Pos())
.Args({"stream"})
- .template Body<TCoToStream>()
- .template Input<TExprApplier>()
- .Apply(lmap.Lambda())
- .With(lmap.Lambda().Args().Arg(0), "stream")
- .Build()
+ .template Body<TCoToStream>()
+ .template Input<TExprApplier>()
+ .Apply(lmap.Lambda())
+ .With(lmap.Lambda().Args().Arg(0), "stream")
+ .Build()
.Build()
.Done();
@@ -418,18 +418,18 @@ TExprBase DqPushBaseLMapToStage(TExprBase node, TExprContext& ctx, IOptimization
return result.Cast();
}
-TExprBase DqPushOrderedLMapToStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
+TExprBase DqPushOrderedLMapToStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
const TParentsMap& parentsMap, bool allowStageMultiUsage)
-{
+{
return DqPushBaseLMapToStage<TCoOrderedLMap>(node, ctx, optCtx, parentsMap, allowStageMultiUsage);
-}
-
-TExprBase DqPushLMapToStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
+}
+
+TExprBase DqPushLMapToStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
const TParentsMap& parentsMap, bool allowStageMultiUsage)
-{
+{
return DqPushBaseLMapToStage<TCoLMap>(node, ctx, optCtx, parentsMap, allowStageMultiUsage);
-}
-
+}
+
TExprBase DqBuildExtFunctionStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
const TParentsMap& parentsMap, bool allowStageMultiUsage)
{
diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.h b/ydb/library/yql/dq/opt/dq_opt_phy.h
index 3f8b24b0c61..70a9d70082b 100644
--- a/ydb/library/yql/dq/opt/dq_opt_phy.h
+++ b/ydb/library/yql/dq/opt/dq_opt_phy.h
@@ -26,9 +26,9 @@ NNodes::TExprBase DqPushExtractMembersToStage(NNodes::TExprBase node, TExprConte
NNodes::TExprBase DqPushOrderedLMapToStage(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
const TParentsMap& parentsMap, bool allowStageMultiUsage = true);
-NNodes::TExprBase DqPushLMapToStage(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
+NNodes::TExprBase DqPushLMapToStage(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
const TParentsMap& parentsMap, bool allowStageMultiUsage = true);
-
+
NNodes::TExprBase DqBuildExtFunctionStage(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
const TParentsMap& parentsMap, bool allowStageMultiUsage = true);
diff --git a/ydb/library/yql/dq/type_ann/dq_type_ann.cpp b/ydb/library/yql/dq/type_ann/dq_type_ann.cpp
index 9c8c54bbb32..fcbd3bb9ecf 100644
--- a/ydb/library/yql/dq/type_ann/dq_type_ann.cpp
+++ b/ydb/library/yql/dq/type_ann/dq_type_ann.cpp
@@ -160,7 +160,7 @@ TStatus AnnotateStage(const TExprNode::TPtr& stage, TExprContext& ctx) {
}
THashMap<TStringBuf, THashMap<TStringBuf, const TTypeAnnotationNode*>>
-ParseJoinInputType(const TStructExprType& rowType, TStringBuf tableLabel, TExprContext& ctx, bool optional) {
+ParseJoinInputType(const TStructExprType& rowType, TStringBuf tableLabel, TExprContext& ctx, bool optional) {
THashMap<TStringBuf, THashMap<TStringBuf, const TTypeAnnotationNode*>> result;
for (auto member : rowType.GetItems()) {
TStringBuf label, column;
@@ -174,14 +174,14 @@ ParseJoinInputType(const TStructExprType& rowType, TStringBuf tableLabel, TExprC
result.clear();
return result;
}
- auto memberType = member->GetItemType();
- if (optional && !memberType->IsOptionalOrNull()) {
- memberType = ctx.MakeType<TOptionalExprType>(memberType);
- }
+ auto memberType = member->GetItemType();
+ if (optional && !memberType->IsOptionalOrNull()) {
+ memberType = ctx.MakeType<TOptionalExprType>(memberType);
+ }
if (!tableLabel.empty()) {
- result[tableLabel][member->GetName()] = memberType;
+ result[tableLabel][member->GetName()] = memberType;
} else {
- result[label][column] = memberType;
+ result[label][column] = memberType;
}
}
return result;
@@ -193,8 +193,8 @@ const TStructExprType* GetDqJoinResultType(TPositionHandle pos, const TStructExp
const TStringBuf& joinType, const TDqJoinKeyTupleList& joinKeys, TExprContext& ctx)
{
// check left
- bool isLeftOptional = IsLeftJoinSideOptional(joinType);
- auto leftType = ParseJoinInputType(leftRowType, leftLabel, ctx, isLeftOptional);
+ bool isLeftOptional = IsLeftJoinSideOptional(joinType);
+ auto leftType = ParseJoinInputType(leftRowType, leftLabel, ctx, isLeftOptional);
if (leftType.empty() && joinType != "Cross") {
TStringStream str; str << "Cannot parse left join input type: ";
leftRowType.Out(str);
@@ -203,8 +203,8 @@ const TStructExprType* GetDqJoinResultType(TPositionHandle pos, const TStructExp
}
// check right
- bool isRightOptional = IsRightJoinSideOptional(joinType);
- auto rightType = ParseJoinInputType(rightRowType, rightLabel, ctx, isRightOptional);
+ bool isRightOptional = IsRightJoinSideOptional(joinType);
+ auto rightType = ParseJoinInputType(rightRowType, rightLabel, ctx, isRightOptional);
if (rightType.empty() && joinType != "Cross") {
TStringStream str; str << "Cannot parse right join input type: ";
rightRowType.Out(str);
diff --git a/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_datasource.cpp b/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_datasource.cpp
index 2b45751ad97..e23600f80fb 100644
--- a/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_datasource.cpp
+++ b/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_datasource.cpp
@@ -76,33 +76,33 @@ public:
return &State_->Configuration->Tokens;
}
- bool GetDependencies(const TExprNode& node, TExprNode::TListType& children, bool compact) override {
- Y_UNUSED(compact);
-
- for (auto& child : node.Children()) {
- children.push_back(child.Get());
- }
-
- if (TMaybeNode<TClReadTable>(&node)) {
- return true;
- }
- return false;
- }
-
- void GetInputs(const TExprNode& node, TVector<TPinInfo>& inputs) override {
- if (auto maybeRead = TMaybeNode<TClReadTable>(&node)) {
- if (auto maybeTable = maybeRead.Table()) {
- TStringBuilder tableNameBuilder;
- if (auto dataSource = maybeRead.DataSource().Maybe<TClDataSource>()) {
- auto cluster = dataSource.Cast().Cluster();
- tableNameBuilder << cluster.Value() << ".";
- }
- tableNameBuilder << '`' << maybeTable.Cast().Value() << '`';
- inputs.push_back(TPinInfo(maybeRead.DataSource().Raw(), nullptr, maybeTable.Cast().Raw(), tableNameBuilder, false));
- }
- }
- }
-
+ bool GetDependencies(const TExprNode& node, TExprNode::TListType& children, bool compact) override {
+ Y_UNUSED(compact);
+
+ for (auto& child : node.Children()) {
+ children.push_back(child.Get());
+ }
+
+ if (TMaybeNode<TClReadTable>(&node)) {
+ return true;
+ }
+ return false;
+ }
+
+ void GetInputs(const TExprNode& node, TVector<TPinInfo>& inputs) override {
+ if (auto maybeRead = TMaybeNode<TClReadTable>(&node)) {
+ if (auto maybeTable = maybeRead.Table()) {
+ TStringBuilder tableNameBuilder;
+ if (auto dataSource = maybeRead.DataSource().Maybe<TClDataSource>()) {
+ auto cluster = dataSource.Cast().Cluster();
+ tableNameBuilder << cluster.Value() << ".";
+ }
+ tableNameBuilder << '`' << maybeTable.Cast().Value() << '`';
+ inputs.push_back(TPinInfo(maybeRead.DataSource().Raw(), nullptr, maybeTable.Cast().Raw(), tableNameBuilder, false));
+ }
+ }
+ }
+
IDqIntegration* GetDqIntegration() override {
return DqIntegration_.Get();
}
diff --git a/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp b/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp
index 8d0bddff2a5..4da4d13d034 100644
--- a/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp
+++ b/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp
@@ -29,11 +29,11 @@ TExprNode::TPtr TDqIntegrationBase::WrapRead(const TDqSettings&, const TExprNode
}
TMaybe<bool> TDqIntegrationBase::CanWrite(const TDqSettings&, const TExprNode& write, TExprContext& ctx) {
- Y_UNUSED(write);
+ Y_UNUSED(write);
Y_UNUSED(ctx);
- return Nothing();
-}
-
+ return Nothing();
+}
+
void TDqIntegrationBase::RegisterMkqlCompiler(NCommon::TMkqlCallableCompilerBase& compiler) {
Y_UNUSED(compiler);
}
diff --git a/ydb/library/yql/providers/common/provider/yql_provider.cpp b/ydb/library/yql/providers/common/provider/yql_provider.cpp
index 2a43027dfc2..aa782ee93ff 100644
--- a/ydb/library/yql/providers/common/provider/yql_provider.cpp
+++ b/ydb/library/yql/providers/common/provider/yql_provider.cpp
@@ -714,17 +714,17 @@ TString ExprToPrettyString(TExprContext& ctx, const TExprNode& expr) {
return exprText;
}
-bool IsFlowOrStream(const TExprNode* node) {
- auto kind = node->GetTypeAnn()->GetKind();
- return kind == ETypeAnnotationKind::Stream || kind == ETypeAnnotationKind::Flow;
-}
+bool IsFlowOrStream(const TExprNode* node) {
+ auto kind = node->GetTypeAnn()->GetKind();
+ return kind == ETypeAnnotationKind::Stream || kind == ETypeAnnotationKind::Flow;
+}
void WriteStream(NYson::TYsonWriter& writer, const TExprNode* node, const TExprNode* source) {
if (node == source) {
return;
}
- if (!node->IsCallable()) {
+ if (!node->IsCallable()) {
return;
}
@@ -736,7 +736,7 @@ void WriteStream(NYson::TYsonWriter& writer, const TExprNode* node, const TExprN
if (TCoApply::Match(node)) {
switch (node->GetTypeAnn()->GetKind()) {
case ETypeAnnotationKind::Stream:
- case ETypeAnnotationKind::Flow:
+ case ETypeAnnotationKind::Flow:
case ETypeAnnotationKind::List:
break;
default:
@@ -744,12 +744,12 @@ void WriteStream(NYson::TYsonWriter& writer, const TExprNode* node, const TExprN
}
for (size_t i = 1; i < node->ChildrenSize(); ++i) {
- if (IsFlowOrStream(node->Child(i))) {
+ if (IsFlowOrStream(node->Child(i))) {
applyStreamChildren.push_back(node->Child(i));
} else if (node->Child(i)->GetTypeAnn()->GetKind() == ETypeAnnotationKind::List) {
if (node->Child(i)->IsCallable("ForwardList")) {
applyStreamChildren.push_back(node->Child(i)->Child(0));
- } else if (node->Child(i)->IsCallable("Collect") && IsFlowOrStream(node->Child(i)->HeadPtr().Get())) {
+ } else if (node->Child(i)->IsCallable("Collect") && IsFlowOrStream(node->Child(i)->HeadPtr().Get())) {
applyStreamChildren.push_back(node->Child(i)->Child(0));
}
}
@@ -758,7 +758,7 @@ void WriteStream(NYson::TYsonWriter& writer, const TExprNode* node, const TExprN
WriteStream(writer, applyStreamChildren.front(), source);
}
}
- else if (!TCoExtendBase::Match(node) && node->ChildrenSize() > 0) {
+ else if (!TCoExtendBase::Match(node) && node->ChildrenSize() > 0) {
WriteStream(writer, node->Child(0), source);
}
@@ -766,7 +766,7 @@ void WriteStream(NYson::TYsonWriter& writer, const TExprNode* node, const TExprN
writer.OnBeginMap();
writer.OnKeyedItem("Name");
writer.OnStringScalar(node->Content());
- if (TCoFlatMapBase::Match(node) && IsFlowOrStream(node->ChildPtr(1).Get())) {
+ if (TCoFlatMapBase::Match(node) && IsFlowOrStream(node->ChildPtr(1).Get())) {
writer.OnKeyedItem("Children");
writer.OnBeginList();
writer.OnListItem();
@@ -799,7 +799,7 @@ void WriteStream(NYson::TYsonWriter& writer, const TExprNode* node, const TExprN
writer.OnEndList();
}
- if (TCoExtendBase::Match(node) && node->ChildrenSize() > 0) {
+ if (TCoExtendBase::Match(node) && node->ChildrenSize() > 0) {
writer.OnKeyedItem("Children");
writer.OnBeginList();
for (size_t i = 0; i < node->ChildrenSize(); ++i) {
@@ -839,23 +839,23 @@ double GetDataReplicationFactor(double factor, const TExprNode* node, const TExp
return factor;
}
- if (!node->IsCallable()) {
+ if (!node->IsCallable()) {
return factor;
}
if (TCoApply::Match(node)) {
switch (node->GetTypeAnn()->GetKind()) {
case ETypeAnnotationKind::Stream:
- case ETypeAnnotationKind::Flow:
+ case ETypeAnnotationKind::Flow:
case ETypeAnnotationKind::List: {
double applyFactor = 0.0;
for (size_t i = 1; i < node->ChildrenSize(); ++i) {
- if (IsFlowOrStream(node->Child(i))) {
+ if (IsFlowOrStream(node->Child(i))) {
applyFactor += GetDataReplicationFactor(factor, node->Child(i), stream, ctx);
} else if (node->Child(i)->GetTypeAnn()->GetKind() == ETypeAnnotationKind::List) {
if (node->Child(i)->IsCallable("ForwardList")) {
applyFactor += GetDataReplicationFactor(factor, node->Child(i)->Child(0), stream, ctx);
- } else if (node->Child(i)->IsCallable("Collect") && IsFlowOrStream(node->Child(i)->HeadPtr().Get())) {
+ } else if (node->Child(i)->IsCallable("Collect") && IsFlowOrStream(node->Child(i)->HeadPtr().Get())) {
applyFactor += GetDataReplicationFactor(factor, node->Child(i)->Child(0), stream, ctx);
}
}
@@ -869,7 +869,7 @@ double GetDataReplicationFactor(double factor, const TExprNode* node, const TExp
return factor;
}
- if (!TCoExtendBase::Match(node) && node->ChildrenSize() > 0) {
+ if (!TCoExtendBase::Match(node) && node->ChildrenSize() > 0) {
factor = GetDataReplicationFactor(factor, node->Child(0), stream, ctx);
}
@@ -921,7 +921,7 @@ double GetDataReplicationFactor(double factor, const TExprNode* node, const TExp
}
factor = Max(1.0, switchFactor);
}
- else if (TCoExtendBase::Match(node) && node->ChildrenSize() > 0) {
+ else if (TCoExtendBase::Match(node) && node->ChildrenSize() > 0) {
double extendFactor = 0.0;
for (size_t i = 0; i < node->ChildrenSize(); ++i) {
extendFactor += GetDataReplicationFactor(factor, node->Child(i), stream, ctx);
diff --git a/ydb/library/yql/providers/config/yql_config_provider.cpp b/ydb/library/yql/providers/config/yql_config_provider.cpp
index 9c8752af56d..6952e96cf07 100644
--- a/ydb/library/yql/providers/config/yql_config_provider.cpp
+++ b/ydb/library/yql/providers/config/yql_config_provider.cpp
@@ -673,10 +673,10 @@ namespace {
if (Find(Types.AvailablePureResultDataSources, DqProviderName) == Types.AvailablePureResultDataSources.end() || arg == "disable") {
; // reserved
} else if (arg == "auto") {
- Types.PureResultDataSource = DqProviderName;
+ Types.PureResultDataSource = DqProviderName;
Types.ForceDq = false;
} else if (arg == "force") {
- Types.PureResultDataSource = DqProviderName;
+ Types.PureResultDataSource = DqProviderName;
Types.ForceDq = true;
} else {
ctx.AddError(TIssue(pos, TStringBuilder() << "Expected `disable|auto|force', but got: " << args[0]));
diff --git a/ydb/library/yql/providers/dq/actors/executer_actor.cpp b/ydb/library/yql/providers/dq/actors/executer_actor.cpp
index 45df684e94b..91f1caee748 100644
--- a/ydb/library/yql/providers/dq/actors/executer_actor.cpp
+++ b/ydb/library/yql/providers/dq/actors/executer_actor.cpp
@@ -143,14 +143,14 @@ private:
int workerCount = ev->Get()->Record.GetRequest().GetTask().size();
YQL_LOG(INFO) << (TStringBuilder() << "Trying to allocate " << workerCount << " workers");
- THashMap<TString, Yql::DqsProto::TFile> files;
+ THashMap<TString, Yql::DqsProto::TFile> files;
TVector<NDqProto::TDqTask> tasks;
for (auto& task : *ev->Get()->Record.MutableRequest()->MutableTask()) {
Yql::DqsProto::TTaskMeta taskMeta;
task.GetMeta().UnpackTo(&taskMeta);
for (const auto& f : taskMeta.GetFiles()) {
- files.emplace(f.GetObjectId(), f);
+ files.emplace(f.GetObjectId(), f);
}
if (ev->Get()->Record.GetRequest().GetSecureParams().size() > 0) {
@@ -189,7 +189,7 @@ private:
if (enableComputeActor) {
ActorIdToProto(ControlId, allocateRequest->Record.MutableResultActorId());
}
- for (const auto& [_, f] : files) {
+ for (const auto& [_, f] : files) {
*allocateRequest->Record.AddFiles() = f;
}
@@ -199,7 +199,7 @@ private:
Yql::DqsProto::TWorkerFilter pragmaFilter = GetPragmaFilter();
- for (const auto& task : tasks) {
+ for (const auto& task : tasks) {
Yql::DqsProto::TTaskMeta taskMeta;
task.GetMeta().UnpackTo(&taskMeta);
@@ -212,8 +212,8 @@ private:
}
MergeFilter(filter, pragmaFilter);
- }
-
+ }
+
StartCounter("AllocateWorkers");
TActivationContext::Send(new IEventHandle(
diff --git a/ydb/library/yql/providers/dq/actors/resource_allocator.cpp b/ydb/library/yql/providers/dq/actors/resource_allocator.cpp
index 33aee0caeeb..6acfb1277e0 100644
--- a/ydb/library/yql/providers/dq/actors/resource_allocator.cpp
+++ b/ydb/library/yql/providers/dq/actors/resource_allocator.cpp
@@ -64,12 +64,12 @@ public:
, RetryCounter(counters->GetSubgroup("component", "ServiceProxyActor")->GetCounter("RetryCreateActor", /*derivative=*/ true))
, Tasks(tasks)
, ComputeActorType(computeActorType)
- {
- AllocatedWorkers.resize(workerCount);
+ {
+ AllocatedWorkers.resize(workerCount);
if (!Tasks.empty()) {
Y_VERIFY(workerCount == Tasks.size());
}
- }
+ }
private:
STRICT_STFUNC(Handle, {
@@ -177,17 +177,17 @@ private:
YQL_ENSURE(RequestedNodes.contains(cookie));
auto& requestedNode = RequestedNodes[cookie];
if (!requestedNode.RequestedFlag) {
- TDqResourceId resourceId{};
- resourceId.Data = cookie;
- AllocatedWorkers[resourceId.u3 - 1] = ev->Get()->Record.GetWorkers();
- AllocatedCount++;
+ TDqResourceId resourceId{};
+ resourceId.Data = cookie;
+ AllocatedWorkers[resourceId.u3 - 1] = ev->Get()->Record.GetWorkers();
+ AllocatedCount++;
requestedNode.RequestedFlag = true;
auto delta = TInstant::Now() - requestedNode.StartTime;
// catched and grpc_service
QueryStat.AddCounter(QueryStat.GetCounterName("Actor", {{"ClusterName", requestedNode.ClusterName}}, "ActorCreateTime"), delta);
}
- if (AllocatedCount == RequestedCount) {
+ if (AllocatedCount == RequestedCount) {
TVector<NActors::TActorId> workerIds;
for (auto& group : AllocatedWorkers) {
for (const auto& actorIdProto : group.GetWorkerActor()) {
@@ -199,15 +199,15 @@ private:
QueryStat.FlushCounters(response->Record);
auto* workerGroup = response->Record.MutableWorkers();
TVector<Yql::DqsProto::TWorkerInfo> workers;
- workers.resize(AllocatedCount);
- for (const auto& [resourceId, requestInfo] : RequestedNodes) {
- TDqResourceId dqResourceId{};
- dqResourceId.Data = resourceId;
- workers[dqResourceId.u3 - 1] = requestInfo.WorkerInfo;
+ workers.resize(AllocatedCount);
+ for (const auto& [resourceId, requestInfo] : RequestedNodes) {
+ TDqResourceId dqResourceId{};
+ dqResourceId.Data = resourceId;
+ workers[dqResourceId.u3 - 1] = requestInfo.WorkerInfo;
+ }
+ for (const auto& worker : workers) {
+ *workerGroup->AddWorker() = worker;
}
- for (const auto& worker : workers) {
- *workerGroup->AddWorker() = worker;
- }
Send(SenderId, response.Release());
Answered = true;
}
@@ -306,7 +306,7 @@ private:
bool LocalMode = false;
TVector<NDqProto::TWorkerGroup> AllocatedWorkers;
- ui32 AllocatedCount = 0;
+ ui32 AllocatedCount = 0;
bool FailState = false;
bool Answered = false;
diff --git a/ydb/library/yql/providers/dq/api/grpc/api.proto b/ydb/library/yql/providers/dq/api/grpc/api.proto
index d2baab0c502..ef620dca4f4 100644
--- a/ydb/library/yql/providers/dq/api/grpc/api.proto
+++ b/ydb/library/yql/providers/dq/api/grpc/api.proto
@@ -14,7 +14,7 @@ service DqService {
rpc PingSession (PingSessionRequest) returns (PingSessionResponse);
rpc ClusterStatus (ClusterStatusRequest) returns (ClusterStatusResponse);
rpc OperationStop (OperationStopRequest) returns (OperationStopResponse);
- rpc QueryStatus (QueryStatusRequest) returns (QueryStatusResponse);
+ rpc QueryStatus (QueryStatusRequest) returns (QueryStatusResponse);
rpc JobStop (JobStopRequest) returns (JobStopResponse);
rpc GetMaster (GetMasterRequest) returns (GetMasterResponse);
rpc ConfigureFailureInjector (ConfigureFailureInjectorRequest) returns (ConfigureFailureInjectorResponse);
diff --git a/ydb/library/yql/providers/dq/api/protos/dqs.proto b/ydb/library/yql/providers/dq/api/protos/dqs.proto
index bc336df50a3..09aecfcf923 100644
--- a/ydb/library/yql/providers/dq/api/protos/dqs.proto
+++ b/ydb/library/yql/providers/dq/api/protos/dqs.proto
@@ -25,7 +25,7 @@ message TAllocateWorkersRequest {
// repeated Yql.DqsProto.DatabaseDescription Databases = 7; // unused
string User = 8;
-
+
repeated Yql.DqsProto.TWorkerFilter WorkerFilterPerTask = 9;
uint32 WorkersCount = 10;
@@ -109,15 +109,15 @@ message TEvOperationStop {
message TEvOperationStopResponse {
}
-message TEvQueryStatus {
- Yql.DqsProto.QueryStatusRequest Request = 1;
- bool IsForwarded = 2;
-}
-
-message TEvQueryStatusResponse {
- Yql.DqsProto.QueryStatusResponse Response = 1;
-}
-
+message TEvQueryStatus {
+ Yql.DqsProto.QueryStatusRequest Request = 1;
+ bool IsForwarded = 2;
+}
+
+message TEvQueryStatusResponse {
+ Yql.DqsProto.QueryStatusResponse Response = 1;
+}
+
message TEvIsReady {
Yql.DqsProto.IsReadyRequest Request = 1;
bool IsForwarded = 2;
diff --git a/ydb/library/yql/providers/dq/api/protos/service.proto b/ydb/library/yql/providers/dq/api/protos/service.proto
index 35430d86e55..b91351e5d12 100644
--- a/ydb/library/yql/providers/dq/api/protos/service.proto
+++ b/ydb/library/yql/providers/dq/api/protos/service.proto
@@ -68,8 +68,8 @@ message TWorkerFilter {
repeated string Address = 5;
repeated uint64 NodeId = 6;
repeated uint64 NodeIdHint = 7;
-}
-
+}
+
message ExecuteGraphRequest {
Ydb.Operations.OperationParams Params = 1;
repeated NYql.NDqProto.TDqTask Task = 3;
@@ -118,14 +118,14 @@ message PingSessionRequest {
message PingSessionResponse {
}
-message QueryStatusRequest {
- string Session = 1;
-}
-
-message QueryStatusResponse {
- string Status = 1;
-}
-
+message QueryStatusRequest {
+ string Session = 1;
+}
+
+message QueryStatusResponse {
+ string Status = 1;
+}
+
message TAttribute {
string Key = 1;
string Value = 2;
diff --git a/ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.json b/ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.json
index 04e920802de..566f8d05032 100644
--- a/ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.json
+++ b/ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.json
@@ -25,16 +25,16 @@
"Name": "TDqReadWideWrap",
"Base": "TDqReadWrapBase",
"Match": {"Type": "Callable", "Name": "DqReadWideWrap"}
- },
- {
- "Name": "TDqWrite",
- "Base": "TCallable",
- "Match": {"Type": "Callable", "Name": "DqWrite"},
- "Children": [
- {"Index": 0, "Name": "Input", "Type": "TExprBase"},
- {"Index": 1, "Name": "Provider", "Type": "TCoAtom"},
- {"Index": 2, "Name": "Settings", "Type": "TCoNameValueTupleList", "Optional": true}
- ]
+ },
+ {
+ "Name": "TDqWrite",
+ "Base": "TCallable",
+ "Match": {"Type": "Callable", "Name": "DqWrite"},
+ "Children": [
+ {"Index": 0, "Name": "Input", "Type": "TExprBase"},
+ {"Index": 1, "Name": "Provider", "Type": "TCoAtom"},
+ {"Index": 2, "Name": "Settings", "Type": "TCoNameValueTupleList", "Optional": true}
+ ]
},
{
"Name": "TDqSourceWrapBase",
diff --git a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp
index 61fac27751d..858e3da45a0 100644
--- a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp
+++ b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp
@@ -208,10 +208,10 @@ protected:
}
template <bool IsGlobal>
- TMaybeNode<TExprBase> PushLMapToStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) {
+ TMaybeNode<TExprBase> PushLMapToStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) {
return DqPushLMapToStage(node, ctx, optCtx, *getParents(), IsGlobal);
- }
-
+ }
+
template <bool IsGlobal>
TMaybeNode<TExprBase> BuildExtFunctionStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) {
return DqBuildExtFunctionStage(node, ctx, optCtx, *getParents(), IsGlobal);
diff --git a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp
index 8a52e9f8688..8759f442219 100644
--- a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp
+++ b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp
@@ -576,7 +576,7 @@ private:
// TODO: change result provider to remove this if
ui32 inMemoryIndex;
for (inMemoryIndex = 0; inMemoryIndex < resFill->ChildrenSize(); ++inMemoryIndex) {
- if (resFill->ChildPtr(inMemoryIndex)->IsAtom(DqProviderName)) {
+ if (resFill->ChildPtr(inMemoryIndex)->IsAtom(DqProviderName)) {
break;
}
}
@@ -658,22 +658,22 @@ private:
THashMap<ui32, ui32> allPublicIds;
bool hasStageError = false;
- VisitExpr(result.Ptr(), [&](const TExprNode::TPtr& node) {
+ VisitExpr(result.Ptr(), [&](const TExprNode::TPtr& node) {
const TExprBase expr(node);
if (expr.Maybe<TResFill>()) {
- if (auto publicId = State->TypeCtx->TranslateOperationId(node->UniqueId())) {
+ if (auto publicId = State->TypeCtx->TranslateOperationId(node->UniqueId())) {
allPublicIds.emplace(*publicId, 0U);
- }
- }
- return true;
- });
-
+ }
+ }
+ return true;
+ });
+
if (hasStageError) {
return SyncError();
}
- IDqGateway::TDqProgressWriter progressWriter = MakeDqProgressWriter(allPublicIds);
-
+ IDqGateway::TDqProgressWriter progressWriter = MakeDqProgressWriter(allPublicIds);
+
auto executionPlanner = THolder<IDqsExecutionPlanner>(new TDqsSingleExecutionPlanner(lambda, NActors::TActorId(), NActors::TActorId(1, 0, 1, 0), State->FunctionRegistry, result.Input().Ref().GetTypeAnn()));
auto& tasks = executionPlanner->GetTasks();
Yql::DqsProto::TTaskMeta taskMeta;
@@ -856,7 +856,7 @@ private:
if (const TExprBase expr(node); expr.Maybe<TDqConnection>()) {
if (const auto publicId = State->TypeCtx->TranslateOperationId(node->UniqueId())) {
allPublicIds.emplace(*publicId, 0U);
- }
+ }
} else if (const auto& maybeStage = expr.Maybe<TDqStage>()) {
const auto& stage = maybeStage.Cast();
if (!(stage.Ref().StartsExecution() || stage.Ref().HasResult())) {
@@ -881,13 +881,13 @@ private:
}
auto optimizedInput = pull.Input().Ptr();
- THashMap<TString, TString> secureParams;
- NCommon::FillSecureParams(optimizedInput, *State->TypeCtx, secureParams);
-
- optimizedInput = ctx.ShallowCopy(*optimizedInput);
+ THashMap<TString, TString> secureParams;
+ NCommon::FillSecureParams(optimizedInput, *State->TypeCtx, secureParams);
+
+ optimizedInput = ctx.ShallowCopy(*optimizedInput);
optimizedInput->SetTypeAnn(pull.Input().Ref().GetTypeAnn());
optimizedInput->CopyConstraints(pull.Input().Ref());
-
+
TDqsPipelineConfigurator peepholeConfig;
TPeepholeSettings peepholeSettings;
peepholeSettings.CommonConfig = &peepholeConfig;
@@ -991,9 +991,9 @@ private:
Yql::DqsProto::TTaskMeta taskMeta;
t.MutableMeta()->UnpackTo(&taskMeta);
- for (const auto& file : uploadList) {
+ for (const auto& file : uploadList) {
*taskMeta.AddFiles() = file;
- }
+ }
t.MutableMeta()->PackFrom(taskMeta);
if (const auto it = allPublicIds.find(taskMeta.GetStageId()); allPublicIds.cend() != it)
++it->second;
@@ -1001,7 +1001,7 @@ private:
}
MarkProgressStarted(allPublicIds, State->ProgressWriter);
-
+
if (fallbackFlag) {
return FallbackWithMessage(pull.Ref(), "Too big attachment", ctx);
}
@@ -1039,17 +1039,17 @@ private:
settings->_RowsLimitPerWrite = 0;
}
- IDqGateway::TDqProgressWriter progressWriter = MakeDqProgressWriter(allPublicIds);
+ IDqGateway::TDqProgressWriter progressWriter = MakeDqProgressWriter(allPublicIds);
auto future = State->DqGateway->ExecutePlan(State->SessionId, *executionPlanner.Get(), columns, secureParams, graphParams,
settings, progressWriter, ModulesMapping, fillSettings.Discard);
-
+
future.Subscribe([allPublicIds, progressWriter = State->ProgressWriter](const NThreading::TFuture<IDqGateway::TResult>& completedFuture) {
- YQL_ENSURE(!completedFuture.HasException());
+ YQL_ENSURE(!completedFuture.HasException());
MarkProgressFinished(allPublicIds, completedFuture.GetValueSync().Success(), progressWriter);
- });
+ });
executionPlanner.Destroy();
-
+
int level = 0;
// TODO: remove copy-paste
@@ -1173,18 +1173,18 @@ private:
IDqGateway::TDqProgressWriter MakeDqProgressWriter(const THashMap<ui32, ui32>& allPublicIds) const {
IDqGateway::TDqProgressWriter dqProgressWriter = [progressWriter = State->ProgressWriter, allPublicIds](const TString& stage) {
- for (const auto& publicId : allPublicIds) {
+ for (const auto& publicId : allPublicIds) {
auto p = TOperationProgress(TString(DqProviderName), publicId.first, TOperationProgress::EState::InProgress, stage);
if (publicId.second) {
p.Counters.ConstructInPlace();
p.Counters->Running = p.Counters->Total = publicId.second;
}
progressWriter(p);
- }
- };
- return dqProgressWriter;
- }
-
+ }
+ };
+ return dqProgressWriter;
+ }
+
static void MarkProgressStarted(const THashMap<ui32, ui32>& allPublicIds, const TOperationProgressWriter& progressWriter) {
for(const auto& publicId : allPublicIds) {
auto p = TOperationProgress(TString(DqProviderName), publicId.first, TOperationProgress::EState::InProgress);
@@ -1193,21 +1193,21 @@ private:
p.Counters->Running = p.Counters->Total = publicId.second;
}
progressWriter(p);
- }
- }
-
+ }
+ }
+
static void MarkProgressFinished(const THashMap<ui32, ui32>& allPublicIds, bool success, const TOperationProgressWriter& progressWriter) {
for(const auto& publicId : allPublicIds) {
- auto state = success ? TOperationProgress::EState::Finished : TOperationProgress::EState::Failed;
+ auto state = success ? TOperationProgress::EState::Finished : TOperationProgress::EState::Failed;
auto p = TOperationProgress(TString(DqProviderName), publicId.first, state);
if (publicId.second) {
p.Counters.ConstructInPlace();
(success ? p.Counters->Completed : p.Counters->Failed) = p.Counters->Total = publicId.second;
}
progressWriter(p);
- }
- }
-
+ }
+ }
+
void FlushStatisticsToState() {
TOperationStatistics statistics;
FlushCounters(statistics);
diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp
index b034fcb8545..a8b5e598a08 100644
--- a/ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp
+++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp
@@ -23,14 +23,14 @@ namespace NYql {
using namespace NNodes;
-class TDqDataProviderSink: public TDataProviderBase {
+class TDqDataProviderSink: public TDataProviderBase {
public:
- TDqDataProviderSink(const TDqStatePtr& state)
+ TDqDataProviderSink(const TDqStatePtr& state)
: State(state)
, LogOptTransformer([state] () { return CreateDqsLogOptTransformer(/*TODO: State->TypeCtx);*/nullptr, state->Settings); })
, PhyOptTransformer([] () { return CreateDqsPhyOptTransformer(/*TODO: State->TypeCtx*/nullptr); })
, PhysicalFinalizingTransformer([] () { return CreateDqsFinalizingOptTransformer(); })
- , TypeAnnotationTransformer([state] () { return CreateDqsDataSinkTypeAnnotationTransformer(state->TypeCtx); })
+ , TypeAnnotationTransformer([state] () { return CreateDqsDataSinkTypeAnnotationTransformer(state->TypeCtx); })
, RecaptureTransformer([state] () { return CreateDqsRecaptureTransformer(state); })
{ }
@@ -157,7 +157,7 @@ public:
return false;
}
- if (node.Child(0)->Content() == DqProviderName) {
+ if (node.Child(0)->Content() == DqProviderName) {
if (node.ChildrenSize() == 2) {
if (!EnsureAtom(*node.Child(1), ctx)) {
return false;
@@ -203,26 +203,26 @@ public:
}
TStringBuf GetName() const override {
- return DqProviderName;
+ return DqProviderName;
}
- bool GetDependencies(const TExprNode& node, TExprNode::TListType& children, bool compact) override {
- Y_UNUSED(compact);
-
- if (TDqConnection::Match(&node)) {
- children.push_back(node.ChildPtr(TDqConnection::idx_Output)->ChildPtr(TDqOutput::idx_Stage));
+ bool GetDependencies(const TExprNode& node, TExprNode::TListType& children, bool compact) override {
+ Y_UNUSED(compact);
+
+ if (TDqConnection::Match(&node)) {
+ children.push_back(node.ChildPtr(TDqConnection::idx_Output)->ChildPtr(TDqOutput::idx_Stage));
return false;
- }
-
- if (TDqStageBase::Match(&node)) {
- auto inputs = node.ChildPtr(TDqStageBase::idx_Inputs);
- for (size_t i = 0; i < inputs->ChildrenSize(); ++i) {
- children.push_back(inputs->ChildPtr(i));
- }
- ScanPlanDependencies(node.ChildPtr(TDqStageBase::idx_Program), children);
- return true;
- }
-
+ }
+
+ if (TDqStageBase::Match(&node)) {
+ auto inputs = node.ChildPtr(TDqStageBase::idx_Inputs);
+ for (size_t i = 0; i < inputs->ChildrenSize(); ++i) {
+ children.push_back(inputs->ChildPtr(i));
+ }
+ ScanPlanDependencies(node.ChildPtr(TDqStageBase::idx_Program), children);
+ return true;
+ }
+
if (TDqQuery::Match(&node)) {
auto stagesList = node.ChildPtr(TDqQuery::idx_SinkStages);
for (size_t i = 0; i < stagesList->ChildrenSize(); ++i) {
@@ -231,40 +231,40 @@ public:
return true;
}
- return false;
- }
-
- void ScanPlanDependencies(const TExprNode::TPtr& input, TExprNode::TListType& children) {
- VisitExpr(input, [&children](const TExprNode::TPtr& node) {
- if (TMaybeNode<TDqReadWrapBase>(node)) {
- children.push_back(node->ChildPtr(TDqReadWrapBase::idx_Input));
- return false;
- }
- return true;
- });
- }
-
- TString GetOperationDisplayName(const TExprNode& node) override {
- if (auto maybeStage = TMaybeNode<TDqStageBase>(&node)) {
- TStringBuilder builder;
- builder << TPlanFormatterBase::GetOperationDisplayName(node);
- if (auto publicId = State->TypeCtx->TranslateOperationId(maybeStage.Raw()->UniqueId())) {
- builder << " #" << publicId;
- }
- return builder;
- }
- return TPlanFormatterBase::GetOperationDisplayName(node);
- }
-
+ return false;
+ }
+
+ void ScanPlanDependencies(const TExprNode::TPtr& input, TExprNode::TListType& children) {
+ VisitExpr(input, [&children](const TExprNode::TPtr& node) {
+ if (TMaybeNode<TDqReadWrapBase>(node)) {
+ children.push_back(node->ChildPtr(TDqReadWrapBase::idx_Input));
+ return false;
+ }
+ return true;
+ });
+ }
+
+ TString GetOperationDisplayName(const TExprNode& node) override {
+ if (auto maybeStage = TMaybeNode<TDqStageBase>(&node)) {
+ TStringBuilder builder;
+ builder << TPlanFormatterBase::GetOperationDisplayName(node);
+ if (auto publicId = State->TypeCtx->TranslateOperationId(maybeStage.Raw()->UniqueId())) {
+ builder << " #" << publicId;
+ }
+ return builder;
+ }
+ return TPlanFormatterBase::GetOperationDisplayName(node);
+ }
+
void WritePlanDetails(const TExprNode& node, NYson::TYsonWriter& writer) override {
- if (auto maybeStage = TMaybeNode<TDqStageBase>(&node)) {
- writer.OnKeyedItem("Streams");
- writer.OnBeginMap();
- NCommon::WriteStreams(writer, "Program", maybeStage.Cast().Program());
- writer.OnEndMap();
- }
- }
-
+ if (auto maybeStage = TMaybeNode<TDqStageBase>(&node)) {
+ writer.OnKeyedItem("Streams");
+ writer.OnBeginMap();
+ NCommon::WriteStreams(writer, "Program", maybeStage.Cast().Program());
+ writer.OnEndMap();
+ }
+ }
+
TDqStatePtr State;
TLazyInitHolder<IGraphTransformer> LogOptTransformer;
@@ -274,8 +274,8 @@ public:
TLazyInitHolder<IGraphTransformer> RecaptureTransformer;
};
-TIntrusivePtr<IDataProvider> CreateDqDataSink(const TDqStatePtr& state) {
- return new TDqDataProviderSink(state);
+TIntrusivePtr<IDataProvider> CreateDqDataSink(const TDqStatePtr& state) {
+ return new TDqDataProviderSink(state);
}
} // namespace NYql
diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasink.h b/ydb/library/yql/providers/dq/provider/yql_dq_datasink.h
index 65838d49bf6..d9849824a78 100644
--- a/ydb/library/yql/providers/dq/provider/yql_dq_datasink.h
+++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasink.h
@@ -6,5 +6,5 @@ namespace NYql {
struct TDqState;
using TDqStatePtr = TIntrusivePtr<TDqState>;
- TIntrusivePtr<IDataProvider> CreateDqDataSink(const TDqStatePtr& state);
+ TIntrusivePtr<IDataProvider> CreateDqDataSink(const TDqStatePtr& state);
} // namespace NYql
diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.cpp
index 61fdcbb0a47..c8d9786a7a0 100644
--- a/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.cpp
+++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.cpp
@@ -13,8 +13,8 @@ namespace {
class TDqsDataSinkTypeAnnotationTransformer : public TVisitorTransformerBase {
public:
- TDqsDataSinkTypeAnnotationTransformer(TTypeAnnotationContext* typeCtx)
- : TVisitorTransformerBase(true), TypeCtx(typeCtx)
+ TDqsDataSinkTypeAnnotationTransformer(TTypeAnnotationContext* typeCtx)
+ : TVisitorTransformerBase(true), TypeCtx(typeCtx)
{
AddHandler({TDqStage::CallableName()}, Hndl(&NDq::AnnotateDqStage));
AddHandler({TDqPhyStage::CallableName()}, Hndl(&NDq::AnnotateDqPhyStage));
@@ -32,47 +32,47 @@ public:
AddHandler({TDqPhyCrossJoin::CallableName()}, Hndl(&NDq::AnnotateDqCrossJoin));
AddHandler({TDqPhyJoinDict::CallableName()}, Hndl(&NDq::AnnotateDqMapOrDictJoin));
AddHandler({TDqSink::CallableName()}, Hndl(&NDq::AnnotateDqSink));
- AddHandler({TDqWrite::CallableName()}, Hndl(&TDqsDataSinkTypeAnnotationTransformer::AnnotateDqWrite));
+ AddHandler({TDqWrite::CallableName()}, Hndl(&TDqsDataSinkTypeAnnotationTransformer::AnnotateDqWrite));
AddHandler({TDqQuery::CallableName()}, Hndl(&NDq::AnnotateDqQuery));
}
-
-private:
+
+private:
TStatus AnnotateDqWrite(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
- if (!EnsureMinArgsCount(*input, 2, ctx)) {
- return TStatus::Error;
- }
-
- if (!EnsureMaxArgsCount(*input, 3, ctx)) {
- return TStatus::Error;
- }
-
+ if (!EnsureMinArgsCount(*input, 2, ctx)) {
+ return TStatus::Error;
+ }
+
+ if (!EnsureMaxArgsCount(*input, 3, ctx)) {
+ return TStatus::Error;
+ }
+
if (!EnsureNewSeqType<false, false, true>(input->Head(), ctx)){
- return TStatus::Error;
- }
-
- if (!EnsureAtom(*input->Child(1), ctx)) {
- return TStatus::Error;
- }
-
+ return TStatus::Error;
+ }
+
+ if (!EnsureAtom(*input->Child(1), ctx)) {
+ return TStatus::Error;
+ }
+
auto providerName = TString(input->Child(1)->Content());
-
+
if (!TypeCtx->DataSinkMap.FindPtr(providerName)) {
- ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), TStringBuilder() << "No datasink defined for provider name " << providerName));
- return TStatus::Error;
- }
-
+ ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), TStringBuilder() << "No datasink defined for provider name " << providerName));
+ return TStatus::Error;
+ }
+
providerName.front() = std::toupper(providerName.front());
output = ctx.NewCallable(input->Pos(), providerName += TDqWrite::CallableName(), {input->HeadPtr(), input->TailPtr()});
return TStatus::Repeat;
- }
-
- TTypeAnnotationContext* TypeCtx;
+ }
+
+ TTypeAnnotationContext* TypeCtx;
};
} // unnamed
-THolder<TVisitorTransformerBase> CreateDqsDataSinkTypeAnnotationTransformer(TTypeAnnotationContext* typeCtx) {
- return THolder(new TDqsDataSinkTypeAnnotationTransformer(typeCtx));
+THolder<TVisitorTransformerBase> CreateDqsDataSinkTypeAnnotationTransformer(TTypeAnnotationContext* typeCtx) {
+ return THolder(new TDqsDataSinkTypeAnnotationTransformer(typeCtx));
}
} // NYql
diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.h b/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.h
index 5780b22d2b6..df86def2cd4 100644
--- a/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.h
+++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.h
@@ -7,6 +7,6 @@
namespace NYql {
-THolder<TVisitorTransformerBase> CreateDqsDataSinkTypeAnnotationTransformer(TTypeAnnotationContext* typeCtx);
+THolder<TVisitorTransformerBase> CreateDqsDataSinkTypeAnnotationTransformer(TTypeAnnotationContext* typeCtx);
} // NYql
diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasource.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_datasource.cpp
index f1949cf3f73..aea55435de9 100644
--- a/ydb/library/yql/providers/dq/provider/yql_dq_datasource.cpp
+++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasource.cpp
@@ -34,19 +34,19 @@ using namespace NKikimr::NMiniKQL;
using namespace NNodes;
using namespace NDq;
-class TDqDataProviderSource: public TDataProviderBase {
+class TDqDataProviderSource: public TDataProviderBase {
public:
TDqDataProviderSource(const TDqStatePtr& state, TExecTransformerFactory execTransformerFactory)
: State(state)
, ConfigurationTransformer([this]() {
- return MakeHolder<NCommon::TProviderConfigurationTransformer>(State->Settings, *State->TypeCtx, TString{DqProviderName});
+ return MakeHolder<NCommon::TProviderConfigurationTransformer>(State->Settings, *State->TypeCtx, TString{DqProviderName});
})
, ExecTransformer([this, execTransformerFactory] () { return THolder<IGraphTransformer>(execTransformerFactory(State)); })
, TypeAnnotationTransformer([] () { return CreateDqsDataSourceTypeAnnotationTransformer(); })
{ }
TStringBuf GetName() const override {
- return DqProviderName;
+ return DqProviderName;
}
IGraphTransformer& GetTypeAnnotationTransformer(bool instantOnly) override {
@@ -141,7 +141,7 @@ public:
return false;
}
- if (node.Child(0)->Content() == DqProviderName) {
+ if (node.Child(0)->Content() == DqProviderName) {
if (node.ChildrenSize() == 2) {
if (!EnsureAtom(*node.Child(1), ctx)) {
return false;
diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasource_type_ann.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_datasource_type_ann.cpp
index be5e9bad10c..6e03017ccb1 100644
--- a/ydb/library/yql/providers/dq/provider/yql_dq_datasource_type_ann.cpp
+++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasource_type_ann.cpp
@@ -146,12 +146,12 @@ private:
return TStatus::Error;
}
- if (!EnsureSpecificDataSource(*input->Child(TCoConfigure::idx_DataSource), DqProviderName, ctx)) {
+ if (!EnsureSpecificDataSource(*input->Child(TCoConfigure::idx_DataSource), DqProviderName, ctx)) {
return TStatus::Error;
}
input->SetTypeAnn(input->Child(TCoConfigure::idx_World)->GetTypeAnn());
-
+
return TStatus::Ok;
}
};
diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp
index 371820fb95f..2341b28ad44 100644
--- a/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp
+++ b/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp
@@ -13,10 +13,10 @@
#include <library/cpp/yson/node/node_io.h>
#include <library/cpp/threading/task_scheduler/task_scheduler.h>
-#include <util/system/thread.h>
+#include <util/system/thread.h>
+
+#include <utility>
-#include <utility>
-
namespace NYql {
class TDqGateway: public IDqGateway
@@ -36,10 +36,10 @@ public:
, TaskScheduler(threads)
, RtTaskScheduler(1)
, OpenSessionTimeout(timeout)
- {
- TaskScheduler.Start();
+ {
+ TaskScheduler.Start();
RtTaskScheduler.Start();
- }
+ }
TString GetVanillaJobPath() override {
return VanillaJobPath;
@@ -54,12 +54,12 @@ public:
{
YQL_LOG_CTX_SCOPE(sessionId);
YQL_CLOG(TRACE, ProviderDq) << "TDqGateway::callback";
-
- {
- TGuard<TMutex> lock(ProgressMutex);
- RunningQueries.erase(sessionId);
- }
-
+
+ {
+ TGuard<TMutex> lock(ProgressMutex);
+ RunningQueries.erase(sessionId);
+ }
+
TResult result;
bool error = false;
@@ -176,15 +176,15 @@ public:
Y_VERIFY(TaskScheduler.Add(MakeIntrusive<TDelay>(promise), TInstant()));
}
- template <typename TResponse, typename TRequest, typename TStub>
- NThreading::TFuture<TResult> WithRetry(
- const TString& sessionId,
- const TRequest& queryPB,
- TStub stub,
- int retry,
+ template <typename TResponse, typename TRequest, typename TStub>
+ NThreading::TFuture<TResult> WithRetry(
+ const TString& sessionId,
+ const TRequest& queryPB,
+ TStub stub,
+ int retry,
const TDqSettings::TPtr& settings,
const THashMap<TString, TString>& modulesMapping
- ) {
+ ) {
auto backoff = TDuration::MilliSeconds(settings->RetryBackoffMs.Get().GetOrElse(1000));
auto promise = NThreading::NewPromise<TResult>();
auto fallbackPolicy = settings->FallbackPolicy.Get().GetOrElse("default");
@@ -193,7 +193,7 @@ public:
return OnResponse(std::move(promise), std::move(sessionId), std::move(status), std::move(resp), modulesMapping, alwaysFallback);
};
- Service->DoRequest<TRequest, TResponse>(queryPB, callback, stub);
+ Service->DoRequest<TRequest, TResponse>(queryPB, callback, stub);
{
TGuard<TMutex> lock(ProgressMutex);
@@ -204,9 +204,9 @@ public:
}
} else {
return NThreading::MakeFuture(TResult());
- }
- }
-
+ }
+ }
+
return promise.GetFuture().Apply([=](const NThreading::TFuture<TResult>& result) {
if (result.HasException()) {
return result;
@@ -288,7 +288,7 @@ public:
queryPB,
&Yql::DqsProto::DqService::Stub::AsyncExecuteGraph,
retry,
- settings,
+ settings,
modulesMapping);
}
@@ -339,14 +339,14 @@ public:
request, callback, &Yql::DqsProto::DqService::Stub::AsyncCloseSession);
}
- void RequestQueryStatus(const TString& sessionId) {
- Yql::DqsProto::QueryStatusRequest request;
- request.SetSession(sessionId);
+ void RequestQueryStatus(const TString& sessionId) {
+ Yql::DqsProto::QueryStatusRequest request;
+ request.SetSession(sessionId);
IDqGateway::TPtr self = this;
auto callback = [this, self, sessionId](NGrpc::TGrpcStatus&& status, Yql::DqsProto::QueryStatusResponse&& resp) {
- if (status.Ok()) {
+ if (status.Ok()) {
TGuard<TMutex> lock(ProgressMutex);
- TString stage;
+ TString stage;
TDqProgressWriter* dqProgressWriter = nullptr;
auto it = RunningQueries.find(sessionId);
if (it != RunningQueries.end()) {
@@ -356,38 +356,38 @@ public:
stage = resp.GetStatus();
it->second.second = stage;
}
-
+
ScheduleQueryStatusRequest(sessionId);
- }
-
- if (!stage.empty() && dqProgressWriter) {
- (*dqProgressWriter)(stage);
- }
- } else {
- TGuard<TMutex> lock(ProgressMutex);
- RunningQueries.erase(sessionId);
- }
- };
-
- Service->DoRequest<Yql::DqsProto::QueryStatusRequest, Yql::DqsProto::QueryStatusResponse>(
- request, callback, &Yql::DqsProto::DqService::Stub::AsyncQueryStatus, {}, nullptr);
- }
-
- void ScheduleQueryStatusRequest(const TString& sessionId) {
+ }
+
+ if (!stage.empty() && dqProgressWriter) {
+ (*dqProgressWriter)(stage);
+ }
+ } else {
+ TGuard<TMutex> lock(ProgressMutex);
+ RunningQueries.erase(sessionId);
+ }
+ };
+
+ Service->DoRequest<Yql::DqsProto::QueryStatusRequest, Yql::DqsProto::QueryStatusResponse>(
+ request, callback, &Yql::DqsProto::DqService::Stub::AsyncQueryStatus, {}, nullptr);
+ }
+
+ void ScheduleQueryStatusRequest(const TString& sessionId) {
Delay(TDuration::MilliSeconds(1000)).Subscribe([this, sessionId](NThreading::TFuture<void> fut) {
- if (fut.HasException()) {
- TGuard<TMutex> lock(ProgressMutex);
- RunningQueries.erase(sessionId);
- } else {
- TGuard<TMutex> lock(ProgressMutex);
- auto it = RunningQueries.find(sessionId);
- if (it != RunningQueries.end()) {
- RequestQueryStatus(sessionId);
- }
- }
- });
- }
-
+ if (fut.HasException()) {
+ TGuard<TMutex> lock(ProgressMutex);
+ RunningQueries.erase(sessionId);
+ } else {
+ TGuard<TMutex> lock(ProgressMutex);
+ auto it = RunningQueries.find(sessionId);
+ if (it != RunningQueries.end()) {
+ RequestQueryStatus(sessionId);
+ }
+ }
+ });
+ }
+
void SchedulePingSessionRequest(const TString& sessionId) {
auto callback = [this, sessionId](
NGrpc::TGrpcStatus&& status,
@@ -409,19 +409,19 @@ public:
});
}
- struct TDelay: public TTaskScheduler::ITask {
- TDelay(NThreading::TPromise<void> p)
- : Promise(std::move(p))
- { }
-
- TInstant Process() override {
- Promise.SetValue();
- return TInstant::Max();
- }
-
- NThreading::TPromise<void> Promise;
- };
-
+ struct TDelay: public TTaskScheduler::ITask {
+ TDelay(NThreading::TPromise<void> p)
+ : Promise(std::move(p))
+ { }
+
+ TInstant Process() override {
+ Promise.SetValue();
+ return TInstant::Max();
+ }
+
+ NThreading::TPromise<void> Promise;
+ };
+
private:
NGrpc::TGRpcClientConfig GrpcConf;
NGrpc::TGRpcClientLow GrpcClient;
@@ -429,7 +429,7 @@ private:
TMutex ProgressMutex;
TMutex Mutex;
- THashMap<TString, std::pair<TDqProgressWriter, TString>> RunningQueries;
+ THashMap<TString, std::pair<TDqProgressWriter, TString>> RunningQueries;
TString VanillaJobPath;
TString VanillaJobMd5;
diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_gateway.h b/ydb/library/yql/providers/dq/provider/yql_dq_gateway.h
index 038709cec66..d562776033f 100644
--- a/ydb/library/yql/providers/dq/provider/yql_dq_gateway.h
+++ b/ydb/library/yql/providers/dq/provider/yql_dq_gateway.h
@@ -26,7 +26,7 @@ class IDqGateway : public TThrRefBase {
public:
using TPtr = TIntrusivePtr<IDqGateway>;
using TFileResource = Yql::DqsProto::TFile;
- using TDqProgressWriter = std::function<void(const TString&)>;
+ using TDqProgressWriter = std::function<void(const TString&)>;
struct TFileResourceHash {
std::size_t operator()(const TFileResource& f) const {
diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_provider.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_provider.cpp
index 7cc61e6e0b9..69788c5c760 100644
--- a/ydb/library/yql/providers/dq/provider/yql_dq_provider.cpp
+++ b/ydb/library/yql/providers/dq/provider/yql_dq_provider.cpp
@@ -12,7 +12,7 @@
namespace NYql {
-TDataProviderInitializer GetDqDataProviderInitializer(
+TDataProviderInitializer GetDqDataProviderInitializer(
TExecTransformerFactory execTransformerFactory,
const IDqGateway::TPtr& dqGateway,
NKikimr::NMiniKQL::TComputationNodeFactory compFactory,
@@ -50,10 +50,10 @@ TDataProviderInitializer GetDqDataProviderInitializer(
);
TDataProviderInfo info;
- info.Names.insert(TString{DqProviderName});
+ info.Names.insert(TString{DqProviderName});
info.Source = CreateDqDataSource(state, execTransformerFactory);
- info.Sink = CreateDqDataSink(state);
+ info.Sink = CreateDqDataSink(state);
info.OpenSession = [dqGateway, metrics, gatewaysConfig, state](
const TString& sessionId,
const TString& username,
diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_provider.h b/ydb/library/yql/providers/dq/provider/yql_dq_provider.h
index 8c6602398db..6e4aff4ee94 100644
--- a/ydb/library/yql/providers/dq/provider/yql_dq_provider.h
+++ b/ydb/library/yql/providers/dq/provider/yql_dq_provider.h
@@ -15,7 +15,7 @@ using TDqStatePtr = TIntrusivePtr<TDqState>;
using TExecTransformerFactory = std::function<IGraphTransformer*(const TDqStatePtr& state)>;
-TDataProviderInitializer GetDqDataProviderInitializer(
+TDataProviderInitializer GetDqDataProviderInitializer(
TExecTransformerFactory execTransformerFactory,
const IDqGateway::TPtr& dqGateway,
NKikimr::NMiniKQL::TComputationNodeFactory compFactory,
diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_recapture.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_recapture.cpp
index 385c764515d..90e8af759c7 100644
--- a/ydb/library/yql/providers/dq/provider/yql_dq_recapture.cpp
+++ b/ydb/library/yql/providers/dq/provider/yql_dq_recapture.cpp
@@ -22,7 +22,7 @@ using namespace NNodes;
namespace {
const THashSet<TStringBuf> VALID_SOURCES = {DqProviderName, ConfigProviderName, YtProviderName, ClickHouseProviderName, YdbProviderName};
-const THashSet<TStringBuf> VALID_SINKS = {ResultProviderName, YtProviderName};
+const THashSet<TStringBuf> VALID_SINKS = {ResultProviderName, YtProviderName};
const THashSet<TStringBuf> UNSUPPORTED_CALLABLE = { TCoForwardList::CallableName() };
}
@@ -67,7 +67,7 @@ public:
Statistics_["DqPureResultDataSourceMismatch"]++;
}
- if (State_->TypeCtx->PureResultDataSource != DqProviderName || !State_->Settings->AnalyzeQuery.Get().GetOrElse(false)) {
+ if (State_->TypeCtx->PureResultDataSource != DqProviderName || !State_->Settings->AnalyzeQuery.Get().GetOrElse(false)) {
return TStatus::Ok;
}
@@ -101,9 +101,9 @@ public:
if (auto maybeRead = TMaybeNode<TCoRight>(node).Input()) {
if (maybeRead.Raw()->ChildrenSize() > 1 && TCoDataSource::Match(maybeRead.Raw()->Child(1))) {
auto dataSourceName = maybeRead.Raw()->Child(1)->Child(0)->Content();
- auto dataSource = State_->TypeCtx->DataSourceMap.FindPtr(dataSourceName);
- YQL_ENSURE(dataSource);
- if (auto dqIntegration = (*dataSource)->GetDqIntegration()) {
+ auto dataSource = State_->TypeCtx->DataSourceMap.FindPtr(dataSourceName);
+ YQL_ENSURE(dataSource);
+ if (auto dqIntegration = (*dataSource)->GetDqIntegration()) {
auto newRead = dqIntegration->WrapRead(*State_->Settings, maybeRead.Cast().Ptr(), ctx);
if (newRead.Get() != maybeRead.Raw()) {
return newRead;
@@ -111,7 +111,7 @@ public:
}
}
}
-
+
return node;
}, ctx, TOptimizeExprSettings{State_->TypeCtx});
@@ -195,29 +195,29 @@ private:
if (good) {
Scan(node.Head(), ctx,good, dataSize, visited, hasJoin);
}
- } else if (node.GetTypeAnn()->GetKind() == ETypeAnnotationKind::World
- && !TCoCommit::Match(&node)
- && node.ChildrenSize() > 1
- && TCoDataSink::Match(node.Child(1))) {
- auto dataSinkName = node.Child(1)->Child(0)->Content();
- auto dataSink = State_->TypeCtx->DataSinkMap.FindPtr(dataSinkName);
- YQL_ENSURE(dataSink);
- if (auto dqIntegration = dataSink->Get()->GetDqIntegration()) {
+ } else if (node.GetTypeAnn()->GetKind() == ETypeAnnotationKind::World
+ && !TCoCommit::Match(&node)
+ && node.ChildrenSize() > 1
+ && TCoDataSink::Match(node.Child(1))) {
+ auto dataSinkName = node.Child(1)->Child(0)->Content();
+ auto dataSink = State_->TypeCtx->DataSinkMap.FindPtr(dataSinkName);
+ YQL_ENSURE(dataSink);
+ if (auto dqIntegration = dataSink->Get()->GetDqIntegration()) {
if (auto canWrite = dqIntegration->CanWrite(*State_->Settings, node, ctx)) {
if (!canWrite.GetRef()) {
- good = false;
+ good = false;
} else if (!State_->Settings->EnableInsert.Get().GetOrElse(false)) {
AddInfo(ctx, TStringBuilder() << "'insert' support is disabled. Use PRAGMA dq.EnableInsert to explicitly enable it");
good = false;
- }
- }
- }
- if (good) {
- for (size_t i = 0; i != node.ChildrenSize() && good; ++i) {
+ }
+ }
+ }
+ if (good) {
+ for (size_t i = 0; i != node.ChildrenSize() && good; ++i) {
Scan(*node.Child(i), ctx, good, dataSize, visited, hasJoin);
- }
- }
- }
+ }
+ }
+ }
else if (!State_->TypeCtx->UdfSupportsYield && TCoScriptUdf::Match(&node)) {
if (IsCallableTypeHasStreams(node.GetTypeAnn()->Cast<TCallableExprType>())) {
AddInfo(ctx, TStringBuilder() << "script udf with streams");
@@ -229,7 +229,7 @@ private:
}
}
}
- else {
+ else {
for (size_t i = 0; i != node.ChildrenSize() && good; ++i) {
Scan(*node.Child(i), ctx, good, dataSize, visited, hasJoin);
}
diff --git a/ydb/library/yql/providers/dq/service/grpc_service.cpp b/ydb/library/yql/providers/dq/service/grpc_service.cpp
index 55265ac7d96..65a9e500d9d 100644
--- a/ydb/library/yql/providers/dq/service/grpc_service.cpp
+++ b/ydb/library/yql/providers/dq/service/grpc_service.cpp
@@ -629,28 +629,28 @@ namespace NYql::NDqs {
ActorSystem.Send(new IEventHandle(MakeWorkerManagerActorID(ActorSystem.NodeId), callbackId, ev.Release(), IEventHandle::FlagTrackDelivery));
});
- ADD_REQUEST(QueryStatus, QueryStatusRequest, QueryStatusResponse, {
- auto* request = dynamic_cast<const Yql::DqsProto::QueryStatusRequest*>(ctx->GetRequest());
-
- auto ev = MakeHolder<TEvQueryStatus>(*request);
-
- auto callback = MakeHolder<TRichActorFutureCallback<TEvQueryStatusResponse>>(
- [ctx] (TAutoPtr<TEventHandle<TEvQueryStatusResponse>>& event) mutable {
- auto* result = google::protobuf::Arena::CreateMessage<Yql::DqsProto::QueryStatusResponse>(ctx->GetArena());
- result->MergeFrom(event->Get()->Record.GetResponse());
- ctx->Reply(result, Ydb::StatusIds::SUCCESS);
- },
- [ctx] () mutable {
- YQL_LOG(DEBUG) << "QueryStatus failed";
- ctx->ReplyError(grpc::UNAVAILABLE, "Error");
- },
- TDuration::MilliSeconds(2000));
-
- TActorId callbackId = ActorSystem.Register(callback.Release());
-
+ ADD_REQUEST(QueryStatus, QueryStatusRequest, QueryStatusResponse, {
+ auto* request = dynamic_cast<const Yql::DqsProto::QueryStatusRequest*>(ctx->GetRequest());
+
+ auto ev = MakeHolder<TEvQueryStatus>(*request);
+
+ auto callback = MakeHolder<TRichActorFutureCallback<TEvQueryStatusResponse>>(
+ [ctx] (TAutoPtr<TEventHandle<TEvQueryStatusResponse>>& event) mutable {
+ auto* result = google::protobuf::Arena::CreateMessage<Yql::DqsProto::QueryStatusResponse>(ctx->GetArena());
+ result->MergeFrom(event->Get()->Record.GetResponse());
+ ctx->Reply(result, Ydb::StatusIds::SUCCESS);
+ },
+ [ctx] () mutable {
+ YQL_LOG(DEBUG) << "QueryStatus failed";
+ ctx->ReplyError(grpc::UNAVAILABLE, "Error");
+ },
+ TDuration::MilliSeconds(2000));
+
+ TActorId callbackId = ActorSystem.Register(callback.Release());
+
ActorSystem.Send(new IEventHandle(MakeWorkerManagerActorID(ActorSystem.NodeId), callbackId, ev.Release(), IEventHandle::FlagTrackDelivery));
- });
-
+ });
+
ADD_REQUEST(RegisterNode, RegisterNodeRequest, RegisterNodeResponse, {
auto* request = dynamic_cast<const Yql::DqsProto::RegisterNodeRequest*>(ctx->GetRequest());
Y_VERIFY(!!request);
diff --git a/ydb/library/yql/providers/dq/worker_manager/interface/events.cpp b/ydb/library/yql/providers/dq/worker_manager/interface/events.cpp
index 1db94f1baaa..9044dc47e30 100644
--- a/ydb/library/yql/providers/dq/worker_manager/interface/events.cpp
+++ b/ydb/library/yql/providers/dq/worker_manager/interface/events.cpp
@@ -109,7 +109,7 @@ namespace NYql::NDqs {
*Record.MutableRequest() = request;
}
- TEvQueryStatus::TEvQueryStatus(const Yql::DqsProto::QueryStatusRequest& request) {
- *Record.MutableRequest() = request;
- }
+ TEvQueryStatus::TEvQueryStatus(const Yql::DqsProto::QueryStatusRequest& request) {
+ *Record.MutableRequest() = request;
+ }
} // namespace NYql::NDqs
diff --git a/ydb/library/yql/providers/dq/worker_manager/interface/events.h b/ydb/library/yql/providers/dq/worker_manager/interface/events.h
index d3bfa4d8efc..a2d0c9f5409 100644
--- a/ydb/library/yql/providers/dq/worker_manager/interface/events.h
+++ b/ydb/library/yql/providers/dq/worker_manager/interface/events.h
@@ -1,7 +1,7 @@
#pragma once
#include <ydb/library/yql/providers/dq/api/protos/dqs.pb.h>
-#include "worker_info.h"
+#include "worker_info.h"
#include <ydb/library/yql/dq/common/dq_common.h>
#include <ydb/library/yql/utils/log/log.h>
@@ -11,11 +11,11 @@
#include <util/generic/guid.h>
-#include <utility>
-
+#include <utility>
+
namespace NYql::NDqs {
-using TDqResManEvents = NDq::TBaseDqResManEvents<NActors::TEvents::EEventSpace::ES_USERSPACE>;
+using TDqResManEvents = NDq::TBaseDqResManEvents<NActors::TEvents::EEventSpace::ES_USERSPACE>;
struct TEvAllocateWorkersRequest : NActors::TEventPB<TEvAllocateWorkersRequest, NYql::NDqProto::TAllocateWorkersRequest,
TDqResManEvents::ES_ALLOCATE_WORKERS_REQUEST> {
@@ -89,19 +89,19 @@ using TDqResManEvents = NDq::TBaseDqResManEvents<NActors::TEvents::EEventSpace::
TEvClusterStatusResponse() = default;
};
- struct TEvQueryStatus
- : NActors::TEventPB<TEvQueryStatus, NYql::NDqProto::TEvQueryStatus, TDqResManEvents::ES_QUERY_STATUS> {
-
- TEvQueryStatus() = default;
- TEvQueryStatus(const Yql::DqsProto::QueryStatusRequest& request);
- };
-
- struct TEvQueryStatusResponse
- : NActors::TEventPB<TEvQueryStatusResponse, NYql::NDqProto::TEvQueryStatusResponse, TDqResManEvents::ES_QUERY_STATUS_RESPONSE> {
-
- TEvQueryStatusResponse() = default;
- };
-
+ struct TEvQueryStatus
+ : NActors::TEventPB<TEvQueryStatus, NYql::NDqProto::TEvQueryStatus, TDqResManEvents::ES_QUERY_STATUS> {
+
+ TEvQueryStatus() = default;
+ TEvQueryStatus(const Yql::DqsProto::QueryStatusRequest& request);
+ };
+
+ struct TEvQueryStatusResponse
+ : NActors::TEventPB<TEvQueryStatusResponse, NYql::NDqProto::TEvQueryStatusResponse, TDqResManEvents::ES_QUERY_STATUS_RESPONSE> {
+
+ TEvQueryStatusResponse() = default;
+ };
+
struct TEvIsReady
: NActors::TEventPB<TEvIsReady, NYql::NDqProto::TEvIsReady, TDqResManEvents::ES_IS_READY> {
diff --git a/ydb/library/yql/providers/dq/worker_manager/interface/worker_info.cpp b/ydb/library/yql/providers/dq/worker_manager/interface/worker_info.cpp
index aa822bce729..634b25fb075 100644
--- a/ydb/library/yql/providers/dq/worker_manager/interface/worker_info.cpp
+++ b/ydb/library/yql/providers/dq/worker_manager/interface/worker_info.cpp
@@ -1,81 +1,81 @@
#include <ydb/library/yql/providers/dq/api/protos/dqs.pb.h>
-#include "worker_info.h"
+#include "worker_info.h"
#include <ydb/library/yql/utils/log/log.h>
-
-namespace NYql::NDqs {
-
- TInflightLimiter::TInflightLimiter(i32 inflightLimit)
- : InflightLimit(inflightLimit) {
- }
-
- bool TInflightLimiter::CanDownload(const TString& id) {
- return !InflightResources.contains(id) || InflightResources.find(id)->second < InflightLimit;
- }
-
- void TInflightLimiter::MarkDownloadStarted(const TString& id) {
- if (!InflightResources.contains(id)) {
- InflightResources.emplace(id, 0);
- }
- InflightResources.find(id)->second++;
- }
-
- void TInflightLimiter::MarkDownloadFinished(const TString& id) {
- auto it = InflightResources.find(id);
- if (it != InflightResources.end() && it->second > 0) {
- it->second--;
- if (it->second == 0) {
- InflightResources.erase(id);
- }
- }
- }
-
- TWorkerInfo::TWorkerInfo(
+
+namespace NYql::NDqs {
+
+ TInflightLimiter::TInflightLimiter(i32 inflightLimit)
+ : InflightLimit(inflightLimit) {
+ }
+
+ bool TInflightLimiter::CanDownload(const TString& id) {
+ return !InflightResources.contains(id) || InflightResources.find(id)->second < InflightLimit;
+ }
+
+ void TInflightLimiter::MarkDownloadStarted(const TString& id) {
+ if (!InflightResources.contains(id)) {
+ InflightResources.emplace(id, 0);
+ }
+ InflightResources.find(id)->second++;
+ }
+
+ void TInflightLimiter::MarkDownloadFinished(const TString& id) {
+ auto it = InflightResources.find(id);
+ if (it != InflightResources.end() && it->second > 0) {
+ it->second--;
+ if (it->second == 0) {
+ InflightResources.erase(id);
+ }
+ }
+ }
+
+ TWorkerInfo::TWorkerInfo(
const TString& startTime,
- ui32 nodeId,
- TGUID workerId,
- const Yql::DqsProto::RegisterNodeRequest& request,
- NYql::TSensorsGroupPtr metrics,
+ ui32 nodeId,
+ TGUID workerId,
+ const Yql::DqsProto::RegisterNodeRequest& request,
+ NYql::TSensorsGroupPtr metrics,
const TInflightLimiter::TPtr& inflightLimiter,
TGlobalResources& globalResources)
- : NodeId(nodeId)
- , WorkerId(workerId)
- , LastPingTime(TInstant::Now())
- , Revision(request.GetRevision())
- , ClusterName(request.GetClusterName())
- , Address(request.GetAddress())
+ : NodeId(nodeId)
+ , WorkerId(workerId)
+ , LastPingTime(TInstant::Now())
+ , Revision(request.GetRevision())
+ , ClusterName(request.GetClusterName())
+ , Address(request.GetAddress())
, StartTime(!request.GetStartTime().empty()
? request.GetStartTime()
: startTime)
- , Port(request.GetPort())
- , Epoch(request.GetEpoch())
- , Capabilities(request.GetCapabilities())
+ , Port(request.GetPort())
+ , Epoch(request.GetEpoch())
+ , Capabilities(request.GetCapabilities())
, Capacity(request.GetCapacity()?request.GetCapacity():1)
- , Metrics(std::move(metrics))
- , InflightLimiter(inflightLimiter)
+ , Metrics(std::move(metrics))
+ , InflightLimiter(inflightLimiter)
, ClusterResources(TClusterResources(globalResources, ClusterName))
- {
+ {
#define ADD_METRIC(name) \
name = Metrics->GetCounter(#name)
#define ADD_METRIC_DERIV(name) \
name = Metrics->GetCounter(#name, /*derivative=*/ true)
-
- ADD_METRIC(CurrentDownloadsSum);
- ADD_METRIC(CurrentDownloadsMax);
- ADD_METRIC(CurrentDownloadsArgMax);
-
- ADD_METRIC(FilesCountSum);
- ADD_METRIC(FilesCountMax);
- ADD_METRIC(FilesCountArgMax);
-
-
- ADD_METRIC(FreeDiskSizeSum);
- ADD_METRIC(FreeDiskSizeMin);
- ADD_METRIC(FreeDiskSizeArgMin);
-
- ADD_METRIC(UsedDiskSizeSum);
- ADD_METRIC(UsedDiskSizeMax);
- ADD_METRIC(UsedDiskSizeArgMax);
-
+
+ ADD_METRIC(CurrentDownloadsSum);
+ ADD_METRIC(CurrentDownloadsMax);
+ ADD_METRIC(CurrentDownloadsArgMax);
+
+ ADD_METRIC(FilesCountSum);
+ ADD_METRIC(FilesCountMax);
+ ADD_METRIC(FilesCountArgMax);
+
+
+ ADD_METRIC(FreeDiskSizeSum);
+ ADD_METRIC(FreeDiskSizeMin);
+ ADD_METRIC(FreeDiskSizeArgMin);
+
+ ADD_METRIC(UsedDiskSizeSum);
+ ADD_METRIC(UsedDiskSizeMax);
+ ADD_METRIC(UsedDiskSizeArgMax);
+
ADD_METRIC(ActiveDownloadsSum);
ADD_METRIC(DeadWorkers);
@@ -87,21 +87,21 @@ namespace NYql::NDqs {
ADD_METRIC(RunningActors);
-#undef ADD_METRIC
+#undef ADD_METRIC
#undef ADD_METRIC_DERIV
-
- for (const auto& attr : request.GetAttribute()) {
- Attributes[attr.GetKey()] = attr.GetValue();
- }
-
+
+ for (const auto& attr : request.GetAttribute()) {
+ Attributes[attr.GetKey()] = attr.GetValue();
+ }
+
ClusterResources.AddCapacity(Capacity);
- Update(request);
- }
-
- bool TWorkerInfo::Update(const Yql::DqsProto::RegisterNodeRequest& request)
- {
- bool needResume = false;
+ Update(request);
+ }
+
+ bool TWorkerInfo::Update(const Yql::DqsProto::RegisterNodeRequest& request)
+ {
+ bool needResume = false;
auto now = TInstant::Now();
TDuration interval = now-LastPingTime;
LastPingTime = now;
@@ -143,143 +143,143 @@ namespace NYql::NDqs {
MajorPageFaults = request.GetRusage().GetMajorPageFaults();
*MajorPageFaultsSum += pageFaultsDelta;
- for (auto file : request.GetFilesOnNode()) {
- if (Resources.insert(file.GetObjectId()).second) {
- needResume = true;
- *FilesCountSum += 1;
+ for (auto file : request.GetFilesOnNode()) {
+ if (Resources.insert(file.GetObjectId()).second) {
+ needResume = true;
+ *FilesCountSum += 1;
*FileAddCounter += 1;
- }
- }
+ }
+ }
Operations.clear();
for (const auto& op: request.GetRunningOperation()) {
Operations.insert(op);
}
-
+
int actorsDelta = request.GetRunningWorkers()-RunningWorkerActors;
*RunningActors += actorsDelta;
RunningWorkerActors = request.GetRunningWorkers();
-
- if (static_cast<int>(Resources.size()) > static_cast<int>(request.GetFilesOnNode().size())) {
- needResume = true;
- // drop files
- THashSet<TString> filesOnNode;
- for (const auto& file : request.GetFilesOnNode()) {
- filesOnNode.insert(file.GetObjectId());
- }
- THashSet<TString> toDrop;
- for (const auto& k : Resources) {
- if (!filesOnNode.contains(k)) {
- toDrop.insert(k);
- }
- }
- for (const auto& k : toDrop) {
- YQL_LOG(DEBUG) << "Remove resource " << k << " from worker " << GetGuidAsString(WorkerId);
- Resources.erase(k);
- }
+
+ if (static_cast<int>(Resources.size()) > static_cast<int>(request.GetFilesOnNode().size())) {
+ needResume = true;
+ // drop files
+ THashSet<TString> filesOnNode;
+ for (const auto& file : request.GetFilesOnNode()) {
+ filesOnNode.insert(file.GetObjectId());
+ }
+ THashSet<TString> toDrop;
+ for (const auto& k : Resources) {
+ if (!filesOnNode.contains(k)) {
+ toDrop.insert(k);
+ }
+ }
+ for (const auto& k : toDrop) {
+ YQL_LOG(DEBUG) << "Remove resource " << k << " from worker " << GetGuidAsString(WorkerId);
+ Resources.erase(k);
+ }
*FileRemoveCounter += toDrop.size();
- *FilesCountSum -= toDrop.size();
- }
-
- // ordered as in LRU cache
- ResourcesOrdered = std::move(request.GetFilesOnNode());
-
+ *FilesCountSum -= toDrop.size();
+ }
+
+ // ordered as in LRU cache
+ ResourcesOrdered = std::move(request.GetFilesOnNode());
+
if (*FreeDiskSizeArgMin == 0 || *FreeDiskSizeMin > request.GetFreeDiskSize()) {
- *FreeDiskSizeMin = request.GetFreeDiskSize();
- *FreeDiskSizeArgMin = NodeId;
- }
-
+ *FreeDiskSizeMin = request.GetFreeDiskSize();
+ *FreeDiskSizeArgMin = NodeId;
+ }
+
if (*FreeDiskSizeArgMin == NodeId && *FreeDiskSizeMin != request.GetFreeDiskSize()) {
*FreeDiskSizeMin = request.GetFreeDiskSize();
}
- *FreeDiskSizeSum += (request.GetFreeDiskSize() - FreeDiskSize);
- FreeDiskSize = request.GetFreeDiskSize();
-
- if (*UsedDiskSizeMax < request.GetUsedDiskSize()) {
- *UsedDiskSizeMax = request.GetUsedDiskSize();
- *UsedDiskSizeArgMax = NodeId;
- }
-
- *UsedDiskSizeSum += (request.GetUsedDiskSize() - UsedDiskSize);
- UsedDiskSize = request.GetUsedDiskSize();
-
- for (auto md5: Resources) {
- RemoveFromDownloadList(md5);
- }
-
- if (*FilesCountMax < static_cast<i64>(Resources.size())) {
- *FilesCountMax = Resources.size();
- *FilesCountArgMax = NodeId;
- }
-
- if (*CurrentDownloadsMax < static_cast<i64>(DownloadList.size())) {
- *CurrentDownloadsMax = DownloadList.size();
- *CurrentDownloadsArgMax = NodeId;
- }
-
- return needResume;
- }
-
- void TWorkerInfo::RemoveFromDownloadList(const TString& objectId) {
- auto delta = DownloadList.erase(objectId);
- if (delta) {
+ *FreeDiskSizeSum += (request.GetFreeDiskSize() - FreeDiskSize);
+ FreeDiskSize = request.GetFreeDiskSize();
+
+ if (*UsedDiskSizeMax < request.GetUsedDiskSize()) {
+ *UsedDiskSizeMax = request.GetUsedDiskSize();
+ *UsedDiskSizeArgMax = NodeId;
+ }
+
+ *UsedDiskSizeSum += (request.GetUsedDiskSize() - UsedDiskSize);
+ UsedDiskSize = request.GetUsedDiskSize();
+
+ for (auto md5: Resources) {
+ RemoveFromDownloadList(md5);
+ }
+
+ if (*FilesCountMax < static_cast<i64>(Resources.size())) {
+ *FilesCountMax = Resources.size();
+ *FilesCountArgMax = NodeId;
+ }
+
+ if (*CurrentDownloadsMax < static_cast<i64>(DownloadList.size())) {
+ *CurrentDownloadsMax = DownloadList.size();
+ *CurrentDownloadsArgMax = NodeId;
+ }
+
+ return needResume;
+ }
+
+ void TWorkerInfo::RemoveFromDownloadList(const TString& objectId) {
+ auto delta = DownloadList.erase(objectId);
+ if (delta) {
InflightLimiter->MarkDownloadFinished(objectId);
*ActiveDownloadsSum -= ActiveDownloads.erase(objectId);
- }
- *CurrentDownloadsSum -= delta;
- }
-
- void TWorkerInfo::AddToDownloadList(const THashMap<TString, TFileResource>& downloadList) {
+ }
+ *CurrentDownloadsSum -= delta;
+ }
+
+ void TWorkerInfo::AddToDownloadList(const THashMap<TString, TFileResource>& downloadList) {
for (const auto& [k, v] : downloadList) {
AddToDownloadList(k, v);
}
- }
-
+ }
+
bool TWorkerInfo::AddToDownloadList(const TString& key, const TFileResource& value) {
if (!Resources.contains(key) && DownloadList.insert({key, value}).second) {
- *CurrentDownloadsSum += 1;
+ *CurrentDownloadsSum += 1;
return true;
- }
+ }
return false;
- }
-
- const THashMap<TString, TWorkerInfo::TFileResource>& TWorkerInfo::GetDownloadList() {
- return DownloadList;
- }
-
- THashMap<TString, TWorkerInfo::TFileResource> TWorkerInfo::GetResourcesForDownloading() {
- for (const auto& it : DownloadList) {
- TString id = it.first;
+ }
+
+ const THashMap<TString, TWorkerInfo::TFileResource>& TWorkerInfo::GetDownloadList() {
+ return DownloadList;
+ }
+
+ THashMap<TString, TWorkerInfo::TFileResource> TWorkerInfo::GetResourcesForDownloading() {
+ for (const auto& it : DownloadList) {
+ TString id = it.first;
if (InflightLimiter->CanDownload(id) && !ActiveDownloads.contains(id)) {
ActiveDownloads.emplace(id, it.second);
*ActiveDownloadsSum += 1;
InflightLimiter->MarkDownloadStarted(id);
- }
- }
+ }
+ }
return ActiveDownloads;
- }
-
- const THashSet<TString>& TWorkerInfo::GetResources() {
- return Resources;
- }
-
- void TWorkerInfo::OnDead() {
+ }
+
+ const THashSet<TString>& TWorkerInfo::GetResources() {
+ return Resources;
+ }
+
+ void TWorkerInfo::OnDead() {
if (IsDead) {
return;
}
- IsDead = true;
+ IsDead = true;
for (const auto& [id, _] : ActiveDownloads) {
InflightLimiter->MarkDownloadFinished(id);
- }
+ }
*ActiveDownloadsSum -= ActiveDownloads.size();
ActiveDownloads.clear();
-
- *CurrentDownloadsSum -= DownloadList.size();
- *FilesCountSum -= Resources.size();
-
- *UsedDiskSizeSum += - UsedDiskSize;
- *FreeDiskSizeSum += - FreeDiskSize;
+
+ *CurrentDownloadsSum -= DownloadList.size();
+ *FilesCountSum -= Resources.size();
+
+ *UsedDiskSizeSum += - UsedDiskSize;
+ *FreeDiskSizeSum += - FreeDiskSize;
*DeadWorkers += 1;
@@ -296,7 +296,7 @@ namespace NYql::NDqs {
TWorkerInfo::~TWorkerInfo() {
OnDead();
- }
+ }
bool TWorkerInfo::Acquire() {
if (IsDead) {
diff --git a/ydb/library/yql/providers/dq/worker_manager/interface/worker_info.h b/ydb/library/yql/providers/dq/worker_manager/interface/worker_info.h
index 15cf63678dd..5281a844f4c 100644
--- a/ydb/library/yql/providers/dq/worker_manager/interface/worker_info.h
+++ b/ydb/library/yql/providers/dq/worker_manager/interface/worker_info.h
@@ -1,30 +1,30 @@
-#pragma once
-
-#include <utility>
-#include <util/generic/hash.h>
-#include <util/generic/guid.h>
-#include <util/generic/hash_set.h>
+#pragma once
+
+#include <utility>
+#include <util/generic/hash.h>
+#include <util/generic/guid.h>
+#include <util/generic/hash_set.h>
#include <util/generic/maybe.h>
#include <ydb/library/yql/providers/common/metrics/sensors_group.h>
#include <ydb/library/yql/providers/dq/api/grpc/api.grpc.pb.h>
-
-
-namespace NYql::NDqs {
-
+
+
+namespace NYql::NDqs {
+
class TInflightLimiter: public TThrRefBase {
-public:
+public:
using TPtr = TIntrusivePtr<TInflightLimiter>;
- TInflightLimiter(i32 inflightLimit);
- bool CanDownload(const TString& id);
- void MarkDownloadStarted(const TString& id);
- void MarkDownloadFinished(const TString& id);
-
-private:
- THashMap<TString, i32> InflightResources;
- i32 InflightLimit;
-};
-
+ TInflightLimiter(i32 inflightLimit);
+ bool CanDownload(const TString& id);
+ void MarkDownloadStarted(const TString& id);
+ void MarkDownloadFinished(const TString& id);
+
+private:
+ THashMap<TString, i32> InflightResources;
+ i32 InflightLimit;
+};
+
struct TGlobalResources {
TGlobalResources(TSensorsGroupPtr metrics)
: Metrics(metrics)
@@ -95,30 +95,30 @@ private:
struct TWorkerInfo: public TThrRefBase {
using TPtr = TIntrusivePtr<TWorkerInfo>;
- using TFileResource = Yql::DqsProto::TFile;
-
- const ui32 NodeId;
- const TGUID WorkerId;
- TInstant LastPingTime;
- const TString Revision;
- const TString ClusterName;
- const TString Address;
+ using TFileResource = Yql::DqsProto::TFile;
+
+ const ui32 NodeId;
+ const TGUID WorkerId;
+ TInstant LastPingTime;
+ const TString Revision;
+ const TString ClusterName;
+ const TString Address;
const TString StartTime;
- const ui32 Port;
- ui32 Epoch;
- THashMap<TString, TString> Attributes;
+ const ui32 Port;
+ ui32 Epoch;
+ THashMap<TString, TString> Attributes;
THashSet<TString> Operations;
-
- ui64 UseCount = 0;
+
+ ui64 UseCount = 0;
TInstant RequestStartTime;
- TDuration UseTime;
- bool IsDead = false;
- bool Stopping = false;
-
- const ui32 Capabilities = 0;
-
- i64 FreeDiskSize = 0;
- i64 UsedDiskSize = 0;
+ TDuration UseTime;
+ bool IsDead = false;
+ bool Stopping = false;
+
+ const ui32 Capabilities = 0;
+
+ i64 FreeDiskSize = 0;
+ i64 UsedDiskSize = 0;
const int Capacity;
const int CpuCores = 1; // unused yet
@@ -130,76 +130,76 @@ struct TWorkerInfo: public TThrRefBase {
i64 MajorPageFaults = 0;
int RunningRequests = 0;
- int RunningWorkerActors = 0;
-
- TWorkerInfo(
+ int RunningWorkerActors = 0;
+
+ TWorkerInfo(
const TString& startTime,
- ui32 nodeId,
- TGUID workerId,
- const Yql::DqsProto::RegisterNodeRequest& request,
- NYql::TSensorsGroupPtr metrics,
+ ui32 nodeId,
+ TGUID workerId,
+ const Yql::DqsProto::RegisterNodeRequest& request,
+ NYql::TSensorsGroupPtr metrics,
const TInflightLimiter::TPtr& inflightLimiter,
TGlobalResources& globalResources
- );
-
- ~TWorkerInfo();
-
- bool Update(const Yql::DqsProto::RegisterNodeRequest& request);
-
- void RemoveFromDownloadList(const TString& objectId);
-
- void AddToDownloadList(const THashMap<TString, TFileResource>& downloadList);
-
+ );
+
+ ~TWorkerInfo();
+
+ bool Update(const Yql::DqsProto::RegisterNodeRequest& request);
+
+ void RemoveFromDownloadList(const TString& objectId);
+
+ void AddToDownloadList(const THashMap<TString, TFileResource>& downloadList);
+
bool AddToDownloadList(const TString& key, const TFileResource& value);
-
- const THashMap<TString, TFileResource>& GetDownloadList();
-
- THashMap<TString, TFileResource> GetResourcesForDownloading();
-
- const THashSet<TString>& GetResources();
-
- const auto& GetResourcesOrdered() {
- return ResourcesOrdered;
- }
-
+
+ const THashMap<TString, TFileResource>& GetDownloadList();
+
+ THashMap<TString, TFileResource> GetResourcesForDownloading();
+
+ const THashSet<TString>& GetResources();
+
+ const auto& GetResourcesOrdered() {
+ return ResourcesOrdered;
+ }
+
const auto& GetActiveDownloads() {
return ActiveDownloads;
}
- void OnDead();
-
+ void OnDead();
+
bool Acquire();
bool Release();
-private:
- THashSet<TString> Resources;
- google::protobuf::RepeatedPtrField<::Yql::DqsProto::RegisterNodeRequest_LocalFile> ResourcesOrdered;
- THashMap<TString, TFileResource> DownloadList;
- TSensorsGroupPtr Metrics;
+private:
+ THashSet<TString> Resources;
+ google::protobuf::RepeatedPtrField<::Yql::DqsProto::RegisterNodeRequest_LocalFile> ResourcesOrdered;
+ THashMap<TString, TFileResource> DownloadList;
+ TSensorsGroupPtr Metrics;
TInflightLimiter::TPtr InflightLimiter;
TClusterResources ClusterResources;
THashMap<TString, TFileResource> ActiveDownloads;
-
- TSensorCounterPtr CurrentDownloadsSum;
- TSensorCounterPtr CurrentDownloadsMax;
- TSensorCounterPtr CurrentDownloadsArgMax;
-
+
+ TSensorCounterPtr CurrentDownloadsSum;
+ TSensorCounterPtr CurrentDownloadsMax;
+ TSensorCounterPtr CurrentDownloadsArgMax;
+
TSensorCounterPtr ActiveDownloadsSum;
TSensorCounterPtr DeadWorkers;
- TSensorCounterPtr FilesCountSum;
- TSensorCounterPtr FilesCountMax;
- TSensorCounterPtr FilesCountArgMax;
-
- TSensorCounterPtr FreeDiskSizeSum;
- TSensorCounterPtr FreeDiskSizeMin;
- TSensorCounterPtr FreeDiskSizeArgMin;
-
- TSensorCounterPtr UsedDiskSizeSum;
- TSensorCounterPtr UsedDiskSizeMax;
- TSensorCounterPtr UsedDiskSizeArgMax;
+ TSensorCounterPtr FilesCountSum;
+ TSensorCounterPtr FilesCountMax;
+ TSensorCounterPtr FilesCountArgMax;
+
+ TSensorCounterPtr FreeDiskSizeSum;
+ TSensorCounterPtr FreeDiskSizeMin;
+ TSensorCounterPtr FreeDiskSizeArgMin;
+
+ TSensorCounterPtr UsedDiskSizeSum;
+ TSensorCounterPtr UsedDiskSizeMax;
+ TSensorCounterPtr UsedDiskSizeArgMax;
TSensorCounterPtr CpuTotalSum;
TSensorCounterPtr MajorPageFaultsSum;
@@ -208,8 +208,8 @@ private:
TSensorCounterPtr FileAddCounter;
TSensorCounterPtr RunningActors;
-};
-
+};
+
struct TWorkerInfoPtrComparator {
bool operator()(const TWorkerInfo::TPtr& a, const TWorkerInfo::TPtr& b) const {
auto scoreA = (a->CpuTotal+1) * (a->RunningRequests + a->RunningWorkerActors + 1);
@@ -223,5 +223,5 @@ struct TWorkerInfoPtrComparator {
}
}
};
-
+
} // namespace NYql::NDqs