diff options
author | Alexander Smirnov <alex@ydb.tech> | 2024-12-25 19:03:15 +0000 |
---|---|---|
committer | Alexander Smirnov <alex@ydb.tech> | 2024-12-25 19:03:15 +0000 |
commit | c895fcc06300c50afa32558268c113bb8a5ee256 (patch) | |
tree | e26dba6120dc7d0fe4360f2ce3697ff7c63fee32 /yt/cpp | |
parent | e3d3da91e05265f579c3dc5a18faa83a926c3d39 (diff) | |
parent | 117eb6fe09e57cf15c95ea6236718f2f70a7105a (diff) | |
download | ydb-c895fcc06300c50afa32558268c113bb8a5ee256.tar.gz |
Merge pull request #12958 from ydb-platform/merge-libs-241224-2313
Diffstat (limited to 'yt/cpp')
43 files changed, 226 insertions, 189 deletions
diff --git a/yt/cpp/mapreduce/client/batch_request_impl.cpp b/yt/cpp/mapreduce/client/batch_request_impl.cpp index 08ffec2bfc..d8084e9c45 100644 --- a/yt/cpp/mapreduce/client/batch_request_impl.cpp +++ b/yt/cpp/mapreduce/client/batch_request_impl.cpp @@ -50,7 +50,7 @@ TBatchRequest::~TBatchRequest() = default; IBatchRequestBase& TBatchRequest::WithTransaction(const TTransactionId& transactionId) { if (!TmpWithTransaction_) { - TmpWithTransaction_.Reset(new TBatchRequest(Impl_.Get(), Client_)); + TmpWithTransaction_.reset(new TBatchRequest(Impl_.Get(), Client_)); } TmpWithTransaction_->DefaultTransaction_ = transactionId; return *TmpWithTransaction_; diff --git a/yt/cpp/mapreduce/client/batch_request_impl.h b/yt/cpp/mapreduce/client/batch_request_impl.h index 0a176417b3..a4d776668f 100644 --- a/yt/cpp/mapreduce/client/batch_request_impl.h +++ b/yt/cpp/mapreduce/client/batch_request_impl.h @@ -124,7 +124,7 @@ private: private: TTransactionId DefaultTransaction_; ::TIntrusivePtr<NDetail::NRawClient::TRawBatchRequest> Impl_; - THolder<TBatchRequest> TmpWithTransaction_; + std::unique_ptr<TBatchRequest> TmpWithTransaction_; ::TIntrusivePtr<TClient> Client_; private: diff --git a/yt/cpp/mapreduce/client/client.cpp b/yt/cpp/mapreduce/client/client.cpp index 9fcb82f5b7..7d73756759 100644 --- a/yt/cpp/mapreduce/client/client.cpp +++ b/yt/cpp/mapreduce/client/client.cpp @@ -940,7 +940,7 @@ TTransaction::TTransaction( : TClientBase(rawClient, context, parentTransactionId, parentClient->GetRetryPolicy()) , TransactionPinger_(parentClient->GetTransactionPinger()) , PingableTx_( - MakeHolder<TPingableTransaction>( + std::make_unique<TPingableTransaction>( rawClient, parentClient->GetRetryPolicy(), context, @@ -1434,7 +1434,7 @@ TYtPoller& TClient::GetYtPoller() // We don't use current client and create new client because YtPoller_ might use // this client during current client shutdown. // That might lead to incrementing of current client refcount and double delete of current client object. - YtPoller_ = MakeHolder<TYtPoller>(Context_, ClientRetryPolicy_); + YtPoller_ = std::make_unique<TYtPoller>(Context_, ClientRetryPolicy_); } return *YtPoller_; } diff --git a/yt/cpp/mapreduce/client/client.h b/yt/cpp/mapreduce/client/client.h index 32a0f9a97e..4ceb7f69b3 100644 --- a/yt/cpp/mapreduce/client/client.h +++ b/yt/cpp/mapreduce/client/client.h @@ -333,7 +333,7 @@ protected: private: ITransactionPingerPtr TransactionPinger_; - THolder<TPingableTransaction> PingableTx_; + std::unique_ptr<TPingableTransaction> PingableTx_; TClientPtr ParentClient_; }; @@ -502,7 +502,7 @@ private: std::atomic<bool> Shutdown_ = false; TMutex Lock_; - THolder<TYtPoller> YtPoller_; + std::unique_ptr<TYtPoller> YtPoller_; }; //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/cpp/mapreduce/client/client_reader.cpp b/yt/cpp/mapreduce/client/client_reader.cpp index e7538a22da..b144b70c02 100644 --- a/yt/cpp/mapreduce/client/client_reader.cpp +++ b/yt/cpp/mapreduce/client/client_reader.cpp @@ -58,7 +58,7 @@ TClientReader::TClientReader( { if (options.CreateTransaction_) { Y_ABORT_UNLESS(transactionPinger, "Internal error: transactionPinger is null"); - ReadTransaction_ = MakeHolder<TPingableTransaction>( + ReadTransaction_ = std::make_unique<TPingableTransaction>( RawClient_, ClientRetryPolicy_, Context_, diff --git a/yt/cpp/mapreduce/client/client_reader.h b/yt/cpp/mapreduce/client/client_reader.h index 3f73080046..cc78d2f3d3 100644 --- a/yt/cpp/mapreduce/client/client_reader.h +++ b/yt/cpp/mapreduce/client/client_reader.h @@ -51,7 +51,7 @@ private: TMaybe<TFormat> Format_; TTableReaderOptions Options_; - THolder<TPingableTransaction> ReadTransaction_; + std::unique_ptr<TPingableTransaction> ReadTransaction_; std::unique_ptr<IInputStream> Input_; diff --git a/yt/cpp/mapreduce/client/file_reader.cpp b/yt/cpp/mapreduce/client/file_reader.cpp index f88b40e38b..1682212f1d 100644 --- a/yt/cpp/mapreduce/client/file_reader.cpp +++ b/yt/cpp/mapreduce/client/file_reader.cpp @@ -47,7 +47,7 @@ TStreamReaderBase::TStreamReaderBase( const TTransactionId& transactionId) : RawClient_(rawClient) , ClientRetryPolicy_(std::move(clientRetryPolicy)) - , ReadTransaction_(MakeHolder<TPingableTransaction>( + , ReadTransaction_(std::make_unique<TPingableTransaction>( RawClient_, ClientRetryPolicy_, context, diff --git a/yt/cpp/mapreduce/client/file_reader.h b/yt/cpp/mapreduce/client/file_reader.h index 8aafdc860d..d3efe90f26 100644 --- a/yt/cpp/mapreduce/client/file_reader.h +++ b/yt/cpp/mapreduce/client/file_reader.h @@ -45,7 +45,7 @@ private: std::unique_ptr<IInputStream> Input_; - THolder<TPingableTransaction> ReadTransaction_; + std::unique_ptr<TPingableTransaction> ReadTransaction_; ui64 CurrentOffset_ = 0; }; diff --git a/yt/cpp/mapreduce/client/init.cpp b/yt/cpp/mapreduce/client/init.cpp index 6121952f86..78c92ee883 100644 --- a/yt/cpp/mapreduce/client/init.cpp +++ b/yt/cpp/mapreduce/client/init.cpp @@ -219,11 +219,11 @@ void ExecJob(int argc, const char** argv, const TInitializeOptions& options) NDetail::OutputTableCount = static_cast<i64>(outputTableCount); - THolder<IInputStream> jobStateStream; + std::unique_ptr<IInputStream> jobStateStream; if (hasState) { - jobStateStream = MakeHolder<TIFStream>("jobstate"); + jobStateStream = std::make_unique<TIFStream>("jobstate"); } else { - jobStateStream = MakeHolder<TBufferStream>(0); + jobStateStream = std::make_unique<TBufferStream>(0); } int ret = 1; diff --git a/yt/cpp/mapreduce/client/operation.cpp b/yt/cpp/mapreduce/client/operation.cpp index 98ed246e0e..9441c7efe2 100644 --- a/yt/cpp/mapreduce/client/operation.cpp +++ b/yt/cpp/mapreduce/client/operation.cpp @@ -969,9 +969,9 @@ void BuildIntermediateDataPart(const TSpec& spec, TNode* nodeSpec) TNode MergeSpec(TNode dst, TNode spec, const TOperationOptions& options) { - MergeNodes(dst["spec"], spec); + MergeNodes(dst, spec); if (options.Spec_) { - MergeNodes(dst["spec"], *options.Spec_); + MergeNodes(dst, *options.Spec_); } return dst; } @@ -1129,7 +1129,7 @@ void DoExecuteMap( spec.Title_ = spec.Title_.GetOrElse(AddModeToTitleIfDebug(map.GetClassName())); TNode specNode = BuildYsonNodeFluently() - .BeginMap().Item("spec").BeginMap() + .BeginMap() .Item("mapper").DoMap([&] (TFluentMap fluent) { BuildUserJobFluently( map, @@ -1148,18 +1148,18 @@ void DoExecuteMap( .DoIf(spec.Ordered_.Defined(), [&] (TFluentMap fluent) { fluent.Item("ordered").Value(spec.Ordered_.GetRef()); }) - .EndMap().EndMap(); + .EndMap(); - BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]); + BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode); - specNode["spec"]["job_io"]["control_attributes"]["enable_row_index"] = TNode(true); - specNode["spec"]["job_io"]["control_attributes"]["enable_range_index"] = TNode(true); + specNode["job_io"]["control_attributes"]["enable_row_index"] = TNode(true); + specNode["job_io"]["control_attributes"]["enable_range_index"] = TNode(true); if (!preparer->GetContext().Config->TableWriter.Empty()) { - specNode["spec"]["job_io"]["table_writer"] = preparer->GetContext().Config->TableWriter; + specNode["job_io"]["table_writer"] = preparer->GetContext().Config->TableWriter; } - BuildCommonUserOperationPart(spec, &specNode["spec"]); - BuildJobCountOperationPart(spec, &specNode["spec"]); + BuildCommonUserOperationPart(spec, &specNode); + BuildJobCountOperationPart(spec, &specNode); auto startOperation = [ operation=operation.Get(), @@ -1168,7 +1168,7 @@ void DoExecuteMap( operationIo, mapper ] () { - auto operationId = preparer->StartOperation(operation, "map", spec); + auto operationId = preparer->StartOperation(operation, EOperationType::Map, spec); LogJob(operationId, mapper.Get(), "mapper"); LogYPaths(operationId, operationIo.Inputs, "input"); @@ -1247,7 +1247,7 @@ void DoExecuteReduce( spec.Title_ = spec.Title_.GetOrElse(AddModeToTitleIfDebug(reduce.GetClassName())); TNode specNode = BuildYsonNodeFluently() - .BeginMap().Item("spec").BeginMap() + .BeginMap() .Item("reducer").DoMap([&] (TFluentMap fluent) { BuildUserJobFluently( reduce, @@ -1280,11 +1280,11 @@ void DoExecuteReduce( .DoIf(spec.AutoMerge_.Defined(), [&] (TFluentMap fluent) { fluent.Item("auto_merge").Value(BuildAutoMergeSpec(*spec.AutoMerge_)); }) - .EndMap().EndMap(); + .EndMap(); - BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]); - BuildCommonUserOperationPart(spec, &specNode["spec"]); - BuildJobCountOperationPart(spec, &specNode["spec"]); + BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode); + BuildCommonUserOperationPart(spec, &specNode); + BuildJobCountOperationPart(spec, &specNode); auto startOperation = [ operation=operation.Get(), @@ -1293,7 +1293,7 @@ void DoExecuteReduce( operationIo, reducer ] () { - auto operationId = preparer->StartOperation(operation, "reduce", spec); + auto operationId = preparer->StartOperation(operation, EOperationType::Reduce, spec); LogJob(operationId, reducer.Get(), "reducer"); LogYPaths(operationId, operationIo.Inputs, "input"); @@ -1373,7 +1373,7 @@ void DoExecuteJoinReduce( spec.Title_ = spec.Title_.GetOrElse(AddModeToTitleIfDebug(reduce.GetClassName())); TNode specNode = BuildYsonNodeFluently() - .BeginMap().Item("spec").BeginMap() + .BeginMap() .Item("reducer").DoMap([&] (TFluentMap fluent) { BuildUserJobFluently( reduce, @@ -1394,11 +1394,11 @@ void DoExecuteJoinReduce( fluent.Item("table_writer").Value(preparer->GetContext().Config->TableWriter); }) .EndMap() - .EndMap().EndMap(); + .EndMap(); - BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]); - BuildCommonUserOperationPart(spec, &specNode["spec"]); - BuildJobCountOperationPart(spec, &specNode["spec"]); + BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode); + BuildCommonUserOperationPart(spec, &specNode); + BuildJobCountOperationPart(spec, &specNode); auto startOperation = [ operation=operation.Get(), @@ -1407,7 +1407,7 @@ void DoExecuteJoinReduce( reducer, operationIo ] () { - auto operationId = preparer->StartOperation(operation, "join_reduce", spec); + auto operationId = preparer->StartOperation(operation, EOperationType::JoinReduce, spec); LogJob(operationId, reducer.Get(), "reducer"); LogYPaths(operationId, operationIo.Inputs, "input"); @@ -1505,7 +1505,7 @@ void DoExecuteMapReduce( TString title; TNode specNode = BuildYsonNodeFluently() - .BeginMap().Item("spec").BeginMap() + .BeginMap() .DoIf(hasMapper, [&] (TFluentMap fluent) { TJobPreparer map( *preparer, @@ -1584,18 +1584,18 @@ void DoExecuteMapReduce( .Do([&] (TFluentMap) { spec.Title_ = spec.Title_.GetOrElse(AddModeToTitleIfDebug(title + "reducer:" + reduce.GetClassName())); }) - .EndMap().EndMap(); + .EndMap(); if (spec.Ordered_) { - specNode["spec"]["ordered"] = *spec.Ordered_; + specNode["ordered"] = *spec.Ordered_; } - BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]); - BuildCommonUserOperationPart(spec, &specNode["spec"]); - BuildMapJobCountOperationPart(spec, &specNode["spec"]); - BuildPartitionCountOperationPart(spec, &specNode["spec"]); - BuildIntermediateDataPart(spec, &specNode["spec"]); - BuildDataSizePerSortJobPart(spec, &specNode["spec"]); + BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode); + BuildCommonUserOperationPart(spec, &specNode); + BuildMapJobCountOperationPart(spec, &specNode); + BuildPartitionCountOperationPart(spec, &specNode); + BuildIntermediateDataPart(spec, &specNode); + BuildDataSizePerSortJobPart(spec, &specNode); auto startOperation = [ operation=operation.Get(), @@ -1607,7 +1607,7 @@ void DoExecuteMapReduce( inputs=operationIo.Inputs, allOutputs ] () { - auto operationId = preparer->StartOperation(operation, "map_reduce", spec); + auto operationId = preparer->StartOperation(operation, EOperationType::MapReduce, spec); LogJob(operationId, mapper.Get(), "mapper"); LogJob(operationId, reduceCombiner.Get(), "reduce_combiner"); @@ -1962,19 +1962,19 @@ void ExecuteSort( } TNode specNode = BuildYsonNodeFluently() - .BeginMap().Item("spec").BeginMap() + .BeginMap() .Item("input_table_paths").List(inputs) .Item("output_table_path").Value(output) .Item("sort_by").Value(spec.SortBy_) .DoIf(spec.SchemaInferenceMode_.Defined(), [&] (TFluentMap fluent) { fluent.Item("schema_inference_mode").Value(ToString(*spec.SchemaInferenceMode_)); }) - .EndMap().EndMap(); + .EndMap(); - BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]); - BuildPartitionCountOperationPart(spec, &specNode["spec"]); - BuildPartitionJobCountOperationPart(spec, &specNode["spec"]); - BuildIntermediateDataPart(spec, &specNode["spec"]); + BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode); + BuildPartitionCountOperationPart(spec, &specNode); + BuildPartitionJobCountOperationPart(spec, &specNode); + BuildIntermediateDataPart(spec, &specNode); auto startOperation = [ operation=operation.Get(), @@ -1983,7 +1983,7 @@ void ExecuteSort( inputs, output ] () { - auto operationId = preparer->StartOperation(operation, "sort", spec); + auto operationId = preparer->StartOperation(operation, EOperationType::Sort, spec); LogYPaths(operationId, inputs, "input"); LogYPath(operationId, output, "output"); @@ -2011,7 +2011,7 @@ void ExecuteMerge( } TNode specNode = BuildYsonNodeFluently() - .BeginMap().Item("spec").BeginMap() + .BeginMap() .Item("input_table_paths").List(inputs) .Item("output_table_path").Value(output) .Item("mode").Value(ToString(spec.Mode_)) @@ -2021,10 +2021,10 @@ void ExecuteMerge( .DoIf(spec.SchemaInferenceMode_.Defined(), [&] (TFluentMap fluent) { fluent.Item("schema_inference_mode").Value(ToString(*spec.SchemaInferenceMode_)); }) - .EndMap().EndMap(); + .EndMap(); - BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]); - BuildJobCountOperationPart(spec, &specNode["spec"]); + BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode); + BuildJobCountOperationPart(spec, &specNode); auto startOperation = [ operation=operation.Get(), @@ -2033,7 +2033,7 @@ void ExecuteMerge( inputs, output ] () { - auto operationId = preparer->StartOperation(operation, "merge", spec); + auto operationId = preparer->StartOperation(operation, EOperationType::Merge, spec); LogYPaths(operationId, inputs, "input"); LogYPath(operationId, output, "output"); @@ -2055,22 +2055,22 @@ void ExecuteErase( auto tablePath = NRawClient::CanonizeYPath(nullptr, preparer->GetContext(), spec.TablePath_); TNode specNode = BuildYsonNodeFluently() - .BeginMap().Item("spec").BeginMap() + .BeginMap() .Item("table_path").Value(tablePath) .Item("combine_chunks").Value(spec.CombineChunks_) .DoIf(spec.SchemaInferenceMode_.Defined(), [&] (TFluentMap fluent) { fluent.Item("schema_inference_mode").Value(ToString(*spec.SchemaInferenceMode_)); }) - .EndMap().EndMap(); + .EndMap(); - BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]); + BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode); auto startOperation = [ operation=operation.Get(), spec=MergeSpec(std::move(specNode), preparer->GetContext().Config->Spec, options), preparer, tablePath ] () { - auto operationId = preparer->StartOperation(operation, "erase", spec); + auto operationId = preparer->StartOperation(operation, EOperationType::Erase, spec); LogYPath(operationId, tablePath, "table_path"); @@ -2098,7 +2098,7 @@ void ExecuteRemoteCopy( Y_ENSURE_EX(!spec.ClusterName_.empty(), TApiUsageError() << "ClusterName parameter is required"); TNode specNode = BuildYsonNodeFluently() - .BeginMap().Item("spec").BeginMap() + .BeginMap() .Item("cluster_name").Value(spec.ClusterName_) .Item("input_table_paths").List(inputs) .Item("output_table_path").Value(output) @@ -2115,9 +2115,9 @@ void ExecuteRemoteCopy( "doesn't make sense without CopyAttributes == true"); fluent.Item("attribute_keys").List(spec.AttributeKeys_); }) - .EndMap().EndMap(); + .EndMap(); - BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]); + BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode); auto startOperation = [ operation=operation.Get(), spec=MergeSpec(specNode, preparer->GetContext().Config->Spec, options), @@ -2125,7 +2125,7 @@ void ExecuteRemoteCopy( inputs, output ] () { - auto operationId = preparer->StartOperation(operation, "remote_copy", spec); + auto operationId = preparer->StartOperation(operation, EOperationType::RemoteCopy, spec); LogYPaths(operationId, inputs, "input"); LogYPath(operationId, output, "output"); @@ -2222,15 +2222,15 @@ void ExecuteVanilla( } TNode specNode = BuildYsonNodeFluently() - .BeginMap().Item("spec").BeginMap() + .BeginMap() .Item("tasks").DoMapFor(spec.Tasks_, addTask) - .EndMap().EndMap(); + .EndMap(); - BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]); - BuildCommonUserOperationPart(spec, &specNode["spec"]); + BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode); + BuildCommonUserOperationPart(spec, &specNode); auto startOperation = [operation=operation.Get(), spec=MergeSpec(std::move(specNode), preparer->GetContext().Config->Spec, options), preparer] () { - auto operationId = preparer->StartOperation(operation, "vanilla", spec, /* useStartOperationRequest */ true); + auto operationId = preparer->StartOperation(operation, EOperationType::Vanilla, spec); return operationId; }; @@ -2784,7 +2784,7 @@ void TOperation::TOperationImpl::AsyncFinishOperation(TOperationAttributes opera void* TOperation::TOperationImpl::SyncFinishOperationProc(void* pArgs) { - THolder<TAsyncFinishOperationsArgs> args(static_cast<TAsyncFinishOperationsArgs*>(pArgs)); + std::unique_ptr<TAsyncFinishOperationsArgs> args(static_cast<TAsyncFinishOperationsArgs*>(pArgs)); args->OperationImpl->SyncFinishOperationImpl(args->OperationAttributes); return nullptr; } @@ -3013,7 +3013,7 @@ struct TAsyncPrepareAndStartOperationArgs void* SyncPrepareAndStartOperation(void* pArgs) { - THolder<TAsyncPrepareAndStartOperationArgs> args(static_cast<TAsyncPrepareAndStartOperationArgs*>(pArgs)); + std::unique_ptr<TAsyncPrepareAndStartOperationArgs> args(static_cast<TAsyncPrepareAndStartOperationArgs*>(pArgs)); args->PrepareAndStart(); return nullptr; } diff --git a/yt/cpp/mapreduce/client/operation_preparer.cpp b/yt/cpp/mapreduce/client/operation_preparer.cpp index e8e4ee26d2..70bd5f8b65 100644 --- a/yt/cpp/mapreduce/client/operation_preparer.cpp +++ b/yt/cpp/mapreduce/client/operation_preparer.cpp @@ -37,7 +37,7 @@ class TWaitOperationStartPollerItem : public IYtPollerItem { public: - TWaitOperationStartPollerItem(TOperationId operationId, THolder<TPingableTransaction> transaction) + TWaitOperationStartPollerItem(TOperationId operationId, std::unique_ptr<TPingableTransaction> transaction) : OperationId_(operationId) , Transaction_(std::move(transaction)) { } @@ -78,7 +78,7 @@ public: private: TOperationId OperationId_; - THolder<TPingableTransaction> Transaction_; + std::unique_ptr<TPingableTransaction> Transaction_; ::NThreading::TFuture<TOperationAttributes> Future_; }; @@ -139,7 +139,7 @@ private: TOperationPreparer::TOperationPreparer(TClientPtr client, TTransactionId transactionId) : Client_(std::move(client)) , TransactionId_(transactionId) - , FileTransaction_(MakeHolder<TPingableTransaction>( + , FileTransaction_(std::make_unique<TPingableTransaction>( Client_->GetRawClient(), Client_->GetRetryPolicy(), Client_->GetContext(), @@ -177,35 +177,26 @@ const IClientRetryPolicyPtr& TOperationPreparer::GetClientRetryPolicy() const TOperationId TOperationPreparer::StartOperation( TOperation* operation, - const TString& operationType, - const TNode& spec, - bool useStartOperationRequest) + EOperationType type, + const TNode& spec) { CheckValidity(); - THttpHeader header("POST", (useStartOperationRequest ? "start_op" : operationType)); - if (useStartOperationRequest) { - header.AddParameter("operation_type", operationType); - } - header.AddTransactionId(TransactionId_); - header.AddMutationId(); - - auto ysonSpec = NodeToYsonString(spec); - auto responseInfo = RetryRequestWithPolicy( + auto operationId = RequestWithRetry<TOperationId>( ::MakeIntrusive<TOperationForwardingRequestRetryPolicy>( ClientRetryPolicy_->CreatePolicyForStartOperationRequest(), TOperationPtr(operation)), - GetContext(), - header, - ysonSpec); - TOperationId operationId = ParseGuidFromResponse(responseInfo.Response); + [this, &type, &spec] (TMutationId& mutationId) { + return Client_->GetRawClient()->StartOperation(mutationId, TransactionId_, type, spec); + }); + YT_LOG_DEBUG("Operation started (OperationId: %v; PreparationId: %v)", operationId, GetPreparationId()); YT_LOG_INFO("Operation %v started (%v): %v", operationId, - operationType, + type, GetOperationWebInterfaceUrl(GetContext().ServerName, operationId)); TOperationExecutionTimeTracker::Get()->Start(operationId); @@ -305,9 +296,9 @@ public: return result; } - THolder<IInputStream> CreateInputStream() const override + std::unique_ptr<IInputStream> CreateInputStream() const override { - return MakeHolder<TFileInput>(FileName_); + return std::make_unique<TFileInput>(FileName_); } TString GetDescription() const override @@ -343,9 +334,9 @@ public: return result; } - THolder<IInputStream> CreateInputStream() const override + std::unique_ptr<IInputStream> CreateInputStream() const override { - return MakeHolder<TMemoryInput>(Data_.data(), Data_.size()); + return std::make_unique<TMemoryInput>(Data_.data(), Data_.size()); } TString GetDescription() const override diff --git a/yt/cpp/mapreduce/client/operation_preparer.h b/yt/cpp/mapreduce/client/operation_preparer.h index 90fd91378c..ef3a790c91 100644 --- a/yt/cpp/mapreduce/client/operation_preparer.h +++ b/yt/cpp/mapreduce/client/operation_preparer.h @@ -28,16 +28,15 @@ public: TOperationId StartOperation( TOperation* operation, - const TString& operationType, - const TNode& spec, - bool useStartOperationRequest = false); + EOperationType type, + const TNode& spec); const IClientRetryPolicyPtr& GetClientRetryPolicy() const; private: TClientPtr Client_; TTransactionId TransactionId_; - THolder<TPingableTransaction> FileTransaction_; + std::unique_ptr<TPingableTransaction> FileTransaction_; IClientRetryPolicyPtr ClientRetryPolicy_; const TString PreparationId_; @@ -54,7 +53,7 @@ struct IItemToUpload virtual ~IItemToUpload() = default; virtual TString CalculateMD5() const = 0; - virtual THolder<IInputStream> CreateInputStream() const = 0; + virtual std::unique_ptr<IInputStream> CreateInputStream() const = 0; virtual TString GetDescription() const = 0; virtual i64 GetDataSize() const = 0; }; diff --git a/yt/cpp/mapreduce/client/retry_heavy_write_request.cpp b/yt/cpp/mapreduce/client/retry_heavy_write_request.cpp index 44c7db3a97..80f54d2340 100644 --- a/yt/cpp/mapreduce/client/retry_heavy_write_request.cpp +++ b/yt/cpp/mapreduce/client/retry_heavy_write_request.cpp @@ -31,7 +31,7 @@ void RetryHeavyWriteRequest( const TClientContext& context, const TTransactionId& parentId, THttpHeader& header, - std::function<THolder<IInputStream>()> streamMaker) + std::function<std::unique_ptr<IInputStream>()> streamMaker) { int retryCount = context.Config->RetryCount; if (context.ServiceTicketAuth) { @@ -63,7 +63,7 @@ void RetryHeavyWriteRequest( GetFullUrlForProxy(hostName, context, header), requestId, header); - TransferData(input.Get(), request->GetStream()); + TransferData(input.get(), request->GetStream()); request->Finish()->GetResponse(); } catch (TErrorResponse& e) { YT_LOG_ERROR("RSP %v - attempt %v failed", @@ -100,7 +100,7 @@ THeavyRequestRetrier::THeavyRequestRetrier(TParameters parameters) : Parameters_(std::move(parameters)) , RequestRetryPolicy_(Parameters_.ClientRetryPolicy->CreatePolicyForGenericRequest()) , StreamFactory_([] { - return MakeHolder<TNullInput>(); + return std::make_unique<TNullInput>(); }) { Retry([] { }); diff --git a/yt/cpp/mapreduce/client/retry_heavy_write_request.h b/yt/cpp/mapreduce/client/retry_heavy_write_request.h index 6933170e96..b65f457239 100644 --- a/yt/cpp/mapreduce/client/retry_heavy_write_request.h +++ b/yt/cpp/mapreduce/client/retry_heavy_write_request.h @@ -26,7 +26,7 @@ public: THttpHeader Header; }; - using TStreamFactory = std::function<THolder<IInputStream>()>; + using TStreamFactory = std::function<std::unique_ptr<IInputStream>()>; public: explicit THeavyRequestRetrier(TParameters parameters); @@ -65,7 +65,7 @@ void RetryHeavyWriteRequest( const TClientContext& context, const TTransactionId& parentId, THttpHeader& header, - std::function<THolder<IInputStream>()> streamMaker); + std::function<std::unique_ptr<IInputStream>()> streamMaker); //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/cpp/mapreduce/client/retryful_writer.cpp b/yt/cpp/mapreduce/client/retryful_writer.cpp index 41ad1298cd..666229d7a6 100644 --- a/yt/cpp/mapreduce/client/retryful_writer.cpp +++ b/yt/cpp/mapreduce/client/retryful_writer.cpp @@ -104,7 +104,7 @@ void TRetryfulWriter::Send(const TBuffer& buffer) header.MergeParameters(Parameters_); auto streamMaker = [&buffer] () { - return MakeHolder<TBufferInput>(buffer); + return std::make_unique<TBufferInput>(buffer); }; auto transactionId = (WriteTransaction_ ? WriteTransaction_->GetId() : ParentTransactionId_); diff --git a/yt/cpp/mapreduce/client/retryful_writer_v2.cpp b/yt/cpp/mapreduce/client/retryful_writer_v2.cpp index 40198baaef..c11f20ce3f 100644 --- a/yt/cpp/mapreduce/client/retryful_writer_v2.cpp +++ b/yt/cpp/mapreduce/client/retryful_writer_v2.cpp @@ -176,7 +176,7 @@ private: void ThreadMain(TRichYPath path, const THeavyRequestRetrier::TParameters& parameters) { - THolder<THeavyRequestRetrier> retrier; + std::unique_ptr<THeavyRequestRetrier> retrier; auto firstRequestParameters = parameters; auto restRequestParameters = parameters; @@ -218,14 +218,14 @@ private: try { if (!retrier) { - retrier = MakeHolder<THeavyRequestRetrier>(*currentParameters); + retrier = std::make_unique<THeavyRequestRetrier>(*currentParameters); } retrier->Update([task=task] { - return MakeHolder<TMemoryInput>(task.Data->data(), task.Size); + return std::make_unique<TMemoryInput>(task.Data->data(), task.Size); }); if (task.BufferComplete) { retrier->Finish(); - retrier.Reset(); + retrier.reset(); } } catch (const std::exception& ex) { task.SendingComplete.SetException(std::current_exception()); @@ -235,7 +235,7 @@ private: } if (task.BufferComplete) { - retrier.Reset(); + retrier.reset(); task.SendingComplete.SetValue(); currentParameters = &restRequestParameters; @@ -303,15 +303,15 @@ TRetryfulWriterV2::TRetryfulWriterV2( ssize_t bufferSize, bool createTransaction) : BufferSize_(bufferSize) - , Current_(MakeHolder<TSendTask>()) - , Previous_(MakeHolder<TSendTask>()) + , Current_(std::make_unique<TSendTask>()) + , Previous_(std::make_unique<TSendTask>()) { THttpHeader httpHeader("PUT", command); httpHeader.SetInputFormat(format); httpHeader.MergeParameters(serializedWriterOptions); if (createTransaction) { - WriteTransaction_ = MakeHolder<TPingableTransaction>( + WriteTransaction_ = std::make_unique<TPingableTransaction>( rawClient, clientRetryPolicy, context, @@ -337,7 +337,7 @@ TRetryfulWriterV2::TRetryfulWriterV2( .Header = std::move(httpHeader), }; - Sender_ = MakeHolder<TSender>(path, parameters); + Sender_ = std::make_unique<TSender>(path, parameters); DoStartBatch(); } diff --git a/yt/cpp/mapreduce/client/retryful_writer_v2.h b/yt/cpp/mapreduce/client/retryful_writer_v2.h index 661ef5d0b5..6d314307ec 100644 --- a/yt/cpp/mapreduce/client/retryful_writer_v2.h +++ b/yt/cpp/mapreduce/client/retryful_writer_v2.h @@ -49,11 +49,11 @@ private: const ssize_t BufferSize_; const ssize_t SendStep_ = 64_KB; ssize_t NextSizeToSend_; - THolder<TSender> Sender_; - THolder<TPingableTransaction> WriteTransaction_; + std::unique_ptr<TSender> Sender_; + std::unique_ptr<TPingableTransaction> WriteTransaction_; - THolder<TSendTask> Current_; - THolder<TSendTask> Previous_; + std::unique_ptr<TSendTask> Current_; + std::unique_ptr<TSendTask> Previous_; }; //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/cpp/mapreduce/client/retryless_writer.h b/yt/cpp/mapreduce/client/retryless_writer.h index 6916cddbf6..689c1c5f2f 100644 --- a/yt/cpp/mapreduce/client/retryless_writer.h +++ b/yt/cpp/mapreduce/client/retryless_writer.h @@ -56,7 +56,7 @@ public: auto hostName = GetProxyForHeavyRequest(context); UpdateHeaderForProxyIfNeed(hostName, context, header); Request_ = context.HttpClient->StartRequest(GetFullUrl(hostName, context, header), requestId, header); - BufferedOutput_.Reset(new TBufferedOutput(Request_->GetStream(), BufferSize_)); + BufferedOutput_ = std::make_unique<TBufferedOutput>(Request_->GetStream(), BufferSize_); } ~TRetrylessWriter() override; @@ -75,7 +75,7 @@ private: bool Running_ = true; NHttpClient::IHttpRequestPtr Request_; - THolder<TBufferedOutput> BufferedOutput_; + std::unique_ptr<TBufferedOutput> BufferedOutput_; }; //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/cpp/mapreduce/client/transaction_pinger.cpp b/yt/cpp/mapreduce/client/transaction_pinger.cpp index ea42867715..074249efce 100644 --- a/yt/cpp/mapreduce/client/transaction_pinger.cpp +++ b/yt/cpp/mapreduce/client/transaction_pinger.cpp @@ -228,7 +228,7 @@ public: PingableTx_ = &pingableTx; Running_ = true; - PingerThread_ = MakeHolder<TThread>( + PingerThread_ = std::make_unique<TThread>( TThread::TParams{Pinger, this}.SetName("pingable_tx")); PingerThread_->Start(); } @@ -284,7 +284,7 @@ private: const TPingableTransaction* PingableTx_ = nullptr; std::atomic<bool> Running_ = false; - THolder<TThread> PingerThread_; + std::unique_ptr<TThread> PingerThread_; }; //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/cpp/mapreduce/http/http.cpp b/yt/cpp/mapreduce/http/http.cpp index 51253feec2..765a96f042 100644 --- a/yt/cpp/mapreduce/http/http.cpp +++ b/yt/cpp/mapreduce/http/http.cpp @@ -545,7 +545,7 @@ TConnectionPtr TConnectionPool::Connect( TSocketHolder socket(DoConnect(networkAddress)); SetNonBlock(socket, false); - connection->Socket.Reset(new TSocket(socket.Release())); + connection->Socket = std::make_unique<TSocket>(socket.Release()); connection->DeadLine = TInstant::Now() + socketTimeout; connection->Socket->SetSocketTimeout(socketTimeout.Seconds()); @@ -754,7 +754,7 @@ private: THttpResponse::THttpResponse( TRequestContext context, IInputStream* socketStream) - : HttpInput_(MakeHolder<THttpInputWrapped>(context, socketStream)) + : HttpInput_(std::make_unique<THttpInputWrapped>(context, socketStream)) , Unframe_(HttpInput_->Headers().HasHeader("X-YT-Framing")) , Context_(std::move(context)) { @@ -935,7 +935,7 @@ bool THttpResponse::RefreshFrameIfNecessary() case EFrameType::KeepAlive: break; case EFrameType::Data: - RemainingFrameSize_ = ReadDataFrameSize(HttpInput_.Get()); + RemainingFrameSize_ = ReadDataFrameSize(HttpInput_.get()); break; default: ythrow yexception() << "Bad frame type " << static_cast<int>(frameTypeByte); @@ -1027,10 +1027,10 @@ IOutputStream* THttpRequest::StartRequestImpl(bool includeParameters) LogResponse_ = true; } - RequestStream_ = MakeHolder<TRequestStream>(this, *Connection_->Socket.Get()); + RequestStream_ = std::make_unique<TRequestStream>(this, *Connection_->Socket.get()); RequestStream_->Write(strHeader.data(), strHeader.size()); - return RequestStream_.Get(); + return RequestStream_.get(); } IOutputStream* THttpRequest::StartRequest() @@ -1064,16 +1064,16 @@ void THttpRequest::SmallRequest(TMaybe<TStringBuf> body) THttpResponse* THttpRequest::GetResponseStream() { if (!Input_) { - SocketInput_.Reset(new TSocketInput(*Connection_->Socket.Get())); + SocketInput_ = std::make_unique<TSocketInput>(*Connection_->Socket.get()); if (TConfig::Get()->UseAbortableResponse) { Y_ABORT_UNLESS(!Url_.empty()); - Input_.Reset(new TAbortableHttpResponse(Context_, SocketInput_.Get(), Url_)); + Input_ = std::make_unique<TAbortableHttpResponse>(Context_, SocketInput_.get(), Url_); } else { - Input_.Reset(new THttpResponse(Context_, SocketInput_.Get())); + Input_ = std::make_unique<THttpResponse>(Context_, SocketInput_.get()); } Input_->CheckErrorResponse(); } - return Input_.Get(); + return Input_.get(); } TString THttpRequest::GetResponse() diff --git a/yt/cpp/mapreduce/http/http.h b/yt/cpp/mapreduce/http/http.h index 0f5e9034ee..45cba99861 100644 --- a/yt/cpp/mapreduce/http/http.h +++ b/yt/cpp/mapreduce/http/http.h @@ -138,7 +138,7 @@ private: struct TConnection { - THolder<TSocket> Socket; + std::unique_ptr<TSocket> Socket; TAtomic Busy = 1; TInstant DeadLine; ui32 Id; @@ -206,7 +206,7 @@ private: class THttpInputWrapped; private: - THolder<THttpInputWrapped> HttpInput_; + std::unique_ptr<THttpInputWrapped> HttpInput_; const bool Unframe_; @@ -257,10 +257,10 @@ private: TConnectionPtr Connection_; - THolder<TRequestStream> RequestStream_; + std::unique_ptr<TRequestStream> RequestStream_; - THolder<TSocketInput> SocketInput_; - THolder<THttpResponse> Input_; + std::unique_ptr<TSocketInput> SocketInput_; + std::unique_ptr<THttpResponse> Input_; bool LogResponse_ = false; }; diff --git a/yt/cpp/mapreduce/http/ut/connection_pool_ut.cpp b/yt/cpp/mapreduce/http/ut/connection_pool_ut.cpp index fa072675fb..a8f6856aee 100644 --- a/yt/cpp/mapreduce/http/ut/connection_pool_ut.cpp +++ b/yt/cpp/mapreduce/http/ut/connection_pool_ut.cpp @@ -32,10 +32,10 @@ namespace { } } // namespace -THolder<TSimpleServer> CreateSimpleHttpServer() +std::unique_ptr<TSimpleServer> CreateSimpleHttpServer() { auto port = NTesting::GetFreePort(); - return MakeHolder<TSimpleServer>( + return std::make_unique<TSimpleServer>( port, [] (IInputStream* input, IOutputStream* output) { try { @@ -57,10 +57,10 @@ THolder<TSimpleServer> CreateSimpleHttpServer() }); } -THolder<TSimpleServer> CreateProxyHttpServer() +std::unique_ptr<TSimpleServer> CreateProxyHttpServer() { auto port = NTesting::GetFreePort(); - return MakeHolder<TSimpleServer>( + return std::make_unique<TSimpleServer>( port, [] (IInputStream* input, IOutputStream* output) { try { @@ -182,9 +182,9 @@ TEST(TConnectionPool, TestConcurrency) } }; - TVector<THolder<TFuncThread>> threads; + TVector<std::unique_ptr<TFuncThread>> threads; for (int i = 0; i != 10; ++i) { - threads.push_back(MakeHolder<TFuncThread>(func)); + threads.push_back(std::make_unique<TFuncThread>(func)); }; for (auto& t : threads) { diff --git a/yt/cpp/mapreduce/http/ut/http_ut.cpp b/yt/cpp/mapreduce/http/ut/http_ut.cpp index e41e83c5a0..2d382f6aa0 100644 --- a/yt/cpp/mapreduce/http/ut/http_ut.cpp +++ b/yt/cpp/mapreduce/http/ut/http_ut.cpp @@ -21,10 +21,10 @@ void WriteDataFrame(TStringBuf string, IOutputStream* stream) stream->Write(string); } -THolder<TSimpleServer> CreateFramingEchoServer() +std::unique_ptr<TSimpleServer> CreateFramingEchoServer() { auto port = NTesting::GetFreePort(); - return MakeHolder<TSimpleServer>( + return std::make_unique<TSimpleServer>( port, [] (IInputStream* input, IOutputStream* output) { try { diff --git a/yt/cpp/mapreduce/http/ut/simple_server.cpp b/yt/cpp/mapreduce/http/ut/simple_server.cpp index fbc369ec20..111010a83d 100644 --- a/yt/cpp/mapreduce/http/ut/simple_server.cpp +++ b/yt/cpp/mapreduce/http/ut/simple_server.cpp @@ -23,9 +23,9 @@ TSimpleServer::TSimpleServer(int port, TRequestHandler requestHandler) ret = listenSocket->Listen(10); Y_ENSURE_EX(ret == 0, TSystemError() << "Can not listen socket"); - SendFinishSocket_ = MakeHolder<TInetStreamSocket>(socketPair[1]); + SendFinishSocket_ = std::make_unique<TInetStreamSocket>(socketPair[1]); - ThreadPool_ = MakeHolder<TAdaptiveThreadPool>(); + ThreadPool_ = std::make_unique<TAdaptiveThreadPool>(); ThreadPool_->Start(1); auto receiveFinish = MakeAtomicShared<TInetStreamSocket>(socketPair[0]); @@ -76,7 +76,7 @@ void TSimpleServer::Stop() SendFinishSocket_->Send("X", 1); ListenerThread_->Join(); ThreadPool_->Stop(); - ThreadPool_.Destroy(); + ThreadPool_.reset(); } int TSimpleServer::GetPort() const diff --git a/yt/cpp/mapreduce/http/ut/simple_server.h b/yt/cpp/mapreduce/http/ut/simple_server.h index f468ca55a6..f9f7287126 100644 --- a/yt/cpp/mapreduce/http/ut/simple_server.h +++ b/yt/cpp/mapreduce/http/ut/simple_server.h @@ -29,7 +29,7 @@ public: private: const int Port_; - THolder<IThreadPool> ThreadPool_; + std::unique_ptr<IThreadPool> ThreadPool_; THolder<IThreadFactory::IThread> ListenerThread_; - THolder<TInetStreamSocket> SendFinishSocket_; + std::unique_ptr<TInetStreamSocket> SendFinishSocket_; }; diff --git a/yt/cpp/mapreduce/interface/io-inl.h b/yt/cpp/mapreduce/interface/io-inl.h index ffaf71f0a2..056910f785 100644 --- a/yt/cpp/mapreduce/interface/io-inl.h +++ b/yt/cpp/mapreduce/interface/io-inl.h @@ -350,11 +350,11 @@ public: return TBase::DoGetRowCached( /* cacher */ [&] { - CachedRow_.Reset(new U); - Reader_->ReadRow(CachedRow_.Get()); + CachedRow_.reset(new U); + Reader_->ReadRow(CachedRow_.get()); }, /* cacheGetter */ [&] { - auto result = dynamic_cast<const U*>(CachedRow_.Get()); + auto result = dynamic_cast<const U*>(CachedRow_.get()); Y_ABORT_UNLESS(result); return result; }); @@ -371,7 +371,7 @@ public: Reader_->ReadRow(result); }, /* cacheMover */ [&] (U* result) { - auto cast = dynamic_cast<U*>(CachedRow_.Get()); + auto cast = dynamic_cast<U*>(CachedRow_.get()); Y_ABORT_UNLESS(cast); result->Swap(cast); }); @@ -394,7 +394,7 @@ public: private: using TBase::Reader_; - mutable THolder<Message> CachedRow_; + mutable std::unique_ptr<Message> CachedRow_; }; template<class... TProtoRowTypes> diff --git a/yt/cpp/mapreduce/interface/raw_client.h b/yt/cpp/mapreduce/interface/raw_client.h index 4994826863..ff2dafeb6d 100644 --- a/yt/cpp/mapreduce/interface/raw_client.h +++ b/yt/cpp/mapreduce/interface/raw_client.h @@ -139,6 +139,12 @@ public: // Operations + virtual TOperationId StartOperation( + TMutationId& mutationId, + const TTransactionId& transactionId, + EOperationType type, + const TNode& spec) = 0; + virtual TOperationAttributes GetOperation( const TOperationId& operationId, const TGetOperationOptions& options = {}) = 0; diff --git a/yt/cpp/mapreduce/io/node_table_reader.cpp b/yt/cpp/mapreduce/io/node_table_reader.cpp index bc3da75ee6..558c42b30e 100644 --- a/yt/cpp/mapreduce/io/node_table_reader.cpp +++ b/yt/cpp/mapreduce/io/node_table_reader.cpp @@ -35,7 +35,7 @@ public: void Finalize(); private: - THolder<TNodeBuilder> Builder_; + std::unique_ptr<TNodeBuilder> Builder_; TRowElement Row_; int Depth_ = 0; bool Started_ = false; @@ -143,7 +143,7 @@ void TRowBuilder::SaveResultRow() *ResultRow_ = std::move(Row_); } Row_.Reset(); - Builder_.Reset(new TNodeBuilder(&Row_.Node)); + Builder_ = std::make_unique<TNodeBuilder>(&Row_.Node); } void TRowBuilder::Finalize() @@ -346,8 +346,8 @@ bool TNodeTableReader::IsRawReaderExhausted() const void TNodeTableReader::PrepareParsing() { NextRow_.Clear(); - Builder_.Reset(new TRowBuilder(&NextRow_)); - Parser_.Reset(new ::NYson::TYsonListParser(Builder_.Get(), &Input_)); + Builder_ = std::make_unique<TRowBuilder>(&NextRow_); + Parser_ = std::make_unique<::NYson::TYsonListParser>(Builder_.get(), &Input_); } void TNodeTableReader::OnStreamError(std::exception_ptr exception, TString error) diff --git a/yt/cpp/mapreduce/io/node_table_reader.h b/yt/cpp/mapreduce/io/node_table_reader.h index 38cb440632..c8e319ce4f 100644 --- a/yt/cpp/mapreduce/io/node_table_reader.h +++ b/yt/cpp/mapreduce/io/node_table_reader.h @@ -79,8 +79,8 @@ private: TMaybe<TRowElement> Row_; TMaybe<TRowElement> NextRow_; - THolder<TRowBuilder> Builder_; - THolder<::NYson::TYsonListParser> Parser_; + std::unique_ptr<TRowBuilder> Builder_; + std::unique_ptr<::NYson::TYsonListParser> Parser_; std::exception_ptr Exception_; bool NeedParseFirst_ = true; diff --git a/yt/cpp/mapreduce/io/node_table_writer.cpp b/yt/cpp/mapreduce/io/node_table_writer.cpp index 916dec7ae4..c516c7b4ee 100644 --- a/yt/cpp/mapreduce/io/node_table_writer.cpp +++ b/yt/cpp/mapreduce/io/node_table_writer.cpp @@ -13,11 +13,11 @@ namespace NYT { //////////////////////////////////////////////////////////////////////////////// TNodeTableWriter::TNodeTableWriter(THolder<IProxyOutput> output, NYson::EYsonFormat format) - : Output_(std::move(output)) + : Output_(output.Release()) { for (size_t i = 0; i < Output_->GetStreamCount(); ++i) { Writers_.push_back( - MakeHolder<::NYson::TYsonWriter>(Output_->GetStream(i), format, NYT::NYson::EYsonType::ListFragment)); + std::make_unique<::NYson::TYsonWriter>(Output_->GetStream(i), format, NYT::NYson::EYsonType::ListFragment)); } } @@ -54,7 +54,7 @@ void TNodeTableWriter::AddRow(const TNode& row, size_t tableIndex) } } - auto* writer = Writers_[tableIndex].Get(); + auto* writer = Writers_[tableIndex].get(); writer->OnListItem(); TNodeVisitor visitor(writer); diff --git a/yt/cpp/mapreduce/io/node_table_writer.h b/yt/cpp/mapreduce/io/node_table_writer.h index 155bec076d..0bc7a58557 100644 --- a/yt/cpp/mapreduce/io/node_table_writer.h +++ b/yt/cpp/mapreduce/io/node_table_writer.h @@ -25,8 +25,8 @@ public: void Abort() override; private: - THolder<IProxyOutput> Output_; - TVector<THolder<::NYson::TYsonWriter>> Writers_; + std::unique_ptr<IProxyOutput> Output_; + TVector<std::unique_ptr<::NYson::TYsonWriter>> Writers_; }; //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/cpp/mapreduce/io/proto_table_reader.h b/yt/cpp/mapreduce/io/proto_table_reader.h index 8452545005..bfe4ac5647 100644 --- a/yt/cpp/mapreduce/io/proto_table_reader.h +++ b/yt/cpp/mapreduce/io/proto_table_reader.h @@ -33,7 +33,7 @@ public: bool IsRawReaderExhausted() const override; private: - THolder<TNodeTableReader> NodeReader_; + std::unique_ptr<TNodeTableReader> NodeReader_; TVector<const ::google::protobuf::Descriptor*> Descriptors_; }; diff --git a/yt/cpp/mapreduce/io/proto_table_writer.cpp b/yt/cpp/mapreduce/io/proto_table_writer.cpp index f56784eb60..ff0dbddc71 100644 --- a/yt/cpp/mapreduce/io/proto_table_writer.cpp +++ b/yt/cpp/mapreduce/io/proto_table_writer.cpp @@ -102,7 +102,7 @@ TNode MakeNodeFromMessage(const Message& row) TProtoTableWriter::TProtoTableWriter( THolder<IProxyOutput> output, TVector<const Descriptor*>&& descriptors) - : NodeWriter_(new TNodeTableWriter(std::move(output))) + : NodeWriter_(std::make_unique<TNodeTableWriter>(std::move(output))) , Descriptors_(std::move(descriptors)) { } @@ -145,7 +145,7 @@ void TProtoTableWriter::Abort() TLenvalProtoTableWriter::TLenvalProtoTableWriter( THolder<IProxyOutput> output, TVector<const Descriptor*>&& descriptors) - : Output_(std::move(output)) + : Output_(output.Release()) , Descriptors_(std::move(descriptors)) { } diff --git a/yt/cpp/mapreduce/io/proto_table_writer.h b/yt/cpp/mapreduce/io/proto_table_writer.h index 336230f55f..8dae9aaf79 100644 --- a/yt/cpp/mapreduce/io/proto_table_writer.h +++ b/yt/cpp/mapreduce/io/proto_table_writer.h @@ -27,7 +27,7 @@ public: void Abort() override; private: - THolder<TNodeTableWriter> NodeWriter_; + std::unique_ptr<TNodeTableWriter> NodeWriter_; TVector<const ::google::protobuf::Descriptor*> Descriptors_; }; @@ -51,7 +51,7 @@ public: void Abort() override; protected: - THolder<IProxyOutput> Output_; + std::unique_ptr<IProxyOutput> Output_; TVector<const ::google::protobuf::Descriptor*> Descriptors_; }; diff --git a/yt/cpp/mapreduce/io/ut/yamr_table_reader_ut.cpp b/yt/cpp/mapreduce/io/ut/yamr_table_reader_ut.cpp index fc20be017f..b45f47c64c 100644 --- a/yt/cpp/mapreduce/io/ut/yamr_table_reader_ut.cpp +++ b/yt/cpp/mapreduce/io/ut/yamr_table_reader_ut.cpp @@ -56,13 +56,13 @@ public: TTestRawTableReader(const TRowCollection& rowCollection) : RowCollection_(rowCollection) , DataToRead_(RowCollection_.GetStreamDataStartFromRow(0)) - , Input_(MakeHolder<TStringStream>(DataToRead_)) + , Input_(std::make_unique<TStringStream>(DataToRead_)) { } TTestRawTableReader(const TRowCollection& rowCollection, size_t breakPoint) : RowCollection_(rowCollection) , DataToRead_(RowCollection_.GetStreamDataStartFromRow(0).substr(0, breakPoint)) - , Input_(MakeHolder<TStringStream>(DataToRead_)) + , Input_(std::make_unique<TStringStream>(DataToRead_)) , Broken_(true) { } @@ -86,7 +86,7 @@ public: } ui64 actualRowIndex = rowIndex ? *rowIndex : 0; DataToRead_ = RowCollection_.GetStreamDataStartFromRow(actualRowIndex); - Input_ = MakeHolder<TStringInput>(DataToRead_); + Input_ = std::make_unique<TStringInput>(DataToRead_); Broken_ = false; return true; } @@ -102,7 +102,7 @@ public: private: TRowCollection RowCollection_; TString DataToRead_; - THolder<IInputStream> Input_; + std::unique_ptr<IInputStream> Input_; bool Broken_ = false; i32 Retries = 1; }; diff --git a/yt/cpp/mapreduce/io/yamr_table_writer.cpp b/yt/cpp/mapreduce/io/yamr_table_writer.cpp index fe31eb5543..dcfacb7541 100644 --- a/yt/cpp/mapreduce/io/yamr_table_writer.cpp +++ b/yt/cpp/mapreduce/io/yamr_table_writer.cpp @@ -7,7 +7,7 @@ namespace NYT { //////////////////////////////////////////////////////////////////////////////// TYaMRTableWriter::TYaMRTableWriter(THolder<IProxyOutput> output) - : Output_(std::move(output)) + : Output_(output.Release()) { } TYaMRTableWriter::~TYaMRTableWriter() diff --git a/yt/cpp/mapreduce/io/yamr_table_writer.h b/yt/cpp/mapreduce/io/yamr_table_writer.h index 7f72c8005a..f976a35832 100644 --- a/yt/cpp/mapreduce/io/yamr_table_writer.h +++ b/yt/cpp/mapreduce/io/yamr_table_writer.h @@ -24,7 +24,7 @@ public: void Abort() override; private: - THolder<IProxyOutput> Output_; + std::unique_ptr<IProxyOutput> Output_; }; //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/cpp/mapreduce/library/user_job_statistics/user_job_statistics.cpp b/yt/cpp/mapreduce/library/user_job_statistics/user_job_statistics.cpp index 56ab88ee9d..7cc5a84e6a 100644 --- a/yt/cpp/mapreduce/library/user_job_statistics/user_job_statistics.cpp +++ b/yt/cpp/mapreduce/library/user_job_statistics/user_job_statistics.cpp @@ -42,8 +42,8 @@ void TUserJobStatsProxy::Init(IOutputStream * usingStream) { if (usingStream == nullptr) { TFileHandle fixedDesrc(JobStatisticsHandle); - FetchedOut = MakeHolder<TFixedBufferFileOutput>(TFile(fixedDesrc.Duplicate())); - UsingStream = FetchedOut.Get(); + FetchedOut = std::make_unique<TFixedBufferFileOutput>(TFile(fixedDesrc.Duplicate())); + UsingStream = FetchedOut.get(); fixedDesrc.Release(); } else { UsingStream = usingStream; @@ -55,8 +55,8 @@ void TUserJobStatsProxy::InitChecked(IOutputStream* def) { if (usingStream == nullptr && !GetEnv("YT_JOB_ID").empty()) { TFileHandle fixedDesrc(JobStatisticsHandle); - FetchedOut = MakeHolder<TFixedBufferFileOutput>(TFile(fixedDesrc.Duplicate())); - UsingStream = FetchedOut.Get(); + FetchedOut = std::make_unique<TFixedBufferFileOutput>(TFile(fixedDesrc.Duplicate())); + UsingStream = FetchedOut.get(); fixedDesrc.Release(); } else { UsingStream = def; @@ -89,7 +89,7 @@ void TUserJobStatsProxy::CommitStats() { TTimeStatHolder TUserJobStatsProxy::TimerStart(TString name, bool commitOnFinish) { - return THolder(new TTimeStat(this, name, commitOnFinish)); + return std::unique_ptr<TTimeStat>(new TTimeStat(this, name, commitOnFinish)); } void TUserJobStatsProxy::WriteStat(TString name, i64 val) { diff --git a/yt/cpp/mapreduce/library/user_job_statistics/user_job_statistics.h b/yt/cpp/mapreduce/library/user_job_statistics/user_job_statistics.h index 6939d20417..928c79864b 100644 --- a/yt/cpp/mapreduce/library/user_job_statistics/user_job_statistics.h +++ b/yt/cpp/mapreduce/library/user_job_statistics/user_job_statistics.h @@ -6,13 +6,13 @@ namespace NYtTools { class TTimeStat; - using TTimeStatHolder = THolder<TTimeStat>; + using TTimeStatHolder = std::unique_ptr<TTimeStat>; class TUserJobStatsProxy { public: static const FHANDLE JobStatisticsHandle; private: - THolder<IOutputStream> FetchedOut; + std::unique_ptr<IOutputStream> FetchedOut; IOutputStream* UsingStream = &Cerr; public: // TODO: add inheritance diff --git a/yt/cpp/mapreduce/raw_client/raw_client.cpp b/yt/cpp/mapreduce/raw_client/raw_client.cpp index 65bfa01cea..2868e10f6e 100644 --- a/yt/cpp/mapreduce/raw_client/raw_client.cpp +++ b/yt/cpp/mapreduce/raw_client/raw_client.cpp @@ -289,6 +289,18 @@ void THttpRawClient::CommitTransaction( RequestWithoutRetry(Context_, mutationId, header)->GetResponse(); } +TOperationId THttpRawClient::StartOperation( + TMutationId& mutationId, + const TTransactionId& transactionId, + EOperationType type, + const TNode& spec) +{ + THttpHeader header("POST", "start_op"); + header.AddMutationId(); + header.MergeParameters(NRawClient::SerializeParamsForStartOperation(transactionId, type, spec)); + return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse()); +} + TOperationAttributes THttpRawClient::GetOperation( const TOperationId& operationId, const TGetOperationOptions& options) diff --git a/yt/cpp/mapreduce/raw_client/raw_client.h b/yt/cpp/mapreduce/raw_client/raw_client.h index e540d1b331..1b3e274507 100644 --- a/yt/cpp/mapreduce/raw_client/raw_client.h +++ b/yt/cpp/mapreduce/raw_client/raw_client.h @@ -136,6 +136,12 @@ public: // Operations + TOperationId StartOperation( + TMutationId& mutationId, + const TTransactionId& transactionId, + EOperationType type, + const TNode& spec) override; + TOperationAttributes GetOperation( const TOperationId& operationId, const TGetOperationOptions& options = {}) override; diff --git a/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp index 2869ddcc0f..98ef5ed099 100644 --- a/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp +++ b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp @@ -393,7 +393,21 @@ TNode SerializeParamsForListOperations( return result; } -TNode SerializeParamsForGetOperation(const std::variant<TString, TOperationId>& aliasOrOperationId, const TGetOperationOptions& options) +TNode SerializeParamsForStartOperation( + const TTransactionId& transactionId, + EOperationType type, + const TNode& spec) +{ + TNode result; + SetTransactionIdParam(&result, transactionId); + result["operation_type"] = ToString(type); + result["spec"] = spec; + return result; +} + +TNode SerializeParamsForGetOperation( + const std::variant<TString, TOperationId>& aliasOrOperationId, + const TGetOperationOptions& options) { auto includeRuntime = options.IncludeRuntime_; TNode result; diff --git a/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h index acbf003b5c..308dcfea64 100644 --- a/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h +++ b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h @@ -1,8 +1,10 @@ #pragma once #include <yt/cpp/mapreduce/common/helpers.h> -#include <yt/cpp/mapreduce/interface/fwd.h> + #include <yt/cpp/mapreduce/interface/client_method_options.h> +#include <yt/cpp/mapreduce/interface/fwd.h> +#include <yt/cpp/mapreduce/interface/operation.h> namespace NYT::NDetail::NRawClient { @@ -95,7 +97,14 @@ TNode SerializeParamsForConcatenate( TNode SerializeParamsForPingTx( const TTransactionId& transactionId); -TNode SerializeParamsForGetOperation(const std::variant<TString, TOperationId>& aliasOrOperationId, const TGetOperationOptions& options); +TNode SerializeParamsForStartOperation( + const TTransactionId& transactionId, + EOperationType type, + const TNode& spec); + +TNode SerializeParamsForGetOperation( + const std::variant<TString, TOperationId>& aliasOrOperationId, + const TGetOperationOptions& options); TNode SerializeParamsForAbortOperation( const TOperationId& operationId); |