diff options
author | romakondakov <romakondakov@yandex-team.ru> | 2022-02-10 16:52:16 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:52:16 +0300 |
commit | bc9502dd9565b1d3084b1871eba035befe00d7e5 (patch) | |
tree | ab7fbbf3253d4c0e2793218f09378908beb025fb | |
parent | aa1447c8c9b7deea12b1b441d5db739fb7fc91fa (diff) | |
download | ydb-bc9502dd9565b1d3084b1871eba035befe00d7e5.tar.gz |
Restoring authorship annotation for <romakondakov@yandex-team.ru>. Commit 2 of 2.
39 files changed, 719 insertions, 719 deletions
diff --git a/ydb/library/yql/core/facade/yql_facade.cpp b/ydb/library/yql/core/facade/yql_facade.cpp index 2943c757909..b74abc1c34c 100644 --- a/ydb/library/yql/core/facade/yql_facade.cpp +++ b/ydb/library/yql/core/facade/yql_facade.cpp @@ -1318,8 +1318,8 @@ TTypeAnnotationContextPtr TProgram::BuildTypeAnnotationContext(const TString& us resultProviderDataSources.push_back(TString(PqProviderName)); } - if (providerNames.contains(DqProviderName)) { - resultProviderDataSources.push_back(TString(DqProviderName)); + if (providerNames.contains(DqProviderName)) { + resultProviderDataSources.push_back(TString(DqProviderName)); } if (!resultProviderDataSources.empty()) diff --git a/ydb/library/yql/core/services/yql_plan.cpp b/ydb/library/yql/core/services/yql_plan.cpp index 20d449fdfe3..4d7cfffaf5a 100644 --- a/ydb/library/yql/core/services/yql_plan.cpp +++ b/ydb/library/yql/core/services/yql_plan.cpp @@ -363,15 +363,15 @@ public: YQL_ENSURE(datasink); info.Provider = (*datasink).Get(); info.IsVisible = (*datasink)->GetPlanFormatter().GetDependencies(*node, dependencies, true); - } - else if (node->IsCallable("DqStage") || - node->IsCallable("DqPhyStage") || + } + else if (node->IsCallable("DqStage") || + node->IsCallable("DqPhyStage") || node->IsCallable("DqQuery!") || - node->ChildrenSize() >= 1 && node->Child(0)->IsCallable("TDqOutput")) { - auto provider = Types_.DataSinkMap.FindPtr(DqProviderName); - YQL_ENSURE(provider); - info.Provider = (*provider).Get(); - info.IsVisible = (*provider)->GetPlanFormatter().GetDependencies(*node, dependencies, true); + node->ChildrenSize() >= 1 && node->Child(0)->IsCallable("TDqOutput")) { + auto provider = Types_.DataSinkMap.FindPtr(DqProviderName); + YQL_ENSURE(provider); + info.Provider = (*provider).Get(); + info.IsVisible = (*provider)->GetPlanFormatter().GetDependencies(*node, dependencies, true); } else { info.IsVisible = false; for (auto& child : node->Children()) { diff --git a/ydb/library/yql/core/services/yql_transform_pipeline.cpp b/ydb/library/yql/core/services/yql_transform_pipeline.cpp index 3dd1b9707b1..ae425f34c8e 100644 --- a/ydb/library/yql/core/services/yql_transform_pipeline.cpp +++ b/ydb/library/yql/core/services/yql_transform_pipeline.cpp @@ -157,7 +157,7 @@ TTransformationPipeline& TTransformationPipeline::AddFinalCommonOptimization(EYq TTransformationPipeline& TTransformationPipeline::AddOptimization(bool checkWorld, bool withFinalOptimization, EYqlIssueCode issueCode) { AddCommonOptimization(issueCode); Transformers_.push_back(TTransformStage( - CreateRecaptureDataProposalsInspector(*TypeAnnotationContext_, TString{DqProviderName}), + CreateRecaptureDataProposalsInspector(*TypeAnnotationContext_, TString{DqProviderName}), "RecaptureDataProposals", issueCode)); Transformers_.push_back(TTransformStage( diff --git a/ydb/library/yql/core/type_ann/type_ann_core.cpp b/ydb/library/yql/core/type_ann/type_ann_core.cpp index 615947c1727..5846e6cb108 100644 --- a/ydb/library/yql/core/type_ann/type_ann_core.cpp +++ b/ydb/library/yql/core/type_ann/type_ann_core.cpp @@ -3113,11 +3113,11 @@ namespace NTypeAnnImpl { return IGraphTransformer::TStatus::Ok; } - IGraphTransformer::TStatus HeadWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) { - output = ctx.Expr.RenameNode(*input, "ToOptional"); - return IGraphTransformer::TStatus::Repeat; - } - + IGraphTransformer::TStatus HeadWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) { + output = ctx.Expr.RenameNode(*input, "ToOptional"); + return IGraphTransformer::TStatus::Repeat; + } + IGraphTransformer::TStatus ToFlowWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) { if (!EnsureArgsCount(*input, 1, ctx.Expr)) { return IGraphTransformer::TStatus::Error; @@ -12802,7 +12802,7 @@ template <NKikimr::NUdf::EDataSlot DataSlot> Functions["ToList"] = &ToListWrapper; Functions["ToOptional"] = &ToOptionalWrapper; Functions["Iterable"] = &IterableWrapper; - Functions["Head"] = &HeadWrapper; + Functions["Head"] = &HeadWrapper; Functions["Last"] = &ToOptionalWrapper; Functions["AsTagged"] = &AsTaggedWrapper; Functions["Untag"] = &UntagWrapper; diff --git a/ydb/library/yql/core/yql_join.cpp b/ydb/library/yql/core/yql_join.cpp index 8de9ef72c72..9a9ded7a327 100644 --- a/ydb/library/yql/core/yql_join.cpp +++ b/ydb/library/yql/core/yql_join.cpp @@ -1113,14 +1113,14 @@ TMap<TStringBuf, TVector<TStringBuf>> UpdateUsedFieldsInRenameMap( bool needRemove = !usedFields.contains(item->GetName()); if (auto renamed = reversedRenameMap.FindPtr(item->GetName())) { if (needRemove) { - if (newRenameMap[*renamed].empty()) { - newRenameMap[*renamed].push_back(""); - } + if (newRenameMap[*renamed].empty()) { + newRenameMap[*renamed].push_back(""); + } } else { - if (!newRenameMap[*renamed].empty() && newRenameMap[*renamed][0].empty()) { - newRenameMap[*renamed].clear(); // Do not remove column because it will be renamed. - } + if (!newRenameMap[*renamed].empty() && newRenameMap[*renamed][0].empty()) { + newRenameMap[*renamed].clear(); // Do not remove column because it will be renamed. + } newRenameMap[*renamed].push_back(item->GetName()); } } diff --git a/ydb/library/yql/core/yql_opt_utils.cpp b/ydb/library/yql/core/yql_opt_utils.cpp index 7a2c730cd89..69aa84a500f 100644 --- a/ydb/library/yql/core/yql_opt_utils.cpp +++ b/ydb/library/yql/core/yql_opt_utils.cpp @@ -1003,11 +1003,11 @@ void ExtractSimpleSortTraits(const TExprNode& sortDirections, const TExprNode& k } const TExprNode& SkipCallables(const TExprNode& node, const std::initializer_list<std::string_view>& skipCallables) { - const TExprNode* p = &node; - while (p->IsCallable(skipCallables)) { - p = &p->Head(); - } - return *p; + const TExprNode* p = &node; + while (p->IsCallable(skipCallables)) { + p = &p->Head(); + } + return *p; } namespace { @@ -1029,7 +1029,7 @@ TExprNode::TPtr ApplyWithCastStructForFirstArg(const TExprNode::TPtr& node, cons auto result = ctx.NewLambda(pos, ctx.NewArguments(pos, std::move(args)), std::move(body)); return ctx.DeepCopyLambda(*result); -} +} } diff --git a/ydb/library/yql/core/yql_opt_utils.h b/ydb/library/yql/core/yql_opt_utils.h index 3de6350b7f2..00635fea81e 100644 --- a/ydb/library/yql/core/yql_opt_utils.h +++ b/ydb/library/yql/core/yql_opt_utils.h @@ -82,7 +82,7 @@ TExprNode::TPtr MakeBool(TPositionHandle position, TExprContext& ctx); TExprNode::TPtr MakeIdentityLambda(TPositionHandle position, TExprContext& ctx); constexpr std::initializer_list<std::string_view> SkippableCallables = {"Unordered", "AssumeSorted", "AssumeUnique", "AssumeColumnOrder", "AssumeAllMembersNullableAtOnce"}; - + const TExprNode& SkipCallables(const TExprNode& node, const std::initializer_list<std::string_view>& skipCallables); void ExtractSortKeyAndOrder(TPositionHandle pos, const TExprNode::TPtr& sortTraitsNode, TExprNode::TPtr& sortKey, TExprNode::TPtr& sortOrder, TExprContext& ctx); diff --git a/ydb/library/yql/dq/common/dq_common.h b/ydb/library/yql/dq/common/dq_common.h index d91aa3e83f3..71cc7be6cb8 100644 --- a/ydb/library/yql/dq/common/dq_common.h +++ b/ydb/library/yql/dq/common/dq_common.h @@ -32,10 +32,10 @@ struct TBaseDqResManEvents { ES_GET_MASTER_RESPONSE, ES_CONFIGURE_FAILURE_INJECTOR, - ES_CONFIGURE_FAILURE_INJECTOR_RESPONSE, - - ES_QUERY_STATUS, - ES_QUERY_STATUS_RESPONSE, + ES_CONFIGURE_FAILURE_INJECTOR_RESPONSE, + + ES_QUERY_STATUS, + ES_QUERY_STATUS_RESPONSE, ES_ROUTES, ES_ROUTES_RESPONSE, diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.cpp b/ydb/library/yql/dq/opt/dq_opt_phy.cpp index 2aa8ac92a5c..cbe871059d7 100644 --- a/ydb/library/yql/dq/opt/dq_opt_phy.cpp +++ b/ydb/library/yql/dq/opt/dq_opt_phy.cpp @@ -382,15 +382,15 @@ TExprBase DqBuildFlatmapStage(TExprBase node, TExprContext& ctx, IOptimizationCo .Done(); } -template <typename BaseLMap> -TExprBase DqPushBaseLMapToStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, +template <typename BaseLMap> +TExprBase DqPushBaseLMapToStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TParentsMap& parentsMap, bool allowStageMultiUsage = true) { - if (!node.Maybe<BaseLMap>().Input().template Maybe<TDqCnUnionAll>()) { + if (!node.Maybe<BaseLMap>().Input().template Maybe<TDqCnUnionAll>()) { return node; } - auto lmap = node.Cast<BaseLMap>(); + auto lmap = node.Cast<BaseLMap>(); auto dqUnion = lmap.Input().template Cast<TDqCnUnionAll>(); if (!IsSingleConsumerConnection(dqUnion, parentsMap, allowStageMultiUsage)) { return node; @@ -402,11 +402,11 @@ TExprBase DqPushBaseLMapToStage(TExprBase node, TExprContext& ctx, IOptimization auto lambda = Build<TCoLambda>(ctx, lmap.Lambda().Pos()) .Args({"stream"}) - .template Body<TCoToStream>() - .template Input<TExprApplier>() - .Apply(lmap.Lambda()) - .With(lmap.Lambda().Args().Arg(0), "stream") - .Build() + .template Body<TCoToStream>() + .template Input<TExprApplier>() + .Apply(lmap.Lambda()) + .With(lmap.Lambda().Args().Arg(0), "stream") + .Build() .Build() .Done(); @@ -418,18 +418,18 @@ TExprBase DqPushBaseLMapToStage(TExprBase node, TExprContext& ctx, IOptimization return result.Cast(); } -TExprBase DqPushOrderedLMapToStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, +TExprBase DqPushOrderedLMapToStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TParentsMap& parentsMap, bool allowStageMultiUsage) -{ +{ return DqPushBaseLMapToStage<TCoOrderedLMap>(node, ctx, optCtx, parentsMap, allowStageMultiUsage); -} - -TExprBase DqPushLMapToStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, +} + +TExprBase DqPushLMapToStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TParentsMap& parentsMap, bool allowStageMultiUsage) -{ +{ return DqPushBaseLMapToStage<TCoLMap>(node, ctx, optCtx, parentsMap, allowStageMultiUsage); -} - +} + TExprBase DqBuildExtFunctionStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TParentsMap& parentsMap, bool allowStageMultiUsage) { diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.h b/ydb/library/yql/dq/opt/dq_opt_phy.h index 3f8b24b0c61..70a9d70082b 100644 --- a/ydb/library/yql/dq/opt/dq_opt_phy.h +++ b/ydb/library/yql/dq/opt/dq_opt_phy.h @@ -26,9 +26,9 @@ NNodes::TExprBase DqPushExtractMembersToStage(NNodes::TExprBase node, TExprConte NNodes::TExprBase DqPushOrderedLMapToStage(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TParentsMap& parentsMap, bool allowStageMultiUsage = true); -NNodes::TExprBase DqPushLMapToStage(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, +NNodes::TExprBase DqPushLMapToStage(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TParentsMap& parentsMap, bool allowStageMultiUsage = true); - + NNodes::TExprBase DqBuildExtFunctionStage(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TParentsMap& parentsMap, bool allowStageMultiUsage = true); diff --git a/ydb/library/yql/dq/type_ann/dq_type_ann.cpp b/ydb/library/yql/dq/type_ann/dq_type_ann.cpp index 9c8c54bbb32..fcbd3bb9ecf 100644 --- a/ydb/library/yql/dq/type_ann/dq_type_ann.cpp +++ b/ydb/library/yql/dq/type_ann/dq_type_ann.cpp @@ -160,7 +160,7 @@ TStatus AnnotateStage(const TExprNode::TPtr& stage, TExprContext& ctx) { } THashMap<TStringBuf, THashMap<TStringBuf, const TTypeAnnotationNode*>> -ParseJoinInputType(const TStructExprType& rowType, TStringBuf tableLabel, TExprContext& ctx, bool optional) { +ParseJoinInputType(const TStructExprType& rowType, TStringBuf tableLabel, TExprContext& ctx, bool optional) { THashMap<TStringBuf, THashMap<TStringBuf, const TTypeAnnotationNode*>> result; for (auto member : rowType.GetItems()) { TStringBuf label, column; @@ -174,14 +174,14 @@ ParseJoinInputType(const TStructExprType& rowType, TStringBuf tableLabel, TExprC result.clear(); return result; } - auto memberType = member->GetItemType(); - if (optional && !memberType->IsOptionalOrNull()) { - memberType = ctx.MakeType<TOptionalExprType>(memberType); - } + auto memberType = member->GetItemType(); + if (optional && !memberType->IsOptionalOrNull()) { + memberType = ctx.MakeType<TOptionalExprType>(memberType); + } if (!tableLabel.empty()) { - result[tableLabel][member->GetName()] = memberType; + result[tableLabel][member->GetName()] = memberType; } else { - result[label][column] = memberType; + result[label][column] = memberType; } } return result; @@ -193,8 +193,8 @@ const TStructExprType* GetDqJoinResultType(TPositionHandle pos, const TStructExp const TStringBuf& joinType, const TDqJoinKeyTupleList& joinKeys, TExprContext& ctx) { // check left - bool isLeftOptional = IsLeftJoinSideOptional(joinType); - auto leftType = ParseJoinInputType(leftRowType, leftLabel, ctx, isLeftOptional); + bool isLeftOptional = IsLeftJoinSideOptional(joinType); + auto leftType = ParseJoinInputType(leftRowType, leftLabel, ctx, isLeftOptional); if (leftType.empty() && joinType != "Cross") { TStringStream str; str << "Cannot parse left join input type: "; leftRowType.Out(str); @@ -203,8 +203,8 @@ const TStructExprType* GetDqJoinResultType(TPositionHandle pos, const TStructExp } // check right - bool isRightOptional = IsRightJoinSideOptional(joinType); - auto rightType = ParseJoinInputType(rightRowType, rightLabel, ctx, isRightOptional); + bool isRightOptional = IsRightJoinSideOptional(joinType); + auto rightType = ParseJoinInputType(rightRowType, rightLabel, ctx, isRightOptional); if (rightType.empty() && joinType != "Cross") { TStringStream str; str << "Cannot parse right join input type: "; rightRowType.Out(str); diff --git a/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_datasource.cpp b/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_datasource.cpp index 2b45751ad97..e23600f80fb 100644 --- a/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_datasource.cpp +++ b/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_datasource.cpp @@ -76,33 +76,33 @@ public: return &State_->Configuration->Tokens; } - bool GetDependencies(const TExprNode& node, TExprNode::TListType& children, bool compact) override { - Y_UNUSED(compact); - - for (auto& child : node.Children()) { - children.push_back(child.Get()); - } - - if (TMaybeNode<TClReadTable>(&node)) { - return true; - } - return false; - } - - void GetInputs(const TExprNode& node, TVector<TPinInfo>& inputs) override { - if (auto maybeRead = TMaybeNode<TClReadTable>(&node)) { - if (auto maybeTable = maybeRead.Table()) { - TStringBuilder tableNameBuilder; - if (auto dataSource = maybeRead.DataSource().Maybe<TClDataSource>()) { - auto cluster = dataSource.Cast().Cluster(); - tableNameBuilder << cluster.Value() << "."; - } - tableNameBuilder << '`' << maybeTable.Cast().Value() << '`'; - inputs.push_back(TPinInfo(maybeRead.DataSource().Raw(), nullptr, maybeTable.Cast().Raw(), tableNameBuilder, false)); - } - } - } - + bool GetDependencies(const TExprNode& node, TExprNode::TListType& children, bool compact) override { + Y_UNUSED(compact); + + for (auto& child : node.Children()) { + children.push_back(child.Get()); + } + + if (TMaybeNode<TClReadTable>(&node)) { + return true; + } + return false; + } + + void GetInputs(const TExprNode& node, TVector<TPinInfo>& inputs) override { + if (auto maybeRead = TMaybeNode<TClReadTable>(&node)) { + if (auto maybeTable = maybeRead.Table()) { + TStringBuilder tableNameBuilder; + if (auto dataSource = maybeRead.DataSource().Maybe<TClDataSource>()) { + auto cluster = dataSource.Cast().Cluster(); + tableNameBuilder << cluster.Value() << "."; + } + tableNameBuilder << '`' << maybeTable.Cast().Value() << '`'; + inputs.push_back(TPinInfo(maybeRead.DataSource().Raw(), nullptr, maybeTable.Cast().Raw(), tableNameBuilder, false)); + } + } + } + IDqIntegration* GetDqIntegration() override { return DqIntegration_.Get(); } diff --git a/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp b/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp index 8d0bddff2a5..4da4d13d034 100644 --- a/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp +++ b/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp @@ -29,11 +29,11 @@ TExprNode::TPtr TDqIntegrationBase::WrapRead(const TDqSettings&, const TExprNode } TMaybe<bool> TDqIntegrationBase::CanWrite(const TDqSettings&, const TExprNode& write, TExprContext& ctx) { - Y_UNUSED(write); + Y_UNUSED(write); Y_UNUSED(ctx); - return Nothing(); -} - + return Nothing(); +} + void TDqIntegrationBase::RegisterMkqlCompiler(NCommon::TMkqlCallableCompilerBase& compiler) { Y_UNUSED(compiler); } diff --git a/ydb/library/yql/providers/common/provider/yql_provider.cpp b/ydb/library/yql/providers/common/provider/yql_provider.cpp index 2a43027dfc2..aa782ee93ff 100644 --- a/ydb/library/yql/providers/common/provider/yql_provider.cpp +++ b/ydb/library/yql/providers/common/provider/yql_provider.cpp @@ -714,17 +714,17 @@ TString ExprToPrettyString(TExprContext& ctx, const TExprNode& expr) { return exprText; } -bool IsFlowOrStream(const TExprNode* node) { - auto kind = node->GetTypeAnn()->GetKind(); - return kind == ETypeAnnotationKind::Stream || kind == ETypeAnnotationKind::Flow; -} +bool IsFlowOrStream(const TExprNode* node) { + auto kind = node->GetTypeAnn()->GetKind(); + return kind == ETypeAnnotationKind::Stream || kind == ETypeAnnotationKind::Flow; +} void WriteStream(NYson::TYsonWriter& writer, const TExprNode* node, const TExprNode* source) { if (node == source) { return; } - if (!node->IsCallable()) { + if (!node->IsCallable()) { return; } @@ -736,7 +736,7 @@ void WriteStream(NYson::TYsonWriter& writer, const TExprNode* node, const TExprN if (TCoApply::Match(node)) { switch (node->GetTypeAnn()->GetKind()) { case ETypeAnnotationKind::Stream: - case ETypeAnnotationKind::Flow: + case ETypeAnnotationKind::Flow: case ETypeAnnotationKind::List: break; default: @@ -744,12 +744,12 @@ void WriteStream(NYson::TYsonWriter& writer, const TExprNode* node, const TExprN } for (size_t i = 1; i < node->ChildrenSize(); ++i) { - if (IsFlowOrStream(node->Child(i))) { + if (IsFlowOrStream(node->Child(i))) { applyStreamChildren.push_back(node->Child(i)); } else if (node->Child(i)->GetTypeAnn()->GetKind() == ETypeAnnotationKind::List) { if (node->Child(i)->IsCallable("ForwardList")) { applyStreamChildren.push_back(node->Child(i)->Child(0)); - } else if (node->Child(i)->IsCallable("Collect") && IsFlowOrStream(node->Child(i)->HeadPtr().Get())) { + } else if (node->Child(i)->IsCallable("Collect") && IsFlowOrStream(node->Child(i)->HeadPtr().Get())) { applyStreamChildren.push_back(node->Child(i)->Child(0)); } } @@ -758,7 +758,7 @@ void WriteStream(NYson::TYsonWriter& writer, const TExprNode* node, const TExprN WriteStream(writer, applyStreamChildren.front(), source); } } - else if (!TCoExtendBase::Match(node) && node->ChildrenSize() > 0) { + else if (!TCoExtendBase::Match(node) && node->ChildrenSize() > 0) { WriteStream(writer, node->Child(0), source); } @@ -766,7 +766,7 @@ void WriteStream(NYson::TYsonWriter& writer, const TExprNode* node, const TExprN writer.OnBeginMap(); writer.OnKeyedItem("Name"); writer.OnStringScalar(node->Content()); - if (TCoFlatMapBase::Match(node) && IsFlowOrStream(node->ChildPtr(1).Get())) { + if (TCoFlatMapBase::Match(node) && IsFlowOrStream(node->ChildPtr(1).Get())) { writer.OnKeyedItem("Children"); writer.OnBeginList(); writer.OnListItem(); @@ -799,7 +799,7 @@ void WriteStream(NYson::TYsonWriter& writer, const TExprNode* node, const TExprN writer.OnEndList(); } - if (TCoExtendBase::Match(node) && node->ChildrenSize() > 0) { + if (TCoExtendBase::Match(node) && node->ChildrenSize() > 0) { writer.OnKeyedItem("Children"); writer.OnBeginList(); for (size_t i = 0; i < node->ChildrenSize(); ++i) { @@ -839,23 +839,23 @@ double GetDataReplicationFactor(double factor, const TExprNode* node, const TExp return factor; } - if (!node->IsCallable()) { + if (!node->IsCallable()) { return factor; } if (TCoApply::Match(node)) { switch (node->GetTypeAnn()->GetKind()) { case ETypeAnnotationKind::Stream: - case ETypeAnnotationKind::Flow: + case ETypeAnnotationKind::Flow: case ETypeAnnotationKind::List: { double applyFactor = 0.0; for (size_t i = 1; i < node->ChildrenSize(); ++i) { - if (IsFlowOrStream(node->Child(i))) { + if (IsFlowOrStream(node->Child(i))) { applyFactor += GetDataReplicationFactor(factor, node->Child(i), stream, ctx); } else if (node->Child(i)->GetTypeAnn()->GetKind() == ETypeAnnotationKind::List) { if (node->Child(i)->IsCallable("ForwardList")) { applyFactor += GetDataReplicationFactor(factor, node->Child(i)->Child(0), stream, ctx); - } else if (node->Child(i)->IsCallable("Collect") && IsFlowOrStream(node->Child(i)->HeadPtr().Get())) { + } else if (node->Child(i)->IsCallable("Collect") && IsFlowOrStream(node->Child(i)->HeadPtr().Get())) { applyFactor += GetDataReplicationFactor(factor, node->Child(i)->Child(0), stream, ctx); } } @@ -869,7 +869,7 @@ double GetDataReplicationFactor(double factor, const TExprNode* node, const TExp return factor; } - if (!TCoExtendBase::Match(node) && node->ChildrenSize() > 0) { + if (!TCoExtendBase::Match(node) && node->ChildrenSize() > 0) { factor = GetDataReplicationFactor(factor, node->Child(0), stream, ctx); } @@ -921,7 +921,7 @@ double GetDataReplicationFactor(double factor, const TExprNode* node, const TExp } factor = Max(1.0, switchFactor); } - else if (TCoExtendBase::Match(node) && node->ChildrenSize() > 0) { + else if (TCoExtendBase::Match(node) && node->ChildrenSize() > 0) { double extendFactor = 0.0; for (size_t i = 0; i < node->ChildrenSize(); ++i) { extendFactor += GetDataReplicationFactor(factor, node->Child(i), stream, ctx); diff --git a/ydb/library/yql/providers/config/yql_config_provider.cpp b/ydb/library/yql/providers/config/yql_config_provider.cpp index 9c8752af56d..6952e96cf07 100644 --- a/ydb/library/yql/providers/config/yql_config_provider.cpp +++ b/ydb/library/yql/providers/config/yql_config_provider.cpp @@ -673,10 +673,10 @@ namespace { if (Find(Types.AvailablePureResultDataSources, DqProviderName) == Types.AvailablePureResultDataSources.end() || arg == "disable") { ; // reserved } else if (arg == "auto") { - Types.PureResultDataSource = DqProviderName; + Types.PureResultDataSource = DqProviderName; Types.ForceDq = false; } else if (arg == "force") { - Types.PureResultDataSource = DqProviderName; + Types.PureResultDataSource = DqProviderName; Types.ForceDq = true; } else { ctx.AddError(TIssue(pos, TStringBuilder() << "Expected `disable|auto|force', but got: " << args[0])); diff --git a/ydb/library/yql/providers/dq/actors/executer_actor.cpp b/ydb/library/yql/providers/dq/actors/executer_actor.cpp index 45df684e94b..91f1caee748 100644 --- a/ydb/library/yql/providers/dq/actors/executer_actor.cpp +++ b/ydb/library/yql/providers/dq/actors/executer_actor.cpp @@ -143,14 +143,14 @@ private: int workerCount = ev->Get()->Record.GetRequest().GetTask().size(); YQL_LOG(INFO) << (TStringBuilder() << "Trying to allocate " << workerCount << " workers"); - THashMap<TString, Yql::DqsProto::TFile> files; + THashMap<TString, Yql::DqsProto::TFile> files; TVector<NDqProto::TDqTask> tasks; for (auto& task : *ev->Get()->Record.MutableRequest()->MutableTask()) { Yql::DqsProto::TTaskMeta taskMeta; task.GetMeta().UnpackTo(&taskMeta); for (const auto& f : taskMeta.GetFiles()) { - files.emplace(f.GetObjectId(), f); + files.emplace(f.GetObjectId(), f); } if (ev->Get()->Record.GetRequest().GetSecureParams().size() > 0) { @@ -189,7 +189,7 @@ private: if (enableComputeActor) { ActorIdToProto(ControlId, allocateRequest->Record.MutableResultActorId()); } - for (const auto& [_, f] : files) { + for (const auto& [_, f] : files) { *allocateRequest->Record.AddFiles() = f; } @@ -199,7 +199,7 @@ private: Yql::DqsProto::TWorkerFilter pragmaFilter = GetPragmaFilter(); - for (const auto& task : tasks) { + for (const auto& task : tasks) { Yql::DqsProto::TTaskMeta taskMeta; task.GetMeta().UnpackTo(&taskMeta); @@ -212,8 +212,8 @@ private: } MergeFilter(filter, pragmaFilter); - } - + } + StartCounter("AllocateWorkers"); TActivationContext::Send(new IEventHandle( diff --git a/ydb/library/yql/providers/dq/actors/resource_allocator.cpp b/ydb/library/yql/providers/dq/actors/resource_allocator.cpp index 33aee0caeeb..6acfb1277e0 100644 --- a/ydb/library/yql/providers/dq/actors/resource_allocator.cpp +++ b/ydb/library/yql/providers/dq/actors/resource_allocator.cpp @@ -64,12 +64,12 @@ public: , RetryCounter(counters->GetSubgroup("component", "ServiceProxyActor")->GetCounter("RetryCreateActor", /*derivative=*/ true)) , Tasks(tasks) , ComputeActorType(computeActorType) - { - AllocatedWorkers.resize(workerCount); + { + AllocatedWorkers.resize(workerCount); if (!Tasks.empty()) { Y_VERIFY(workerCount == Tasks.size()); } - } + } private: STRICT_STFUNC(Handle, { @@ -177,17 +177,17 @@ private: YQL_ENSURE(RequestedNodes.contains(cookie)); auto& requestedNode = RequestedNodes[cookie]; if (!requestedNode.RequestedFlag) { - TDqResourceId resourceId{}; - resourceId.Data = cookie; - AllocatedWorkers[resourceId.u3 - 1] = ev->Get()->Record.GetWorkers(); - AllocatedCount++; + TDqResourceId resourceId{}; + resourceId.Data = cookie; + AllocatedWorkers[resourceId.u3 - 1] = ev->Get()->Record.GetWorkers(); + AllocatedCount++; requestedNode.RequestedFlag = true; auto delta = TInstant::Now() - requestedNode.StartTime; // catched and grpc_service QueryStat.AddCounter(QueryStat.GetCounterName("Actor", {{"ClusterName", requestedNode.ClusterName}}, "ActorCreateTime"), delta); } - if (AllocatedCount == RequestedCount) { + if (AllocatedCount == RequestedCount) { TVector<NActors::TActorId> workerIds; for (auto& group : AllocatedWorkers) { for (const auto& actorIdProto : group.GetWorkerActor()) { @@ -199,15 +199,15 @@ private: QueryStat.FlushCounters(response->Record); auto* workerGroup = response->Record.MutableWorkers(); TVector<Yql::DqsProto::TWorkerInfo> workers; - workers.resize(AllocatedCount); - for (const auto& [resourceId, requestInfo] : RequestedNodes) { - TDqResourceId dqResourceId{}; - dqResourceId.Data = resourceId; - workers[dqResourceId.u3 - 1] = requestInfo.WorkerInfo; + workers.resize(AllocatedCount); + for (const auto& [resourceId, requestInfo] : RequestedNodes) { + TDqResourceId dqResourceId{}; + dqResourceId.Data = resourceId; + workers[dqResourceId.u3 - 1] = requestInfo.WorkerInfo; + } + for (const auto& worker : workers) { + *workerGroup->AddWorker() = worker; } - for (const auto& worker : workers) { - *workerGroup->AddWorker() = worker; - } Send(SenderId, response.Release()); Answered = true; } @@ -306,7 +306,7 @@ private: bool LocalMode = false; TVector<NDqProto::TWorkerGroup> AllocatedWorkers; - ui32 AllocatedCount = 0; + ui32 AllocatedCount = 0; bool FailState = false; bool Answered = false; diff --git a/ydb/library/yql/providers/dq/api/grpc/api.proto b/ydb/library/yql/providers/dq/api/grpc/api.proto index d2baab0c502..ef620dca4f4 100644 --- a/ydb/library/yql/providers/dq/api/grpc/api.proto +++ b/ydb/library/yql/providers/dq/api/grpc/api.proto @@ -14,7 +14,7 @@ service DqService { rpc PingSession (PingSessionRequest) returns (PingSessionResponse); rpc ClusterStatus (ClusterStatusRequest) returns (ClusterStatusResponse); rpc OperationStop (OperationStopRequest) returns (OperationStopResponse); - rpc QueryStatus (QueryStatusRequest) returns (QueryStatusResponse); + rpc QueryStatus (QueryStatusRequest) returns (QueryStatusResponse); rpc JobStop (JobStopRequest) returns (JobStopResponse); rpc GetMaster (GetMasterRequest) returns (GetMasterResponse); rpc ConfigureFailureInjector (ConfigureFailureInjectorRequest) returns (ConfigureFailureInjectorResponse); diff --git a/ydb/library/yql/providers/dq/api/protos/dqs.proto b/ydb/library/yql/providers/dq/api/protos/dqs.proto index bc336df50a3..09aecfcf923 100644 --- a/ydb/library/yql/providers/dq/api/protos/dqs.proto +++ b/ydb/library/yql/providers/dq/api/protos/dqs.proto @@ -25,7 +25,7 @@ message TAllocateWorkersRequest { // repeated Yql.DqsProto.DatabaseDescription Databases = 7; // unused string User = 8; - + repeated Yql.DqsProto.TWorkerFilter WorkerFilterPerTask = 9; uint32 WorkersCount = 10; @@ -109,15 +109,15 @@ message TEvOperationStop { message TEvOperationStopResponse { } -message TEvQueryStatus { - Yql.DqsProto.QueryStatusRequest Request = 1; - bool IsForwarded = 2; -} - -message TEvQueryStatusResponse { - Yql.DqsProto.QueryStatusResponse Response = 1; -} - +message TEvQueryStatus { + Yql.DqsProto.QueryStatusRequest Request = 1; + bool IsForwarded = 2; +} + +message TEvQueryStatusResponse { + Yql.DqsProto.QueryStatusResponse Response = 1; +} + message TEvIsReady { Yql.DqsProto.IsReadyRequest Request = 1; bool IsForwarded = 2; diff --git a/ydb/library/yql/providers/dq/api/protos/service.proto b/ydb/library/yql/providers/dq/api/protos/service.proto index 35430d86e55..b91351e5d12 100644 --- a/ydb/library/yql/providers/dq/api/protos/service.proto +++ b/ydb/library/yql/providers/dq/api/protos/service.proto @@ -68,8 +68,8 @@ message TWorkerFilter { repeated string Address = 5; repeated uint64 NodeId = 6; repeated uint64 NodeIdHint = 7; -} - +} + message ExecuteGraphRequest { Ydb.Operations.OperationParams Params = 1; repeated NYql.NDqProto.TDqTask Task = 3; @@ -118,14 +118,14 @@ message PingSessionRequest { message PingSessionResponse { } -message QueryStatusRequest { - string Session = 1; -} - -message QueryStatusResponse { - string Status = 1; -} - +message QueryStatusRequest { + string Session = 1; +} + +message QueryStatusResponse { + string Status = 1; +} + message TAttribute { string Key = 1; string Value = 2; diff --git a/ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.json b/ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.json index 04e920802de..566f8d05032 100644 --- a/ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.json +++ b/ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.json @@ -25,16 +25,16 @@ "Name": "TDqReadWideWrap", "Base": "TDqReadWrapBase", "Match": {"Type": "Callable", "Name": "DqReadWideWrap"} - }, - { - "Name": "TDqWrite", - "Base": "TCallable", - "Match": {"Type": "Callable", "Name": "DqWrite"}, - "Children": [ - {"Index": 0, "Name": "Input", "Type": "TExprBase"}, - {"Index": 1, "Name": "Provider", "Type": "TCoAtom"}, - {"Index": 2, "Name": "Settings", "Type": "TCoNameValueTupleList", "Optional": true} - ] + }, + { + "Name": "TDqWrite", + "Base": "TCallable", + "Match": {"Type": "Callable", "Name": "DqWrite"}, + "Children": [ + {"Index": 0, "Name": "Input", "Type": "TExprBase"}, + {"Index": 1, "Name": "Provider", "Type": "TCoAtom"}, + {"Index": 2, "Name": "Settings", "Type": "TCoNameValueTupleList", "Optional": true} + ] }, { "Name": "TDqSourceWrapBase", diff --git a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp index 61fac27751d..858e3da45a0 100644 --- a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp +++ b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp @@ -208,10 +208,10 @@ protected: } template <bool IsGlobal> - TMaybeNode<TExprBase> PushLMapToStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) { + TMaybeNode<TExprBase> PushLMapToStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) { return DqPushLMapToStage(node, ctx, optCtx, *getParents(), IsGlobal); - } - + } + template <bool IsGlobal> TMaybeNode<TExprBase> BuildExtFunctionStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) { return DqBuildExtFunctionStage(node, ctx, optCtx, *getParents(), IsGlobal); diff --git a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp index 8a52e9f8688..8759f442219 100644 --- a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp +++ b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp @@ -576,7 +576,7 @@ private: // TODO: change result provider to remove this if ui32 inMemoryIndex; for (inMemoryIndex = 0; inMemoryIndex < resFill->ChildrenSize(); ++inMemoryIndex) { - if (resFill->ChildPtr(inMemoryIndex)->IsAtom(DqProviderName)) { + if (resFill->ChildPtr(inMemoryIndex)->IsAtom(DqProviderName)) { break; } } @@ -658,22 +658,22 @@ private: THashMap<ui32, ui32> allPublicIds; bool hasStageError = false; - VisitExpr(result.Ptr(), [&](const TExprNode::TPtr& node) { + VisitExpr(result.Ptr(), [&](const TExprNode::TPtr& node) { const TExprBase expr(node); if (expr.Maybe<TResFill>()) { - if (auto publicId = State->TypeCtx->TranslateOperationId(node->UniqueId())) { + if (auto publicId = State->TypeCtx->TranslateOperationId(node->UniqueId())) { allPublicIds.emplace(*publicId, 0U); - } - } - return true; - }); - + } + } + return true; + }); + if (hasStageError) { return SyncError(); } - IDqGateway::TDqProgressWriter progressWriter = MakeDqProgressWriter(allPublicIds); - + IDqGateway::TDqProgressWriter progressWriter = MakeDqProgressWriter(allPublicIds); + auto executionPlanner = THolder<IDqsExecutionPlanner>(new TDqsSingleExecutionPlanner(lambda, NActors::TActorId(), NActors::TActorId(1, 0, 1, 0), State->FunctionRegistry, result.Input().Ref().GetTypeAnn())); auto& tasks = executionPlanner->GetTasks(); Yql::DqsProto::TTaskMeta taskMeta; @@ -856,7 +856,7 @@ private: if (const TExprBase expr(node); expr.Maybe<TDqConnection>()) { if (const auto publicId = State->TypeCtx->TranslateOperationId(node->UniqueId())) { allPublicIds.emplace(*publicId, 0U); - } + } } else if (const auto& maybeStage = expr.Maybe<TDqStage>()) { const auto& stage = maybeStage.Cast(); if (!(stage.Ref().StartsExecution() || stage.Ref().HasResult())) { @@ -881,13 +881,13 @@ private: } auto optimizedInput = pull.Input().Ptr(); - THashMap<TString, TString> secureParams; - NCommon::FillSecureParams(optimizedInput, *State->TypeCtx, secureParams); - - optimizedInput = ctx.ShallowCopy(*optimizedInput); + THashMap<TString, TString> secureParams; + NCommon::FillSecureParams(optimizedInput, *State->TypeCtx, secureParams); + + optimizedInput = ctx.ShallowCopy(*optimizedInput); optimizedInput->SetTypeAnn(pull.Input().Ref().GetTypeAnn()); optimizedInput->CopyConstraints(pull.Input().Ref()); - + TDqsPipelineConfigurator peepholeConfig; TPeepholeSettings peepholeSettings; peepholeSettings.CommonConfig = &peepholeConfig; @@ -991,9 +991,9 @@ private: Yql::DqsProto::TTaskMeta taskMeta; t.MutableMeta()->UnpackTo(&taskMeta); - for (const auto& file : uploadList) { + for (const auto& file : uploadList) { *taskMeta.AddFiles() = file; - } + } t.MutableMeta()->PackFrom(taskMeta); if (const auto it = allPublicIds.find(taskMeta.GetStageId()); allPublicIds.cend() != it) ++it->second; @@ -1001,7 +1001,7 @@ private: } MarkProgressStarted(allPublicIds, State->ProgressWriter); - + if (fallbackFlag) { return FallbackWithMessage(pull.Ref(), "Too big attachment", ctx); } @@ -1039,17 +1039,17 @@ private: settings->_RowsLimitPerWrite = 0; } - IDqGateway::TDqProgressWriter progressWriter = MakeDqProgressWriter(allPublicIds); + IDqGateway::TDqProgressWriter progressWriter = MakeDqProgressWriter(allPublicIds); auto future = State->DqGateway->ExecutePlan(State->SessionId, *executionPlanner.Get(), columns, secureParams, graphParams, settings, progressWriter, ModulesMapping, fillSettings.Discard); - + future.Subscribe([allPublicIds, progressWriter = State->ProgressWriter](const NThreading::TFuture<IDqGateway::TResult>& completedFuture) { - YQL_ENSURE(!completedFuture.HasException()); + YQL_ENSURE(!completedFuture.HasException()); MarkProgressFinished(allPublicIds, completedFuture.GetValueSync().Success(), progressWriter); - }); + }); executionPlanner.Destroy(); - + int level = 0; // TODO: remove copy-paste @@ -1173,18 +1173,18 @@ private: IDqGateway::TDqProgressWriter MakeDqProgressWriter(const THashMap<ui32, ui32>& allPublicIds) const { IDqGateway::TDqProgressWriter dqProgressWriter = [progressWriter = State->ProgressWriter, allPublicIds](const TString& stage) { - for (const auto& publicId : allPublicIds) { + for (const auto& publicId : allPublicIds) { auto p = TOperationProgress(TString(DqProviderName), publicId.first, TOperationProgress::EState::InProgress, stage); if (publicId.second) { p.Counters.ConstructInPlace(); p.Counters->Running = p.Counters->Total = publicId.second; } progressWriter(p); - } - }; - return dqProgressWriter; - } - + } + }; + return dqProgressWriter; + } + static void MarkProgressStarted(const THashMap<ui32, ui32>& allPublicIds, const TOperationProgressWriter& progressWriter) { for(const auto& publicId : allPublicIds) { auto p = TOperationProgress(TString(DqProviderName), publicId.first, TOperationProgress::EState::InProgress); @@ -1193,21 +1193,21 @@ private: p.Counters->Running = p.Counters->Total = publicId.second; } progressWriter(p); - } - } - + } + } + static void MarkProgressFinished(const THashMap<ui32, ui32>& allPublicIds, bool success, const TOperationProgressWriter& progressWriter) { for(const auto& publicId : allPublicIds) { - auto state = success ? TOperationProgress::EState::Finished : TOperationProgress::EState::Failed; + auto state = success ? TOperationProgress::EState::Finished : TOperationProgress::EState::Failed; auto p = TOperationProgress(TString(DqProviderName), publicId.first, state); if (publicId.second) { p.Counters.ConstructInPlace(); (success ? p.Counters->Completed : p.Counters->Failed) = p.Counters->Total = publicId.second; } progressWriter(p); - } - } - + } + } + void FlushStatisticsToState() { TOperationStatistics statistics; FlushCounters(statistics); diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp index b034fcb8545..a8b5e598a08 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp +++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp @@ -23,14 +23,14 @@ namespace NYql { using namespace NNodes; -class TDqDataProviderSink: public TDataProviderBase { +class TDqDataProviderSink: public TDataProviderBase { public: - TDqDataProviderSink(const TDqStatePtr& state) + TDqDataProviderSink(const TDqStatePtr& state) : State(state) , LogOptTransformer([state] () { return CreateDqsLogOptTransformer(/*TODO: State->TypeCtx);*/nullptr, state->Settings); }) , PhyOptTransformer([] () { return CreateDqsPhyOptTransformer(/*TODO: State->TypeCtx*/nullptr); }) , PhysicalFinalizingTransformer([] () { return CreateDqsFinalizingOptTransformer(); }) - , TypeAnnotationTransformer([state] () { return CreateDqsDataSinkTypeAnnotationTransformer(state->TypeCtx); }) + , TypeAnnotationTransformer([state] () { return CreateDqsDataSinkTypeAnnotationTransformer(state->TypeCtx); }) , RecaptureTransformer([state] () { return CreateDqsRecaptureTransformer(state); }) { } @@ -157,7 +157,7 @@ public: return false; } - if (node.Child(0)->Content() == DqProviderName) { + if (node.Child(0)->Content() == DqProviderName) { if (node.ChildrenSize() == 2) { if (!EnsureAtom(*node.Child(1), ctx)) { return false; @@ -203,26 +203,26 @@ public: } TStringBuf GetName() const override { - return DqProviderName; + return DqProviderName; } - bool GetDependencies(const TExprNode& node, TExprNode::TListType& children, bool compact) override { - Y_UNUSED(compact); - - if (TDqConnection::Match(&node)) { - children.push_back(node.ChildPtr(TDqConnection::idx_Output)->ChildPtr(TDqOutput::idx_Stage)); + bool GetDependencies(const TExprNode& node, TExprNode::TListType& children, bool compact) override { + Y_UNUSED(compact); + + if (TDqConnection::Match(&node)) { + children.push_back(node.ChildPtr(TDqConnection::idx_Output)->ChildPtr(TDqOutput::idx_Stage)); return false; - } - - if (TDqStageBase::Match(&node)) { - auto inputs = node.ChildPtr(TDqStageBase::idx_Inputs); - for (size_t i = 0; i < inputs->ChildrenSize(); ++i) { - children.push_back(inputs->ChildPtr(i)); - } - ScanPlanDependencies(node.ChildPtr(TDqStageBase::idx_Program), children); - return true; - } - + } + + if (TDqStageBase::Match(&node)) { + auto inputs = node.ChildPtr(TDqStageBase::idx_Inputs); + for (size_t i = 0; i < inputs->ChildrenSize(); ++i) { + children.push_back(inputs->ChildPtr(i)); + } + ScanPlanDependencies(node.ChildPtr(TDqStageBase::idx_Program), children); + return true; + } + if (TDqQuery::Match(&node)) { auto stagesList = node.ChildPtr(TDqQuery::idx_SinkStages); for (size_t i = 0; i < stagesList->ChildrenSize(); ++i) { @@ -231,40 +231,40 @@ public: return true; } - return false; - } - - void ScanPlanDependencies(const TExprNode::TPtr& input, TExprNode::TListType& children) { - VisitExpr(input, [&children](const TExprNode::TPtr& node) { - if (TMaybeNode<TDqReadWrapBase>(node)) { - children.push_back(node->ChildPtr(TDqReadWrapBase::idx_Input)); - return false; - } - return true; - }); - } - - TString GetOperationDisplayName(const TExprNode& node) override { - if (auto maybeStage = TMaybeNode<TDqStageBase>(&node)) { - TStringBuilder builder; - builder << TPlanFormatterBase::GetOperationDisplayName(node); - if (auto publicId = State->TypeCtx->TranslateOperationId(maybeStage.Raw()->UniqueId())) { - builder << " #" << publicId; - } - return builder; - } - return TPlanFormatterBase::GetOperationDisplayName(node); - } - + return false; + } + + void ScanPlanDependencies(const TExprNode::TPtr& input, TExprNode::TListType& children) { + VisitExpr(input, [&children](const TExprNode::TPtr& node) { + if (TMaybeNode<TDqReadWrapBase>(node)) { + children.push_back(node->ChildPtr(TDqReadWrapBase::idx_Input)); + return false; + } + return true; + }); + } + + TString GetOperationDisplayName(const TExprNode& node) override { + if (auto maybeStage = TMaybeNode<TDqStageBase>(&node)) { + TStringBuilder builder; + builder << TPlanFormatterBase::GetOperationDisplayName(node); + if (auto publicId = State->TypeCtx->TranslateOperationId(maybeStage.Raw()->UniqueId())) { + builder << " #" << publicId; + } + return builder; + } + return TPlanFormatterBase::GetOperationDisplayName(node); + } + void WritePlanDetails(const TExprNode& node, NYson::TYsonWriter& writer) override { - if (auto maybeStage = TMaybeNode<TDqStageBase>(&node)) { - writer.OnKeyedItem("Streams"); - writer.OnBeginMap(); - NCommon::WriteStreams(writer, "Program", maybeStage.Cast().Program()); - writer.OnEndMap(); - } - } - + if (auto maybeStage = TMaybeNode<TDqStageBase>(&node)) { + writer.OnKeyedItem("Streams"); + writer.OnBeginMap(); + NCommon::WriteStreams(writer, "Program", maybeStage.Cast().Program()); + writer.OnEndMap(); + } + } + TDqStatePtr State; TLazyInitHolder<IGraphTransformer> LogOptTransformer; @@ -274,8 +274,8 @@ public: TLazyInitHolder<IGraphTransformer> RecaptureTransformer; }; -TIntrusivePtr<IDataProvider> CreateDqDataSink(const TDqStatePtr& state) { - return new TDqDataProviderSink(state); +TIntrusivePtr<IDataProvider> CreateDqDataSink(const TDqStatePtr& state) { + return new TDqDataProviderSink(state); } } // namespace NYql diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasink.h b/ydb/library/yql/providers/dq/provider/yql_dq_datasink.h index 65838d49bf6..d9849824a78 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_datasink.h +++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasink.h @@ -6,5 +6,5 @@ namespace NYql { struct TDqState; using TDqStatePtr = TIntrusivePtr<TDqState>; - TIntrusivePtr<IDataProvider> CreateDqDataSink(const TDqStatePtr& state); + TIntrusivePtr<IDataProvider> CreateDqDataSink(const TDqStatePtr& state); } // namespace NYql diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.cpp index 61fdcbb0a47..c8d9786a7a0 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.cpp +++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.cpp @@ -13,8 +13,8 @@ namespace { class TDqsDataSinkTypeAnnotationTransformer : public TVisitorTransformerBase { public: - TDqsDataSinkTypeAnnotationTransformer(TTypeAnnotationContext* typeCtx) - : TVisitorTransformerBase(true), TypeCtx(typeCtx) + TDqsDataSinkTypeAnnotationTransformer(TTypeAnnotationContext* typeCtx) + : TVisitorTransformerBase(true), TypeCtx(typeCtx) { AddHandler({TDqStage::CallableName()}, Hndl(&NDq::AnnotateDqStage)); AddHandler({TDqPhyStage::CallableName()}, Hndl(&NDq::AnnotateDqPhyStage)); @@ -32,47 +32,47 @@ public: AddHandler({TDqPhyCrossJoin::CallableName()}, Hndl(&NDq::AnnotateDqCrossJoin)); AddHandler({TDqPhyJoinDict::CallableName()}, Hndl(&NDq::AnnotateDqMapOrDictJoin)); AddHandler({TDqSink::CallableName()}, Hndl(&NDq::AnnotateDqSink)); - AddHandler({TDqWrite::CallableName()}, Hndl(&TDqsDataSinkTypeAnnotationTransformer::AnnotateDqWrite)); + AddHandler({TDqWrite::CallableName()}, Hndl(&TDqsDataSinkTypeAnnotationTransformer::AnnotateDqWrite)); AddHandler({TDqQuery::CallableName()}, Hndl(&NDq::AnnotateDqQuery)); } - -private: + +private: TStatus AnnotateDqWrite(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) { - if (!EnsureMinArgsCount(*input, 2, ctx)) { - return TStatus::Error; - } - - if (!EnsureMaxArgsCount(*input, 3, ctx)) { - return TStatus::Error; - } - + if (!EnsureMinArgsCount(*input, 2, ctx)) { + return TStatus::Error; + } + + if (!EnsureMaxArgsCount(*input, 3, ctx)) { + return TStatus::Error; + } + if (!EnsureNewSeqType<false, false, true>(input->Head(), ctx)){ - return TStatus::Error; - } - - if (!EnsureAtom(*input->Child(1), ctx)) { - return TStatus::Error; - } - + return TStatus::Error; + } + + if (!EnsureAtom(*input->Child(1), ctx)) { + return TStatus::Error; + } + auto providerName = TString(input->Child(1)->Content()); - + if (!TypeCtx->DataSinkMap.FindPtr(providerName)) { - ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), TStringBuilder() << "No datasink defined for provider name " << providerName)); - return TStatus::Error; - } - + ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), TStringBuilder() << "No datasink defined for provider name " << providerName)); + return TStatus::Error; + } + providerName.front() = std::toupper(providerName.front()); output = ctx.NewCallable(input->Pos(), providerName += TDqWrite::CallableName(), {input->HeadPtr(), input->TailPtr()}); return TStatus::Repeat; - } - - TTypeAnnotationContext* TypeCtx; + } + + TTypeAnnotationContext* TypeCtx; }; } // unnamed -THolder<TVisitorTransformerBase> CreateDqsDataSinkTypeAnnotationTransformer(TTypeAnnotationContext* typeCtx) { - return THolder(new TDqsDataSinkTypeAnnotationTransformer(typeCtx)); +THolder<TVisitorTransformerBase> CreateDqsDataSinkTypeAnnotationTransformer(TTypeAnnotationContext* typeCtx) { + return THolder(new TDqsDataSinkTypeAnnotationTransformer(typeCtx)); } } // NYql diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.h b/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.h index 5780b22d2b6..df86def2cd4 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.h +++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.h @@ -7,6 +7,6 @@ namespace NYql { -THolder<TVisitorTransformerBase> CreateDqsDataSinkTypeAnnotationTransformer(TTypeAnnotationContext* typeCtx); +THolder<TVisitorTransformerBase> CreateDqsDataSinkTypeAnnotationTransformer(TTypeAnnotationContext* typeCtx); } // NYql diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasource.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_datasource.cpp index f1949cf3f73..aea55435de9 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_datasource.cpp +++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasource.cpp @@ -34,19 +34,19 @@ using namespace NKikimr::NMiniKQL; using namespace NNodes; using namespace NDq; -class TDqDataProviderSource: public TDataProviderBase { +class TDqDataProviderSource: public TDataProviderBase { public: TDqDataProviderSource(const TDqStatePtr& state, TExecTransformerFactory execTransformerFactory) : State(state) , ConfigurationTransformer([this]() { - return MakeHolder<NCommon::TProviderConfigurationTransformer>(State->Settings, *State->TypeCtx, TString{DqProviderName}); + return MakeHolder<NCommon::TProviderConfigurationTransformer>(State->Settings, *State->TypeCtx, TString{DqProviderName}); }) , ExecTransformer([this, execTransformerFactory] () { return THolder<IGraphTransformer>(execTransformerFactory(State)); }) , TypeAnnotationTransformer([] () { return CreateDqsDataSourceTypeAnnotationTransformer(); }) { } TStringBuf GetName() const override { - return DqProviderName; + return DqProviderName; } IGraphTransformer& GetTypeAnnotationTransformer(bool instantOnly) override { @@ -141,7 +141,7 @@ public: return false; } - if (node.Child(0)->Content() == DqProviderName) { + if (node.Child(0)->Content() == DqProviderName) { if (node.ChildrenSize() == 2) { if (!EnsureAtom(*node.Child(1), ctx)) { return false; diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasource_type_ann.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_datasource_type_ann.cpp index be5e9bad10c..6e03017ccb1 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_datasource_type_ann.cpp +++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasource_type_ann.cpp @@ -146,12 +146,12 @@ private: return TStatus::Error; } - if (!EnsureSpecificDataSource(*input->Child(TCoConfigure::idx_DataSource), DqProviderName, ctx)) { + if (!EnsureSpecificDataSource(*input->Child(TCoConfigure::idx_DataSource), DqProviderName, ctx)) { return TStatus::Error; } input->SetTypeAnn(input->Child(TCoConfigure::idx_World)->GetTypeAnn()); - + return TStatus::Ok; } }; diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp index 371820fb95f..2341b28ad44 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp +++ b/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp @@ -13,10 +13,10 @@ #include <library/cpp/yson/node/node_io.h> #include <library/cpp/threading/task_scheduler/task_scheduler.h> -#include <util/system/thread.h> +#include <util/system/thread.h> + +#include <utility> -#include <utility> - namespace NYql { class TDqGateway: public IDqGateway @@ -36,10 +36,10 @@ public: , TaskScheduler(threads) , RtTaskScheduler(1) , OpenSessionTimeout(timeout) - { - TaskScheduler.Start(); + { + TaskScheduler.Start(); RtTaskScheduler.Start(); - } + } TString GetVanillaJobPath() override { return VanillaJobPath; @@ -54,12 +54,12 @@ public: { YQL_LOG_CTX_SCOPE(sessionId); YQL_CLOG(TRACE, ProviderDq) << "TDqGateway::callback"; - - { - TGuard<TMutex> lock(ProgressMutex); - RunningQueries.erase(sessionId); - } - + + { + TGuard<TMutex> lock(ProgressMutex); + RunningQueries.erase(sessionId); + } + TResult result; bool error = false; @@ -176,15 +176,15 @@ public: Y_VERIFY(TaskScheduler.Add(MakeIntrusive<TDelay>(promise), TInstant())); } - template <typename TResponse, typename TRequest, typename TStub> - NThreading::TFuture<TResult> WithRetry( - const TString& sessionId, - const TRequest& queryPB, - TStub stub, - int retry, + template <typename TResponse, typename TRequest, typename TStub> + NThreading::TFuture<TResult> WithRetry( + const TString& sessionId, + const TRequest& queryPB, + TStub stub, + int retry, const TDqSettings::TPtr& settings, const THashMap<TString, TString>& modulesMapping - ) { + ) { auto backoff = TDuration::MilliSeconds(settings->RetryBackoffMs.Get().GetOrElse(1000)); auto promise = NThreading::NewPromise<TResult>(); auto fallbackPolicy = settings->FallbackPolicy.Get().GetOrElse("default"); @@ -193,7 +193,7 @@ public: return OnResponse(std::move(promise), std::move(sessionId), std::move(status), std::move(resp), modulesMapping, alwaysFallback); }; - Service->DoRequest<TRequest, TResponse>(queryPB, callback, stub); + Service->DoRequest<TRequest, TResponse>(queryPB, callback, stub); { TGuard<TMutex> lock(ProgressMutex); @@ -204,9 +204,9 @@ public: } } else { return NThreading::MakeFuture(TResult()); - } - } - + } + } + return promise.GetFuture().Apply([=](const NThreading::TFuture<TResult>& result) { if (result.HasException()) { return result; @@ -288,7 +288,7 @@ public: queryPB, &Yql::DqsProto::DqService::Stub::AsyncExecuteGraph, retry, - settings, + settings, modulesMapping); } @@ -339,14 +339,14 @@ public: request, callback, &Yql::DqsProto::DqService::Stub::AsyncCloseSession); } - void RequestQueryStatus(const TString& sessionId) { - Yql::DqsProto::QueryStatusRequest request; - request.SetSession(sessionId); + void RequestQueryStatus(const TString& sessionId) { + Yql::DqsProto::QueryStatusRequest request; + request.SetSession(sessionId); IDqGateway::TPtr self = this; auto callback = [this, self, sessionId](NGrpc::TGrpcStatus&& status, Yql::DqsProto::QueryStatusResponse&& resp) { - if (status.Ok()) { + if (status.Ok()) { TGuard<TMutex> lock(ProgressMutex); - TString stage; + TString stage; TDqProgressWriter* dqProgressWriter = nullptr; auto it = RunningQueries.find(sessionId); if (it != RunningQueries.end()) { @@ -356,38 +356,38 @@ public: stage = resp.GetStatus(); it->second.second = stage; } - + ScheduleQueryStatusRequest(sessionId); - } - - if (!stage.empty() && dqProgressWriter) { - (*dqProgressWriter)(stage); - } - } else { - TGuard<TMutex> lock(ProgressMutex); - RunningQueries.erase(sessionId); - } - }; - - Service->DoRequest<Yql::DqsProto::QueryStatusRequest, Yql::DqsProto::QueryStatusResponse>( - request, callback, &Yql::DqsProto::DqService::Stub::AsyncQueryStatus, {}, nullptr); - } - - void ScheduleQueryStatusRequest(const TString& sessionId) { + } + + if (!stage.empty() && dqProgressWriter) { + (*dqProgressWriter)(stage); + } + } else { + TGuard<TMutex> lock(ProgressMutex); + RunningQueries.erase(sessionId); + } + }; + + Service->DoRequest<Yql::DqsProto::QueryStatusRequest, Yql::DqsProto::QueryStatusResponse>( + request, callback, &Yql::DqsProto::DqService::Stub::AsyncQueryStatus, {}, nullptr); + } + + void ScheduleQueryStatusRequest(const TString& sessionId) { Delay(TDuration::MilliSeconds(1000)).Subscribe([this, sessionId](NThreading::TFuture<void> fut) { - if (fut.HasException()) { - TGuard<TMutex> lock(ProgressMutex); - RunningQueries.erase(sessionId); - } else { - TGuard<TMutex> lock(ProgressMutex); - auto it = RunningQueries.find(sessionId); - if (it != RunningQueries.end()) { - RequestQueryStatus(sessionId); - } - } - }); - } - + if (fut.HasException()) { + TGuard<TMutex> lock(ProgressMutex); + RunningQueries.erase(sessionId); + } else { + TGuard<TMutex> lock(ProgressMutex); + auto it = RunningQueries.find(sessionId); + if (it != RunningQueries.end()) { + RequestQueryStatus(sessionId); + } + } + }); + } + void SchedulePingSessionRequest(const TString& sessionId) { auto callback = [this, sessionId]( NGrpc::TGrpcStatus&& status, @@ -409,19 +409,19 @@ public: }); } - struct TDelay: public TTaskScheduler::ITask { - TDelay(NThreading::TPromise<void> p) - : Promise(std::move(p)) - { } - - TInstant Process() override { - Promise.SetValue(); - return TInstant::Max(); - } - - NThreading::TPromise<void> Promise; - }; - + struct TDelay: public TTaskScheduler::ITask { + TDelay(NThreading::TPromise<void> p) + : Promise(std::move(p)) + { } + + TInstant Process() override { + Promise.SetValue(); + return TInstant::Max(); + } + + NThreading::TPromise<void> Promise; + }; + private: NGrpc::TGRpcClientConfig GrpcConf; NGrpc::TGRpcClientLow GrpcClient; @@ -429,7 +429,7 @@ private: TMutex ProgressMutex; TMutex Mutex; - THashMap<TString, std::pair<TDqProgressWriter, TString>> RunningQueries; + THashMap<TString, std::pair<TDqProgressWriter, TString>> RunningQueries; TString VanillaJobPath; TString VanillaJobMd5; diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_gateway.h b/ydb/library/yql/providers/dq/provider/yql_dq_gateway.h index 038709cec66..d562776033f 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_gateway.h +++ b/ydb/library/yql/providers/dq/provider/yql_dq_gateway.h @@ -26,7 +26,7 @@ class IDqGateway : public TThrRefBase { public: using TPtr = TIntrusivePtr<IDqGateway>; using TFileResource = Yql::DqsProto::TFile; - using TDqProgressWriter = std::function<void(const TString&)>; + using TDqProgressWriter = std::function<void(const TString&)>; struct TFileResourceHash { std::size_t operator()(const TFileResource& f) const { diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_provider.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_provider.cpp index 7cc61e6e0b9..69788c5c760 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_provider.cpp +++ b/ydb/library/yql/providers/dq/provider/yql_dq_provider.cpp @@ -12,7 +12,7 @@ namespace NYql { -TDataProviderInitializer GetDqDataProviderInitializer( +TDataProviderInitializer GetDqDataProviderInitializer( TExecTransformerFactory execTransformerFactory, const IDqGateway::TPtr& dqGateway, NKikimr::NMiniKQL::TComputationNodeFactory compFactory, @@ -50,10 +50,10 @@ TDataProviderInitializer GetDqDataProviderInitializer( ); TDataProviderInfo info; - info.Names.insert(TString{DqProviderName}); + info.Names.insert(TString{DqProviderName}); info.Source = CreateDqDataSource(state, execTransformerFactory); - info.Sink = CreateDqDataSink(state); + info.Sink = CreateDqDataSink(state); info.OpenSession = [dqGateway, metrics, gatewaysConfig, state]( const TString& sessionId, const TString& username, diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_provider.h b/ydb/library/yql/providers/dq/provider/yql_dq_provider.h index 8c6602398db..6e4aff4ee94 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_provider.h +++ b/ydb/library/yql/providers/dq/provider/yql_dq_provider.h @@ -15,7 +15,7 @@ using TDqStatePtr = TIntrusivePtr<TDqState>; using TExecTransformerFactory = std::function<IGraphTransformer*(const TDqStatePtr& state)>; -TDataProviderInitializer GetDqDataProviderInitializer( +TDataProviderInitializer GetDqDataProviderInitializer( TExecTransformerFactory execTransformerFactory, const IDqGateway::TPtr& dqGateway, NKikimr::NMiniKQL::TComputationNodeFactory compFactory, diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_recapture.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_recapture.cpp index 385c764515d..90e8af759c7 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_recapture.cpp +++ b/ydb/library/yql/providers/dq/provider/yql_dq_recapture.cpp @@ -22,7 +22,7 @@ using namespace NNodes; namespace { const THashSet<TStringBuf> VALID_SOURCES = {DqProviderName, ConfigProviderName, YtProviderName, ClickHouseProviderName, YdbProviderName}; -const THashSet<TStringBuf> VALID_SINKS = {ResultProviderName, YtProviderName}; +const THashSet<TStringBuf> VALID_SINKS = {ResultProviderName, YtProviderName}; const THashSet<TStringBuf> UNSUPPORTED_CALLABLE = { TCoForwardList::CallableName() }; } @@ -67,7 +67,7 @@ public: Statistics_["DqPureResultDataSourceMismatch"]++; } - if (State_->TypeCtx->PureResultDataSource != DqProviderName || !State_->Settings->AnalyzeQuery.Get().GetOrElse(false)) { + if (State_->TypeCtx->PureResultDataSource != DqProviderName || !State_->Settings->AnalyzeQuery.Get().GetOrElse(false)) { return TStatus::Ok; } @@ -101,9 +101,9 @@ public: if (auto maybeRead = TMaybeNode<TCoRight>(node).Input()) { if (maybeRead.Raw()->ChildrenSize() > 1 && TCoDataSource::Match(maybeRead.Raw()->Child(1))) { auto dataSourceName = maybeRead.Raw()->Child(1)->Child(0)->Content(); - auto dataSource = State_->TypeCtx->DataSourceMap.FindPtr(dataSourceName); - YQL_ENSURE(dataSource); - if (auto dqIntegration = (*dataSource)->GetDqIntegration()) { + auto dataSource = State_->TypeCtx->DataSourceMap.FindPtr(dataSourceName); + YQL_ENSURE(dataSource); + if (auto dqIntegration = (*dataSource)->GetDqIntegration()) { auto newRead = dqIntegration->WrapRead(*State_->Settings, maybeRead.Cast().Ptr(), ctx); if (newRead.Get() != maybeRead.Raw()) { return newRead; @@ -111,7 +111,7 @@ public: } } } - + return node; }, ctx, TOptimizeExprSettings{State_->TypeCtx}); @@ -195,29 +195,29 @@ private: if (good) { Scan(node.Head(), ctx,good, dataSize, visited, hasJoin); } - } else if (node.GetTypeAnn()->GetKind() == ETypeAnnotationKind::World - && !TCoCommit::Match(&node) - && node.ChildrenSize() > 1 - && TCoDataSink::Match(node.Child(1))) { - auto dataSinkName = node.Child(1)->Child(0)->Content(); - auto dataSink = State_->TypeCtx->DataSinkMap.FindPtr(dataSinkName); - YQL_ENSURE(dataSink); - if (auto dqIntegration = dataSink->Get()->GetDqIntegration()) { + } else if (node.GetTypeAnn()->GetKind() == ETypeAnnotationKind::World + && !TCoCommit::Match(&node) + && node.ChildrenSize() > 1 + && TCoDataSink::Match(node.Child(1))) { + auto dataSinkName = node.Child(1)->Child(0)->Content(); + auto dataSink = State_->TypeCtx->DataSinkMap.FindPtr(dataSinkName); + YQL_ENSURE(dataSink); + if (auto dqIntegration = dataSink->Get()->GetDqIntegration()) { if (auto canWrite = dqIntegration->CanWrite(*State_->Settings, node, ctx)) { if (!canWrite.GetRef()) { - good = false; + good = false; } else if (!State_->Settings->EnableInsert.Get().GetOrElse(false)) { AddInfo(ctx, TStringBuilder() << "'insert' support is disabled. Use PRAGMA dq.EnableInsert to explicitly enable it"); good = false; - } - } - } - if (good) { - for (size_t i = 0; i != node.ChildrenSize() && good; ++i) { + } + } + } + if (good) { + for (size_t i = 0; i != node.ChildrenSize() && good; ++i) { Scan(*node.Child(i), ctx, good, dataSize, visited, hasJoin); - } - } - } + } + } + } else if (!State_->TypeCtx->UdfSupportsYield && TCoScriptUdf::Match(&node)) { if (IsCallableTypeHasStreams(node.GetTypeAnn()->Cast<TCallableExprType>())) { AddInfo(ctx, TStringBuilder() << "script udf with streams"); @@ -229,7 +229,7 @@ private: } } } - else { + else { for (size_t i = 0; i != node.ChildrenSize() && good; ++i) { Scan(*node.Child(i), ctx, good, dataSize, visited, hasJoin); } diff --git a/ydb/library/yql/providers/dq/service/grpc_service.cpp b/ydb/library/yql/providers/dq/service/grpc_service.cpp index 55265ac7d96..65a9e500d9d 100644 --- a/ydb/library/yql/providers/dq/service/grpc_service.cpp +++ b/ydb/library/yql/providers/dq/service/grpc_service.cpp @@ -629,28 +629,28 @@ namespace NYql::NDqs { ActorSystem.Send(new IEventHandle(MakeWorkerManagerActorID(ActorSystem.NodeId), callbackId, ev.Release(), IEventHandle::FlagTrackDelivery)); }); - ADD_REQUEST(QueryStatus, QueryStatusRequest, QueryStatusResponse, { - auto* request = dynamic_cast<const Yql::DqsProto::QueryStatusRequest*>(ctx->GetRequest()); - - auto ev = MakeHolder<TEvQueryStatus>(*request); - - auto callback = MakeHolder<TRichActorFutureCallback<TEvQueryStatusResponse>>( - [ctx] (TAutoPtr<TEventHandle<TEvQueryStatusResponse>>& event) mutable { - auto* result = google::protobuf::Arena::CreateMessage<Yql::DqsProto::QueryStatusResponse>(ctx->GetArena()); - result->MergeFrom(event->Get()->Record.GetResponse()); - ctx->Reply(result, Ydb::StatusIds::SUCCESS); - }, - [ctx] () mutable { - YQL_LOG(DEBUG) << "QueryStatus failed"; - ctx->ReplyError(grpc::UNAVAILABLE, "Error"); - }, - TDuration::MilliSeconds(2000)); - - TActorId callbackId = ActorSystem.Register(callback.Release()); - + ADD_REQUEST(QueryStatus, QueryStatusRequest, QueryStatusResponse, { + auto* request = dynamic_cast<const Yql::DqsProto::QueryStatusRequest*>(ctx->GetRequest()); + + auto ev = MakeHolder<TEvQueryStatus>(*request); + + auto callback = MakeHolder<TRichActorFutureCallback<TEvQueryStatusResponse>>( + [ctx] (TAutoPtr<TEventHandle<TEvQueryStatusResponse>>& event) mutable { + auto* result = google::protobuf::Arena::CreateMessage<Yql::DqsProto::QueryStatusResponse>(ctx->GetArena()); + result->MergeFrom(event->Get()->Record.GetResponse()); + ctx->Reply(result, Ydb::StatusIds::SUCCESS); + }, + [ctx] () mutable { + YQL_LOG(DEBUG) << "QueryStatus failed"; + ctx->ReplyError(grpc::UNAVAILABLE, "Error"); + }, + TDuration::MilliSeconds(2000)); + + TActorId callbackId = ActorSystem.Register(callback.Release()); + ActorSystem.Send(new IEventHandle(MakeWorkerManagerActorID(ActorSystem.NodeId), callbackId, ev.Release(), IEventHandle::FlagTrackDelivery)); - }); - + }); + ADD_REQUEST(RegisterNode, RegisterNodeRequest, RegisterNodeResponse, { auto* request = dynamic_cast<const Yql::DqsProto::RegisterNodeRequest*>(ctx->GetRequest()); Y_VERIFY(!!request); diff --git a/ydb/library/yql/providers/dq/worker_manager/interface/events.cpp b/ydb/library/yql/providers/dq/worker_manager/interface/events.cpp index 1db94f1baaa..9044dc47e30 100644 --- a/ydb/library/yql/providers/dq/worker_manager/interface/events.cpp +++ b/ydb/library/yql/providers/dq/worker_manager/interface/events.cpp @@ -109,7 +109,7 @@ namespace NYql::NDqs { *Record.MutableRequest() = request; } - TEvQueryStatus::TEvQueryStatus(const Yql::DqsProto::QueryStatusRequest& request) { - *Record.MutableRequest() = request; - } + TEvQueryStatus::TEvQueryStatus(const Yql::DqsProto::QueryStatusRequest& request) { + *Record.MutableRequest() = request; + } } // namespace NYql::NDqs diff --git a/ydb/library/yql/providers/dq/worker_manager/interface/events.h b/ydb/library/yql/providers/dq/worker_manager/interface/events.h index d3bfa4d8efc..a2d0c9f5409 100644 --- a/ydb/library/yql/providers/dq/worker_manager/interface/events.h +++ b/ydb/library/yql/providers/dq/worker_manager/interface/events.h @@ -1,7 +1,7 @@ #pragma once #include <ydb/library/yql/providers/dq/api/protos/dqs.pb.h> -#include "worker_info.h" +#include "worker_info.h" #include <ydb/library/yql/dq/common/dq_common.h> #include <ydb/library/yql/utils/log/log.h> @@ -11,11 +11,11 @@ #include <util/generic/guid.h> -#include <utility> - +#include <utility> + namespace NYql::NDqs { -using TDqResManEvents = NDq::TBaseDqResManEvents<NActors::TEvents::EEventSpace::ES_USERSPACE>; +using TDqResManEvents = NDq::TBaseDqResManEvents<NActors::TEvents::EEventSpace::ES_USERSPACE>; struct TEvAllocateWorkersRequest : NActors::TEventPB<TEvAllocateWorkersRequest, NYql::NDqProto::TAllocateWorkersRequest, TDqResManEvents::ES_ALLOCATE_WORKERS_REQUEST> { @@ -89,19 +89,19 @@ using TDqResManEvents = NDq::TBaseDqResManEvents<NActors::TEvents::EEventSpace:: TEvClusterStatusResponse() = default; }; - struct TEvQueryStatus - : NActors::TEventPB<TEvQueryStatus, NYql::NDqProto::TEvQueryStatus, TDqResManEvents::ES_QUERY_STATUS> { - - TEvQueryStatus() = default; - TEvQueryStatus(const Yql::DqsProto::QueryStatusRequest& request); - }; - - struct TEvQueryStatusResponse - : NActors::TEventPB<TEvQueryStatusResponse, NYql::NDqProto::TEvQueryStatusResponse, TDqResManEvents::ES_QUERY_STATUS_RESPONSE> { - - TEvQueryStatusResponse() = default; - }; - + struct TEvQueryStatus + : NActors::TEventPB<TEvQueryStatus, NYql::NDqProto::TEvQueryStatus, TDqResManEvents::ES_QUERY_STATUS> { + + TEvQueryStatus() = default; + TEvQueryStatus(const Yql::DqsProto::QueryStatusRequest& request); + }; + + struct TEvQueryStatusResponse + : NActors::TEventPB<TEvQueryStatusResponse, NYql::NDqProto::TEvQueryStatusResponse, TDqResManEvents::ES_QUERY_STATUS_RESPONSE> { + + TEvQueryStatusResponse() = default; + }; + struct TEvIsReady : NActors::TEventPB<TEvIsReady, NYql::NDqProto::TEvIsReady, TDqResManEvents::ES_IS_READY> { diff --git a/ydb/library/yql/providers/dq/worker_manager/interface/worker_info.cpp b/ydb/library/yql/providers/dq/worker_manager/interface/worker_info.cpp index aa822bce729..634b25fb075 100644 --- a/ydb/library/yql/providers/dq/worker_manager/interface/worker_info.cpp +++ b/ydb/library/yql/providers/dq/worker_manager/interface/worker_info.cpp @@ -1,81 +1,81 @@ #include <ydb/library/yql/providers/dq/api/protos/dqs.pb.h> -#include "worker_info.h" +#include "worker_info.h" #include <ydb/library/yql/utils/log/log.h> - -namespace NYql::NDqs { - - TInflightLimiter::TInflightLimiter(i32 inflightLimit) - : InflightLimit(inflightLimit) { - } - - bool TInflightLimiter::CanDownload(const TString& id) { - return !InflightResources.contains(id) || InflightResources.find(id)->second < InflightLimit; - } - - void TInflightLimiter::MarkDownloadStarted(const TString& id) { - if (!InflightResources.contains(id)) { - InflightResources.emplace(id, 0); - } - InflightResources.find(id)->second++; - } - - void TInflightLimiter::MarkDownloadFinished(const TString& id) { - auto it = InflightResources.find(id); - if (it != InflightResources.end() && it->second > 0) { - it->second--; - if (it->second == 0) { - InflightResources.erase(id); - } - } - } - - TWorkerInfo::TWorkerInfo( + +namespace NYql::NDqs { + + TInflightLimiter::TInflightLimiter(i32 inflightLimit) + : InflightLimit(inflightLimit) { + } + + bool TInflightLimiter::CanDownload(const TString& id) { + return !InflightResources.contains(id) || InflightResources.find(id)->second < InflightLimit; + } + + void TInflightLimiter::MarkDownloadStarted(const TString& id) { + if (!InflightResources.contains(id)) { + InflightResources.emplace(id, 0); + } + InflightResources.find(id)->second++; + } + + void TInflightLimiter::MarkDownloadFinished(const TString& id) { + auto it = InflightResources.find(id); + if (it != InflightResources.end() && it->second > 0) { + it->second--; + if (it->second == 0) { + InflightResources.erase(id); + } + } + } + + TWorkerInfo::TWorkerInfo( const TString& startTime, - ui32 nodeId, - TGUID workerId, - const Yql::DqsProto::RegisterNodeRequest& request, - NYql::TSensorsGroupPtr metrics, + ui32 nodeId, + TGUID workerId, + const Yql::DqsProto::RegisterNodeRequest& request, + NYql::TSensorsGroupPtr metrics, const TInflightLimiter::TPtr& inflightLimiter, TGlobalResources& globalResources) - : NodeId(nodeId) - , WorkerId(workerId) - , LastPingTime(TInstant::Now()) - , Revision(request.GetRevision()) - , ClusterName(request.GetClusterName()) - , Address(request.GetAddress()) + : NodeId(nodeId) + , WorkerId(workerId) + , LastPingTime(TInstant::Now()) + , Revision(request.GetRevision()) + , ClusterName(request.GetClusterName()) + , Address(request.GetAddress()) , StartTime(!request.GetStartTime().empty() ? request.GetStartTime() : startTime) - , Port(request.GetPort()) - , Epoch(request.GetEpoch()) - , Capabilities(request.GetCapabilities()) + , Port(request.GetPort()) + , Epoch(request.GetEpoch()) + , Capabilities(request.GetCapabilities()) , Capacity(request.GetCapacity()?request.GetCapacity():1) - , Metrics(std::move(metrics)) - , InflightLimiter(inflightLimiter) + , Metrics(std::move(metrics)) + , InflightLimiter(inflightLimiter) , ClusterResources(TClusterResources(globalResources, ClusterName)) - { + { #define ADD_METRIC(name) \ name = Metrics->GetCounter(#name) #define ADD_METRIC_DERIV(name) \ name = Metrics->GetCounter(#name, /*derivative=*/ true) - - ADD_METRIC(CurrentDownloadsSum); - ADD_METRIC(CurrentDownloadsMax); - ADD_METRIC(CurrentDownloadsArgMax); - - ADD_METRIC(FilesCountSum); - ADD_METRIC(FilesCountMax); - ADD_METRIC(FilesCountArgMax); - - - ADD_METRIC(FreeDiskSizeSum); - ADD_METRIC(FreeDiskSizeMin); - ADD_METRIC(FreeDiskSizeArgMin); - - ADD_METRIC(UsedDiskSizeSum); - ADD_METRIC(UsedDiskSizeMax); - ADD_METRIC(UsedDiskSizeArgMax); - + + ADD_METRIC(CurrentDownloadsSum); + ADD_METRIC(CurrentDownloadsMax); + ADD_METRIC(CurrentDownloadsArgMax); + + ADD_METRIC(FilesCountSum); + ADD_METRIC(FilesCountMax); + ADD_METRIC(FilesCountArgMax); + + + ADD_METRIC(FreeDiskSizeSum); + ADD_METRIC(FreeDiskSizeMin); + ADD_METRIC(FreeDiskSizeArgMin); + + ADD_METRIC(UsedDiskSizeSum); + ADD_METRIC(UsedDiskSizeMax); + ADD_METRIC(UsedDiskSizeArgMax); + ADD_METRIC(ActiveDownloadsSum); ADD_METRIC(DeadWorkers); @@ -87,21 +87,21 @@ namespace NYql::NDqs { ADD_METRIC(RunningActors); -#undef ADD_METRIC +#undef ADD_METRIC #undef ADD_METRIC_DERIV - - for (const auto& attr : request.GetAttribute()) { - Attributes[attr.GetKey()] = attr.GetValue(); - } - + + for (const auto& attr : request.GetAttribute()) { + Attributes[attr.GetKey()] = attr.GetValue(); + } + ClusterResources.AddCapacity(Capacity); - Update(request); - } - - bool TWorkerInfo::Update(const Yql::DqsProto::RegisterNodeRequest& request) - { - bool needResume = false; + Update(request); + } + + bool TWorkerInfo::Update(const Yql::DqsProto::RegisterNodeRequest& request) + { + bool needResume = false; auto now = TInstant::Now(); TDuration interval = now-LastPingTime; LastPingTime = now; @@ -143,143 +143,143 @@ namespace NYql::NDqs { MajorPageFaults = request.GetRusage().GetMajorPageFaults(); *MajorPageFaultsSum += pageFaultsDelta; - for (auto file : request.GetFilesOnNode()) { - if (Resources.insert(file.GetObjectId()).second) { - needResume = true; - *FilesCountSum += 1; + for (auto file : request.GetFilesOnNode()) { + if (Resources.insert(file.GetObjectId()).second) { + needResume = true; + *FilesCountSum += 1; *FileAddCounter += 1; - } - } + } + } Operations.clear(); for (const auto& op: request.GetRunningOperation()) { Operations.insert(op); } - + int actorsDelta = request.GetRunningWorkers()-RunningWorkerActors; *RunningActors += actorsDelta; RunningWorkerActors = request.GetRunningWorkers(); - - if (static_cast<int>(Resources.size()) > static_cast<int>(request.GetFilesOnNode().size())) { - needResume = true; - // drop files - THashSet<TString> filesOnNode; - for (const auto& file : request.GetFilesOnNode()) { - filesOnNode.insert(file.GetObjectId()); - } - THashSet<TString> toDrop; - for (const auto& k : Resources) { - if (!filesOnNode.contains(k)) { - toDrop.insert(k); - } - } - for (const auto& k : toDrop) { - YQL_LOG(DEBUG) << "Remove resource " << k << " from worker " << GetGuidAsString(WorkerId); - Resources.erase(k); - } + + if (static_cast<int>(Resources.size()) > static_cast<int>(request.GetFilesOnNode().size())) { + needResume = true; + // drop files + THashSet<TString> filesOnNode; + for (const auto& file : request.GetFilesOnNode()) { + filesOnNode.insert(file.GetObjectId()); + } + THashSet<TString> toDrop; + for (const auto& k : Resources) { + if (!filesOnNode.contains(k)) { + toDrop.insert(k); + } + } + for (const auto& k : toDrop) { + YQL_LOG(DEBUG) << "Remove resource " << k << " from worker " << GetGuidAsString(WorkerId); + Resources.erase(k); + } *FileRemoveCounter += toDrop.size(); - *FilesCountSum -= toDrop.size(); - } - - // ordered as in LRU cache - ResourcesOrdered = std::move(request.GetFilesOnNode()); - + *FilesCountSum -= toDrop.size(); + } + + // ordered as in LRU cache + ResourcesOrdered = std::move(request.GetFilesOnNode()); + if (*FreeDiskSizeArgMin == 0 || *FreeDiskSizeMin > request.GetFreeDiskSize()) { - *FreeDiskSizeMin = request.GetFreeDiskSize(); - *FreeDiskSizeArgMin = NodeId; - } - + *FreeDiskSizeMin = request.GetFreeDiskSize(); + *FreeDiskSizeArgMin = NodeId; + } + if (*FreeDiskSizeArgMin == NodeId && *FreeDiskSizeMin != request.GetFreeDiskSize()) { *FreeDiskSizeMin = request.GetFreeDiskSize(); } - *FreeDiskSizeSum += (request.GetFreeDiskSize() - FreeDiskSize); - FreeDiskSize = request.GetFreeDiskSize(); - - if (*UsedDiskSizeMax < request.GetUsedDiskSize()) { - *UsedDiskSizeMax = request.GetUsedDiskSize(); - *UsedDiskSizeArgMax = NodeId; - } - - *UsedDiskSizeSum += (request.GetUsedDiskSize() - UsedDiskSize); - UsedDiskSize = request.GetUsedDiskSize(); - - for (auto md5: Resources) { - RemoveFromDownloadList(md5); - } - - if (*FilesCountMax < static_cast<i64>(Resources.size())) { - *FilesCountMax = Resources.size(); - *FilesCountArgMax = NodeId; - } - - if (*CurrentDownloadsMax < static_cast<i64>(DownloadList.size())) { - *CurrentDownloadsMax = DownloadList.size(); - *CurrentDownloadsArgMax = NodeId; - } - - return needResume; - } - - void TWorkerInfo::RemoveFromDownloadList(const TString& objectId) { - auto delta = DownloadList.erase(objectId); - if (delta) { + *FreeDiskSizeSum += (request.GetFreeDiskSize() - FreeDiskSize); + FreeDiskSize = request.GetFreeDiskSize(); + + if (*UsedDiskSizeMax < request.GetUsedDiskSize()) { + *UsedDiskSizeMax = request.GetUsedDiskSize(); + *UsedDiskSizeArgMax = NodeId; + } + + *UsedDiskSizeSum += (request.GetUsedDiskSize() - UsedDiskSize); + UsedDiskSize = request.GetUsedDiskSize(); + + for (auto md5: Resources) { + RemoveFromDownloadList(md5); + } + + if (*FilesCountMax < static_cast<i64>(Resources.size())) { + *FilesCountMax = Resources.size(); + *FilesCountArgMax = NodeId; + } + + if (*CurrentDownloadsMax < static_cast<i64>(DownloadList.size())) { + *CurrentDownloadsMax = DownloadList.size(); + *CurrentDownloadsArgMax = NodeId; + } + + return needResume; + } + + void TWorkerInfo::RemoveFromDownloadList(const TString& objectId) { + auto delta = DownloadList.erase(objectId); + if (delta) { InflightLimiter->MarkDownloadFinished(objectId); *ActiveDownloadsSum -= ActiveDownloads.erase(objectId); - } - *CurrentDownloadsSum -= delta; - } - - void TWorkerInfo::AddToDownloadList(const THashMap<TString, TFileResource>& downloadList) { + } + *CurrentDownloadsSum -= delta; + } + + void TWorkerInfo::AddToDownloadList(const THashMap<TString, TFileResource>& downloadList) { for (const auto& [k, v] : downloadList) { AddToDownloadList(k, v); } - } - + } + bool TWorkerInfo::AddToDownloadList(const TString& key, const TFileResource& value) { if (!Resources.contains(key) && DownloadList.insert({key, value}).second) { - *CurrentDownloadsSum += 1; + *CurrentDownloadsSum += 1; return true; - } + } return false; - } - - const THashMap<TString, TWorkerInfo::TFileResource>& TWorkerInfo::GetDownloadList() { - return DownloadList; - } - - THashMap<TString, TWorkerInfo::TFileResource> TWorkerInfo::GetResourcesForDownloading() { - for (const auto& it : DownloadList) { - TString id = it.first; + } + + const THashMap<TString, TWorkerInfo::TFileResource>& TWorkerInfo::GetDownloadList() { + return DownloadList; + } + + THashMap<TString, TWorkerInfo::TFileResource> TWorkerInfo::GetResourcesForDownloading() { + for (const auto& it : DownloadList) { + TString id = it.first; if (InflightLimiter->CanDownload(id) && !ActiveDownloads.contains(id)) { ActiveDownloads.emplace(id, it.second); *ActiveDownloadsSum += 1; InflightLimiter->MarkDownloadStarted(id); - } - } + } + } return ActiveDownloads; - } - - const THashSet<TString>& TWorkerInfo::GetResources() { - return Resources; - } - - void TWorkerInfo::OnDead() { + } + + const THashSet<TString>& TWorkerInfo::GetResources() { + return Resources; + } + + void TWorkerInfo::OnDead() { if (IsDead) { return; } - IsDead = true; + IsDead = true; for (const auto& [id, _] : ActiveDownloads) { InflightLimiter->MarkDownloadFinished(id); - } + } *ActiveDownloadsSum -= ActiveDownloads.size(); ActiveDownloads.clear(); - - *CurrentDownloadsSum -= DownloadList.size(); - *FilesCountSum -= Resources.size(); - - *UsedDiskSizeSum += - UsedDiskSize; - *FreeDiskSizeSum += - FreeDiskSize; + + *CurrentDownloadsSum -= DownloadList.size(); + *FilesCountSum -= Resources.size(); + + *UsedDiskSizeSum += - UsedDiskSize; + *FreeDiskSizeSum += - FreeDiskSize; *DeadWorkers += 1; @@ -296,7 +296,7 @@ namespace NYql::NDqs { TWorkerInfo::~TWorkerInfo() { OnDead(); - } + } bool TWorkerInfo::Acquire() { if (IsDead) { diff --git a/ydb/library/yql/providers/dq/worker_manager/interface/worker_info.h b/ydb/library/yql/providers/dq/worker_manager/interface/worker_info.h index 15cf63678dd..5281a844f4c 100644 --- a/ydb/library/yql/providers/dq/worker_manager/interface/worker_info.h +++ b/ydb/library/yql/providers/dq/worker_manager/interface/worker_info.h @@ -1,30 +1,30 @@ -#pragma once - -#include <utility> -#include <util/generic/hash.h> -#include <util/generic/guid.h> -#include <util/generic/hash_set.h> +#pragma once + +#include <utility> +#include <util/generic/hash.h> +#include <util/generic/guid.h> +#include <util/generic/hash_set.h> #include <util/generic/maybe.h> #include <ydb/library/yql/providers/common/metrics/sensors_group.h> #include <ydb/library/yql/providers/dq/api/grpc/api.grpc.pb.h> - - -namespace NYql::NDqs { - + + +namespace NYql::NDqs { + class TInflightLimiter: public TThrRefBase { -public: +public: using TPtr = TIntrusivePtr<TInflightLimiter>; - TInflightLimiter(i32 inflightLimit); - bool CanDownload(const TString& id); - void MarkDownloadStarted(const TString& id); - void MarkDownloadFinished(const TString& id); - -private: - THashMap<TString, i32> InflightResources; - i32 InflightLimit; -}; - + TInflightLimiter(i32 inflightLimit); + bool CanDownload(const TString& id); + void MarkDownloadStarted(const TString& id); + void MarkDownloadFinished(const TString& id); + +private: + THashMap<TString, i32> InflightResources; + i32 InflightLimit; +}; + struct TGlobalResources { TGlobalResources(TSensorsGroupPtr metrics) : Metrics(metrics) @@ -95,30 +95,30 @@ private: struct TWorkerInfo: public TThrRefBase { using TPtr = TIntrusivePtr<TWorkerInfo>; - using TFileResource = Yql::DqsProto::TFile; - - const ui32 NodeId; - const TGUID WorkerId; - TInstant LastPingTime; - const TString Revision; - const TString ClusterName; - const TString Address; + using TFileResource = Yql::DqsProto::TFile; + + const ui32 NodeId; + const TGUID WorkerId; + TInstant LastPingTime; + const TString Revision; + const TString ClusterName; + const TString Address; const TString StartTime; - const ui32 Port; - ui32 Epoch; - THashMap<TString, TString> Attributes; + const ui32 Port; + ui32 Epoch; + THashMap<TString, TString> Attributes; THashSet<TString> Operations; - - ui64 UseCount = 0; + + ui64 UseCount = 0; TInstant RequestStartTime; - TDuration UseTime; - bool IsDead = false; - bool Stopping = false; - - const ui32 Capabilities = 0; - - i64 FreeDiskSize = 0; - i64 UsedDiskSize = 0; + TDuration UseTime; + bool IsDead = false; + bool Stopping = false; + + const ui32 Capabilities = 0; + + i64 FreeDiskSize = 0; + i64 UsedDiskSize = 0; const int Capacity; const int CpuCores = 1; // unused yet @@ -130,76 +130,76 @@ struct TWorkerInfo: public TThrRefBase { i64 MajorPageFaults = 0; int RunningRequests = 0; - int RunningWorkerActors = 0; - - TWorkerInfo( + int RunningWorkerActors = 0; + + TWorkerInfo( const TString& startTime, - ui32 nodeId, - TGUID workerId, - const Yql::DqsProto::RegisterNodeRequest& request, - NYql::TSensorsGroupPtr metrics, + ui32 nodeId, + TGUID workerId, + const Yql::DqsProto::RegisterNodeRequest& request, + NYql::TSensorsGroupPtr metrics, const TInflightLimiter::TPtr& inflightLimiter, TGlobalResources& globalResources - ); - - ~TWorkerInfo(); - - bool Update(const Yql::DqsProto::RegisterNodeRequest& request); - - void RemoveFromDownloadList(const TString& objectId); - - void AddToDownloadList(const THashMap<TString, TFileResource>& downloadList); - + ); + + ~TWorkerInfo(); + + bool Update(const Yql::DqsProto::RegisterNodeRequest& request); + + void RemoveFromDownloadList(const TString& objectId); + + void AddToDownloadList(const THashMap<TString, TFileResource>& downloadList); + bool AddToDownloadList(const TString& key, const TFileResource& value); - - const THashMap<TString, TFileResource>& GetDownloadList(); - - THashMap<TString, TFileResource> GetResourcesForDownloading(); - - const THashSet<TString>& GetResources(); - - const auto& GetResourcesOrdered() { - return ResourcesOrdered; - } - + + const THashMap<TString, TFileResource>& GetDownloadList(); + + THashMap<TString, TFileResource> GetResourcesForDownloading(); + + const THashSet<TString>& GetResources(); + + const auto& GetResourcesOrdered() { + return ResourcesOrdered; + } + const auto& GetActiveDownloads() { return ActiveDownloads; } - void OnDead(); - + void OnDead(); + bool Acquire(); bool Release(); -private: - THashSet<TString> Resources; - google::protobuf::RepeatedPtrField<::Yql::DqsProto::RegisterNodeRequest_LocalFile> ResourcesOrdered; - THashMap<TString, TFileResource> DownloadList; - TSensorsGroupPtr Metrics; +private: + THashSet<TString> Resources; + google::protobuf::RepeatedPtrField<::Yql::DqsProto::RegisterNodeRequest_LocalFile> ResourcesOrdered; + THashMap<TString, TFileResource> DownloadList; + TSensorsGroupPtr Metrics; TInflightLimiter::TPtr InflightLimiter; TClusterResources ClusterResources; THashMap<TString, TFileResource> ActiveDownloads; - - TSensorCounterPtr CurrentDownloadsSum; - TSensorCounterPtr CurrentDownloadsMax; - TSensorCounterPtr CurrentDownloadsArgMax; - + + TSensorCounterPtr CurrentDownloadsSum; + TSensorCounterPtr CurrentDownloadsMax; + TSensorCounterPtr CurrentDownloadsArgMax; + TSensorCounterPtr ActiveDownloadsSum; TSensorCounterPtr DeadWorkers; - TSensorCounterPtr FilesCountSum; - TSensorCounterPtr FilesCountMax; - TSensorCounterPtr FilesCountArgMax; - - TSensorCounterPtr FreeDiskSizeSum; - TSensorCounterPtr FreeDiskSizeMin; - TSensorCounterPtr FreeDiskSizeArgMin; - - TSensorCounterPtr UsedDiskSizeSum; - TSensorCounterPtr UsedDiskSizeMax; - TSensorCounterPtr UsedDiskSizeArgMax; + TSensorCounterPtr FilesCountSum; + TSensorCounterPtr FilesCountMax; + TSensorCounterPtr FilesCountArgMax; + + TSensorCounterPtr FreeDiskSizeSum; + TSensorCounterPtr FreeDiskSizeMin; + TSensorCounterPtr FreeDiskSizeArgMin; + + TSensorCounterPtr UsedDiskSizeSum; + TSensorCounterPtr UsedDiskSizeMax; + TSensorCounterPtr UsedDiskSizeArgMax; TSensorCounterPtr CpuTotalSum; TSensorCounterPtr MajorPageFaultsSum; @@ -208,8 +208,8 @@ private: TSensorCounterPtr FileAddCounter; TSensorCounterPtr RunningActors; -}; - +}; + struct TWorkerInfoPtrComparator { bool operator()(const TWorkerInfo::TPtr& a, const TWorkerInfo::TPtr& b) const { auto scoreA = (a->CpuTotal+1) * (a->RunningRequests + a->RunningWorkerActors + 1); @@ -223,5 +223,5 @@ struct TWorkerInfoPtrComparator { } } }; - + } // namespace NYql::NDqs |