summaryrefslogtreecommitdiffstats
path: root/yql/essentials/minikql/computation/mkql_computation_node_graph.cpp
diff options
context:
space:
mode:
authorvvvv <[email protected]>2025-10-09 12:25:18 +0300
committervvvv <[email protected]>2025-10-09 12:57:17 +0300
commitcb77d014972b2cdb27d2e6d979fc3a2772b27ad4 (patch)
tree7f3bcd8ce71c6bd0f3ccc11e31b9f665475b819e /yql/essentials/minikql/computation/mkql_computation_node_graph.cpp
parentd58a8990d353b051c27e1069141117fdfde64358 (diff)
YQL-20086 minikql
commit_hash:e96f7390db5fcbe7e9f64f898141a263ad522daa
Diffstat (limited to 'yql/essentials/minikql/computation/mkql_computation_node_graph.cpp')
-rw-r--r--yql/essentials/minikql/computation/mkql_computation_node_graph.cpp141
1 files changed, 79 insertions, 62 deletions
diff --git a/yql/essentials/minikql/computation/mkql_computation_node_graph.cpp b/yql/essentials/minikql/computation/mkql_computation_node_graph.cpp
index 7e7c77a8844..c0e34abb103 100644
--- a/yql/essentials/minikql/computation/mkql_computation_node_graph.cpp
+++ b/yql/essentials/minikql/computation/mkql_computation_node_graph.cpp
@@ -13,7 +13,7 @@
#include <util/digest/city.h>
#ifndef MKQL_DISABLE_CODEGEN
-#include <llvm/Support/raw_ostream.h> // Y_IGNORE
+ #include <llvm/Support/raw_ostream.h> // Y_IGNORE
#endif
namespace NKikimr {
@@ -53,7 +53,7 @@ const static TStatKey CodeGen_FinalizeTime("CodeGen_FinalizeTime", true);
const static TStatKey Mkql_TotalNodes("Mkql_TotalNodes", true);
const static TStatKey Mkql_CodegenFunctions("Mkql_CodegenFunctions", true);
-class TDependencyScanVisitor : public TEmptyNodeVisitor {
+class TDependencyScanVisitor: public TEmptyNodeVisitor {
public:
void Walk(TNode* root, std::vector<TNode*>& nodeStack) {
Stack = &nodeStack;
@@ -125,7 +125,6 @@ private:
AddNode(node.GetItem().GetNode());
}
-
void AddNode(TNode* node) {
if (node->GetCookie() != IS_NODE_REACHABLE) {
Stack->push_back(node);
@@ -182,7 +181,7 @@ public:
if (cookie <= IS_NODE_REACHABLE) {
MKQL_ENSURE(!require, "Computation graph builder, node not found, type:"
- << node->GetType()->GetKindAsStr());
+ << node->GetType()->GetKindAsStr());
return result;
}
@@ -195,8 +194,9 @@ public:
IComputationExternalNode* GetEntryPoint(size_t index, bool require) {
MKQL_ENSURE(index < Runtime2ComputationEntryPoints.size() && (!require || Runtime2ComputationEntryPoints[index]),
- "Pattern nodes can not get computation node by index: " << index << ", require: " << require
- << ", Runtime2ComputationEntryPoints size: " << Runtime2ComputationEntryPoints.size());
+ "Pattern nodes can not get computation node by index: "
+ << index << ", require: " << require
+ << ", Runtime2ComputationEntryPoints size: " << Runtime2ComputationEntryPoints.size());
return Runtime2ComputationEntryPoints[index];
}
@@ -228,10 +228,8 @@ private:
bool SuitableForCache = true;
};
-class TComputationGraphBuildingVisitor:
- public INodeVisitor,
- private TNonCopyable
-{
+class TComputationGraphBuildingVisitor: public INodeVisitor,
+ private TNonCopyable {
public:
TComputationGraphBuildingVisitor(const TComputationPatternOpts& opts)
: Env(opts.Env)
@@ -384,7 +382,7 @@ private:
if (typeId != 0x101) { // TODO remove
const auto slot = NUdf::GetDataSlot(typeId);
MKQL_ENSURE(IsValidValue(slot, value),
- "Bad data literal for type: " << NUdf::GetDataTypeInfo(slot).Name << ", " << value);
+ "Bad data literal for type: " << NUdf::GetDataTypeInfo(slot).Name << ", " << value);
}
NUdf::TUnboxedValue externalValue;
@@ -437,14 +435,16 @@ private:
items.reserve(node.GetItemsCount());
for (ui32 i = 0, e = node.GetItemsCount(); i < e; ++i) {
auto item = node.GetItem(i);
- items.push_back(std::make_pair(GetComputationNode(item.first.GetNode()), GetComputationNode(item.second.GetNode())));
+ items.push_back(std::make_pair(
+ GetComputationNode(item.first.GetNode()),
+ GetComputationNode(item.second.GetNode())));
}
bool isSorted = !CanHash(keyType);
AddNode(node, NodeFactory->CreateDictNode(std::move(items), types, isTuple, encoded ? keyType : nullptr,
- useIHash && !isSorted ? MakeHashImpl(keyType) : nullptr,
- useIHash ? MakeEquateImpl(keyType) : nullptr,
- useIHash && isSorted ? MakeCompareImpl(keyType) : nullptr, isSorted));
+ useIHash && !isSorted ? MakeHashImpl(keyType) : nullptr,
+ useIHash ? MakeEquateImpl(keyType) : nullptr,
+ useIHash && isSorted ? MakeCompareImpl(keyType) : nullptr, isSorted));
}
void Visit(TCallable& node) override {
@@ -463,37 +463,36 @@ private:
return GetComputationNode(dependentNode, pop);
};
TComputationNodeFactoryContext ctx(
- nodeLocator,
- FunctionRegistry,
- Env,
- TypeInfoHelper,
- CountersProvider,
- SecureParamsProvider,
- LogProvider,
- LangVer,
- *NodeFactory,
- *PatternNodes->HolderFactory,
- PatternNodes->ValueBuilder.Get(),
- ValidateMode,
- ValidatePolicy,
- GraphPerProcess,
- PatternNodes->Mutables,
- PatternNodes->ElementsCache,
- std::bind(&TComputationGraphBuildingVisitor::PushBackNode, this, std::placeholders::_1));
+ nodeLocator,
+ FunctionRegistry,
+ Env,
+ TypeInfoHelper,
+ CountersProvider,
+ SecureParamsProvider,
+ LogProvider,
+ LangVer,
+ *NodeFactory,
+ *PatternNodes->HolderFactory,
+ PatternNodes->ValueBuilder.Get(),
+ ValidateMode,
+ ValidatePolicy,
+ GraphPerProcess,
+ PatternNodes->Mutables,
+ PatternNodes->ElementsCache,
+ std::bind(&TComputationGraphBuildingVisitor::PushBackNode, this, std::placeholders::_1));
const auto computationNode = Factory(node, ctx);
const auto& name = node.GetType()->GetName();
if (name == "KqpWideReadTable" ||
name == "KqpWideReadTableRanges" ||
name == "KqpBlockReadTableRanges" ||
name == "KqpLookupTable" ||
- name == "KqpReadTable"
- ) {
+ name == "KqpReadTable") {
PatternNodes->SuitableForCache = false;
}
if (!computationNode) {
THROW yexception()
- << "Computation graph builder, unsupported function: " << name << " type: " << Factory.target_type().name() ;
+ << "Computation graph builder, unsupported function: " << name << " type: " << Factory.target_type().name();
}
AddNode(node, computationNode);
@@ -580,9 +579,10 @@ private:
const bool ExternalAlloc; // obsolete, will be removed after YQL-13977
};
-class TComputationGraph final : public IComputationGraph {
+class TComputationGraph final: public IComputationGraph {
public:
- TComputationGraph(TPatternNodes::TPtr& patternNodes, const TComputationOptsFull& compOpts, NYql::NCodegen::ICodegen::TSharedPtr codegen)
+ TComputationGraph(TPatternNodes::TPtr& patternNodes, const TComputationOptsFull& compOpts,
+ NYql::NCodegen::ICodegen::TSharedPtr codegen)
: PatternNodes(patternNodes)
, MemInfo(MakeIntrusive<TMemoryUsageInfo>("ComputationGraph"))
, CompOpts(compOpts)
@@ -612,10 +612,10 @@ public:
void Prepare() override {
if (!IsPrepared) {
Ctx.Reset(new TComputationContext(*HolderFactory,
- ValueBuilder.Get(),
- CompOpts,
- PatternNodes->GetMutables(),
- *NYql::NUdf::GetYqlMemoryPool()));
+ ValueBuilder.Get(),
+ CompOpts,
+ PatternNodes->GetMutables(),
+ *NYql::NUdf::GetYqlMemoryPool()));
Ctx->ExecuteLLVM = Codegen.get() != nullptr;
ValueBuilder->SetCalleePositionHolder(Ctx->CalleePosition);
for (auto& node : PatternNodes->GetNodes()) {
@@ -666,13 +666,13 @@ public:
continue;
}
- deps.emplace(entryPoint, TNodeState{ true, i});
+ deps.emplace(entryPoint, TNodeState{true, i});
}
stack.push(PatternNodes->GetRoot());
while (!stack.empty()) {
auto node = stack.top();
- auto [iter, inserted] = deps.emplace(node, TNodeState{ false, 0 });
+ auto [iter, inserted] = deps.emplace(node, TNodeState{false, 0});
auto extNode = dynamic_cast<const IComputationExternalNode*>(node);
if (extNode) {
MKQL_ENSURE(!inserted, "Unexpected external node");
@@ -707,7 +707,10 @@ public:
}
void Invalidate() override {
- std::fill_n(Ctx->MutableValues.get(), PatternNodes->GetMutables().CurValueIndex, NUdf::TUnboxedValue(NUdf::TUnboxedValuePod::Invalid()));
+ std::fill_n(
+ Ctx->MutableValues.get(),
+ PatternNodes->GetMutables().CurValueIndex,
+ NUdf::TUnboxedValue(NUdf::TUnboxedValuePod::Invalid()));
}
void InvalidateCaches() override {
@@ -762,11 +765,11 @@ public:
};
bool isList = mutableValue.HasListItems();
NUdf::TUnboxedValue list;
- if (isList) { // No load was done during previous runs.
+ if (isList) { // No load was done during previous runs.
saveList(mutableValue);
} else {
NUdf::TUnboxedValue saved = mutableValue.Save();
- if (saved.IsString() || saved.IsEmbedded()) { // Old version.
+ if (saved.IsString() || saved.IsEmbedded()) { // Old version.
const TStringBuf savedBuf = saved.AsStringRef();
taskState.push_back({});
taskState.back().AppendNoAlias(savedBuf.Data(), savedBuf.Size());
@@ -781,7 +784,8 @@ public:
it = taskState.erase(it);
}
} else { // No load was done during previous runs (if any).
- MKQL_ENSURE(mutableValue.HasValue() && (mutableValue.IsString() || mutableValue.IsEmbedded()), "State is expected to have data or invalid value");
+ MKQL_ENSURE(mutableValue.HasValue() && (mutableValue.IsString() || mutableValue.IsEmbedded()),
+ "State is expected to have data or invalid value");
const NUdf::TStringRef savedRef = mutableValue.AsStringRef();
WriteUi64(result, savedRef.Size());
result.AppendNoAlias(savedRef.Data(), savedRef.Size());
@@ -795,7 +799,9 @@ public:
for (ui32 i : PatternNodes->GetMutables().SerializableValues) {
if (const ui64 size = ReadUi64(state); size != std::numeric_limits<ui64>::max()) {
- MKQL_ENSURE(state.Size() >= size, "Serialized state is corrupted - buffer is too short (" << state.Size() << ") for specified size: " << size);
+ MKQL_ENSURE(state.Size() >= size,
+ "Serialized state is corrupted - buffer is too short ("
+ << state.Size() << ") for specified size: " << size);
TStringBuf savedRef(state.Data(), size);
Ctx->MutableValues[i] = NKikimr::NMiniKQL::TOutputSerializer::MakeArray(*Ctx, savedRef);
state.Skip(size);
@@ -817,7 +823,7 @@ private:
std::optional<TArrowKernelsTopology> KernelsTopology;
};
-class TComputationPatternImpl final : public IComputationPattern {
+class TComputationPatternImpl final: public IComputationPattern {
public:
TComputationPatternImpl(THolder<TComputationGraphBuildingVisitor>&& builder, const TComputationPatternOpts& opts)
#if defined(MKQL_DISABLE_CODEGEN)
@@ -825,19 +831,24 @@ public:
#elif defined(MKQL_FORCE_USE_CODEGEN)
: Codegen(NYql::NCodegen::ICodegen::MakeShared(NYql::NCodegen::ETarget::Native))
#else
- : Codegen((NYql::NCodegen::ICodegen::IsCodegenAvailable() && opts.OptLLVM != "OFF") || GetEnv(TString("MKQL_FORCE_USE_LLVM")) ? NYql::NCodegen::ICodegen::MakeShared(NYql::NCodegen::ETarget::Native) : NYql::NCodegen::ICodegen::TPtr())
+ : Codegen((NYql::NCodegen::ICodegen::IsCodegenAvailable() && opts.OptLLVM != "OFF") ||
+ GetEnv(TString("MKQL_FORCE_USE_LLVM"))
+ ? NYql::NCodegen::ICodegen::MakeShared(NYql::NCodegen::ETarget::Native)
+ : NYql::NCodegen::ICodegen::TPtr())
#endif
{
- /// TODO: Enable JIT for AARCH64/Win
+ /// TODO: Enable JIT for AARCH64/Win
#if defined(__aarch64__) || defined(_win_)
Codegen = {};
#endif
const auto& nodes = builder->GetNodes();
- for (const auto& node : nodes)
+ for (const auto& node : nodes) {
node->PrepareStageOne();
- for (const auto& node : nodes)
+ }
+ for (const auto& node : nodes) {
node->PrepareStageTwo();
+ }
MKQL_ADD_STAT(opts.Stats, Mkql_TotalNodes, nodes.size());
PatternNodes = builder->GetPatternNodes();
@@ -855,12 +866,14 @@ public:
}
void Compile(TString optLLVM, IStatsRegistry* stats) {
- if (IsPatternCompiled.load())
+ if (IsPatternCompiled.load()) {
return;
+ }
#ifndef MKQL_DISABLE_CODEGEN
- if (!Codegen)
+ if (!Codegen) {
Codegen = NYql::NCodegen::ICodegen::Make(NYql::NCodegen::ETarget::Native);
+ }
const auto& nodes = PatternNodes->GetNodes();
@@ -984,14 +997,16 @@ private:
TStringBuf GetCompileOptions(const TString& s) {
const TString flag = "--compile-options";
auto lpos = s.rfind(flag);
- if (lpos == TString::npos)
+ if (lpos == TString::npos) {
return TStringBuf();
+ }
lpos += flag.size();
auto rpos = s.find(" --", lpos);
- if (rpos == TString::npos)
+ if (rpos == TString::npos) {
return TStringBuf(s, lpos);
- else
+ } else {
return TStringBuf(s, lpos, rpos - lpos);
+ }
};
TTypeEnvironment* TypeEnv = nullptr;
@@ -1001,9 +1016,11 @@ private:
NYql::NCodegen::TCompileStats CompileStats;
};
-
-TIntrusivePtr<TComputationPatternImpl> MakeComputationPatternImpl(TExploringNodeVisitor& explorer, const TRuntimeNode& root,
- const std::vector<TNode*>& entryPoints, const TComputationPatternOpts& opts) {
+TIntrusivePtr<TComputationPatternImpl> MakeComputationPatternImpl(
+ TExploringNodeVisitor& explorer,
+ const TRuntimeNode& root,
+ const std::vector<TNode*>& entryPoints,
+ const TComputationPatternOpts& opts) {
TDependencyScanVisitor depScanner;
depScanner.Walk(root.GetNode(), opts.Env.GetNodeStack());
@@ -1050,7 +1067,7 @@ TIntrusivePtr<TComputationPatternImpl> MakeComputationPatternImpl(TExploringNode
} // namespace
IComputationPattern::TPtr MakeComputationPattern(TExploringNodeVisitor& explorer, const TRuntimeNode& root,
- const std::vector<TNode*>& entryPoints, const TComputationPatternOpts& opts) {
+ const std::vector<TNode*>& entryPoints, const TComputationPatternOpts& opts) {
return MakeComputationPatternImpl(explorer, root, entryPoints, opts);
}