diff options
author | max42 <max42@yandex-team.com> | 2023-06-30 03:37:03 +0300 |
---|---|---|
committer | max42 <max42@yandex-team.com> | 2023-06-30 03:37:03 +0300 |
commit | fac2bd72b4b31ec3238292caf8fb2a8aaa6d6c4a (patch) | |
tree | b8cbc1deb00309c7f1a7ab6df520a76cf0b5c6d7 /yt/cpp/mapreduce/client/prepare_operation.cpp | |
parent | 7bf166b1a7ed0af927f230022b245af618e998c1 (diff) | |
download | ydb-fac2bd72b4b31ec3238292caf8fb2a8aaa6d6c4a.tar.gz |
YT-19324: move YT provider to ydb/library/yql
This commit is formed by the following script: https://paste.yandex-team.ru/6f92e4b8-efc5-4d34-948b-15ee2accd7e7/text.
This commit has zero effect on all projects that depend on YQL.
The summary of changes:
- `yql/providers/yt -> ydb/library/yql/providers/yt `- the whole implementation of YT provider is moved into YDB code base for further export as a part of YT YQL plugin shared library;
- `yql/providers/stat/{expr_nodes,uploader} -> ydb/library/yql/providers/stat/{expr_nodes,uploader}` - a small interface without implementation and the description of stat expr nodes;
- `yql/core/extract_predicate/ut -> ydb/library/yql/core/extract_predicate/ut`;
- `yql/core/{ut,ut_common} -> ydb/library/yql/core/{ut,ut_common}`;
- `yql/core` is gone;
- `yql/library/url_preprocessing -> ydb/library/yql/core/url_preprocessing`.
**NB**: all new targets inside `ydb/` are under `IF (NOT CMAKE_EXPORT)` clause which disables them from open-source cmake generation and ya make build. They will be enabled in the subsequent commits.
Diffstat (limited to 'yt/cpp/mapreduce/client/prepare_operation.cpp')
-rw-r--r-- | yt/cpp/mapreduce/client/prepare_operation.cpp | 286 |
1 files changed, 286 insertions, 0 deletions
diff --git a/yt/cpp/mapreduce/client/prepare_operation.cpp b/yt/cpp/mapreduce/client/prepare_operation.cpp new file mode 100644 index 0000000000..7f772dc99a --- /dev/null +++ b/yt/cpp/mapreduce/client/prepare_operation.cpp @@ -0,0 +1,286 @@ +#include "prepare_operation.h" + +#include <yt/cpp/mapreduce/common/retry_lib.h> + +#include <yt/cpp/mapreduce/interface/serialize.h> + +#include <yt/cpp/mapreduce/raw_client/raw_requests.h> +#include <yt/cpp/mapreduce/raw_client/raw_batch_request.h> + +#include <library/cpp/iterator/functools.h> + +namespace NYT::NDetail { + +//////////////////////////////////////////////////////////////////////////////// + +TOperationPreparationContext::TOperationPreparationContext( + const TStructuredJobTableList& structuredInputs, + const TStructuredJobTableList& structuredOutputs, + const TClientContext& context, + const IClientRetryPolicyPtr& retryPolicy, + TTransactionId transactionId) + : Context_(context) + , RetryPolicy_(retryPolicy) + , TransactionId_(transactionId) + , InputSchemas_(structuredInputs.size()) + , InputSchemasLoaded_(structuredInputs.size(), false) +{ + Inputs_.reserve(structuredInputs.size()); + for (const auto& input : structuredInputs) { + Inputs_.push_back(input.RichYPath); + } + Outputs_.reserve(structuredOutputs.size()); + for (const auto& output : structuredOutputs) { + Outputs_.push_back(output.RichYPath); + } +} + +TOperationPreparationContext::TOperationPreparationContext( + TVector<TRichYPath> inputs, + TVector<TRichYPath> outputs, + const TClientContext& context, + const IClientRetryPolicyPtr& retryPolicy, + TTransactionId transactionId) + : Context_(context) + , RetryPolicy_(retryPolicy) + , TransactionId_(transactionId) + , InputSchemas_(inputs.size()) + , InputSchemasLoaded_(inputs.size(), false) +{ + Inputs_.reserve(inputs.size()); + for (auto& input : inputs) { + Inputs_.push_back(std::move(input)); + } + Outputs_.reserve(outputs.size()); + for (const auto& output : outputs) { + Outputs_.push_back(std::move(output)); + } +} + +int TOperationPreparationContext::GetInputCount() const +{ + return static_cast<int>(Inputs_.size()); +} + +int TOperationPreparationContext::GetOutputCount() const +{ + return static_cast<int>(Outputs_.size()); +} + +const TVector<TTableSchema>& TOperationPreparationContext::GetInputSchemas() const +{ + TVector<::NThreading::TFuture<TNode>> schemaFutures; + NRawClient::TRawBatchRequest batch(Context_.Config); + for (int tableIndex = 0; tableIndex < static_cast<int>(InputSchemas_.size()); ++tableIndex) { + if (InputSchemasLoaded_[tableIndex]) { + schemaFutures.emplace_back(); + continue; + } + Y_VERIFY(Inputs_[tableIndex]); + schemaFutures.push_back(batch.Get(TransactionId_, Inputs_[tableIndex]->Path_ + "/@schema", TGetOptions{})); + } + + NRawClient::ExecuteBatch( + RetryPolicy_->CreatePolicyForGenericRequest(), + Context_, + batch); + + for (int tableIndex = 0; tableIndex < static_cast<int>(InputSchemas_.size()); ++tableIndex) { + if (schemaFutures[tableIndex].Initialized()) { + Deserialize(InputSchemas_[tableIndex], schemaFutures[tableIndex].ExtractValueSync()); + } + } + + return InputSchemas_; +} + +const TTableSchema& TOperationPreparationContext::GetInputSchema(int index) const +{ + auto& schema = InputSchemas_[index]; + if (!InputSchemasLoaded_[index]) { + Y_VERIFY(Inputs_[index]); + auto schemaNode = NRawClient::Get( + RetryPolicy_->CreatePolicyForGenericRequest(), + Context_, + TransactionId_, + Inputs_[index]->Path_ + "/@schema"); + Deserialize(schema, schemaNode); + } + return schema; +} + +TMaybe<TYPath> TOperationPreparationContext::GetInputPath(int index) const +{ + Y_VERIFY(index < static_cast<int>(Inputs_.size())); + if (Inputs_[index]) { + return Inputs_[index]->Path_; + } + return Nothing(); +} + +TMaybe<TYPath> TOperationPreparationContext::GetOutputPath(int index) const +{ + Y_VERIFY(index < static_cast<int>(Outputs_.size())); + if (Outputs_[index]) { + return Outputs_[index]->Path_; + } + return Nothing(); +} + +//////////////////////////////////////////////////////////////////////////////// + +TSpeculativeOperationPreparationContext::TSpeculativeOperationPreparationContext( + const TVector<TTableSchema>& previousResult, + TStructuredJobTableList inputs, + TStructuredJobTableList outputs) + : InputSchemas_(previousResult) + , Inputs_(std::move(inputs)) + , Outputs_(std::move(outputs)) +{ + Y_VERIFY(Inputs_.size() == previousResult.size()); +} + +int TSpeculativeOperationPreparationContext::GetInputCount() const +{ + return static_cast<int>(Inputs_.size()); +} + +int TSpeculativeOperationPreparationContext::GetOutputCount() const +{ + return static_cast<int>(Outputs_.size()); +} + +const TVector<TTableSchema>& TSpeculativeOperationPreparationContext::GetInputSchemas() const +{ + return InputSchemas_; +} + +const TTableSchema& TSpeculativeOperationPreparationContext::GetInputSchema(int index) const +{ + Y_VERIFY(index < static_cast<int>(InputSchemas_.size())); + return InputSchemas_[index]; +} + +TMaybe<TYPath> TSpeculativeOperationPreparationContext::GetInputPath(int index) const +{ + Y_VERIFY(index < static_cast<int>(Inputs_.size())); + if (Inputs_[index].RichYPath) { + return Inputs_[index].RichYPath->Path_; + } + return Nothing(); +} + +TMaybe<TYPath> TSpeculativeOperationPreparationContext::GetOutputPath(int index) const +{ + Y_VERIFY(index < static_cast<int>(Outputs_.size())); + if (Outputs_[index].RichYPath) { + return Outputs_[index].RichYPath->Path_; + } + return Nothing(); +} + +//////////////////////////////////////////////////////////////////////////////// + +static void FixInputTable(TRichYPath& table, int index, const TJobOperationPreparer& preparer) +{ + const auto& columnRenamings = preparer.GetInputColumnRenamings(); + const auto& columnFilters = preparer.GetInputColumnFilters(); + + if (!columnRenamings[index].empty()) { + table.RenameColumns(columnRenamings[index]); + } + if (columnFilters[index]) { + table.Columns(*columnFilters[index]); + } +} + +static void FixInputTable(TStructuredJobTable& table, int index, const TJobOperationPreparer& preparer) +{ + const auto& inputDescriptions = preparer.GetInputDescriptions(); + + if (inputDescriptions[index] && std::holds_alternative<TUnspecifiedTableStructure>(table.Description)) { + table.Description = *inputDescriptions[index]; + } + if (table.RichYPath) { + FixInputTable(*table.RichYPath, index, preparer); + } +} + +static void FixOutputTable(TRichYPath& /* table */, int /* index */, const TJobOperationPreparer& /* preparer */) +{ } + +static void FixOutputTable(TStructuredJobTable& table, int index, const TJobOperationPreparer& preparer) +{ + const auto& outputDescriptions = preparer.GetOutputDescriptions(); + + if (outputDescriptions[index] && std::holds_alternative<TUnspecifiedTableStructure>(table.Description)) { + table.Description = *outputDescriptions[index]; + } + if (table.RichYPath) { + FixOutputTable(*table.RichYPath, index, preparer); + } +} + +template <typename TTables> +TVector<TTableSchema> PrepareOperation( + const IJob& job, + const IOperationPreparationContext& context, + TTables* inputsPtr, + TTables* outputsPtr, + TUserJobFormatHints& hints) +{ + TJobOperationPreparer preparer(context); + job.PrepareOperation(context, preparer); + preparer.Finish(); + + if (inputsPtr) { + auto& inputs = *inputsPtr; + for (int i = 0; i < static_cast<int>(inputs.size()); ++i) { + FixInputTable(inputs[i], i, preparer); + } + } + + if (outputsPtr) { + auto& outputs = *outputsPtr; + for (int i = 0; i < static_cast<int>(outputs.size()); ++i) { + FixOutputTable(outputs[i], i, preparer); + } + } + + auto applyPatch = [](TMaybe<TFormatHints>& origin, const TMaybe<TFormatHints>& patch) { + if (origin) { + if (patch) { + origin->Merge(*patch); + } + } else { + origin = patch; + } + }; + + auto preparerHints = preparer.GetFormatHints(); + applyPatch(preparerHints.InputFormatHints_, hints.InputFormatHints_); + applyPatch(preparerHints.OutputFormatHints_, hints.OutputFormatHints_); + hints = std::move(preparerHints); + + return preparer.GetOutputSchemas(); +} + +template +TVector<TTableSchema> PrepareOperation<TStructuredJobTableList>( + const IJob& job, + const IOperationPreparationContext& context, + TStructuredJobTableList* inputsPtr, + TStructuredJobTableList* outputsPtr, + TUserJobFormatHints& hints); + +template +TVector<TTableSchema> PrepareOperation<TVector<TRichYPath>>( + const IJob& job, + const IOperationPreparationContext& context, + TVector<TRichYPath>* inputsPtr, + TVector<TRichYPath>* outputsPtr, + TUserJobFormatHints& hints); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NDetail |