summaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/client/py_helpers.cpp
diff options
context:
space:
mode:
authormax42 <[email protected]>2023-06-30 03:37:03 +0300
committermax42 <[email protected]>2023-06-30 03:37:03 +0300
commitfac2bd72b4b31ec3238292caf8fb2a8aaa6d6c4a (patch)
treeb8cbc1deb00309c7f1a7ab6df520a76cf0b5c6d7 /yt/cpp/mapreduce/client/py_helpers.cpp
parent7bf166b1a7ed0af927f230022b245af618e998c1 (diff)
YT-19324: move YT provider to ydb/library/yql
This commit is formed by the following script: https://paste.yandex-team.ru/6f92e4b8-efc5-4d34-948b-15ee2accd7e7/text. This commit has zero effect on all projects that depend on YQL. The summary of changes: - `yql/providers/yt -> ydb/library/yql/providers/yt `- the whole implementation of YT provider is moved into YDB code base for further export as a part of YT YQL plugin shared library; - `yql/providers/stat/{expr_nodes,uploader} -> ydb/library/yql/providers/stat/{expr_nodes,uploader}` - a small interface without implementation and the description of stat expr nodes; - `yql/core/extract_predicate/ut -> ydb/library/yql/core/extract_predicate/ut`; - `yql/core/{ut,ut_common} -> ydb/library/yql/core/{ut,ut_common}`; - `yql/core` is gone; - `yql/library/url_preprocessing -> ydb/library/yql/core/url_preprocessing`. **NB**: all new targets inside `ydb/` are under `IF (NOT CMAKE_EXPORT)` clause which disables them from open-source cmake generation and ya make build. They will be enabled in the subsequent commits.
Diffstat (limited to 'yt/cpp/mapreduce/client/py_helpers.cpp')
-rw-r--r--yt/cpp/mapreduce/client/py_helpers.cpp112
1 files changed, 112 insertions, 0 deletions
diff --git a/yt/cpp/mapreduce/client/py_helpers.cpp b/yt/cpp/mapreduce/client/py_helpers.cpp
new file mode 100644
index 00000000000..3072449866e
--- /dev/null
+++ b/yt/cpp/mapreduce/client/py_helpers.cpp
@@ -0,0 +1,112 @@
+#include "py_helpers.h"
+
+#include "client.h"
+#include "operation.h"
+#include "transaction.h"
+
+#include <yt/cpp/mapreduce/interface/client.h>
+#include <yt/cpp/mapreduce/interface/fluent.h>
+
+#include <yt/cpp/mapreduce/common/retry_lib.h>
+#include <yt/cpp/mapreduce/common/helpers.h>
+
+#include <library/cpp/yson/node/node_io.h>
+
+#include <util/generic/hash_set.h>
+
+namespace NYT {
+
+using namespace NDetail;
+
+////////////////////////////////////////////////////////////////////////////////
+
+IStructuredJobPtr ConstructJob(const TString& jobName, const TString& state)
+{
+ auto node = TNode();
+ if (!state.empty()) {
+ node = NodeFromYsonString(state);
+ }
+ return TJobFactory::Get()->GetConstructingFunction(jobName.data())(node);
+}
+
+TString GetJobStateString(const IStructuredJob& job)
+{
+ TString result;
+ {
+ TStringOutput output(result);
+ job.Save(output);
+ output.Finish();
+ }
+ return result;
+}
+
+TStructuredJobTableList NodeToStructuredTablePaths(const TNode& node, const TOperationPreparer& preparer)
+{
+ int intermediateTableCount = 0;
+ TVector<TRichYPath> paths;
+ for (const auto& inputNode : node.AsList()) {
+ if (inputNode.IsNull()) {
+ ++intermediateTableCount;
+ } else {
+ paths.emplace_back(inputNode.AsString());
+ }
+ }
+ paths = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, preparer.GetContext(), paths);
+ TStructuredJobTableList result(intermediateTableCount, TStructuredJobTable::Intermediate(TUnspecifiedTableStructure()));
+ for (const auto& path : paths) {
+ result.emplace_back(TStructuredJobTable{TUnspecifiedTableStructure(), path});
+ }
+ return result;
+}
+
+TString GetIOInfo(
+ const IStructuredJob& job,
+ const TCreateClientOptions& options,
+ const TString& cluster,
+ const TString& transactionId,
+ const TString& inputPaths,
+ const TString& outputPaths,
+ const TString& neededColumns)
+{
+ auto client = NDetail::CreateClientImpl(cluster, options);
+ TOperationPreparer preparer(client, GetGuid(transactionId));
+
+ auto structuredInputs = NodeToStructuredTablePaths(NodeFromYsonString(inputPaths), preparer);
+ auto structuredOutputs = NodeToStructuredTablePaths(NodeFromYsonString(outputPaths), preparer);
+
+ auto neededColumnsNode = NodeFromYsonString(neededColumns);
+ THashSet<TString> columnsUsedInOperations;
+ for (const auto& columnNode : neededColumnsNode.AsList()) {
+ columnsUsedInOperations.insert(columnNode.AsString());
+ }
+
+ auto operationIo = CreateSimpleOperationIoHelper(
+ job,
+ preparer,
+ TOperationOptions(),
+ std::move(structuredInputs),
+ std::move(structuredOutputs),
+ TUserJobFormatHints(),
+ ENodeReaderFormat::Yson,
+ columnsUsedInOperations);
+
+ return BuildYsonStringFluently().BeginMap()
+ .Item("input_format").Value(operationIo.InputFormat.Config)
+ .Item("output_format").Value(operationIo.OutputFormat.Config)
+ .Item("input_table_paths").List(operationIo.Inputs)
+ .Item("output_table_paths").List(operationIo.Outputs)
+ .Item("small_files").DoListFor(
+ operationIo.JobFiles.begin(),
+ operationIo.JobFiles.end(),
+ [] (TFluentList fluent, auto fileIt) {
+ fluent.Item().BeginMap()
+ .Item("file_name").Value(fileIt->FileName)
+ .Item("data").Value(fileIt->Data)
+ .EndMap();
+ })
+ .EndMap();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT