aboutsummaryrefslogtreecommitdiffstats
path: root/yql/essentials/minikql/computation/mkql_computation_node.cpp
blob: 2303e954a49fb17619faca24afb4b57a21a62f8b (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
#include "mkql_computation_node_holders.h"
#include "mkql_computation_node_impl.h"
#include "mkql_computation_node_pack.h"
#include "mkql_value_builder.h"
#include "mkql_validate.h"

#include <yql/essentials/minikql/mkql_node_builder.h>
#include <yql/essentials/minikql/mkql_node_cast.h>
#include <yql/essentials/minikql/mkql_node_printer.h>
#include <yql/essentials/minikql/mkql_function_registry.h>
#include <yql/essentials/minikql/mkql_type_builder.h>
#include <yql/essentials/minikql/mkql_utils.h>
#include <yql/essentials/minikql/mkql_alloc.h>

#include <util/generic/set.h>
#include <util/generic/algorithm.h>
#include <util/random/mersenne.h>
#include <util/random/random.h>
#include <util/system/tempfile.h>
#include <util/system/fstat.h>
#include <util/system/rusage.h>
#include <util/stream/file.h>
#include <util/stream/output.h>
#include <util/memory/pool.h>

namespace NKikimr {
namespace NMiniKQL {

std::unique_ptr<IArrowKernelComputationNode> IComputationNode::PrepareArrowKernelComputationNode(TComputationContext& ctx) const {
    Y_UNUSED(ctx);
    return {};
}

TDatumProvider MakeDatumProvider(const arrow::Datum& datum) {
    return [datum]() {
        return datum;
    };
}

TDatumProvider MakeDatumProvider(const IComputationNode* node, TComputationContext& ctx) {
    return [node, &ctx]() {
        return TArrowBlock::From(node->GetValue(ctx)).GetDatum();
    };
}

TComputationContext::TComputationContext(const THolderFactory& holderFactory,
    const NUdf::IValueBuilder* builder,
    const TComputationOptsFull& opts,
    const TComputationMutables& mutables,
    arrow::MemoryPool& arrowMemoryPool)
    : TComputationContextLLVM{holderFactory, opts.Stats, std::make_unique<NUdf::TUnboxedValue[]>(mutables.CurValueIndex), builder}
    , RandomProvider(opts.RandomProvider)
    , TimeProvider(opts.TimeProvider)
    , ArrowMemoryPool(arrowMemoryPool)
    , WideFields(mutables.CurWideFieldsIndex, nullptr)
    , TypeEnv(opts.TypeEnv)
    , Mutables(mutables)
    , TypeInfoHelper(new TTypeInfoHelper)
    , CountersProvider(opts.CountersProvider)
    , SecureParamsProvider(opts.SecureParamsProvider)
{
    std::fill_n(MutableValues.get(), mutables.CurValueIndex, NUdf::TUnboxedValue(NUdf::TUnboxedValuePod::Invalid()));

    for (const auto& [mutableIdx, fieldIdx, used] : mutables.WideFieldInitialize) {
        for (ui32 i: used) {
            WideFields[fieldIdx + i] = &MutableValues[mutableIdx + i];
        }
    }
}

TComputationContext::~TComputationContext() {
#ifndef NDEBUG
    if (RssCounter) {
        Cerr << "UsageOnFinish: graph=" << HolderFactory.GetPagePool().GetUsed()
            << ", rss=" << TRusage::Get().MaxRss
            << ", peakAlloc=" << HolderFactory.GetPagePool().GetPeakAllocated()
            << ", adjustor=" << UsageAdjustor
            << Endl;
    }
#endif
}

void TComputationContext::UpdateUsageAdjustor(ui64 memLimit) {
    const auto rss = TRusage::Get().MaxRss;
    if (!InitRss) {
        LastRss = InitRss = rss;
    }

#ifndef NDEBUG
    // Print first time and then each 30 seconds
    bool printUsage = LastPrintUsage == TInstant::Zero()
        || TInstant::Now() > TDuration::Seconds(30).ToDeadLine(LastPrintUsage);
#endif

    if (auto peakAlloc = HolderFactory.GetPagePool().GetPeakAllocated()) {
        if (rss - InitRss > memLimit && rss - LastRss > (memLimit / 4)) {
            UsageAdjustor = std::max(1.f, float(rss - InitRss) / float(peakAlloc));
            LastRss = rss;
#ifndef NDEBUG
            printUsage = UsageAdjustor > 1.f;
#endif
        }
    }

#ifndef NDEBUG
    if (printUsage) {
        Cerr << "Usage: graph=" << HolderFactory.GetPagePool().GetUsed()
            << ", rss=" << rss
            << ", peakAlloc=" << HolderFactory.GetPagePool().GetPeakAllocated()
            << ", adjustor=" << UsageAdjustor
            << Endl;
        LastPrintUsage = TInstant::Now();
    }
#endif
}

class TSimpleSecureParamsProvider : public NUdf::ISecureParamsProvider {
public:
    TSimpleSecureParamsProvider(const THashMap<TString, TString>& secureParams)
        : SecureParams(secureParams)
    {}

    bool GetSecureParam(NUdf::TStringRef key, NUdf::TStringRef& value) const override {
        auto found = SecureParams.FindPtr(TStringBuf(key));
        if (!found) {
            return false;
        }

        value = (TStringBuf)*found;
        return true;
    }

private:
    const THashMap<TString, TString> SecureParams;
};

std::unique_ptr<NUdf::ISecureParamsProvider> MakeSimpleSecureParamsProvider(const THashMap<TString, TString>& secureParams) {
    return std::make_unique<TSimpleSecureParamsProvider>(secureParams);
}

}
}