aboutsummaryrefslogtreecommitdiffstats
path: root/yt/yql/plugin/dynamic/impl.cpp
blob: 5966a8a8ec17acd5eac9dd9b8af4b7094cc045e4 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
#include <yt/yql/plugin/bridge/interface.h>
#include <yt/yql/plugin/native/plugin.h>

#include <type_traits>

using namespace NYT::NYqlPlugin;
using namespace NYT::NYson;

extern "C" {

////////////////////////////////////////////////////////////////////////////////

TBridgeYqlPlugin* BridgeCreateYqlPlugin(const TBridgeYqlPluginOptions* bridgeOptions)
{
    THashMap<TString, TString> clusters;
    for (auto clusterIndex = 0; clusterIndex < bridgeOptions->ClusterCount; ++clusterIndex) {
        const auto& Cluster = bridgeOptions->Clusters[clusterIndex];
        clusters[Cluster.Cluster] = Cluster.Proxy;
    }

    auto operationAttributes = bridgeOptions->OperationAttributes
        ? TYsonString(TString(bridgeOptions->OperationAttributes, bridgeOptions->OperationAttributesLength))
        : TYsonString();

    TYqlPluginOptions options{
        .MRJobBinary = TString(bridgeOptions->MRJobBinary),
        .UdfDirectory = TString(bridgeOptions->UdfDirectory),
        .Clusters = std::move(clusters),
        .DefaultCluster = std::optional<TString>(bridgeOptions->DefaultCluster),
        .OperationAttributes = operationAttributes,
        .MaxFilesSizeMb = static_cast<int>(bridgeOptions->MaxFilesSizeMb),
        .MaxFileCount = static_cast<int>(bridgeOptions->MaxFileCount),
        .DownloadFileRetryCount = static_cast<int>(bridgeOptions->DownloadFileRetryCount),
        .YTTokenPath = TString(bridgeOptions->YTTokenPath),
        .LogBackend = std::move(*reinterpret_cast<THolder<TLogBackend>*>(bridgeOptions->LogBackend)),
    };
    auto nativePlugin = CreateYqlPlugin(options);
    return nativePlugin.release();
}

void BridgeFreeYqlPlugin(TBridgeYqlPlugin* plugin)
{
    auto* nativePlugin = reinterpret_cast<IYqlPlugin*>(plugin);
    delete nativePlugin;
}

void BridgeFreeQueryResult(TBridgeQueryResult* result)
{
    delete result->TaskInfo;
    delete result->Statistics;
    delete result->Plan;
    delete result->YsonResult;
    delete result->YsonError;
    delete result;
}

void FillString(const char*& str, ssize_t& strLength, const std::optional<TString>& original)
{
    if (!original) {
        str = nullptr;
        strLength = 0;
        return;
    }
    char* copy = new char[original->size() + 1];
    memcpy(copy, original->data(), original->size() + 1);
    str = copy;
    strLength = original->size();
}

TBridgeQueryResult* BridgeRun(TBridgeYqlPlugin* plugin, const char* queryId, const char* impersonationUser, const char* queryText, const char* settings, const TBridgeQueryFile* bridgeFiles, int bridgeFileCount)
{
    static const auto EmptyMap = TYsonString(TString("{}"));

    auto* nativePlugin = reinterpret_cast<IYqlPlugin*>(plugin);
    auto* bridgeResult = new TBridgeQueryResult;

    std::vector<TQueryFile> files(bridgeFileCount);
    for (int index = 0; index < bridgeFileCount; index++) {
        const auto& file = bridgeFiles[index];
        files.push_back(TQueryFile {
            .Name = TStringBuf(file.Name, file.NameLength),
            .Content = TStringBuf(file.Content, file.ContentLength),
            .Type = file.Type,
        });
    }

    auto result = nativePlugin->Run(
        NYT::TGuid::FromString(queryId),
        TString(impersonationUser),
        TString(queryText),
        settings ? TYsonString(TString(settings)) : EmptyMap,
        files);
    FillString(bridgeResult->YsonResult, bridgeResult->YsonResultLength, result.YsonResult);
    FillString(bridgeResult->Plan, bridgeResult->PlanLength, result.Plan);
    FillString(bridgeResult->Statistics, bridgeResult->StatisticsLength, result.Statistics);
    FillString(bridgeResult->Progress, bridgeResult->ProgressLength, result.Progress);
    FillString(bridgeResult->TaskInfo, bridgeResult->TaskInfoLength, result.TaskInfo);
    FillString(bridgeResult->YsonError, bridgeResult->YsonErrorLength, result.YsonError);

    return bridgeResult;
}

TBridgeQueryResult* BridgeGetProgress(TBridgeYqlPlugin* plugin, const char* queryId)
{
    auto* nativePlugin = reinterpret_cast<IYqlPlugin*>(plugin);
    auto* bridgeResult = new TBridgeQueryResult;

    auto result = nativePlugin->GetProgress(NYT::TGuid::FromString(queryId));
    FillString(bridgeResult->Plan, bridgeResult->PlanLength, result.Plan);
    FillString(bridgeResult->Progress, bridgeResult->ProgressLength, result.Progress);

    return bridgeResult;
}

////////////////////////////////////////////////////////////////////////////////

// Validate that the all functions from the bridge interface are implemented with proper signatures.

#define XX(function) static_assert(std::is_same_v<decltype(&(function)), TFunc ## function*>);
FOR_EACH_BRIDGE_INTERFACE_FUNCTION(XX)
#undef XX

////////////////////////////////////////////////////////////////////////////////

} // extern "C"