diff options
author | max42 <max42@yandex-team.com> | 2023-07-29 00:02:16 +0300 |
---|---|---|
committer | max42 <max42@yandex-team.com> | 2023-07-29 00:02:16 +0300 |
commit | 73b89de71748a21e102d27b9f3ed1bf658766cb5 (patch) | |
tree | 188bbd2d622fa91cdcbb1b6d6d77fbc84a0646f5 /yt/cpp/mapreduce/client/py_helpers.cpp | |
parent | 528e321bcc2a2b67b53aeba58c3bd88305a141ee (diff) | |
download | ydb-73b89de71748a21e102d27b9f3ed1bf658766cb5.tar.gz |
YT-19210: expose YQL shared library for YT.
After this, a new target libyqlplugin.so appears. in open-source cmake build.
Diff in open-source YDB repo looks like the following: https://paste.yandex-team.ru/f302bdb4-7ef2-4362-91c7-6ca45f329264
Diffstat (limited to 'yt/cpp/mapreduce/client/py_helpers.cpp')
-rw-r--r-- | yt/cpp/mapreduce/client/py_helpers.cpp | 112 |
1 files changed, 112 insertions, 0 deletions
diff --git a/yt/cpp/mapreduce/client/py_helpers.cpp b/yt/cpp/mapreduce/client/py_helpers.cpp new file mode 100644 index 0000000000..3072449866 --- /dev/null +++ b/yt/cpp/mapreduce/client/py_helpers.cpp @@ -0,0 +1,112 @@ +#include "py_helpers.h" + +#include "client.h" +#include "operation.h" +#include "transaction.h" + +#include <yt/cpp/mapreduce/interface/client.h> +#include <yt/cpp/mapreduce/interface/fluent.h> + +#include <yt/cpp/mapreduce/common/retry_lib.h> +#include <yt/cpp/mapreduce/common/helpers.h> + +#include <library/cpp/yson/node/node_io.h> + +#include <util/generic/hash_set.h> + +namespace NYT { + +using namespace NDetail; + +//////////////////////////////////////////////////////////////////////////////// + +IStructuredJobPtr ConstructJob(const TString& jobName, const TString& state) +{ + auto node = TNode(); + if (!state.empty()) { + node = NodeFromYsonString(state); + } + return TJobFactory::Get()->GetConstructingFunction(jobName.data())(node); +} + +TString GetJobStateString(const IStructuredJob& job) +{ + TString result; + { + TStringOutput output(result); + job.Save(output); + output.Finish(); + } + return result; +} + +TStructuredJobTableList NodeToStructuredTablePaths(const TNode& node, const TOperationPreparer& preparer) +{ + int intermediateTableCount = 0; + TVector<TRichYPath> paths; + for (const auto& inputNode : node.AsList()) { + if (inputNode.IsNull()) { + ++intermediateTableCount; + } else { + paths.emplace_back(inputNode.AsString()); + } + } + paths = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, preparer.GetContext(), paths); + TStructuredJobTableList result(intermediateTableCount, TStructuredJobTable::Intermediate(TUnspecifiedTableStructure())); + for (const auto& path : paths) { + result.emplace_back(TStructuredJobTable{TUnspecifiedTableStructure(), path}); + } + return result; +} + +TString GetIOInfo( + const IStructuredJob& job, + const TCreateClientOptions& options, + const TString& cluster, + const TString& transactionId, + const TString& inputPaths, + const TString& outputPaths, + const TString& neededColumns) +{ + auto client = NDetail::CreateClientImpl(cluster, options); + TOperationPreparer preparer(client, GetGuid(transactionId)); + + auto structuredInputs = NodeToStructuredTablePaths(NodeFromYsonString(inputPaths), preparer); + auto structuredOutputs = NodeToStructuredTablePaths(NodeFromYsonString(outputPaths), preparer); + + auto neededColumnsNode = NodeFromYsonString(neededColumns); + THashSet<TString> columnsUsedInOperations; + for (const auto& columnNode : neededColumnsNode.AsList()) { + columnsUsedInOperations.insert(columnNode.AsString()); + } + + auto operationIo = CreateSimpleOperationIoHelper( + job, + preparer, + TOperationOptions(), + std::move(structuredInputs), + std::move(structuredOutputs), + TUserJobFormatHints(), + ENodeReaderFormat::Yson, + columnsUsedInOperations); + + return BuildYsonStringFluently().BeginMap() + .Item("input_format").Value(operationIo.InputFormat.Config) + .Item("output_format").Value(operationIo.OutputFormat.Config) + .Item("input_table_paths").List(operationIo.Inputs) + .Item("output_table_paths").List(operationIo.Outputs) + .Item("small_files").DoListFor( + operationIo.JobFiles.begin(), + operationIo.JobFiles.end(), + [] (TFluentList fluent, auto fileIt) { + fluent.Item().BeginMap() + .Item("file_name").Value(fileIt->FileName) + .Item("data").Value(fileIt->Data) + .EndMap(); + }) + .EndMap(); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT |