aboutsummaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/client/py_helpers.cpp
diff options
context:
space:
mode:
authormax42 <max42@yandex-team.com>2023-07-29 00:02:16 +0300
committermax42 <max42@yandex-team.com>2023-07-29 00:02:16 +0300
commit73b89de71748a21e102d27b9f3ed1bf658766cb5 (patch)
tree188bbd2d622fa91cdcbb1b6d6d77fbc84a0646f5 /yt/cpp/mapreduce/client/py_helpers.cpp
parent528e321bcc2a2b67b53aeba58c3bd88305a141ee (diff)
downloadydb-73b89de71748a21e102d27b9f3ed1bf658766cb5.tar.gz
YT-19210: expose YQL shared library for YT.
After this, a new target libyqlplugin.so appears. in open-source cmake build. Diff in open-source YDB repo looks like the following: https://paste.yandex-team.ru/f302bdb4-7ef2-4362-91c7-6ca45f329264
Diffstat (limited to 'yt/cpp/mapreduce/client/py_helpers.cpp')
-rw-r--r--yt/cpp/mapreduce/client/py_helpers.cpp112
1 files changed, 112 insertions, 0 deletions
diff --git a/yt/cpp/mapreduce/client/py_helpers.cpp b/yt/cpp/mapreduce/client/py_helpers.cpp
new file mode 100644
index 0000000000..3072449866
--- /dev/null
+++ b/yt/cpp/mapreduce/client/py_helpers.cpp
@@ -0,0 +1,112 @@
+#include "py_helpers.h"
+
+#include "client.h"
+#include "operation.h"
+#include "transaction.h"
+
+#include <yt/cpp/mapreduce/interface/client.h>
+#include <yt/cpp/mapreduce/interface/fluent.h>
+
+#include <yt/cpp/mapreduce/common/retry_lib.h>
+#include <yt/cpp/mapreduce/common/helpers.h>
+
+#include <library/cpp/yson/node/node_io.h>
+
+#include <util/generic/hash_set.h>
+
+namespace NYT {
+
+using namespace NDetail;
+
+////////////////////////////////////////////////////////////////////////////////
+
+IStructuredJobPtr ConstructJob(const TString& jobName, const TString& state)
+{
+ auto node = TNode();
+ if (!state.empty()) {
+ node = NodeFromYsonString(state);
+ }
+ return TJobFactory::Get()->GetConstructingFunction(jobName.data())(node);
+}
+
+TString GetJobStateString(const IStructuredJob& job)
+{
+ TString result;
+ {
+ TStringOutput output(result);
+ job.Save(output);
+ output.Finish();
+ }
+ return result;
+}
+
+TStructuredJobTableList NodeToStructuredTablePaths(const TNode& node, const TOperationPreparer& preparer)
+{
+ int intermediateTableCount = 0;
+ TVector<TRichYPath> paths;
+ for (const auto& inputNode : node.AsList()) {
+ if (inputNode.IsNull()) {
+ ++intermediateTableCount;
+ } else {
+ paths.emplace_back(inputNode.AsString());
+ }
+ }
+ paths = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, preparer.GetContext(), paths);
+ TStructuredJobTableList result(intermediateTableCount, TStructuredJobTable::Intermediate(TUnspecifiedTableStructure()));
+ for (const auto& path : paths) {
+ result.emplace_back(TStructuredJobTable{TUnspecifiedTableStructure(), path});
+ }
+ return result;
+}
+
+TString GetIOInfo(
+ const IStructuredJob& job,
+ const TCreateClientOptions& options,
+ const TString& cluster,
+ const TString& transactionId,
+ const TString& inputPaths,
+ const TString& outputPaths,
+ const TString& neededColumns)
+{
+ auto client = NDetail::CreateClientImpl(cluster, options);
+ TOperationPreparer preparer(client, GetGuid(transactionId));
+
+ auto structuredInputs = NodeToStructuredTablePaths(NodeFromYsonString(inputPaths), preparer);
+ auto structuredOutputs = NodeToStructuredTablePaths(NodeFromYsonString(outputPaths), preparer);
+
+ auto neededColumnsNode = NodeFromYsonString(neededColumns);
+ THashSet<TString> columnsUsedInOperations;
+ for (const auto& columnNode : neededColumnsNode.AsList()) {
+ columnsUsedInOperations.insert(columnNode.AsString());
+ }
+
+ auto operationIo = CreateSimpleOperationIoHelper(
+ job,
+ preparer,
+ TOperationOptions(),
+ std::move(structuredInputs),
+ std::move(structuredOutputs),
+ TUserJobFormatHints(),
+ ENodeReaderFormat::Yson,
+ columnsUsedInOperations);
+
+ return BuildYsonStringFluently().BeginMap()
+ .Item("input_format").Value(operationIo.InputFormat.Config)
+ .Item("output_format").Value(operationIo.OutputFormat.Config)
+ .Item("input_table_paths").List(operationIo.Inputs)
+ .Item("output_table_paths").List(operationIo.Outputs)
+ .Item("small_files").DoListFor(
+ operationIo.JobFiles.begin(),
+ operationIo.JobFiles.end(),
+ [] (TFluentList fluent, auto fileIt) {
+ fluent.Item().BeginMap()
+ .Item("file_name").Value(fileIt->FileName)
+ .Item("data").Value(fileIt->Data)
+ .EndMap();
+ })
+ .EndMap();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT