1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
|
#include "infer_schema_rpc.h"
#include "yt/cpp/mapreduce/common/helpers.h"
#include <yt/yt/library/auth/auth.h>
#include <yt/yt/client/api/client.h>
#include <yt/yt/client/api/rpc_proxy/client_impl.h>
#include <yt/yt/client/api/rpc_proxy/config.h>
#include <yt/yt/client/api/rpc_proxy/connection.h>
#include <yt/yt/client/api/rpc_proxy/row_stream.h>
#include <yql/essentials/utils/yql_panic.h>
#include <yql/essentials/utils/log/log.h>
#include <yt/yql/providers/yt/lib/yt_rpc_helpers/yt_convert_helpers.h>
#include <library/cpp/yson/parser.h>
namespace NYql {
void OnPayload(const NYT::TSharedRef& block, size_t i, std::vector<TStreamSchemaInferer>& inferers, std::vector<NYT::TPromise<void>>& promises) {
NYT::NApi::NRpcProxy::NProto::TRowsetDescriptor descriptor;
NYT::NApi::NRpcProxy::NProto::TRowsetStatistics statistics;
auto currentPayload = std::move(NYT::NApi::NRpcProxy::DeserializeRowStreamBlockEnvelope(block, &descriptor, &statistics));
if (descriptor.rowset_format() != NYT::NApi::NRpcProxy::NProto::RF_FORMAT) {
return;
}
if (currentPayload.empty()) {
return;
}
try {
NYT::TNode res;
NYT::TNodeBuilder builder(&res);
TMemoryInput mem(currentPayload.begin(), currentPayload.Size());
NYson::TYsonListParser parser(&builder, &mem);
while (parser.Parse()) {
auto& lst = res.AsList();
for (size_t j = 0; j < lst.size(); ++j) {
inferers[i].AddRow(lst[j]);
}
res.Clear();
}
} catch (std::exception& e) {
promises[i].Set(e);
}
}
TVector<TMaybe<NYT::TNode>> InferSchemaFromTablesContents(const TString& cluster, const TString& token, const NYT::TTransactionId& tx, const std::vector<TTableInferSchemaRequest>& requests, TAsyncQueue::TPtr queue) {
const ui32 Timeout = 300'000;
auto connectionConfig = NYT::New<NYT::NApi::NRpcProxy::TConnectionConfig>();
connectionConfig->ClusterUrl = cluster;
connectionConfig->DefaultTotalStreamingTimeout = TDuration::MilliSeconds(Timeout);
connectionConfig->EnableRetries = true;
connectionConfig->DefaultPingPeriod = TDuration::MilliSeconds(5000);
auto connection = CreateConnection(connectionConfig);
auto clientOptions = NYT::NApi::TClientOptions();
if (!token.empty()) {
clientOptions.Token = token;
}
auto client = DynamicPointerCast<NYT::NApi::NRpcProxy::TClient>(connection->CreateClient(clientOptions));
Y_ABORT_UNLESS(client);
auto apiServiceProxy = client->CreateApiServiceProxy();
TVector<NYT::NConcurrency::IAsyncZeroCopyInputStreamPtr> inputs(requests.size());
size_t i = 0;
std::vector<NYT::TFuture<void>> futures;
std::vector<NYT::TPromise<void>> promises;
std::vector<TStreamSchemaInferer> inferers;
inferers.reserve(requests.size());
std::function<void(size_t)> runRead = [&](size_t i) {
YT_UNUSED_FUTURE(inputs[i]->Read().ApplyUnique(BIND([queue, &inferers, &promises, &runRead, i = i](NYT::TErrorOr<NYT::TSharedRef>&& res){
if (res.IsOK() && !res.Value()) {
// EOS
promises[i].Set();
return;
}
if (!res.IsOK()) {
promises[i].Set(res);
return;
}
Y_UNUSED(queue->Async([&inferers, &promises, &runRead, i = i, block = std::move(res.Value())] {
OnPayload(block, i, inferers, promises);
runRead(i);
}));
})));
};
futures.reserve(requests.size());
promises.reserve(requests.size());
YQL_CLOG(TRACE, ProviderYt) << "Infering started";
for (auto& req: requests) {
inferers.emplace_back(req.TableName);
auto request = apiServiceProxy.ReadTable();
client->InitStreamingRequest(*request);
request->ClientAttachmentsStreamingParameters().ReadTimeout = TDuration::MilliSeconds(Timeout);
request->ClientAttachmentsStreamingParameters().WriteTimeout = TDuration::MilliSeconds(Timeout);
TString ppath;
NYT::NYPath::TRichYPath tableYPath(req.TableId);
NYT::NChunkClient::TReadRange range;
range.LowerLimit().SetRowIndex(0);
range.UpperLimit().SetRowIndex(req.Rows);
tableYPath.SetRanges({range});
NYT::NYPath::ToProto(&ppath, tableYPath);
request->set_path(ppath);
request->set_desired_rowset_format(NYT::NApi::NRpcProxy::NProto::ERowsetFormat::RF_FORMAT);
request->set_unordered(true);
NDqs::ConfigureTransaction(request, tx.dw);
// Get skiff format yson string
request->set_format("<format=binary>yson");
promises.push_back(NYT::NewPromise<void>());
futures.push_back(promises.back().ToFuture());
YT_UNUSED_FUTURE(CreateRpcClientInputStream(std::move(request)).ApplyUnique(BIND([&runRead, &inputs, i](NYT::NConcurrency::IAsyncZeroCopyInputStreamPtr&& stream) {
// first packet contains meta, skip it
return stream->Read().ApplyUnique(BIND([&runRead, stream = std::move(stream), i, &inputs](NYT::TSharedRef&&) {
inputs[i] = std::move(stream);
runRead(i);
}));
})));
++i;
}
YQL_CLOG(TRACE, ProviderYt) << "Futures prepared";
auto success = NYT::NConcurrency::WaitFor(AllSucceeded(futures));
YQL_CLOG(TRACE, ProviderYt) << "Infered";
success.ThrowOnError();
TVector<TMaybe<NYT::TNode>> result;
result.reserve(requests.size());
std::transform(inferers.begin(), inferers.end(), std::back_inserter(result), [](auto& x) { return x.GetSchema();});
return result;
}
}
|