diff options
| author | max42 <[email protected]> | 2023-06-30 03:37:03 +0300 |
|---|---|---|
| committer | max42 <[email protected]> | 2023-06-30 03:37:03 +0300 |
| commit | fac2bd72b4b31ec3238292caf8fb2a8aaa6d6c4a (patch) | |
| tree | b8cbc1deb00309c7f1a7ab6df520a76cf0b5c6d7 /yt/cpp/mapreduce/client/py_helpers.cpp | |
| parent | 7bf166b1a7ed0af927f230022b245af618e998c1 (diff) | |
YT-19324: move YT provider to ydb/library/yql
This commit is formed by the following script: https://paste.yandex-team.ru/6f92e4b8-efc5-4d34-948b-15ee2accd7e7/text.
This commit has zero effect on all projects that depend on YQL.
The summary of changes:
- `yql/providers/yt -> ydb/library/yql/providers/yt `- the whole implementation of YT provider is moved into YDB code base for further export as a part of YT YQL plugin shared library;
- `yql/providers/stat/{expr_nodes,uploader} -> ydb/library/yql/providers/stat/{expr_nodes,uploader}` - a small interface without implementation and the description of stat expr nodes;
- `yql/core/extract_predicate/ut -> ydb/library/yql/core/extract_predicate/ut`;
- `yql/core/{ut,ut_common} -> ydb/library/yql/core/{ut,ut_common}`;
- `yql/core` is gone;
- `yql/library/url_preprocessing -> ydb/library/yql/core/url_preprocessing`.
**NB**: all new targets inside `ydb/` are under `IF (NOT CMAKE_EXPORT)` clause which disables them from open-source cmake generation and ya make build. They will be enabled in the subsequent commits.
Diffstat (limited to 'yt/cpp/mapreduce/client/py_helpers.cpp')
| -rw-r--r-- | yt/cpp/mapreduce/client/py_helpers.cpp | 112 |
1 files changed, 112 insertions, 0 deletions
diff --git a/yt/cpp/mapreduce/client/py_helpers.cpp b/yt/cpp/mapreduce/client/py_helpers.cpp new file mode 100644 index 00000000000..3072449866e --- /dev/null +++ b/yt/cpp/mapreduce/client/py_helpers.cpp @@ -0,0 +1,112 @@ +#include "py_helpers.h" + +#include "client.h" +#include "operation.h" +#include "transaction.h" + +#include <yt/cpp/mapreduce/interface/client.h> +#include <yt/cpp/mapreduce/interface/fluent.h> + +#include <yt/cpp/mapreduce/common/retry_lib.h> +#include <yt/cpp/mapreduce/common/helpers.h> + +#include <library/cpp/yson/node/node_io.h> + +#include <util/generic/hash_set.h> + +namespace NYT { + +using namespace NDetail; + +//////////////////////////////////////////////////////////////////////////////// + +IStructuredJobPtr ConstructJob(const TString& jobName, const TString& state) +{ + auto node = TNode(); + if (!state.empty()) { + node = NodeFromYsonString(state); + } + return TJobFactory::Get()->GetConstructingFunction(jobName.data())(node); +} + +TString GetJobStateString(const IStructuredJob& job) +{ + TString result; + { + TStringOutput output(result); + job.Save(output); + output.Finish(); + } + return result; +} + +TStructuredJobTableList NodeToStructuredTablePaths(const TNode& node, const TOperationPreparer& preparer) +{ + int intermediateTableCount = 0; + TVector<TRichYPath> paths; + for (const auto& inputNode : node.AsList()) { + if (inputNode.IsNull()) { + ++intermediateTableCount; + } else { + paths.emplace_back(inputNode.AsString()); + } + } + paths = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, preparer.GetContext(), paths); + TStructuredJobTableList result(intermediateTableCount, TStructuredJobTable::Intermediate(TUnspecifiedTableStructure())); + for (const auto& path : paths) { + result.emplace_back(TStructuredJobTable{TUnspecifiedTableStructure(), path}); + } + return result; +} + +TString GetIOInfo( + const IStructuredJob& job, + const TCreateClientOptions& options, + const TString& cluster, + const TString& transactionId, + const TString& inputPaths, + const TString& outputPaths, + const TString& neededColumns) +{ + auto client = NDetail::CreateClientImpl(cluster, options); + TOperationPreparer preparer(client, GetGuid(transactionId)); + + auto structuredInputs = NodeToStructuredTablePaths(NodeFromYsonString(inputPaths), preparer); + auto structuredOutputs = NodeToStructuredTablePaths(NodeFromYsonString(outputPaths), preparer); + + auto neededColumnsNode = NodeFromYsonString(neededColumns); + THashSet<TString> columnsUsedInOperations; + for (const auto& columnNode : neededColumnsNode.AsList()) { + columnsUsedInOperations.insert(columnNode.AsString()); + } + + auto operationIo = CreateSimpleOperationIoHelper( + job, + preparer, + TOperationOptions(), + std::move(structuredInputs), + std::move(structuredOutputs), + TUserJobFormatHints(), + ENodeReaderFormat::Yson, + columnsUsedInOperations); + + return BuildYsonStringFluently().BeginMap() + .Item("input_format").Value(operationIo.InputFormat.Config) + .Item("output_format").Value(operationIo.OutputFormat.Config) + .Item("input_table_paths").List(operationIo.Inputs) + .Item("output_table_paths").List(operationIo.Outputs) + .Item("small_files").DoListFor( + operationIo.JobFiles.begin(), + operationIo.JobFiles.end(), + [] (TFluentList fluent, auto fileIt) { + fluent.Item().BeginMap() + .Item("file_name").Value(fileIt->FileName) + .Item("data").Value(fileIt->Data) + .EndMap(); + }) + .EndMap(); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT |
