aboutsummaryrefslogtreecommitdiffstats
path: root/yt/cpp
diff options
context:
space:
mode:
authorAlexander Smirnov <alex@ydb.tech>2024-12-25 19:03:15 +0000
committerAlexander Smirnov <alex@ydb.tech>2024-12-25 19:03:15 +0000
commitc895fcc06300c50afa32558268c113bb8a5ee256 (patch)
treee26dba6120dc7d0fe4360f2ce3697ff7c63fee32 /yt/cpp
parente3d3da91e05265f579c3dc5a18faa83a926c3d39 (diff)
parent117eb6fe09e57cf15c95ea6236718f2f70a7105a (diff)
downloadydb-c895fcc06300c50afa32558268c113bb8a5ee256.tar.gz
Merge pull request #12958 from ydb-platform/merge-libs-241224-2313
Diffstat (limited to 'yt/cpp')
-rw-r--r--yt/cpp/mapreduce/client/batch_request_impl.cpp2
-rw-r--r--yt/cpp/mapreduce/client/batch_request_impl.h2
-rw-r--r--yt/cpp/mapreduce/client/client.cpp4
-rw-r--r--yt/cpp/mapreduce/client/client.h4
-rw-r--r--yt/cpp/mapreduce/client/client_reader.cpp2
-rw-r--r--yt/cpp/mapreduce/client/client_reader.h2
-rw-r--r--yt/cpp/mapreduce/client/file_reader.cpp2
-rw-r--r--yt/cpp/mapreduce/client/file_reader.h2
-rw-r--r--yt/cpp/mapreduce/client/init.cpp6
-rw-r--r--yt/cpp/mapreduce/client/operation.cpp120
-rw-r--r--yt/cpp/mapreduce/client/operation_preparer.cpp39
-rw-r--r--yt/cpp/mapreduce/client/operation_preparer.h9
-rw-r--r--yt/cpp/mapreduce/client/retry_heavy_write_request.cpp6
-rw-r--r--yt/cpp/mapreduce/client/retry_heavy_write_request.h4
-rw-r--r--yt/cpp/mapreduce/client/retryful_writer.cpp2
-rw-r--r--yt/cpp/mapreduce/client/retryful_writer_v2.cpp18
-rw-r--r--yt/cpp/mapreduce/client/retryful_writer_v2.h8
-rw-r--r--yt/cpp/mapreduce/client/retryless_writer.h4
-rw-r--r--yt/cpp/mapreduce/client/transaction_pinger.cpp4
-rw-r--r--yt/cpp/mapreduce/http/http.cpp18
-rw-r--r--yt/cpp/mapreduce/http/http.h10
-rw-r--r--yt/cpp/mapreduce/http/ut/connection_pool_ut.cpp12
-rw-r--r--yt/cpp/mapreduce/http/ut/http_ut.cpp4
-rw-r--r--yt/cpp/mapreduce/http/ut/simple_server.cpp6
-rw-r--r--yt/cpp/mapreduce/http/ut/simple_server.h4
-rw-r--r--yt/cpp/mapreduce/interface/io-inl.h10
-rw-r--r--yt/cpp/mapreduce/interface/raw_client.h6
-rw-r--r--yt/cpp/mapreduce/io/node_table_reader.cpp8
-rw-r--r--yt/cpp/mapreduce/io/node_table_reader.h4
-rw-r--r--yt/cpp/mapreduce/io/node_table_writer.cpp6
-rw-r--r--yt/cpp/mapreduce/io/node_table_writer.h4
-rw-r--r--yt/cpp/mapreduce/io/proto_table_reader.h2
-rw-r--r--yt/cpp/mapreduce/io/proto_table_writer.cpp4
-rw-r--r--yt/cpp/mapreduce/io/proto_table_writer.h4
-rw-r--r--yt/cpp/mapreduce/io/ut/yamr_table_reader_ut.cpp8
-rw-r--r--yt/cpp/mapreduce/io/yamr_table_writer.cpp2
-rw-r--r--yt/cpp/mapreduce/io/yamr_table_writer.h2
-rw-r--r--yt/cpp/mapreduce/library/user_job_statistics/user_job_statistics.cpp10
-rw-r--r--yt/cpp/mapreduce/library/user_job_statistics/user_job_statistics.h4
-rw-r--r--yt/cpp/mapreduce/raw_client/raw_client.cpp12
-rw-r--r--yt/cpp/mapreduce/raw_client/raw_client.h6
-rw-r--r--yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp16
-rw-r--r--yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h13
43 files changed, 226 insertions, 189 deletions
diff --git a/yt/cpp/mapreduce/client/batch_request_impl.cpp b/yt/cpp/mapreduce/client/batch_request_impl.cpp
index 08ffec2bfc..d8084e9c45 100644
--- a/yt/cpp/mapreduce/client/batch_request_impl.cpp
+++ b/yt/cpp/mapreduce/client/batch_request_impl.cpp
@@ -50,7 +50,7 @@ TBatchRequest::~TBatchRequest() = default;
IBatchRequestBase& TBatchRequest::WithTransaction(const TTransactionId& transactionId)
{
if (!TmpWithTransaction_) {
- TmpWithTransaction_.Reset(new TBatchRequest(Impl_.Get(), Client_));
+ TmpWithTransaction_.reset(new TBatchRequest(Impl_.Get(), Client_));
}
TmpWithTransaction_->DefaultTransaction_ = transactionId;
return *TmpWithTransaction_;
diff --git a/yt/cpp/mapreduce/client/batch_request_impl.h b/yt/cpp/mapreduce/client/batch_request_impl.h
index 0a176417b3..a4d776668f 100644
--- a/yt/cpp/mapreduce/client/batch_request_impl.h
+++ b/yt/cpp/mapreduce/client/batch_request_impl.h
@@ -124,7 +124,7 @@ private:
private:
TTransactionId DefaultTransaction_;
::TIntrusivePtr<NDetail::NRawClient::TRawBatchRequest> Impl_;
- THolder<TBatchRequest> TmpWithTransaction_;
+ std::unique_ptr<TBatchRequest> TmpWithTransaction_;
::TIntrusivePtr<TClient> Client_;
private:
diff --git a/yt/cpp/mapreduce/client/client.cpp b/yt/cpp/mapreduce/client/client.cpp
index 9fcb82f5b7..7d73756759 100644
--- a/yt/cpp/mapreduce/client/client.cpp
+++ b/yt/cpp/mapreduce/client/client.cpp
@@ -940,7 +940,7 @@ TTransaction::TTransaction(
: TClientBase(rawClient, context, parentTransactionId, parentClient->GetRetryPolicy())
, TransactionPinger_(parentClient->GetTransactionPinger())
, PingableTx_(
- MakeHolder<TPingableTransaction>(
+ std::make_unique<TPingableTransaction>(
rawClient,
parentClient->GetRetryPolicy(),
context,
@@ -1434,7 +1434,7 @@ TYtPoller& TClient::GetYtPoller()
// We don't use current client and create new client because YtPoller_ might use
// this client during current client shutdown.
// That might lead to incrementing of current client refcount and double delete of current client object.
- YtPoller_ = MakeHolder<TYtPoller>(Context_, ClientRetryPolicy_);
+ YtPoller_ = std::make_unique<TYtPoller>(Context_, ClientRetryPolicy_);
}
return *YtPoller_;
}
diff --git a/yt/cpp/mapreduce/client/client.h b/yt/cpp/mapreduce/client/client.h
index 32a0f9a97e..4ceb7f69b3 100644
--- a/yt/cpp/mapreduce/client/client.h
+++ b/yt/cpp/mapreduce/client/client.h
@@ -333,7 +333,7 @@ protected:
private:
ITransactionPingerPtr TransactionPinger_;
- THolder<TPingableTransaction> PingableTx_;
+ std::unique_ptr<TPingableTransaction> PingableTx_;
TClientPtr ParentClient_;
};
@@ -502,7 +502,7 @@ private:
std::atomic<bool> Shutdown_ = false;
TMutex Lock_;
- THolder<TYtPoller> YtPoller_;
+ std::unique_ptr<TYtPoller> YtPoller_;
};
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/cpp/mapreduce/client/client_reader.cpp b/yt/cpp/mapreduce/client/client_reader.cpp
index e7538a22da..b144b70c02 100644
--- a/yt/cpp/mapreduce/client/client_reader.cpp
+++ b/yt/cpp/mapreduce/client/client_reader.cpp
@@ -58,7 +58,7 @@ TClientReader::TClientReader(
{
if (options.CreateTransaction_) {
Y_ABORT_UNLESS(transactionPinger, "Internal error: transactionPinger is null");
- ReadTransaction_ = MakeHolder<TPingableTransaction>(
+ ReadTransaction_ = std::make_unique<TPingableTransaction>(
RawClient_,
ClientRetryPolicy_,
Context_,
diff --git a/yt/cpp/mapreduce/client/client_reader.h b/yt/cpp/mapreduce/client/client_reader.h
index 3f73080046..cc78d2f3d3 100644
--- a/yt/cpp/mapreduce/client/client_reader.h
+++ b/yt/cpp/mapreduce/client/client_reader.h
@@ -51,7 +51,7 @@ private:
TMaybe<TFormat> Format_;
TTableReaderOptions Options_;
- THolder<TPingableTransaction> ReadTransaction_;
+ std::unique_ptr<TPingableTransaction> ReadTransaction_;
std::unique_ptr<IInputStream> Input_;
diff --git a/yt/cpp/mapreduce/client/file_reader.cpp b/yt/cpp/mapreduce/client/file_reader.cpp
index f88b40e38b..1682212f1d 100644
--- a/yt/cpp/mapreduce/client/file_reader.cpp
+++ b/yt/cpp/mapreduce/client/file_reader.cpp
@@ -47,7 +47,7 @@ TStreamReaderBase::TStreamReaderBase(
const TTransactionId& transactionId)
: RawClient_(rawClient)
, ClientRetryPolicy_(std::move(clientRetryPolicy))
- , ReadTransaction_(MakeHolder<TPingableTransaction>(
+ , ReadTransaction_(std::make_unique<TPingableTransaction>(
RawClient_,
ClientRetryPolicy_,
context,
diff --git a/yt/cpp/mapreduce/client/file_reader.h b/yt/cpp/mapreduce/client/file_reader.h
index 8aafdc860d..d3efe90f26 100644
--- a/yt/cpp/mapreduce/client/file_reader.h
+++ b/yt/cpp/mapreduce/client/file_reader.h
@@ -45,7 +45,7 @@ private:
std::unique_ptr<IInputStream> Input_;
- THolder<TPingableTransaction> ReadTransaction_;
+ std::unique_ptr<TPingableTransaction> ReadTransaction_;
ui64 CurrentOffset_ = 0;
};
diff --git a/yt/cpp/mapreduce/client/init.cpp b/yt/cpp/mapreduce/client/init.cpp
index 6121952f86..78c92ee883 100644
--- a/yt/cpp/mapreduce/client/init.cpp
+++ b/yt/cpp/mapreduce/client/init.cpp
@@ -219,11 +219,11 @@ void ExecJob(int argc, const char** argv, const TInitializeOptions& options)
NDetail::OutputTableCount = static_cast<i64>(outputTableCount);
- THolder<IInputStream> jobStateStream;
+ std::unique_ptr<IInputStream> jobStateStream;
if (hasState) {
- jobStateStream = MakeHolder<TIFStream>("jobstate");
+ jobStateStream = std::make_unique<TIFStream>("jobstate");
} else {
- jobStateStream = MakeHolder<TBufferStream>(0);
+ jobStateStream = std::make_unique<TBufferStream>(0);
}
int ret = 1;
diff --git a/yt/cpp/mapreduce/client/operation.cpp b/yt/cpp/mapreduce/client/operation.cpp
index 98ed246e0e..9441c7efe2 100644
--- a/yt/cpp/mapreduce/client/operation.cpp
+++ b/yt/cpp/mapreduce/client/operation.cpp
@@ -969,9 +969,9 @@ void BuildIntermediateDataPart(const TSpec& spec, TNode* nodeSpec)
TNode MergeSpec(TNode dst, TNode spec, const TOperationOptions& options)
{
- MergeNodes(dst["spec"], spec);
+ MergeNodes(dst, spec);
if (options.Spec_) {
- MergeNodes(dst["spec"], *options.Spec_);
+ MergeNodes(dst, *options.Spec_);
}
return dst;
}
@@ -1129,7 +1129,7 @@ void DoExecuteMap(
spec.Title_ = spec.Title_.GetOrElse(AddModeToTitleIfDebug(map.GetClassName()));
TNode specNode = BuildYsonNodeFluently()
- .BeginMap().Item("spec").BeginMap()
+ .BeginMap()
.Item("mapper").DoMap([&] (TFluentMap fluent) {
BuildUserJobFluently(
map,
@@ -1148,18 +1148,18 @@ void DoExecuteMap(
.DoIf(spec.Ordered_.Defined(), [&] (TFluentMap fluent) {
fluent.Item("ordered").Value(spec.Ordered_.GetRef());
})
- .EndMap().EndMap();
+ .EndMap();
- BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]);
+ BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode);
- specNode["spec"]["job_io"]["control_attributes"]["enable_row_index"] = TNode(true);
- specNode["spec"]["job_io"]["control_attributes"]["enable_range_index"] = TNode(true);
+ specNode["job_io"]["control_attributes"]["enable_row_index"] = TNode(true);
+ specNode["job_io"]["control_attributes"]["enable_range_index"] = TNode(true);
if (!preparer->GetContext().Config->TableWriter.Empty()) {
- specNode["spec"]["job_io"]["table_writer"] = preparer->GetContext().Config->TableWriter;
+ specNode["job_io"]["table_writer"] = preparer->GetContext().Config->TableWriter;
}
- BuildCommonUserOperationPart(spec, &specNode["spec"]);
- BuildJobCountOperationPart(spec, &specNode["spec"]);
+ BuildCommonUserOperationPart(spec, &specNode);
+ BuildJobCountOperationPart(spec, &specNode);
auto startOperation = [
operation=operation.Get(),
@@ -1168,7 +1168,7 @@ void DoExecuteMap(
operationIo,
mapper
] () {
- auto operationId = preparer->StartOperation(operation, "map", spec);
+ auto operationId = preparer->StartOperation(operation, EOperationType::Map, spec);
LogJob(operationId, mapper.Get(), "mapper");
LogYPaths(operationId, operationIo.Inputs, "input");
@@ -1247,7 +1247,7 @@ void DoExecuteReduce(
spec.Title_ = spec.Title_.GetOrElse(AddModeToTitleIfDebug(reduce.GetClassName()));
TNode specNode = BuildYsonNodeFluently()
- .BeginMap().Item("spec").BeginMap()
+ .BeginMap()
.Item("reducer").DoMap([&] (TFluentMap fluent) {
BuildUserJobFluently(
reduce,
@@ -1280,11 +1280,11 @@ void DoExecuteReduce(
.DoIf(spec.AutoMerge_.Defined(), [&] (TFluentMap fluent) {
fluent.Item("auto_merge").Value(BuildAutoMergeSpec(*spec.AutoMerge_));
})
- .EndMap().EndMap();
+ .EndMap();
- BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]);
- BuildCommonUserOperationPart(spec, &specNode["spec"]);
- BuildJobCountOperationPart(spec, &specNode["spec"]);
+ BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode);
+ BuildCommonUserOperationPart(spec, &specNode);
+ BuildJobCountOperationPart(spec, &specNode);
auto startOperation = [
operation=operation.Get(),
@@ -1293,7 +1293,7 @@ void DoExecuteReduce(
operationIo,
reducer
] () {
- auto operationId = preparer->StartOperation(operation, "reduce", spec);
+ auto operationId = preparer->StartOperation(operation, EOperationType::Reduce, spec);
LogJob(operationId, reducer.Get(), "reducer");
LogYPaths(operationId, operationIo.Inputs, "input");
@@ -1373,7 +1373,7 @@ void DoExecuteJoinReduce(
spec.Title_ = spec.Title_.GetOrElse(AddModeToTitleIfDebug(reduce.GetClassName()));
TNode specNode = BuildYsonNodeFluently()
- .BeginMap().Item("spec").BeginMap()
+ .BeginMap()
.Item("reducer").DoMap([&] (TFluentMap fluent) {
BuildUserJobFluently(
reduce,
@@ -1394,11 +1394,11 @@ void DoExecuteJoinReduce(
fluent.Item("table_writer").Value(preparer->GetContext().Config->TableWriter);
})
.EndMap()
- .EndMap().EndMap();
+ .EndMap();
- BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]);
- BuildCommonUserOperationPart(spec, &specNode["spec"]);
- BuildJobCountOperationPart(spec, &specNode["spec"]);
+ BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode);
+ BuildCommonUserOperationPart(spec, &specNode);
+ BuildJobCountOperationPart(spec, &specNode);
auto startOperation = [
operation=operation.Get(),
@@ -1407,7 +1407,7 @@ void DoExecuteJoinReduce(
reducer,
operationIo
] () {
- auto operationId = preparer->StartOperation(operation, "join_reduce", spec);
+ auto operationId = preparer->StartOperation(operation, EOperationType::JoinReduce, spec);
LogJob(operationId, reducer.Get(), "reducer");
LogYPaths(operationId, operationIo.Inputs, "input");
@@ -1505,7 +1505,7 @@ void DoExecuteMapReduce(
TString title;
TNode specNode = BuildYsonNodeFluently()
- .BeginMap().Item("spec").BeginMap()
+ .BeginMap()
.DoIf(hasMapper, [&] (TFluentMap fluent) {
TJobPreparer map(
*preparer,
@@ -1584,18 +1584,18 @@ void DoExecuteMapReduce(
.Do([&] (TFluentMap) {
spec.Title_ = spec.Title_.GetOrElse(AddModeToTitleIfDebug(title + "reducer:" + reduce.GetClassName()));
})
- .EndMap().EndMap();
+ .EndMap();
if (spec.Ordered_) {
- specNode["spec"]["ordered"] = *spec.Ordered_;
+ specNode["ordered"] = *spec.Ordered_;
}
- BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]);
- BuildCommonUserOperationPart(spec, &specNode["spec"]);
- BuildMapJobCountOperationPart(spec, &specNode["spec"]);
- BuildPartitionCountOperationPart(spec, &specNode["spec"]);
- BuildIntermediateDataPart(spec, &specNode["spec"]);
- BuildDataSizePerSortJobPart(spec, &specNode["spec"]);
+ BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode);
+ BuildCommonUserOperationPart(spec, &specNode);
+ BuildMapJobCountOperationPart(spec, &specNode);
+ BuildPartitionCountOperationPart(spec, &specNode);
+ BuildIntermediateDataPart(spec, &specNode);
+ BuildDataSizePerSortJobPart(spec, &specNode);
auto startOperation = [
operation=operation.Get(),
@@ -1607,7 +1607,7 @@ void DoExecuteMapReduce(
inputs=operationIo.Inputs,
allOutputs
] () {
- auto operationId = preparer->StartOperation(operation, "map_reduce", spec);
+ auto operationId = preparer->StartOperation(operation, EOperationType::MapReduce, spec);
LogJob(operationId, mapper.Get(), "mapper");
LogJob(operationId, reduceCombiner.Get(), "reduce_combiner");
@@ -1962,19 +1962,19 @@ void ExecuteSort(
}
TNode specNode = BuildYsonNodeFluently()
- .BeginMap().Item("spec").BeginMap()
+ .BeginMap()
.Item("input_table_paths").List(inputs)
.Item("output_table_path").Value(output)
.Item("sort_by").Value(spec.SortBy_)
.DoIf(spec.SchemaInferenceMode_.Defined(), [&] (TFluentMap fluent) {
fluent.Item("schema_inference_mode").Value(ToString(*spec.SchemaInferenceMode_));
})
- .EndMap().EndMap();
+ .EndMap();
- BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]);
- BuildPartitionCountOperationPart(spec, &specNode["spec"]);
- BuildPartitionJobCountOperationPart(spec, &specNode["spec"]);
- BuildIntermediateDataPart(spec, &specNode["spec"]);
+ BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode);
+ BuildPartitionCountOperationPart(spec, &specNode);
+ BuildPartitionJobCountOperationPart(spec, &specNode);
+ BuildIntermediateDataPart(spec, &specNode);
auto startOperation = [
operation=operation.Get(),
@@ -1983,7 +1983,7 @@ void ExecuteSort(
inputs,
output
] () {
- auto operationId = preparer->StartOperation(operation, "sort", spec);
+ auto operationId = preparer->StartOperation(operation, EOperationType::Sort, spec);
LogYPaths(operationId, inputs, "input");
LogYPath(operationId, output, "output");
@@ -2011,7 +2011,7 @@ void ExecuteMerge(
}
TNode specNode = BuildYsonNodeFluently()
- .BeginMap().Item("spec").BeginMap()
+ .BeginMap()
.Item("input_table_paths").List(inputs)
.Item("output_table_path").Value(output)
.Item("mode").Value(ToString(spec.Mode_))
@@ -2021,10 +2021,10 @@ void ExecuteMerge(
.DoIf(spec.SchemaInferenceMode_.Defined(), [&] (TFluentMap fluent) {
fluent.Item("schema_inference_mode").Value(ToString(*spec.SchemaInferenceMode_));
})
- .EndMap().EndMap();
+ .EndMap();
- BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]);
- BuildJobCountOperationPart(spec, &specNode["spec"]);
+ BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode);
+ BuildJobCountOperationPart(spec, &specNode);
auto startOperation = [
operation=operation.Get(),
@@ -2033,7 +2033,7 @@ void ExecuteMerge(
inputs,
output
] () {
- auto operationId = preparer->StartOperation(operation, "merge", spec);
+ auto operationId = preparer->StartOperation(operation, EOperationType::Merge, spec);
LogYPaths(operationId, inputs, "input");
LogYPath(operationId, output, "output");
@@ -2055,22 +2055,22 @@ void ExecuteErase(
auto tablePath = NRawClient::CanonizeYPath(nullptr, preparer->GetContext(), spec.TablePath_);
TNode specNode = BuildYsonNodeFluently()
- .BeginMap().Item("spec").BeginMap()
+ .BeginMap()
.Item("table_path").Value(tablePath)
.Item("combine_chunks").Value(spec.CombineChunks_)
.DoIf(spec.SchemaInferenceMode_.Defined(), [&] (TFluentMap fluent) {
fluent.Item("schema_inference_mode").Value(ToString(*spec.SchemaInferenceMode_));
})
- .EndMap().EndMap();
+ .EndMap();
- BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]);
+ BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode);
auto startOperation = [
operation=operation.Get(),
spec=MergeSpec(std::move(specNode), preparer->GetContext().Config->Spec, options),
preparer,
tablePath
] () {
- auto operationId = preparer->StartOperation(operation, "erase", spec);
+ auto operationId = preparer->StartOperation(operation, EOperationType::Erase, spec);
LogYPath(operationId, tablePath, "table_path");
@@ -2098,7 +2098,7 @@ void ExecuteRemoteCopy(
Y_ENSURE_EX(!spec.ClusterName_.empty(), TApiUsageError() << "ClusterName parameter is required");
TNode specNode = BuildYsonNodeFluently()
- .BeginMap().Item("spec").BeginMap()
+ .BeginMap()
.Item("cluster_name").Value(spec.ClusterName_)
.Item("input_table_paths").List(inputs)
.Item("output_table_path").Value(output)
@@ -2115,9 +2115,9 @@ void ExecuteRemoteCopy(
"doesn't make sense without CopyAttributes == true");
fluent.Item("attribute_keys").List(spec.AttributeKeys_);
})
- .EndMap().EndMap();
+ .EndMap();
- BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]);
+ BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode);
auto startOperation = [
operation=operation.Get(),
spec=MergeSpec(specNode, preparer->GetContext().Config->Spec, options),
@@ -2125,7 +2125,7 @@ void ExecuteRemoteCopy(
inputs,
output
] () {
- auto operationId = preparer->StartOperation(operation, "remote_copy", spec);
+ auto operationId = preparer->StartOperation(operation, EOperationType::RemoteCopy, spec);
LogYPaths(operationId, inputs, "input");
LogYPath(operationId, output, "output");
@@ -2222,15 +2222,15 @@ void ExecuteVanilla(
}
TNode specNode = BuildYsonNodeFluently()
- .BeginMap().Item("spec").BeginMap()
+ .BeginMap()
.Item("tasks").DoMapFor(spec.Tasks_, addTask)
- .EndMap().EndMap();
+ .EndMap();
- BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]);
- BuildCommonUserOperationPart(spec, &specNode["spec"]);
+ BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode);
+ BuildCommonUserOperationPart(spec, &specNode);
auto startOperation = [operation=operation.Get(), spec=MergeSpec(std::move(specNode), preparer->GetContext().Config->Spec, options), preparer] () {
- auto operationId = preparer->StartOperation(operation, "vanilla", spec, /* useStartOperationRequest */ true);
+ auto operationId = preparer->StartOperation(operation, EOperationType::Vanilla, spec);
return operationId;
};
@@ -2784,7 +2784,7 @@ void TOperation::TOperationImpl::AsyncFinishOperation(TOperationAttributes opera
void* TOperation::TOperationImpl::SyncFinishOperationProc(void* pArgs)
{
- THolder<TAsyncFinishOperationsArgs> args(static_cast<TAsyncFinishOperationsArgs*>(pArgs));
+ std::unique_ptr<TAsyncFinishOperationsArgs> args(static_cast<TAsyncFinishOperationsArgs*>(pArgs));
args->OperationImpl->SyncFinishOperationImpl(args->OperationAttributes);
return nullptr;
}
@@ -3013,7 +3013,7 @@ struct TAsyncPrepareAndStartOperationArgs
void* SyncPrepareAndStartOperation(void* pArgs)
{
- THolder<TAsyncPrepareAndStartOperationArgs> args(static_cast<TAsyncPrepareAndStartOperationArgs*>(pArgs));
+ std::unique_ptr<TAsyncPrepareAndStartOperationArgs> args(static_cast<TAsyncPrepareAndStartOperationArgs*>(pArgs));
args->PrepareAndStart();
return nullptr;
}
diff --git a/yt/cpp/mapreduce/client/operation_preparer.cpp b/yt/cpp/mapreduce/client/operation_preparer.cpp
index e8e4ee26d2..70bd5f8b65 100644
--- a/yt/cpp/mapreduce/client/operation_preparer.cpp
+++ b/yt/cpp/mapreduce/client/operation_preparer.cpp
@@ -37,7 +37,7 @@ class TWaitOperationStartPollerItem
: public IYtPollerItem
{
public:
- TWaitOperationStartPollerItem(TOperationId operationId, THolder<TPingableTransaction> transaction)
+ TWaitOperationStartPollerItem(TOperationId operationId, std::unique_ptr<TPingableTransaction> transaction)
: OperationId_(operationId)
, Transaction_(std::move(transaction))
{ }
@@ -78,7 +78,7 @@ public:
private:
TOperationId OperationId_;
- THolder<TPingableTransaction> Transaction_;
+ std::unique_ptr<TPingableTransaction> Transaction_;
::NThreading::TFuture<TOperationAttributes> Future_;
};
@@ -139,7 +139,7 @@ private:
TOperationPreparer::TOperationPreparer(TClientPtr client, TTransactionId transactionId)
: Client_(std::move(client))
, TransactionId_(transactionId)
- , FileTransaction_(MakeHolder<TPingableTransaction>(
+ , FileTransaction_(std::make_unique<TPingableTransaction>(
Client_->GetRawClient(),
Client_->GetRetryPolicy(),
Client_->GetContext(),
@@ -177,35 +177,26 @@ const IClientRetryPolicyPtr& TOperationPreparer::GetClientRetryPolicy() const
TOperationId TOperationPreparer::StartOperation(
TOperation* operation,
- const TString& operationType,
- const TNode& spec,
- bool useStartOperationRequest)
+ EOperationType type,
+ const TNode& spec)
{
CheckValidity();
- THttpHeader header("POST", (useStartOperationRequest ? "start_op" : operationType));
- if (useStartOperationRequest) {
- header.AddParameter("operation_type", operationType);
- }
- header.AddTransactionId(TransactionId_);
- header.AddMutationId();
-
- auto ysonSpec = NodeToYsonString(spec);
- auto responseInfo = RetryRequestWithPolicy(
+ auto operationId = RequestWithRetry<TOperationId>(
::MakeIntrusive<TOperationForwardingRequestRetryPolicy>(
ClientRetryPolicy_->CreatePolicyForStartOperationRequest(),
TOperationPtr(operation)),
- GetContext(),
- header,
- ysonSpec);
- TOperationId operationId = ParseGuidFromResponse(responseInfo.Response);
+ [this, &type, &spec] (TMutationId& mutationId) {
+ return Client_->GetRawClient()->StartOperation(mutationId, TransactionId_, type, spec);
+ });
+
YT_LOG_DEBUG("Operation started (OperationId: %v; PreparationId: %v)",
operationId,
GetPreparationId());
YT_LOG_INFO("Operation %v started (%v): %v",
operationId,
- operationType,
+ type,
GetOperationWebInterfaceUrl(GetContext().ServerName, operationId));
TOperationExecutionTimeTracker::Get()->Start(operationId);
@@ -305,9 +296,9 @@ public:
return result;
}
- THolder<IInputStream> CreateInputStream() const override
+ std::unique_ptr<IInputStream> CreateInputStream() const override
{
- return MakeHolder<TFileInput>(FileName_);
+ return std::make_unique<TFileInput>(FileName_);
}
TString GetDescription() const override
@@ -343,9 +334,9 @@ public:
return result;
}
- THolder<IInputStream> CreateInputStream() const override
+ std::unique_ptr<IInputStream> CreateInputStream() const override
{
- return MakeHolder<TMemoryInput>(Data_.data(), Data_.size());
+ return std::make_unique<TMemoryInput>(Data_.data(), Data_.size());
}
TString GetDescription() const override
diff --git a/yt/cpp/mapreduce/client/operation_preparer.h b/yt/cpp/mapreduce/client/operation_preparer.h
index 90fd91378c..ef3a790c91 100644
--- a/yt/cpp/mapreduce/client/operation_preparer.h
+++ b/yt/cpp/mapreduce/client/operation_preparer.h
@@ -28,16 +28,15 @@ public:
TOperationId StartOperation(
TOperation* operation,
- const TString& operationType,
- const TNode& spec,
- bool useStartOperationRequest = false);
+ EOperationType type,
+ const TNode& spec);
const IClientRetryPolicyPtr& GetClientRetryPolicy() const;
private:
TClientPtr Client_;
TTransactionId TransactionId_;
- THolder<TPingableTransaction> FileTransaction_;
+ std::unique_ptr<TPingableTransaction> FileTransaction_;
IClientRetryPolicyPtr ClientRetryPolicy_;
const TString PreparationId_;
@@ -54,7 +53,7 @@ struct IItemToUpload
virtual ~IItemToUpload() = default;
virtual TString CalculateMD5() const = 0;
- virtual THolder<IInputStream> CreateInputStream() const = 0;
+ virtual std::unique_ptr<IInputStream> CreateInputStream() const = 0;
virtual TString GetDescription() const = 0;
virtual i64 GetDataSize() const = 0;
};
diff --git a/yt/cpp/mapreduce/client/retry_heavy_write_request.cpp b/yt/cpp/mapreduce/client/retry_heavy_write_request.cpp
index 44c7db3a97..80f54d2340 100644
--- a/yt/cpp/mapreduce/client/retry_heavy_write_request.cpp
+++ b/yt/cpp/mapreduce/client/retry_heavy_write_request.cpp
@@ -31,7 +31,7 @@ void RetryHeavyWriteRequest(
const TClientContext& context,
const TTransactionId& parentId,
THttpHeader& header,
- std::function<THolder<IInputStream>()> streamMaker)
+ std::function<std::unique_ptr<IInputStream>()> streamMaker)
{
int retryCount = context.Config->RetryCount;
if (context.ServiceTicketAuth) {
@@ -63,7 +63,7 @@ void RetryHeavyWriteRequest(
GetFullUrlForProxy(hostName, context, header),
requestId,
header);
- TransferData(input.Get(), request->GetStream());
+ TransferData(input.get(), request->GetStream());
request->Finish()->GetResponse();
} catch (TErrorResponse& e) {
YT_LOG_ERROR("RSP %v - attempt %v failed",
@@ -100,7 +100,7 @@ THeavyRequestRetrier::THeavyRequestRetrier(TParameters parameters)
: Parameters_(std::move(parameters))
, RequestRetryPolicy_(Parameters_.ClientRetryPolicy->CreatePolicyForGenericRequest())
, StreamFactory_([] {
- return MakeHolder<TNullInput>();
+ return std::make_unique<TNullInput>();
})
{
Retry([] { });
diff --git a/yt/cpp/mapreduce/client/retry_heavy_write_request.h b/yt/cpp/mapreduce/client/retry_heavy_write_request.h
index 6933170e96..b65f457239 100644
--- a/yt/cpp/mapreduce/client/retry_heavy_write_request.h
+++ b/yt/cpp/mapreduce/client/retry_heavy_write_request.h
@@ -26,7 +26,7 @@ public:
THttpHeader Header;
};
- using TStreamFactory = std::function<THolder<IInputStream>()>;
+ using TStreamFactory = std::function<std::unique_ptr<IInputStream>()>;
public:
explicit THeavyRequestRetrier(TParameters parameters);
@@ -65,7 +65,7 @@ void RetryHeavyWriteRequest(
const TClientContext& context,
const TTransactionId& parentId,
THttpHeader& header,
- std::function<THolder<IInputStream>()> streamMaker);
+ std::function<std::unique_ptr<IInputStream>()> streamMaker);
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/cpp/mapreduce/client/retryful_writer.cpp b/yt/cpp/mapreduce/client/retryful_writer.cpp
index 41ad1298cd..666229d7a6 100644
--- a/yt/cpp/mapreduce/client/retryful_writer.cpp
+++ b/yt/cpp/mapreduce/client/retryful_writer.cpp
@@ -104,7 +104,7 @@ void TRetryfulWriter::Send(const TBuffer& buffer)
header.MergeParameters(Parameters_);
auto streamMaker = [&buffer] () {
- return MakeHolder<TBufferInput>(buffer);
+ return std::make_unique<TBufferInput>(buffer);
};
auto transactionId = (WriteTransaction_ ? WriteTransaction_->GetId() : ParentTransactionId_);
diff --git a/yt/cpp/mapreduce/client/retryful_writer_v2.cpp b/yt/cpp/mapreduce/client/retryful_writer_v2.cpp
index 40198baaef..c11f20ce3f 100644
--- a/yt/cpp/mapreduce/client/retryful_writer_v2.cpp
+++ b/yt/cpp/mapreduce/client/retryful_writer_v2.cpp
@@ -176,7 +176,7 @@ private:
void ThreadMain(TRichYPath path, const THeavyRequestRetrier::TParameters& parameters)
{
- THolder<THeavyRequestRetrier> retrier;
+ std::unique_ptr<THeavyRequestRetrier> retrier;
auto firstRequestParameters = parameters;
auto restRequestParameters = parameters;
@@ -218,14 +218,14 @@ private:
try {
if (!retrier) {
- retrier = MakeHolder<THeavyRequestRetrier>(*currentParameters);
+ retrier = std::make_unique<THeavyRequestRetrier>(*currentParameters);
}
retrier->Update([task=task] {
- return MakeHolder<TMemoryInput>(task.Data->data(), task.Size);
+ return std::make_unique<TMemoryInput>(task.Data->data(), task.Size);
});
if (task.BufferComplete) {
retrier->Finish();
- retrier.Reset();
+ retrier.reset();
}
} catch (const std::exception& ex) {
task.SendingComplete.SetException(std::current_exception());
@@ -235,7 +235,7 @@ private:
}
if (task.BufferComplete) {
- retrier.Reset();
+ retrier.reset();
task.SendingComplete.SetValue();
currentParameters = &restRequestParameters;
@@ -303,15 +303,15 @@ TRetryfulWriterV2::TRetryfulWriterV2(
ssize_t bufferSize,
bool createTransaction)
: BufferSize_(bufferSize)
- , Current_(MakeHolder<TSendTask>())
- , Previous_(MakeHolder<TSendTask>())
+ , Current_(std::make_unique<TSendTask>())
+ , Previous_(std::make_unique<TSendTask>())
{
THttpHeader httpHeader("PUT", command);
httpHeader.SetInputFormat(format);
httpHeader.MergeParameters(serializedWriterOptions);
if (createTransaction) {
- WriteTransaction_ = MakeHolder<TPingableTransaction>(
+ WriteTransaction_ = std::make_unique<TPingableTransaction>(
rawClient,
clientRetryPolicy,
context,
@@ -337,7 +337,7 @@ TRetryfulWriterV2::TRetryfulWriterV2(
.Header = std::move(httpHeader),
};
- Sender_ = MakeHolder<TSender>(path, parameters);
+ Sender_ = std::make_unique<TSender>(path, parameters);
DoStartBatch();
}
diff --git a/yt/cpp/mapreduce/client/retryful_writer_v2.h b/yt/cpp/mapreduce/client/retryful_writer_v2.h
index 661ef5d0b5..6d314307ec 100644
--- a/yt/cpp/mapreduce/client/retryful_writer_v2.h
+++ b/yt/cpp/mapreduce/client/retryful_writer_v2.h
@@ -49,11 +49,11 @@ private:
const ssize_t BufferSize_;
const ssize_t SendStep_ = 64_KB;
ssize_t NextSizeToSend_;
- THolder<TSender> Sender_;
- THolder<TPingableTransaction> WriteTransaction_;
+ std::unique_ptr<TSender> Sender_;
+ std::unique_ptr<TPingableTransaction> WriteTransaction_;
- THolder<TSendTask> Current_;
- THolder<TSendTask> Previous_;
+ std::unique_ptr<TSendTask> Current_;
+ std::unique_ptr<TSendTask> Previous_;
};
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/cpp/mapreduce/client/retryless_writer.h b/yt/cpp/mapreduce/client/retryless_writer.h
index 6916cddbf6..689c1c5f2f 100644
--- a/yt/cpp/mapreduce/client/retryless_writer.h
+++ b/yt/cpp/mapreduce/client/retryless_writer.h
@@ -56,7 +56,7 @@ public:
auto hostName = GetProxyForHeavyRequest(context);
UpdateHeaderForProxyIfNeed(hostName, context, header);
Request_ = context.HttpClient->StartRequest(GetFullUrl(hostName, context, header), requestId, header);
- BufferedOutput_.Reset(new TBufferedOutput(Request_->GetStream(), BufferSize_));
+ BufferedOutput_ = std::make_unique<TBufferedOutput>(Request_->GetStream(), BufferSize_);
}
~TRetrylessWriter() override;
@@ -75,7 +75,7 @@ private:
bool Running_ = true;
NHttpClient::IHttpRequestPtr Request_;
- THolder<TBufferedOutput> BufferedOutput_;
+ std::unique_ptr<TBufferedOutput> BufferedOutput_;
};
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/cpp/mapreduce/client/transaction_pinger.cpp b/yt/cpp/mapreduce/client/transaction_pinger.cpp
index ea42867715..074249efce 100644
--- a/yt/cpp/mapreduce/client/transaction_pinger.cpp
+++ b/yt/cpp/mapreduce/client/transaction_pinger.cpp
@@ -228,7 +228,7 @@ public:
PingableTx_ = &pingableTx;
Running_ = true;
- PingerThread_ = MakeHolder<TThread>(
+ PingerThread_ = std::make_unique<TThread>(
TThread::TParams{Pinger, this}.SetName("pingable_tx"));
PingerThread_->Start();
}
@@ -284,7 +284,7 @@ private:
const TPingableTransaction* PingableTx_ = nullptr;
std::atomic<bool> Running_ = false;
- THolder<TThread> PingerThread_;
+ std::unique_ptr<TThread> PingerThread_;
};
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/cpp/mapreduce/http/http.cpp b/yt/cpp/mapreduce/http/http.cpp
index 51253feec2..765a96f042 100644
--- a/yt/cpp/mapreduce/http/http.cpp
+++ b/yt/cpp/mapreduce/http/http.cpp
@@ -545,7 +545,7 @@ TConnectionPtr TConnectionPool::Connect(
TSocketHolder socket(DoConnect(networkAddress));
SetNonBlock(socket, false);
- connection->Socket.Reset(new TSocket(socket.Release()));
+ connection->Socket = std::make_unique<TSocket>(socket.Release());
connection->DeadLine = TInstant::Now() + socketTimeout;
connection->Socket->SetSocketTimeout(socketTimeout.Seconds());
@@ -754,7 +754,7 @@ private:
THttpResponse::THttpResponse(
TRequestContext context,
IInputStream* socketStream)
- : HttpInput_(MakeHolder<THttpInputWrapped>(context, socketStream))
+ : HttpInput_(std::make_unique<THttpInputWrapped>(context, socketStream))
, Unframe_(HttpInput_->Headers().HasHeader("X-YT-Framing"))
, Context_(std::move(context))
{
@@ -935,7 +935,7 @@ bool THttpResponse::RefreshFrameIfNecessary()
case EFrameType::KeepAlive:
break;
case EFrameType::Data:
- RemainingFrameSize_ = ReadDataFrameSize(HttpInput_.Get());
+ RemainingFrameSize_ = ReadDataFrameSize(HttpInput_.get());
break;
default:
ythrow yexception() << "Bad frame type " << static_cast<int>(frameTypeByte);
@@ -1027,10 +1027,10 @@ IOutputStream* THttpRequest::StartRequestImpl(bool includeParameters)
LogResponse_ = true;
}
- RequestStream_ = MakeHolder<TRequestStream>(this, *Connection_->Socket.Get());
+ RequestStream_ = std::make_unique<TRequestStream>(this, *Connection_->Socket.get());
RequestStream_->Write(strHeader.data(), strHeader.size());
- return RequestStream_.Get();
+ return RequestStream_.get();
}
IOutputStream* THttpRequest::StartRequest()
@@ -1064,16 +1064,16 @@ void THttpRequest::SmallRequest(TMaybe<TStringBuf> body)
THttpResponse* THttpRequest::GetResponseStream()
{
if (!Input_) {
- SocketInput_.Reset(new TSocketInput(*Connection_->Socket.Get()));
+ SocketInput_ = std::make_unique<TSocketInput>(*Connection_->Socket.get());
if (TConfig::Get()->UseAbortableResponse) {
Y_ABORT_UNLESS(!Url_.empty());
- Input_.Reset(new TAbortableHttpResponse(Context_, SocketInput_.Get(), Url_));
+ Input_ = std::make_unique<TAbortableHttpResponse>(Context_, SocketInput_.get(), Url_);
} else {
- Input_.Reset(new THttpResponse(Context_, SocketInput_.Get()));
+ Input_ = std::make_unique<THttpResponse>(Context_, SocketInput_.get());
}
Input_->CheckErrorResponse();
}
- return Input_.Get();
+ return Input_.get();
}
TString THttpRequest::GetResponse()
diff --git a/yt/cpp/mapreduce/http/http.h b/yt/cpp/mapreduce/http/http.h
index 0f5e9034ee..45cba99861 100644
--- a/yt/cpp/mapreduce/http/http.h
+++ b/yt/cpp/mapreduce/http/http.h
@@ -138,7 +138,7 @@ private:
struct TConnection
{
- THolder<TSocket> Socket;
+ std::unique_ptr<TSocket> Socket;
TAtomic Busy = 1;
TInstant DeadLine;
ui32 Id;
@@ -206,7 +206,7 @@ private:
class THttpInputWrapped;
private:
- THolder<THttpInputWrapped> HttpInput_;
+ std::unique_ptr<THttpInputWrapped> HttpInput_;
const bool Unframe_;
@@ -257,10 +257,10 @@ private:
TConnectionPtr Connection_;
- THolder<TRequestStream> RequestStream_;
+ std::unique_ptr<TRequestStream> RequestStream_;
- THolder<TSocketInput> SocketInput_;
- THolder<THttpResponse> Input_;
+ std::unique_ptr<TSocketInput> SocketInput_;
+ std::unique_ptr<THttpResponse> Input_;
bool LogResponse_ = false;
};
diff --git a/yt/cpp/mapreduce/http/ut/connection_pool_ut.cpp b/yt/cpp/mapreduce/http/ut/connection_pool_ut.cpp
index fa072675fb..a8f6856aee 100644
--- a/yt/cpp/mapreduce/http/ut/connection_pool_ut.cpp
+++ b/yt/cpp/mapreduce/http/ut/connection_pool_ut.cpp
@@ -32,10 +32,10 @@ namespace {
}
} // namespace
-THolder<TSimpleServer> CreateSimpleHttpServer()
+std::unique_ptr<TSimpleServer> CreateSimpleHttpServer()
{
auto port = NTesting::GetFreePort();
- return MakeHolder<TSimpleServer>(
+ return std::make_unique<TSimpleServer>(
port,
[] (IInputStream* input, IOutputStream* output) {
try {
@@ -57,10 +57,10 @@ THolder<TSimpleServer> CreateSimpleHttpServer()
});
}
-THolder<TSimpleServer> CreateProxyHttpServer()
+std::unique_ptr<TSimpleServer> CreateProxyHttpServer()
{
auto port = NTesting::GetFreePort();
- return MakeHolder<TSimpleServer>(
+ return std::make_unique<TSimpleServer>(
port,
[] (IInputStream* input, IOutputStream* output) {
try {
@@ -182,9 +182,9 @@ TEST(TConnectionPool, TestConcurrency)
}
};
- TVector<THolder<TFuncThread>> threads;
+ TVector<std::unique_ptr<TFuncThread>> threads;
for (int i = 0; i != 10; ++i) {
- threads.push_back(MakeHolder<TFuncThread>(func));
+ threads.push_back(std::make_unique<TFuncThread>(func));
};
for (auto& t : threads) {
diff --git a/yt/cpp/mapreduce/http/ut/http_ut.cpp b/yt/cpp/mapreduce/http/ut/http_ut.cpp
index e41e83c5a0..2d382f6aa0 100644
--- a/yt/cpp/mapreduce/http/ut/http_ut.cpp
+++ b/yt/cpp/mapreduce/http/ut/http_ut.cpp
@@ -21,10 +21,10 @@ void WriteDataFrame(TStringBuf string, IOutputStream* stream)
stream->Write(string);
}
-THolder<TSimpleServer> CreateFramingEchoServer()
+std::unique_ptr<TSimpleServer> CreateFramingEchoServer()
{
auto port = NTesting::GetFreePort();
- return MakeHolder<TSimpleServer>(
+ return std::make_unique<TSimpleServer>(
port,
[] (IInputStream* input, IOutputStream* output) {
try {
diff --git a/yt/cpp/mapreduce/http/ut/simple_server.cpp b/yt/cpp/mapreduce/http/ut/simple_server.cpp
index fbc369ec20..111010a83d 100644
--- a/yt/cpp/mapreduce/http/ut/simple_server.cpp
+++ b/yt/cpp/mapreduce/http/ut/simple_server.cpp
@@ -23,9 +23,9 @@ TSimpleServer::TSimpleServer(int port, TRequestHandler requestHandler)
ret = listenSocket->Listen(10);
Y_ENSURE_EX(ret == 0, TSystemError() << "Can not listen socket");
- SendFinishSocket_ = MakeHolder<TInetStreamSocket>(socketPair[1]);
+ SendFinishSocket_ = std::make_unique<TInetStreamSocket>(socketPair[1]);
- ThreadPool_ = MakeHolder<TAdaptiveThreadPool>();
+ ThreadPool_ = std::make_unique<TAdaptiveThreadPool>();
ThreadPool_->Start(1);
auto receiveFinish = MakeAtomicShared<TInetStreamSocket>(socketPair[0]);
@@ -76,7 +76,7 @@ void TSimpleServer::Stop()
SendFinishSocket_->Send("X", 1);
ListenerThread_->Join();
ThreadPool_->Stop();
- ThreadPool_.Destroy();
+ ThreadPool_.reset();
}
int TSimpleServer::GetPort() const
diff --git a/yt/cpp/mapreduce/http/ut/simple_server.h b/yt/cpp/mapreduce/http/ut/simple_server.h
index f468ca55a6..f9f7287126 100644
--- a/yt/cpp/mapreduce/http/ut/simple_server.h
+++ b/yt/cpp/mapreduce/http/ut/simple_server.h
@@ -29,7 +29,7 @@ public:
private:
const int Port_;
- THolder<IThreadPool> ThreadPool_;
+ std::unique_ptr<IThreadPool> ThreadPool_;
THolder<IThreadFactory::IThread> ListenerThread_;
- THolder<TInetStreamSocket> SendFinishSocket_;
+ std::unique_ptr<TInetStreamSocket> SendFinishSocket_;
};
diff --git a/yt/cpp/mapreduce/interface/io-inl.h b/yt/cpp/mapreduce/interface/io-inl.h
index ffaf71f0a2..056910f785 100644
--- a/yt/cpp/mapreduce/interface/io-inl.h
+++ b/yt/cpp/mapreduce/interface/io-inl.h
@@ -350,11 +350,11 @@ public:
return TBase::DoGetRowCached(
/* cacher */ [&] {
- CachedRow_.Reset(new U);
- Reader_->ReadRow(CachedRow_.Get());
+ CachedRow_.reset(new U);
+ Reader_->ReadRow(CachedRow_.get());
},
/* cacheGetter */ [&] {
- auto result = dynamic_cast<const U*>(CachedRow_.Get());
+ auto result = dynamic_cast<const U*>(CachedRow_.get());
Y_ABORT_UNLESS(result);
return result;
});
@@ -371,7 +371,7 @@ public:
Reader_->ReadRow(result);
},
/* cacheMover */ [&] (U* result) {
- auto cast = dynamic_cast<U*>(CachedRow_.Get());
+ auto cast = dynamic_cast<U*>(CachedRow_.get());
Y_ABORT_UNLESS(cast);
result->Swap(cast);
});
@@ -394,7 +394,7 @@ public:
private:
using TBase::Reader_;
- mutable THolder<Message> CachedRow_;
+ mutable std::unique_ptr<Message> CachedRow_;
};
template<class... TProtoRowTypes>
diff --git a/yt/cpp/mapreduce/interface/raw_client.h b/yt/cpp/mapreduce/interface/raw_client.h
index 4994826863..ff2dafeb6d 100644
--- a/yt/cpp/mapreduce/interface/raw_client.h
+++ b/yt/cpp/mapreduce/interface/raw_client.h
@@ -139,6 +139,12 @@ public:
// Operations
+ virtual TOperationId StartOperation(
+ TMutationId& mutationId,
+ const TTransactionId& transactionId,
+ EOperationType type,
+ const TNode& spec) = 0;
+
virtual TOperationAttributes GetOperation(
const TOperationId& operationId,
const TGetOperationOptions& options = {}) = 0;
diff --git a/yt/cpp/mapreduce/io/node_table_reader.cpp b/yt/cpp/mapreduce/io/node_table_reader.cpp
index bc3da75ee6..558c42b30e 100644
--- a/yt/cpp/mapreduce/io/node_table_reader.cpp
+++ b/yt/cpp/mapreduce/io/node_table_reader.cpp
@@ -35,7 +35,7 @@ public:
void Finalize();
private:
- THolder<TNodeBuilder> Builder_;
+ std::unique_ptr<TNodeBuilder> Builder_;
TRowElement Row_;
int Depth_ = 0;
bool Started_ = false;
@@ -143,7 +143,7 @@ void TRowBuilder::SaveResultRow()
*ResultRow_ = std::move(Row_);
}
Row_.Reset();
- Builder_.Reset(new TNodeBuilder(&Row_.Node));
+ Builder_ = std::make_unique<TNodeBuilder>(&Row_.Node);
}
void TRowBuilder::Finalize()
@@ -346,8 +346,8 @@ bool TNodeTableReader::IsRawReaderExhausted() const
void TNodeTableReader::PrepareParsing()
{
NextRow_.Clear();
- Builder_.Reset(new TRowBuilder(&NextRow_));
- Parser_.Reset(new ::NYson::TYsonListParser(Builder_.Get(), &Input_));
+ Builder_ = std::make_unique<TRowBuilder>(&NextRow_);
+ Parser_ = std::make_unique<::NYson::TYsonListParser>(Builder_.get(), &Input_);
}
void TNodeTableReader::OnStreamError(std::exception_ptr exception, TString error)
diff --git a/yt/cpp/mapreduce/io/node_table_reader.h b/yt/cpp/mapreduce/io/node_table_reader.h
index 38cb440632..c8e319ce4f 100644
--- a/yt/cpp/mapreduce/io/node_table_reader.h
+++ b/yt/cpp/mapreduce/io/node_table_reader.h
@@ -79,8 +79,8 @@ private:
TMaybe<TRowElement> Row_;
TMaybe<TRowElement> NextRow_;
- THolder<TRowBuilder> Builder_;
- THolder<::NYson::TYsonListParser> Parser_;
+ std::unique_ptr<TRowBuilder> Builder_;
+ std::unique_ptr<::NYson::TYsonListParser> Parser_;
std::exception_ptr Exception_;
bool NeedParseFirst_ = true;
diff --git a/yt/cpp/mapreduce/io/node_table_writer.cpp b/yt/cpp/mapreduce/io/node_table_writer.cpp
index 916dec7ae4..c516c7b4ee 100644
--- a/yt/cpp/mapreduce/io/node_table_writer.cpp
+++ b/yt/cpp/mapreduce/io/node_table_writer.cpp
@@ -13,11 +13,11 @@ namespace NYT {
////////////////////////////////////////////////////////////////////////////////
TNodeTableWriter::TNodeTableWriter(THolder<IProxyOutput> output, NYson::EYsonFormat format)
- : Output_(std::move(output))
+ : Output_(output.Release())
{
for (size_t i = 0; i < Output_->GetStreamCount(); ++i) {
Writers_.push_back(
- MakeHolder<::NYson::TYsonWriter>(Output_->GetStream(i), format, NYT::NYson::EYsonType::ListFragment));
+ std::make_unique<::NYson::TYsonWriter>(Output_->GetStream(i), format, NYT::NYson::EYsonType::ListFragment));
}
}
@@ -54,7 +54,7 @@ void TNodeTableWriter::AddRow(const TNode& row, size_t tableIndex)
}
}
- auto* writer = Writers_[tableIndex].Get();
+ auto* writer = Writers_[tableIndex].get();
writer->OnListItem();
TNodeVisitor visitor(writer);
diff --git a/yt/cpp/mapreduce/io/node_table_writer.h b/yt/cpp/mapreduce/io/node_table_writer.h
index 155bec076d..0bc7a58557 100644
--- a/yt/cpp/mapreduce/io/node_table_writer.h
+++ b/yt/cpp/mapreduce/io/node_table_writer.h
@@ -25,8 +25,8 @@ public:
void Abort() override;
private:
- THolder<IProxyOutput> Output_;
- TVector<THolder<::NYson::TYsonWriter>> Writers_;
+ std::unique_ptr<IProxyOutput> Output_;
+ TVector<std::unique_ptr<::NYson::TYsonWriter>> Writers_;
};
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/cpp/mapreduce/io/proto_table_reader.h b/yt/cpp/mapreduce/io/proto_table_reader.h
index 8452545005..bfe4ac5647 100644
--- a/yt/cpp/mapreduce/io/proto_table_reader.h
+++ b/yt/cpp/mapreduce/io/proto_table_reader.h
@@ -33,7 +33,7 @@ public:
bool IsRawReaderExhausted() const override;
private:
- THolder<TNodeTableReader> NodeReader_;
+ std::unique_ptr<TNodeTableReader> NodeReader_;
TVector<const ::google::protobuf::Descriptor*> Descriptors_;
};
diff --git a/yt/cpp/mapreduce/io/proto_table_writer.cpp b/yt/cpp/mapreduce/io/proto_table_writer.cpp
index f56784eb60..ff0dbddc71 100644
--- a/yt/cpp/mapreduce/io/proto_table_writer.cpp
+++ b/yt/cpp/mapreduce/io/proto_table_writer.cpp
@@ -102,7 +102,7 @@ TNode MakeNodeFromMessage(const Message& row)
TProtoTableWriter::TProtoTableWriter(
THolder<IProxyOutput> output,
TVector<const Descriptor*>&& descriptors)
- : NodeWriter_(new TNodeTableWriter(std::move(output)))
+ : NodeWriter_(std::make_unique<TNodeTableWriter>(std::move(output)))
, Descriptors_(std::move(descriptors))
{ }
@@ -145,7 +145,7 @@ void TProtoTableWriter::Abort()
TLenvalProtoTableWriter::TLenvalProtoTableWriter(
THolder<IProxyOutput> output,
TVector<const Descriptor*>&& descriptors)
- : Output_(std::move(output))
+ : Output_(output.Release())
, Descriptors_(std::move(descriptors))
{ }
diff --git a/yt/cpp/mapreduce/io/proto_table_writer.h b/yt/cpp/mapreduce/io/proto_table_writer.h
index 336230f55f..8dae9aaf79 100644
--- a/yt/cpp/mapreduce/io/proto_table_writer.h
+++ b/yt/cpp/mapreduce/io/proto_table_writer.h
@@ -27,7 +27,7 @@ public:
void Abort() override;
private:
- THolder<TNodeTableWriter> NodeWriter_;
+ std::unique_ptr<TNodeTableWriter> NodeWriter_;
TVector<const ::google::protobuf::Descriptor*> Descriptors_;
};
@@ -51,7 +51,7 @@ public:
void Abort() override;
protected:
- THolder<IProxyOutput> Output_;
+ std::unique_ptr<IProxyOutput> Output_;
TVector<const ::google::protobuf::Descriptor*> Descriptors_;
};
diff --git a/yt/cpp/mapreduce/io/ut/yamr_table_reader_ut.cpp b/yt/cpp/mapreduce/io/ut/yamr_table_reader_ut.cpp
index fc20be017f..b45f47c64c 100644
--- a/yt/cpp/mapreduce/io/ut/yamr_table_reader_ut.cpp
+++ b/yt/cpp/mapreduce/io/ut/yamr_table_reader_ut.cpp
@@ -56,13 +56,13 @@ public:
TTestRawTableReader(const TRowCollection& rowCollection)
: RowCollection_(rowCollection)
, DataToRead_(RowCollection_.GetStreamDataStartFromRow(0))
- , Input_(MakeHolder<TStringStream>(DataToRead_))
+ , Input_(std::make_unique<TStringStream>(DataToRead_))
{ }
TTestRawTableReader(const TRowCollection& rowCollection, size_t breakPoint)
: RowCollection_(rowCollection)
, DataToRead_(RowCollection_.GetStreamDataStartFromRow(0).substr(0, breakPoint))
- , Input_(MakeHolder<TStringStream>(DataToRead_))
+ , Input_(std::make_unique<TStringStream>(DataToRead_))
, Broken_(true)
{ }
@@ -86,7 +86,7 @@ public:
}
ui64 actualRowIndex = rowIndex ? *rowIndex : 0;
DataToRead_ = RowCollection_.GetStreamDataStartFromRow(actualRowIndex);
- Input_ = MakeHolder<TStringInput>(DataToRead_);
+ Input_ = std::make_unique<TStringInput>(DataToRead_);
Broken_ = false;
return true;
}
@@ -102,7 +102,7 @@ public:
private:
TRowCollection RowCollection_;
TString DataToRead_;
- THolder<IInputStream> Input_;
+ std::unique_ptr<IInputStream> Input_;
bool Broken_ = false;
i32 Retries = 1;
};
diff --git a/yt/cpp/mapreduce/io/yamr_table_writer.cpp b/yt/cpp/mapreduce/io/yamr_table_writer.cpp
index fe31eb5543..dcfacb7541 100644
--- a/yt/cpp/mapreduce/io/yamr_table_writer.cpp
+++ b/yt/cpp/mapreduce/io/yamr_table_writer.cpp
@@ -7,7 +7,7 @@ namespace NYT {
////////////////////////////////////////////////////////////////////////////////
TYaMRTableWriter::TYaMRTableWriter(THolder<IProxyOutput> output)
- : Output_(std::move(output))
+ : Output_(output.Release())
{ }
TYaMRTableWriter::~TYaMRTableWriter()
diff --git a/yt/cpp/mapreduce/io/yamr_table_writer.h b/yt/cpp/mapreduce/io/yamr_table_writer.h
index 7f72c8005a..f976a35832 100644
--- a/yt/cpp/mapreduce/io/yamr_table_writer.h
+++ b/yt/cpp/mapreduce/io/yamr_table_writer.h
@@ -24,7 +24,7 @@ public:
void Abort() override;
private:
- THolder<IProxyOutput> Output_;
+ std::unique_ptr<IProxyOutput> Output_;
};
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/cpp/mapreduce/library/user_job_statistics/user_job_statistics.cpp b/yt/cpp/mapreduce/library/user_job_statistics/user_job_statistics.cpp
index 56ab88ee9d..7cc5a84e6a 100644
--- a/yt/cpp/mapreduce/library/user_job_statistics/user_job_statistics.cpp
+++ b/yt/cpp/mapreduce/library/user_job_statistics/user_job_statistics.cpp
@@ -42,8 +42,8 @@ void TUserJobStatsProxy::Init(IOutputStream * usingStream) {
if (usingStream == nullptr) {
TFileHandle fixedDesrc(JobStatisticsHandle);
- FetchedOut = MakeHolder<TFixedBufferFileOutput>(TFile(fixedDesrc.Duplicate()));
- UsingStream = FetchedOut.Get();
+ FetchedOut = std::make_unique<TFixedBufferFileOutput>(TFile(fixedDesrc.Duplicate()));
+ UsingStream = FetchedOut.get();
fixedDesrc.Release();
} else {
UsingStream = usingStream;
@@ -55,8 +55,8 @@ void TUserJobStatsProxy::InitChecked(IOutputStream* def) {
if (usingStream == nullptr && !GetEnv("YT_JOB_ID").empty()) {
TFileHandle fixedDesrc(JobStatisticsHandle);
- FetchedOut = MakeHolder<TFixedBufferFileOutput>(TFile(fixedDesrc.Duplicate()));
- UsingStream = FetchedOut.Get();
+ FetchedOut = std::make_unique<TFixedBufferFileOutput>(TFile(fixedDesrc.Duplicate()));
+ UsingStream = FetchedOut.get();
fixedDesrc.Release();
} else {
UsingStream = def;
@@ -89,7 +89,7 @@ void TUserJobStatsProxy::CommitStats() {
TTimeStatHolder TUserJobStatsProxy::TimerStart(TString name, bool commitOnFinish) {
- return THolder(new TTimeStat(this, name, commitOnFinish));
+ return std::unique_ptr<TTimeStat>(new TTimeStat(this, name, commitOnFinish));
}
void TUserJobStatsProxy::WriteStat(TString name, i64 val) {
diff --git a/yt/cpp/mapreduce/library/user_job_statistics/user_job_statistics.h b/yt/cpp/mapreduce/library/user_job_statistics/user_job_statistics.h
index 6939d20417..928c79864b 100644
--- a/yt/cpp/mapreduce/library/user_job_statistics/user_job_statistics.h
+++ b/yt/cpp/mapreduce/library/user_job_statistics/user_job_statistics.h
@@ -6,13 +6,13 @@
namespace NYtTools {
class TTimeStat;
- using TTimeStatHolder = THolder<TTimeStat>;
+ using TTimeStatHolder = std::unique_ptr<TTimeStat>;
class TUserJobStatsProxy {
public:
static const FHANDLE JobStatisticsHandle;
private:
- THolder<IOutputStream> FetchedOut;
+ std::unique_ptr<IOutputStream> FetchedOut;
IOutputStream* UsingStream = &Cerr;
public:
// TODO: add inheritance
diff --git a/yt/cpp/mapreduce/raw_client/raw_client.cpp b/yt/cpp/mapreduce/raw_client/raw_client.cpp
index 65bfa01cea..2868e10f6e 100644
--- a/yt/cpp/mapreduce/raw_client/raw_client.cpp
+++ b/yt/cpp/mapreduce/raw_client/raw_client.cpp
@@ -289,6 +289,18 @@ void THttpRawClient::CommitTransaction(
RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
}
+TOperationId THttpRawClient::StartOperation(
+ TMutationId& mutationId,
+ const TTransactionId& transactionId,
+ EOperationType type,
+ const TNode& spec)
+{
+ THttpHeader header("POST", "start_op");
+ header.AddMutationId();
+ header.MergeParameters(NRawClient::SerializeParamsForStartOperation(transactionId, type, spec));
+ return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse());
+}
+
TOperationAttributes THttpRawClient::GetOperation(
const TOperationId& operationId,
const TGetOperationOptions& options)
diff --git a/yt/cpp/mapreduce/raw_client/raw_client.h b/yt/cpp/mapreduce/raw_client/raw_client.h
index e540d1b331..1b3e274507 100644
--- a/yt/cpp/mapreduce/raw_client/raw_client.h
+++ b/yt/cpp/mapreduce/raw_client/raw_client.h
@@ -136,6 +136,12 @@ public:
// Operations
+ TOperationId StartOperation(
+ TMutationId& mutationId,
+ const TTransactionId& transactionId,
+ EOperationType type,
+ const TNode& spec) override;
+
TOperationAttributes GetOperation(
const TOperationId& operationId,
const TGetOperationOptions& options = {}) override;
diff --git a/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp
index 2869ddcc0f..98ef5ed099 100644
--- a/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp
+++ b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp
@@ -393,7 +393,21 @@ TNode SerializeParamsForListOperations(
return result;
}
-TNode SerializeParamsForGetOperation(const std::variant<TString, TOperationId>& aliasOrOperationId, const TGetOperationOptions& options)
+TNode SerializeParamsForStartOperation(
+ const TTransactionId& transactionId,
+ EOperationType type,
+ const TNode& spec)
+{
+ TNode result;
+ SetTransactionIdParam(&result, transactionId);
+ result["operation_type"] = ToString(type);
+ result["spec"] = spec;
+ return result;
+}
+
+TNode SerializeParamsForGetOperation(
+ const std::variant<TString, TOperationId>& aliasOrOperationId,
+ const TGetOperationOptions& options)
{
auto includeRuntime = options.IncludeRuntime_;
TNode result;
diff --git a/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h
index acbf003b5c..308dcfea64 100644
--- a/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h
+++ b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h
@@ -1,8 +1,10 @@
#pragma once
#include <yt/cpp/mapreduce/common/helpers.h>
-#include <yt/cpp/mapreduce/interface/fwd.h>
+
#include <yt/cpp/mapreduce/interface/client_method_options.h>
+#include <yt/cpp/mapreduce/interface/fwd.h>
+#include <yt/cpp/mapreduce/interface/operation.h>
namespace NYT::NDetail::NRawClient {
@@ -95,7 +97,14 @@ TNode SerializeParamsForConcatenate(
TNode SerializeParamsForPingTx(
const TTransactionId& transactionId);
-TNode SerializeParamsForGetOperation(const std::variant<TString, TOperationId>& aliasOrOperationId, const TGetOperationOptions& options);
+TNode SerializeParamsForStartOperation(
+ const TTransactionId& transactionId,
+ EOperationType type,
+ const TNode& spec);
+
+TNode SerializeParamsForGetOperation(
+ const std::variant<TString, TOperationId>& aliasOrOperationId,
+ const TGetOperationOptions& options);
TNode SerializeParamsForAbortOperation(
const TOperationId& operationId);