diff options
| author | vvvv <[email protected]> | 2025-10-09 12:25:18 +0300 |
|---|---|---|
| committer | vvvv <[email protected]> | 2025-10-09 12:57:17 +0300 |
| commit | cb77d014972b2cdb27d2e6d979fc3a2772b27ad4 (patch) | |
| tree | 7f3bcd8ce71c6bd0f3ccc11e31b9f665475b819e /yql/essentials/minikql/computation/mkql_computation_node_graph.cpp | |
| parent | d58a8990d353b051c27e1069141117fdfde64358 (diff) | |
YQL-20086 minikql
commit_hash:e96f7390db5fcbe7e9f64f898141a263ad522daa
Diffstat (limited to 'yql/essentials/minikql/computation/mkql_computation_node_graph.cpp')
| -rw-r--r-- | yql/essentials/minikql/computation/mkql_computation_node_graph.cpp | 141 |
1 files changed, 79 insertions, 62 deletions
diff --git a/yql/essentials/minikql/computation/mkql_computation_node_graph.cpp b/yql/essentials/minikql/computation/mkql_computation_node_graph.cpp index 7e7c77a8844..c0e34abb103 100644 --- a/yql/essentials/minikql/computation/mkql_computation_node_graph.cpp +++ b/yql/essentials/minikql/computation/mkql_computation_node_graph.cpp @@ -13,7 +13,7 @@ #include <util/digest/city.h> #ifndef MKQL_DISABLE_CODEGEN -#include <llvm/Support/raw_ostream.h> // Y_IGNORE + #include <llvm/Support/raw_ostream.h> // Y_IGNORE #endif namespace NKikimr { @@ -53,7 +53,7 @@ const static TStatKey CodeGen_FinalizeTime("CodeGen_FinalizeTime", true); const static TStatKey Mkql_TotalNodes("Mkql_TotalNodes", true); const static TStatKey Mkql_CodegenFunctions("Mkql_CodegenFunctions", true); -class TDependencyScanVisitor : public TEmptyNodeVisitor { +class TDependencyScanVisitor: public TEmptyNodeVisitor { public: void Walk(TNode* root, std::vector<TNode*>& nodeStack) { Stack = &nodeStack; @@ -125,7 +125,6 @@ private: AddNode(node.GetItem().GetNode()); } - void AddNode(TNode* node) { if (node->GetCookie() != IS_NODE_REACHABLE) { Stack->push_back(node); @@ -182,7 +181,7 @@ public: if (cookie <= IS_NODE_REACHABLE) { MKQL_ENSURE(!require, "Computation graph builder, node not found, type:" - << node->GetType()->GetKindAsStr()); + << node->GetType()->GetKindAsStr()); return result; } @@ -195,8 +194,9 @@ public: IComputationExternalNode* GetEntryPoint(size_t index, bool require) { MKQL_ENSURE(index < Runtime2ComputationEntryPoints.size() && (!require || Runtime2ComputationEntryPoints[index]), - "Pattern nodes can not get computation node by index: " << index << ", require: " << require - << ", Runtime2ComputationEntryPoints size: " << Runtime2ComputationEntryPoints.size()); + "Pattern nodes can not get computation node by index: " + << index << ", require: " << require + << ", Runtime2ComputationEntryPoints size: " << Runtime2ComputationEntryPoints.size()); return Runtime2ComputationEntryPoints[index]; } @@ -228,10 +228,8 @@ private: bool SuitableForCache = true; }; -class TComputationGraphBuildingVisitor: - public INodeVisitor, - private TNonCopyable -{ +class TComputationGraphBuildingVisitor: public INodeVisitor, + private TNonCopyable { public: TComputationGraphBuildingVisitor(const TComputationPatternOpts& opts) : Env(opts.Env) @@ -384,7 +382,7 @@ private: if (typeId != 0x101) { // TODO remove const auto slot = NUdf::GetDataSlot(typeId); MKQL_ENSURE(IsValidValue(slot, value), - "Bad data literal for type: " << NUdf::GetDataTypeInfo(slot).Name << ", " << value); + "Bad data literal for type: " << NUdf::GetDataTypeInfo(slot).Name << ", " << value); } NUdf::TUnboxedValue externalValue; @@ -437,14 +435,16 @@ private: items.reserve(node.GetItemsCount()); for (ui32 i = 0, e = node.GetItemsCount(); i < e; ++i) { auto item = node.GetItem(i); - items.push_back(std::make_pair(GetComputationNode(item.first.GetNode()), GetComputationNode(item.second.GetNode()))); + items.push_back(std::make_pair( + GetComputationNode(item.first.GetNode()), + GetComputationNode(item.second.GetNode()))); } bool isSorted = !CanHash(keyType); AddNode(node, NodeFactory->CreateDictNode(std::move(items), types, isTuple, encoded ? keyType : nullptr, - useIHash && !isSorted ? MakeHashImpl(keyType) : nullptr, - useIHash ? MakeEquateImpl(keyType) : nullptr, - useIHash && isSorted ? MakeCompareImpl(keyType) : nullptr, isSorted)); + useIHash && !isSorted ? MakeHashImpl(keyType) : nullptr, + useIHash ? MakeEquateImpl(keyType) : nullptr, + useIHash && isSorted ? MakeCompareImpl(keyType) : nullptr, isSorted)); } void Visit(TCallable& node) override { @@ -463,37 +463,36 @@ private: return GetComputationNode(dependentNode, pop); }; TComputationNodeFactoryContext ctx( - nodeLocator, - FunctionRegistry, - Env, - TypeInfoHelper, - CountersProvider, - SecureParamsProvider, - LogProvider, - LangVer, - *NodeFactory, - *PatternNodes->HolderFactory, - PatternNodes->ValueBuilder.Get(), - ValidateMode, - ValidatePolicy, - GraphPerProcess, - PatternNodes->Mutables, - PatternNodes->ElementsCache, - std::bind(&TComputationGraphBuildingVisitor::PushBackNode, this, std::placeholders::_1)); + nodeLocator, + FunctionRegistry, + Env, + TypeInfoHelper, + CountersProvider, + SecureParamsProvider, + LogProvider, + LangVer, + *NodeFactory, + *PatternNodes->HolderFactory, + PatternNodes->ValueBuilder.Get(), + ValidateMode, + ValidatePolicy, + GraphPerProcess, + PatternNodes->Mutables, + PatternNodes->ElementsCache, + std::bind(&TComputationGraphBuildingVisitor::PushBackNode, this, std::placeholders::_1)); const auto computationNode = Factory(node, ctx); const auto& name = node.GetType()->GetName(); if (name == "KqpWideReadTable" || name == "KqpWideReadTableRanges" || name == "KqpBlockReadTableRanges" || name == "KqpLookupTable" || - name == "KqpReadTable" - ) { + name == "KqpReadTable") { PatternNodes->SuitableForCache = false; } if (!computationNode) { THROW yexception() - << "Computation graph builder, unsupported function: " << name << " type: " << Factory.target_type().name() ; + << "Computation graph builder, unsupported function: " << name << " type: " << Factory.target_type().name(); } AddNode(node, computationNode); @@ -580,9 +579,10 @@ private: const bool ExternalAlloc; // obsolete, will be removed after YQL-13977 }; -class TComputationGraph final : public IComputationGraph { +class TComputationGraph final: public IComputationGraph { public: - TComputationGraph(TPatternNodes::TPtr& patternNodes, const TComputationOptsFull& compOpts, NYql::NCodegen::ICodegen::TSharedPtr codegen) + TComputationGraph(TPatternNodes::TPtr& patternNodes, const TComputationOptsFull& compOpts, + NYql::NCodegen::ICodegen::TSharedPtr codegen) : PatternNodes(patternNodes) , MemInfo(MakeIntrusive<TMemoryUsageInfo>("ComputationGraph")) , CompOpts(compOpts) @@ -612,10 +612,10 @@ public: void Prepare() override { if (!IsPrepared) { Ctx.Reset(new TComputationContext(*HolderFactory, - ValueBuilder.Get(), - CompOpts, - PatternNodes->GetMutables(), - *NYql::NUdf::GetYqlMemoryPool())); + ValueBuilder.Get(), + CompOpts, + PatternNodes->GetMutables(), + *NYql::NUdf::GetYqlMemoryPool())); Ctx->ExecuteLLVM = Codegen.get() != nullptr; ValueBuilder->SetCalleePositionHolder(Ctx->CalleePosition); for (auto& node : PatternNodes->GetNodes()) { @@ -666,13 +666,13 @@ public: continue; } - deps.emplace(entryPoint, TNodeState{ true, i}); + deps.emplace(entryPoint, TNodeState{true, i}); } stack.push(PatternNodes->GetRoot()); while (!stack.empty()) { auto node = stack.top(); - auto [iter, inserted] = deps.emplace(node, TNodeState{ false, 0 }); + auto [iter, inserted] = deps.emplace(node, TNodeState{false, 0}); auto extNode = dynamic_cast<const IComputationExternalNode*>(node); if (extNode) { MKQL_ENSURE(!inserted, "Unexpected external node"); @@ -707,7 +707,10 @@ public: } void Invalidate() override { - std::fill_n(Ctx->MutableValues.get(), PatternNodes->GetMutables().CurValueIndex, NUdf::TUnboxedValue(NUdf::TUnboxedValuePod::Invalid())); + std::fill_n( + Ctx->MutableValues.get(), + PatternNodes->GetMutables().CurValueIndex, + NUdf::TUnboxedValue(NUdf::TUnboxedValuePod::Invalid())); } void InvalidateCaches() override { @@ -762,11 +765,11 @@ public: }; bool isList = mutableValue.HasListItems(); NUdf::TUnboxedValue list; - if (isList) { // No load was done during previous runs. + if (isList) { // No load was done during previous runs. saveList(mutableValue); } else { NUdf::TUnboxedValue saved = mutableValue.Save(); - if (saved.IsString() || saved.IsEmbedded()) { // Old version. + if (saved.IsString() || saved.IsEmbedded()) { // Old version. const TStringBuf savedBuf = saved.AsStringRef(); taskState.push_back({}); taskState.back().AppendNoAlias(savedBuf.Data(), savedBuf.Size()); @@ -781,7 +784,8 @@ public: it = taskState.erase(it); } } else { // No load was done during previous runs (if any). - MKQL_ENSURE(mutableValue.HasValue() && (mutableValue.IsString() || mutableValue.IsEmbedded()), "State is expected to have data or invalid value"); + MKQL_ENSURE(mutableValue.HasValue() && (mutableValue.IsString() || mutableValue.IsEmbedded()), + "State is expected to have data or invalid value"); const NUdf::TStringRef savedRef = mutableValue.AsStringRef(); WriteUi64(result, savedRef.Size()); result.AppendNoAlias(savedRef.Data(), savedRef.Size()); @@ -795,7 +799,9 @@ public: for (ui32 i : PatternNodes->GetMutables().SerializableValues) { if (const ui64 size = ReadUi64(state); size != std::numeric_limits<ui64>::max()) { - MKQL_ENSURE(state.Size() >= size, "Serialized state is corrupted - buffer is too short (" << state.Size() << ") for specified size: " << size); + MKQL_ENSURE(state.Size() >= size, + "Serialized state is corrupted - buffer is too short (" + << state.Size() << ") for specified size: " << size); TStringBuf savedRef(state.Data(), size); Ctx->MutableValues[i] = NKikimr::NMiniKQL::TOutputSerializer::MakeArray(*Ctx, savedRef); state.Skip(size); @@ -817,7 +823,7 @@ private: std::optional<TArrowKernelsTopology> KernelsTopology; }; -class TComputationPatternImpl final : public IComputationPattern { +class TComputationPatternImpl final: public IComputationPattern { public: TComputationPatternImpl(THolder<TComputationGraphBuildingVisitor>&& builder, const TComputationPatternOpts& opts) #if defined(MKQL_DISABLE_CODEGEN) @@ -825,19 +831,24 @@ public: #elif defined(MKQL_FORCE_USE_CODEGEN) : Codegen(NYql::NCodegen::ICodegen::MakeShared(NYql::NCodegen::ETarget::Native)) #else - : Codegen((NYql::NCodegen::ICodegen::IsCodegenAvailable() && opts.OptLLVM != "OFF") || GetEnv(TString("MKQL_FORCE_USE_LLVM")) ? NYql::NCodegen::ICodegen::MakeShared(NYql::NCodegen::ETarget::Native) : NYql::NCodegen::ICodegen::TPtr()) + : Codegen((NYql::NCodegen::ICodegen::IsCodegenAvailable() && opts.OptLLVM != "OFF") || + GetEnv(TString("MKQL_FORCE_USE_LLVM")) + ? NYql::NCodegen::ICodegen::MakeShared(NYql::NCodegen::ETarget::Native) + : NYql::NCodegen::ICodegen::TPtr()) #endif { - /// TODO: Enable JIT for AARCH64/Win + /// TODO: Enable JIT for AARCH64/Win #if defined(__aarch64__) || defined(_win_) Codegen = {}; #endif const auto& nodes = builder->GetNodes(); - for (const auto& node : nodes) + for (const auto& node : nodes) { node->PrepareStageOne(); - for (const auto& node : nodes) + } + for (const auto& node : nodes) { node->PrepareStageTwo(); + } MKQL_ADD_STAT(opts.Stats, Mkql_TotalNodes, nodes.size()); PatternNodes = builder->GetPatternNodes(); @@ -855,12 +866,14 @@ public: } void Compile(TString optLLVM, IStatsRegistry* stats) { - if (IsPatternCompiled.load()) + if (IsPatternCompiled.load()) { return; + } #ifndef MKQL_DISABLE_CODEGEN - if (!Codegen) + if (!Codegen) { Codegen = NYql::NCodegen::ICodegen::Make(NYql::NCodegen::ETarget::Native); + } const auto& nodes = PatternNodes->GetNodes(); @@ -984,14 +997,16 @@ private: TStringBuf GetCompileOptions(const TString& s) { const TString flag = "--compile-options"; auto lpos = s.rfind(flag); - if (lpos == TString::npos) + if (lpos == TString::npos) { return TStringBuf(); + } lpos += flag.size(); auto rpos = s.find(" --", lpos); - if (rpos == TString::npos) + if (rpos == TString::npos) { return TStringBuf(s, lpos); - else + } else { return TStringBuf(s, lpos, rpos - lpos); + } }; TTypeEnvironment* TypeEnv = nullptr; @@ -1001,9 +1016,11 @@ private: NYql::NCodegen::TCompileStats CompileStats; }; - -TIntrusivePtr<TComputationPatternImpl> MakeComputationPatternImpl(TExploringNodeVisitor& explorer, const TRuntimeNode& root, - const std::vector<TNode*>& entryPoints, const TComputationPatternOpts& opts) { +TIntrusivePtr<TComputationPatternImpl> MakeComputationPatternImpl( + TExploringNodeVisitor& explorer, + const TRuntimeNode& root, + const std::vector<TNode*>& entryPoints, + const TComputationPatternOpts& opts) { TDependencyScanVisitor depScanner; depScanner.Walk(root.GetNode(), opts.Env.GetNodeStack()); @@ -1050,7 +1067,7 @@ TIntrusivePtr<TComputationPatternImpl> MakeComputationPatternImpl(TExploringNode } // namespace IComputationPattern::TPtr MakeComputationPattern(TExploringNodeVisitor& explorer, const TRuntimeNode& root, - const std::vector<TNode*>& entryPoints, const TComputationPatternOpts& opts) { + const std::vector<TNode*>& entryPoints, const TComputationPatternOpts& opts) { return MakeComputationPatternImpl(explorer, root, entryPoints, opts); } |
