summaryrefslogtreecommitdiffstats
path: root/yql/essentials/minikql/computation/mkql_computation_node.cpp
diff options
context:
space:
mode:
authorvvvv <[email protected]>2024-11-07 04:19:26 +0300
committervvvv <[email protected]>2024-11-07 04:29:50 +0300
commit2661be00f3bc47590fda9218bf0386d6355c8c88 (patch)
tree3d316c07519191283d31c5f537efc6aabb42a2f0 /yql/essentials/minikql/computation/mkql_computation_node.cpp
parentcf2a23963ac10add28c50cc114fbf48953eca5aa (diff)
Moved yql/minikql YQL-19206
init [nodiff:caesar] commit_hash:d1182ef7d430ccf7e4d37ed933c7126d7bd5d6e4
Diffstat (limited to 'yql/essentials/minikql/computation/mkql_computation_node.cpp')
-rw-r--r--yql/essentials/minikql/computation/mkql_computation_node.cpp142
1 files changed, 142 insertions, 0 deletions
diff --git a/yql/essentials/minikql/computation/mkql_computation_node.cpp b/yql/essentials/minikql/computation/mkql_computation_node.cpp
new file mode 100644
index 00000000000..2303e954a49
--- /dev/null
+++ b/yql/essentials/minikql/computation/mkql_computation_node.cpp
@@ -0,0 +1,142 @@
+#include "mkql_computation_node_holders.h"
+#include "mkql_computation_node_impl.h"
+#include "mkql_computation_node_pack.h"
+#include "mkql_value_builder.h"
+#include "mkql_validate.h"
+
+#include <yql/essentials/minikql/mkql_node_builder.h>
+#include <yql/essentials/minikql/mkql_node_cast.h>
+#include <yql/essentials/minikql/mkql_node_printer.h>
+#include <yql/essentials/minikql/mkql_function_registry.h>
+#include <yql/essentials/minikql/mkql_type_builder.h>
+#include <yql/essentials/minikql/mkql_utils.h>
+#include <yql/essentials/minikql/mkql_alloc.h>
+
+#include <util/generic/set.h>
+#include <util/generic/algorithm.h>
+#include <util/random/mersenne.h>
+#include <util/random/random.h>
+#include <util/system/tempfile.h>
+#include <util/system/fstat.h>
+#include <util/system/rusage.h>
+#include <util/stream/file.h>
+#include <util/stream/output.h>
+#include <util/memory/pool.h>
+
+namespace NKikimr {
+namespace NMiniKQL {
+
+std::unique_ptr<IArrowKernelComputationNode> IComputationNode::PrepareArrowKernelComputationNode(TComputationContext& ctx) const {
+ Y_UNUSED(ctx);
+ return {};
+}
+
+TDatumProvider MakeDatumProvider(const arrow::Datum& datum) {
+ return [datum]() {
+ return datum;
+ };
+}
+
+TDatumProvider MakeDatumProvider(const IComputationNode* node, TComputationContext& ctx) {
+ return [node, &ctx]() {
+ return TArrowBlock::From(node->GetValue(ctx)).GetDatum();
+ };
+}
+
+TComputationContext::TComputationContext(const THolderFactory& holderFactory,
+ const NUdf::IValueBuilder* builder,
+ const TComputationOptsFull& opts,
+ const TComputationMutables& mutables,
+ arrow::MemoryPool& arrowMemoryPool)
+ : TComputationContextLLVM{holderFactory, opts.Stats, std::make_unique<NUdf::TUnboxedValue[]>(mutables.CurValueIndex), builder}
+ , RandomProvider(opts.RandomProvider)
+ , TimeProvider(opts.TimeProvider)
+ , ArrowMemoryPool(arrowMemoryPool)
+ , WideFields(mutables.CurWideFieldsIndex, nullptr)
+ , TypeEnv(opts.TypeEnv)
+ , Mutables(mutables)
+ , TypeInfoHelper(new TTypeInfoHelper)
+ , CountersProvider(opts.CountersProvider)
+ , SecureParamsProvider(opts.SecureParamsProvider)
+{
+ std::fill_n(MutableValues.get(), mutables.CurValueIndex, NUdf::TUnboxedValue(NUdf::TUnboxedValuePod::Invalid()));
+
+ for (const auto& [mutableIdx, fieldIdx, used] : mutables.WideFieldInitialize) {
+ for (ui32 i: used) {
+ WideFields[fieldIdx + i] = &MutableValues[mutableIdx + i];
+ }
+ }
+}
+
+TComputationContext::~TComputationContext() {
+#ifndef NDEBUG
+ if (RssCounter) {
+ Cerr << "UsageOnFinish: graph=" << HolderFactory.GetPagePool().GetUsed()
+ << ", rss=" << TRusage::Get().MaxRss
+ << ", peakAlloc=" << HolderFactory.GetPagePool().GetPeakAllocated()
+ << ", adjustor=" << UsageAdjustor
+ << Endl;
+ }
+#endif
+}
+
+void TComputationContext::UpdateUsageAdjustor(ui64 memLimit) {
+ const auto rss = TRusage::Get().MaxRss;
+ if (!InitRss) {
+ LastRss = InitRss = rss;
+ }
+
+#ifndef NDEBUG
+ // Print first time and then each 30 seconds
+ bool printUsage = LastPrintUsage == TInstant::Zero()
+ || TInstant::Now() > TDuration::Seconds(30).ToDeadLine(LastPrintUsage);
+#endif
+
+ if (auto peakAlloc = HolderFactory.GetPagePool().GetPeakAllocated()) {
+ if (rss - InitRss > memLimit && rss - LastRss > (memLimit / 4)) {
+ UsageAdjustor = std::max(1.f, float(rss - InitRss) / float(peakAlloc));
+ LastRss = rss;
+#ifndef NDEBUG
+ printUsage = UsageAdjustor > 1.f;
+#endif
+ }
+ }
+
+#ifndef NDEBUG
+ if (printUsage) {
+ Cerr << "Usage: graph=" << HolderFactory.GetPagePool().GetUsed()
+ << ", rss=" << rss
+ << ", peakAlloc=" << HolderFactory.GetPagePool().GetPeakAllocated()
+ << ", adjustor=" << UsageAdjustor
+ << Endl;
+ LastPrintUsage = TInstant::Now();
+ }
+#endif
+}
+
+class TSimpleSecureParamsProvider : public NUdf::ISecureParamsProvider {
+public:
+ TSimpleSecureParamsProvider(const THashMap<TString, TString>& secureParams)
+ : SecureParams(secureParams)
+ {}
+
+ bool GetSecureParam(NUdf::TStringRef key, NUdf::TStringRef& value) const override {
+ auto found = SecureParams.FindPtr(TStringBuf(key));
+ if (!found) {
+ return false;
+ }
+
+ value = (TStringBuf)*found;
+ return true;
+ }
+
+private:
+ const THashMap<TString, TString> SecureParams;
+};
+
+std::unique_ptr<NUdf::ISecureParamsProvider> MakeSimpleSecureParamsProvider(const THashMap<TString, TString>& secureParams) {
+ return std::make_unique<TSimpleSecureParamsProvider>(secureParams);
+}
+
+}
+}