aboutsummaryrefslogtreecommitdiffstats
path: root/yql/essentials/providers/common/mkql_simple_file/mkql_simple_file.cpp
blob: 65e4077a874b005b8b85ac01afa1ee4265cb6d05 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
#include "mkql_simple_file.h"

#include <yql/essentials/core/yql_user_data_storage.h>
#include <yql/essentials/minikql/mkql_program_builder.h>
#include <yql/essentials/minikql/mkql_node_cast.h>

#include <util/stream/file.h>

namespace NYql {

using namespace NKikimr;
using namespace NKikimr::NMiniKQL;

TSimpleFileTransformProvider::TSimpleFileTransformProvider(const IFunctionRegistry* functionRegistry,
    const TUserDataTable& userDataBlocks)
    : FunctionRegistry_(functionRegistry)
    , UserDataBlocks_(userDataBlocks)
{}

TCallableVisitFunc TSimpleFileTransformProvider::operator()(TInternName name) {
    if (name == "FilePath") {
        return [&](NMiniKQL::TCallable& callable, const TTypeEnvironment& env) {
            MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 arguments");
            const TString name(AS_VALUE(TDataLiteral, callable.GetInput(0))->AsValue().AsStringRef());
            auto block = TUserDataStorage::FindUserDataBlock(UserDataBlocks_, name);
            MKQL_ENSURE(block, "File not found: " << name);
            MKQL_ENSURE(block->Type == EUserDataType::PATH || block->FrozenFile, "File is not frozen, name: "
                << name << ", block type: " << block->Type);
            return TProgramBuilder(env, *FunctionRegistry_).NewDataLiteral<NUdf::EDataSlot::String>(
                block->Type == EUserDataType::PATH ? block->Data : block->FrozenFile->GetPath().GetPath()
            );
        };
    }

    if (name == "FolderPath") {
        return [&](NMiniKQL::TCallable& callable, const TTypeEnvironment& env) {
            MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 arguments");
            const TString name(AS_VALUE(TDataLiteral, callable.GetInput(0))->AsValue().AsStringRef());
            auto folderName = TUserDataStorage::MakeFolderName(name);
            TMaybe<TString> folderPath;
            for (const auto& x : UserDataBlocks_) {
                if (!x.first.Alias().StartsWith(folderName)) {
                    continue;
                }

                MKQL_ENSURE(x.second.Type == EUserDataType::PATH, "FolderPath not supported for non-file data block, name: "
                    << x.first.Alias() << ", block type: " << x.second.Type);
                auto newFolderPath = x.second.Data.substr(0, x.second.Data.size() - (x.first.Alias().size() - folderName.size()));
                if (!folderPath) {
                    folderPath = newFolderPath;
                } else {
                    MKQL_ENSURE(*folderPath == newFolderPath, "File " << x.second.Data << " is out of directory " << *folderPath);
                }
            }

            return TProgramBuilder(env, *FunctionRegistry_).NewDataLiteral<NUdf::EDataSlot::String>(*folderPath);
        };
    }

    if (name == "FileContent") {
        return [&](NMiniKQL::TCallable& callable, const TTypeEnvironment& env) {
            MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 arguments");
            const TString name(AS_VALUE(TDataLiteral, callable.GetInput(0))->AsValue().AsStringRef());
            auto block = TUserDataStorage::FindUserDataBlock(UserDataBlocks_, name);
            MKQL_ENSURE(block, "File not found: " << name);
            const TProgramBuilder pgmBuilder(env, *FunctionRegistry_);
            if (block->Type == EUserDataType::PATH) {
                auto content = TFileInput(block->Data).ReadAll();
                return pgmBuilder.NewDataLiteral<NUdf::EDataSlot::String>(content);
            }
            else if (block->Type == EUserDataType::RAW_INLINE_DATA) {
                return pgmBuilder.NewDataLiteral<NUdf::EDataSlot::String>(block->Data);
            }
            else if (block->FrozenFile && block->Type == EUserDataType::URL) {
                auto content = TFileInput(block->FrozenFile->GetPath().GetPath()).ReadAll();
                return pgmBuilder.NewDataLiteral<NUdf::EDataSlot::String>(content);
            } else {
                MKQL_ENSURE(false, "Unsupported block type");
            }
        };
    }

    return TCallableVisitFunc();
}

} // NYql