aboutsummaryrefslogtreecommitdiffstats
path: root/ydb/core/engine/minikql/minikql_engine_host.h
blob: 9c6e39a364f07405ff419052d74da06dae9b4f45 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
#pragma once

#include "change_collector_iface.h"

#include <util/generic/cast.h>
#include <ydb/core/tablet/tablet_exception.h>
#include <ydb/library/yql/minikql/mkql_function_registry.h>
#include <ydb/library/yql/minikql/mkql_node_cast.h>
#include <ydb/library/yql/minikql/mkql_program_builder.h>
#include <ydb/core/engine/mkql_engine_flat_host.h>

namespace NKikimr {
namespace NMiniKQL {

struct TEngineHostCounters {
    ui64 NSelectRow = 0;
    ui64 NSelectRange = 0;
    ui64 NUpdateRow  = 0;
    ui64 NEraseRow = 0;

    ui64 SelectRowRows = 0;
    ui64 SelectRowBytes = 0;
    ui64 SelectRangeRows = 0;
    ui64 SelectRangeBytes = 0;
    ui64 SelectRangeDeletedRowSkips = 0;
    ui64 UpdateRowBytes = 0;
    ui64 EraseRowBytes = 0;

    ui64 InvisibleRowSkips = 0;

    TEngineHostCounters& operator+=(const TEngineHostCounters& other) {
        NSelectRow += other.NSelectRow;
        NSelectRange += other.NSelectRange;
        NUpdateRow += other.NUpdateRow;
        NEraseRow += other.NEraseRow;
        SelectRowRows += other.SelectRowRows;
        SelectRowBytes += other.SelectRowBytes;
        SelectRangeRows += other.SelectRangeRows;
        SelectRangeBytes += other.SelectRangeBytes;
        SelectRangeDeletedRowSkips += other.SelectRangeDeletedRowSkips;
        UpdateRowBytes += other.UpdateRowBytes;
        EraseRowBytes += other.EraseRowBytes;
        InvisibleRowSkips += other.InvisibleRowSkips;
        return *this;
    }

    TString ToString() const {
        return TStringBuilder()
            << "{NSelectRow: " << NSelectRow
            << ", NSelectRange: " << NSelectRange
            << ", NUpdateRow: " << NUpdateRow
            << ", NEraseRow: " << NEraseRow
            << ", SelectRowRows: " << SelectRowRows
            << ", SelectRowBytes: " << SelectRowBytes
            << ", SelectRangeRows: " << SelectRangeRows
            << ", SelectRangeBytes: " << SelectRangeBytes
            << ", UpdateRowBytes: " << UpdateRowBytes
            << ", EraseRowBytes: " << EraseRowBytes
            << ", SelectRangeDeletedRowSkips: " << SelectRangeDeletedRowSkips
            << ", InvisibleRowSkips: " << InvisibleRowSkips
            << "}";
    }
};

struct IKeyAccessSampler : public TThrRefBase {
    using TPtr = TIntrusivePtr<IKeyAccessSampler>;
    virtual void AddSample(const TTableId& tableId, const TArrayRef<const TCell>& key) = 0;
};

struct TNoopKeySampler : public IKeyAccessSampler {
    void AddSample(const TTableId& tableId, const TArrayRef<const TCell>& key) override {
        Y_UNUSED(tableId);
        Y_UNUSED(key);
    }
};

struct TEngineHostSettings {
    ui64 ShardId;
    bool IsReadonly;
    bool DisableByKeyFilter;
    IKeyAccessSampler::TPtr KeyAccessSampler;

    explicit TEngineHostSettings(ui64 shardId = 0, bool IsReadonly = false, bool disableByKeyFilter = false,
                                 IKeyAccessSampler::TPtr keyAccessSampler = new TNoopKeySampler())
        : ShardId(shardId)
        , IsReadonly(IsReadonly)
        , DisableByKeyFilter(disableByKeyFilter)
        , KeyAccessSampler(keyAccessSampler)
    {}
};

class TEngineHost : public IEngineFlatHost {
public:
    using TScheme = NTable::TScheme;

    explicit TEngineHost(NTable::TDatabase& db, TEngineHostCounters& counters,
        const TEngineHostSettings& settings = TEngineHostSettings());
    ui64 GetShardId() const override;
    const TScheme::TTableInfo* GetTableInfo(const TTableId& tableId) const override;
    bool IsReadonly() const override;
    bool IsValidKey(TKeyDesc& key, std::pair<ui64, ui64>& maxSnapshotTime) const override;
    ui64 CalculateReadSize(const TVector<const TKeyDesc*>& keys) const override;
    ui64 CalculateResultSize(const TKeyDesc& key) const override;
    void PinPages(const TVector<THolder<TKeyDesc>>& keys, ui64 pageFaultCount) override;

    NUdf::TUnboxedValue SelectRow(const TTableId& tableId, const TArrayRef<const TCell>& row,
        TStructLiteral* columnIds, TOptionalType* returnType, const TReadTarget& readTarget,
        const THolderFactory& holderFactory) override;

    NUdf::TUnboxedValue SelectRange(const TTableId& tableId, const TTableRange& range,
        TStructLiteral* columnIds, TListLiteral* skipNullKeys, TStructType* returnType,
        const TReadTarget& readTarget, ui64 itemsLimit, ui64 bytesLimit, bool reverse,
        std::pair<const TListLiteral*, const TListLiteral*> forbidNullArgs, const THolderFactory& holderFactory) override;

    void UpdateRow(const TTableId& tableId, const TArrayRef<const TCell>& row,
        const TArrayRef<const TUpdateCommand>& commands) override;
    void EraseRow(const TTableId& tableId, const TArrayRef<const TCell>& row) override;
    bool IsPathErased(const TTableId& tableId) const override;
    bool IsMyKey(const TTableId& tableId, const TArrayRef<const TCell>& row) const override;
    ui64 GetTableSchemaVersion(const TTableId&) const override;

    void SetPeriodicCallback(TPeriodicCallback&& callback) override;
    void ExecPeriodicCallback() { if (PeriodicCallback) { PeriodicCallback();} }

    TEngineHostCounters& GetCounters() const { return Counters; }
    const TEngineHostSettings& GetSettings() const { return Settings; }

    virtual TRowVersion GetWriteVersion(const TTableId& tableId) const = 0;
    virtual TRowVersion GetReadVersion(const TTableId& tableId) const = 0;

    virtual IChangeCollector* GetChangeCollector(const TTableId& tableId) const = 0;

protected:
    virtual ui64 LocalTableId(const TTableId& tableId) const;
    void ConvertKeys(const TScheme::TTableInfo* tableInfo, const TArrayRef<const TCell>& row,
        TSmallVec<TRawTypeValue>& key) const;
    void DoCalculateReadSize(const TKeyDesc& key, NTable::TSizeEnv& env) const;

protected:
    NTable::TDatabase& Db;
    const TScheme& Scheme;
    const TEngineHostSettings Settings;
    TEngineHostCounters& Counters;
    TPeriodicCallback PeriodicCallback;
};

class TUnversionedEngineHost : public TEngineHost {
public:
    using TEngineHost::TEngineHost;

    TRowVersion GetWriteVersion(const TTableId& tableId) const override {
        Y_UNUSED(tableId);
        return TRowVersion::Min();
    }

    TRowVersion GetReadVersion(const TTableId& tableId) const override {
        Y_UNUSED(tableId);
        return TRowVersion::Max();
    }

    IChangeCollector* GetChangeCollector(const TTableId& tableId) const override {
        Y_UNUSED(tableId);
        return nullptr;
    }
};

void AnalyzeRowType(TStructLiteral* columnIds, TSmallVec<NTable::TTag>& tags, TSmallVec<NTable::TTag>& systemColumnTags);
NUdf::TUnboxedValue GetCellValue(const TCell& cell, NScheme::TTypeId type);
NUdf::TUnboxedValue CreateSelectRangeLazyRowsList(NTable::TDatabase& db, const NTable::TScheme& scheme,
    const THolderFactory& holderFactory, const TTableId& tableId, ui64 localTid, const TSmallVec<NTable::TTag>& tags,
    const TSmallVec<bool>& skipNullKeys, const TTableRange& range, ui64 itemsLimit, ui64 bytesLimit,
    bool reverse, TEngineHostCounters& counters, const TSmallVec<NTable::TTag>& systemColumnTags, ui64 shardId);

void ConvertTableKeys(const NTable::TScheme& scheme, const NTable::TScheme::TTableInfo* tableInfo,
    const TArrayRef<const TCell>& row, TSmallVec<TRawTypeValue>& key, ui64* keyDataBytes);

}}