aboutsummaryrefslogtreecommitdiffstats
path: root/yql/essentials/public/purecalc/common/compile_mkql.cpp
blob: 743447ada9c10f74eab644cb958c77ff4b060f93 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
#include "compile_mkql.h"

#include <yql/essentials/providers/common/mkql/yql_provider_mkql.h>
#include <yql/essentials/providers/common/mkql/yql_type_mkql.h>
#include <yql/essentials/core/yql_user_data_storage.h>
#include <yql/essentials/public/purecalc/common/names.h>

#include <util/stream/file.h>

namespace NYql::NPureCalc {

namespace {

NCommon::IMkqlCallableCompiler::TCompiler MakeSelfCallableCompiler() {
    return [](const TExprNode& node, NCommon::TMkqlBuildContext& ctx) {
        MKQL_ENSURE(node.ChildrenSize() == 1, "Self takes exactly 1 argument");
        const auto* argument = node.Child(0);
        MKQL_ENSURE(argument->IsAtom(), "Self argument must be atom");
        ui32 inputIndex = 0;
        MKQL_ENSURE(TryFromString(argument->Content(), inputIndex), "Self argument must be UI32");
        auto type = NCommon::BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder);
        NKikimr::NMiniKQL::TCallableBuilder call(ctx.ProgramBuilder.GetTypeEnvironment(), node.Content(), type);
        call.Add(ctx.ProgramBuilder.NewDataLiteral<ui32>(inputIndex));
        return NKikimr::NMiniKQL::TRuntimeNode(call.Build(), false);
    };
}

NCommon::IMkqlCallableCompiler::TCompiler MakeFilePathCallableCompiler(const TUserDataTable& userData) {
    return [&](const TExprNode& node, NCommon::TMkqlBuildContext& ctx) {
        const TString name(node.Child(0)->Content());
        auto block = TUserDataStorage::FindUserDataBlock(userData, TUserDataKey::File(name));
        if (!block) {
            auto blockKey = TUserDataKey::File(GetDefaultFilePrefix() + name);
            block = TUserDataStorage::FindUserDataBlock(userData, blockKey);
        }
        MKQL_ENSURE(block, "file not found: " << name);
        MKQL_ENSURE(block->Type == EUserDataType::PATH,
                    "FilePath not supported for non-filesystem user data, name: "
                        << name << ", block type: " << block->Type);
        return ctx.ProgramBuilder.NewDataLiteral<NKikimr::NUdf::EDataSlot::String>(block->Data);
    };
}

NCommon::IMkqlCallableCompiler::TCompiler MakeFileContentCallableCompiler(const TUserDataTable& userData) {
    return [&](const TExprNode& node, NCommon::TMkqlBuildContext& ctx) {
        const TString name(node.Child(0)->Content());
        auto block = TUserDataStorage::FindUserDataBlock(userData, TUserDataKey::File(name));
        if (!block) {
            auto blockKey = TUserDataKey::File(GetDefaultFilePrefix() + name);
            block = TUserDataStorage::FindUserDataBlock(userData, blockKey);
        }
        MKQL_ENSURE(block, "file not found: " << name);
        if (block->Type == EUserDataType::PATH) {
            auto content = TFileInput(block->Data).ReadAll();
            return ctx.ProgramBuilder.NewDataLiteral<NKikimr::NUdf::EDataSlot::String>(content);
        } else if (block->Type == EUserDataType::RAW_INLINE_DATA) {
            return ctx.ProgramBuilder.NewDataLiteral<NKikimr::NUdf::EDataSlot::String>(block->Data);
        } else {
            // TODO support EUserDataType::URL
            MKQL_ENSURE(false, "user data blocks of type URL are not supported by FileContent: " << name);
            Y_UNREACHABLE();
        }
    };
}

NCommon::IMkqlCallableCompiler::TCompiler MakeFolderPathCallableCompiler(const TUserDataTable& userData) {
    return [&](const TExprNode& node, NCommon::TMkqlBuildContext& ctx) {
        const TString name(node.Child(0)->Content());
        auto folderName = TUserDataStorage::MakeFolderName(name);
        TMaybe<TString> folderPath;
        for (const auto& x : userData) {
            if (!x.first.Alias().StartsWith(folderName)) {
                continue;
            }

            MKQL_ENSURE(x.second.Type == EUserDataType::PATH,
                        "FilePath not supported for non-file data block, name: "
                            << x.first.Alias() << ", block type: " << x.second.Type);

            auto pathPrefixLength = x.second.Data.size() - (x.first.Alias().size() - folderName.size());
            auto newFolderPath = x.second.Data.substr(0, pathPrefixLength);
            if (!folderPath) {
                folderPath = newFolderPath;
            } else {
                MKQL_ENSURE(*folderPath == newFolderPath,
                            "file " << x.second.Data << " is out of directory " << *folderPath);
            }
        }
        return ctx.ProgramBuilder.NewDataLiteral<NKikimr::NUdf::EDataSlot::String>(*folderPath);
    };
}

}

NKikimr::NMiniKQL::TRuntimeNode CompileMkql(const TExprNode::TPtr& exprRoot, TExprContext& exprCtx,
    const NKikimr::NMiniKQL::IFunctionRegistry& funcRegistry, const NKikimr::NMiniKQL::TTypeEnvironment& env, const TUserDataTable& userData)
{
    NCommon::TMkqlCommonCallableCompiler compiler;

    compiler.AddCallable(PurecalcInputCallableName, MakeSelfCallableCompiler());
    compiler.AddCallable(PurecalcBlockInputCallableName, MakeSelfCallableCompiler());
    compiler.OverrideCallable("FileContent", MakeFileContentCallableCompiler(userData));
    compiler.OverrideCallable("FilePath", MakeFilePathCallableCompiler(userData));
    compiler.OverrideCallable("FolderPath", MakeFolderPathCallableCompiler(userData));

    // Prepare build context

    NKikimr::NMiniKQL::TProgramBuilder pgmBuilder(env, funcRegistry);
    NCommon::TMkqlBuildContext buildCtx(compiler, pgmBuilder, exprCtx);

    // Build the root MKQL node

    return NCommon::MkqlBuildExpr(*exprRoot, buildCtx);
}

} // NYql::NPureCalc