aboutsummaryrefslogtreecommitdiffstats
path: root/ydb/core/testlib/minikql_compile.h
blob: 68dd3db375e436d64539601dddf0ee0350d36060 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
#pragma once

#include <ydb/core/scheme/scheme_tabledefs.h>
#include <ydb/core/client/minikql_compile/db_key_resolver.h>
#include <ydb/core/client/minikql_compile/yql_expr_minikql.h>
#include <library/cpp/threading/future/future.h>
#include <util/thread/pool.h>
#include <library/cpp/testing/unittest/registar.h>

class TMockDbSchemeResolver : public NYql::IDbSchemeResolver {
public:
    TMockDbSchemeResolver()
    {
        MtpQueue.Start(2);
    }

    template <typename Func>
    NThreading::TFuture<NThreading::TFutureType<::TFunctionResult<Func>>> Async(Func&& func, IThreadPool& queue) {
        auto promise = NThreading::NewPromise<NThreading::TFutureType<::TFunctionResult<Func>>>();
        auto lambda = [promise, func = std::forward<Func>(func)]() mutable {
            NThreading::NImpl::SetValue(promise, func);
        };
        queue.SafeAddFunc(std::move(lambda));
        return promise.GetFuture();
    }

    virtual NThreading::TFuture<TTableResults> ResolveTables(const TVector<TTable>& tables) override { 
        TTableResults results;
        results.reserve(tables.size());
        for (auto& table : tables) {
            TTableResult result(TTableResult::Ok);
            auto data = Tables.FindPtr(table.TableName);
            if (!data) {
                result.Status = TTableResult::Error;
                result.Reason = TStringBuilder() << "Table " << table.TableName << " not found";
            }
            else {
                result.Table = table;
                result.TableId.Reset(new NKikimr::TTableId(*data->TableId));
                result.KeyColumnCount = data->KeyColumnCount;

                for (auto& column : table.ColumnNames) {
                    auto columnInfo = data->Columns.FindPtr(column);
                    Y_VERIFY(column);

                    auto insertResult = result.Columns.insert(std::make_pair(column, *columnInfo));
                    Y_VERIFY(insertResult.second);
                }
            }

            results.push_back(result);
        }

        return Async([results]() {
            return results;
        }, MtpQueue);
    }

    virtual void ResolveTables(const TVector<TTable>& tables, NActors::TActorId responseTo) override {
        Y_UNUSED(tables);
        Y_UNUSED(responseTo);
        ythrow yexception() << "Not implemented";
    }

    void AddTable(const IDbSchemeResolver::TTableResult& table) {
        Y_ENSURE(!!table.TableId, "TableId must be defined");
        if (!Tables.insert({ table.Table.TableName, table }).second) {
            ythrow yexception() << "Table " << table.Table.TableName << " is already registered";
        }
    }

private:
    TThreadPool MtpQueue;
    THashMap<TString, IDbSchemeResolver::TTableResult> Tables; 
};

namespace NYql {

inline TExprContainer::TPtr ParseText(const TString& programText) { 
    TAstParseResult astRes = ParseAst(programText);
    astRes.Issues.PrintTo(Cerr);
    UNIT_ASSERT(astRes.IsOk());

    TExprContainer::TPtr expr(new TExprContainer());
    bool isOk = CompileExpr(*astRes.Root, expr->Root, expr->Context, nullptr);
    expr->Context.IssueManager.GetIssues().PrintTo(Cerr);
    UNIT_ASSERT(isOk);
    return expr;
}

}