aboutsummaryrefslogtreecommitdiffstats
path: root/yql/essentials/core/yql_gc_transformer.cpp
blob: 94365d8de88431b945bb30762299a6afa24ea201 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
#include "yql_gc_transformer.h"
#include <yql/essentials/utils/yql_panic.h>

namespace NYql {

namespace {

class TGcNodeTransformer : public TSyncTransformerBase {
public:
    TGcNodeTransformer()
    {}

    TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) override {
        output = input;

        if (!CurrentThreshold)
            CurrentThreshold = ctx.GcConfig.Settings.NodeCountThreshold;

        if (ctx.NodeAllocationCounter < LastGcCount + CurrentThreshold) {
            return TStatus::Ok;
        }

        const auto oldSize = ctx.ExprNodes.size();
        const auto zombies = std::partition(ctx.ExprNodes.begin(), ctx.ExprNodes.end(), std::bind(std::logical_not<bool>(), std::bind(&TExprNode::Dead, std::placeholders::_1)));

        for (auto it = zombies; ctx.ExprNodes.cend() != it; ++it) {
            const auto dead = it->get();
            if (const auto hash = dead->GetHashAbove()) {
                const auto range = ctx.UniqueNodes.equal_range(hash);
                for (auto jt = range.first; range.second != jt;) {
                    if (jt->second == dead) {
                        jt = ctx.UniqueNodes.erase(jt);
                    } else {
                        ++jt;
                    }
                }
            }
        }

        ctx.ExprNodes.erase(zombies, ctx.ExprNodes.cend());
        const auto liveSize = ctx.ExprNodes.size();

        Y_ABORT_UNLESS(liveSize >= ctx.UniqueNodes.size());

        // Update statistic.
        ++ctx.GcConfig.Statistics.CollectCount;
        ctx.GcConfig.Statistics.TotalCollectedNodes += oldSize - liveSize;

        LastGcCount = ctx.NodeAllocationCounter;
        // adjust next treshold
        CurrentThreshold = Max(ctx.GcConfig.Settings.NodeCountThreshold, liveSize);

        if (liveSize > ctx.NodesAllocationLimit) {
            ctx.AddError(YqlIssue(TPosition(), TIssuesIds::CORE_GC_NODES_LIMIT_EXCEEDED, TStringBuilder()
                << "Too many allocated nodes, allowed: " << ctx.NodesAllocationLimit
                << ", current: " << liveSize));
            return TStatus::Error;
        }

        const auto poolSize = ctx.StringPool.MemoryAllocated() + ctx.StringPool.MemoryWaste();
        if (poolSize > ctx.StringsAllocationLimit) {
            ctx.AddError(YqlIssue(TPosition(), TIssuesIds::CORE_GC_STRINGS_LIMIT_EXCEEDED, TStringBuilder()
                << "Too large string pool, allowed: " << ctx.StringsAllocationLimit
                << ", current: " << poolSize));
            return TStatus::Error;
        }
        return TStatus::Ok;
    }

    void Rewind() final {
        LastGcCount = 0;
        CurrentThreshold = 0;
    }

private:
    ui64 LastGcCount = 0;
    ui64 CurrentThreshold = 0;
};

}

TAutoPtr<IGraphTransformer> CreateGcNodeTransformer() {
    return new TGcNodeTransformer();
}

}