1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
|
#include <yt/yql/plugin/bridge/interface.h>
#include <yt/yql/plugin/native/plugin.h>
#include <type_traits>
using namespace NYT::NYqlPlugin;
using namespace NYT::NYson;
extern "C" {
////////////////////////////////////////////////////////////////////////////////
TBridgeYqlPlugin* BridgeCreateYqlPlugin(const TBridgeYqlPluginOptions* bridgeOptions)
{
THashMap<TString, TString> clusters;
for (auto clusterIndex = 0; clusterIndex < bridgeOptions->ClusterCount; ++clusterIndex) {
const auto& Cluster = bridgeOptions->Clusters[clusterIndex];
clusters[Cluster.Cluster] = Cluster.Proxy;
}
auto operationAttributes = bridgeOptions->OperationAttributes
? TYsonString(TString(bridgeOptions->OperationAttributes, bridgeOptions->OperationAttributesLength))
: TYsonString();
TYqlPluginOptions options{
.MRJobBinary = TString(bridgeOptions->MRJobBinary),
.UdfDirectory = TString(bridgeOptions->UdfDirectory),
.Clusters = std::move(clusters),
.DefaultCluster = std::optional<TString>(bridgeOptions->DefaultCluster),
.OperationAttributes = operationAttributes,
.YTTokenPath = TString(bridgeOptions->YTTokenPath),
.LogBackend = std::move(*reinterpret_cast<THolder<TLogBackend>*>(bridgeOptions->LogBackend)),
};
auto nativePlugin = CreateYqlPlugin(options);
return nativePlugin.release();
}
void BridgeFreeYqlPlugin(TBridgeYqlPlugin* plugin)
{
auto* nativePlugin = reinterpret_cast<IYqlPlugin*>(plugin);
delete nativePlugin;
}
void BridgeFreeQueryResult(TBridgeQueryResult* result)
{
delete result->TaskInfo;
delete result->Statistics;
delete result->Plan;
delete result->YsonResult;
delete result->YsonError;
delete result;
}
TBridgeQueryResult* BridgeRun(TBridgeYqlPlugin* plugin, const char* impersonationUser, const char* queryText, const char* settings)
{
static const auto EmptyMap = TYsonString(TString("{}"));
auto* nativePlugin = reinterpret_cast<IYqlPlugin*>(plugin);
auto* bridgeResult = new TBridgeQueryResult;
auto fillString = [] (const char*& str, ssize_t& strLength, const std::optional<TString>& original) {
if (!original) {
str = nullptr;
strLength = 0;
return;
}
char* copy = new char[original->size() + 1];
memcpy(copy, original->data(), original->size() + 1);
str = copy;
strLength = original->size();
};
auto result = nativePlugin->Run(TString(impersonationUser), TString(queryText), settings ? TYsonString(TString(settings)) : EmptyMap);
fillString(bridgeResult->YsonResult, bridgeResult->YsonResultLength, result.YsonResult);
fillString(bridgeResult->Plan, bridgeResult->PlanLength, result.Plan);
fillString(bridgeResult->Statistics, bridgeResult->StatisticsLength, result.Statistics);
fillString(bridgeResult->TaskInfo, bridgeResult->TaskInfoLength, result.TaskInfo);
fillString(bridgeResult->YsonError, bridgeResult->YsonErrorLength, result.YsonError);
return bridgeResult;
}
////////////////////////////////////////////////////////////////////////////////
// Validate that the all functions from the bridge interface are implemented with proper signatures.
#define XX(function) static_assert(std::is_same_v<decltype(&(function)), TFunc ## function*>);
FOR_EACH_BRIDGE_INTERFACE_FUNCTION(XX)
#undef XX
////////////////////////////////////////////////////////////////////////////////
} // extern "C"
|