diff options
author | Alexander Smirnov <alex@ydb.tech> | 2024-12-25 19:03:15 +0000 |
---|---|---|
committer | Alexander Smirnov <alex@ydb.tech> | 2024-12-25 19:03:15 +0000 |
commit | c895fcc06300c50afa32558268c113bb8a5ee256 (patch) | |
tree | e26dba6120dc7d0fe4360f2ce3697ff7c63fee32 /yt | |
parent | e3d3da91e05265f579c3dc5a18faa83a926c3d39 (diff) | |
parent | 117eb6fe09e57cf15c95ea6236718f2f70a7105a (diff) | |
download | ydb-c895fcc06300c50afa32558268c113bb8a5ee256.tar.gz |
Merge pull request #12958 from ydb-platform/merge-libs-241224-2313
Diffstat (limited to 'yt')
58 files changed, 358 insertions, 209 deletions
diff --git a/yt/cpp/mapreduce/client/batch_request_impl.cpp b/yt/cpp/mapreduce/client/batch_request_impl.cpp index 08ffec2bfc..d8084e9c45 100644 --- a/yt/cpp/mapreduce/client/batch_request_impl.cpp +++ b/yt/cpp/mapreduce/client/batch_request_impl.cpp @@ -50,7 +50,7 @@ TBatchRequest::~TBatchRequest() = default; IBatchRequestBase& TBatchRequest::WithTransaction(const TTransactionId& transactionId) { if (!TmpWithTransaction_) { - TmpWithTransaction_.Reset(new TBatchRequest(Impl_.Get(), Client_)); + TmpWithTransaction_.reset(new TBatchRequest(Impl_.Get(), Client_)); } TmpWithTransaction_->DefaultTransaction_ = transactionId; return *TmpWithTransaction_; diff --git a/yt/cpp/mapreduce/client/batch_request_impl.h b/yt/cpp/mapreduce/client/batch_request_impl.h index 0a176417b3..a4d776668f 100644 --- a/yt/cpp/mapreduce/client/batch_request_impl.h +++ b/yt/cpp/mapreduce/client/batch_request_impl.h @@ -124,7 +124,7 @@ private: private: TTransactionId DefaultTransaction_; ::TIntrusivePtr<NDetail::NRawClient::TRawBatchRequest> Impl_; - THolder<TBatchRequest> TmpWithTransaction_; + std::unique_ptr<TBatchRequest> TmpWithTransaction_; ::TIntrusivePtr<TClient> Client_; private: diff --git a/yt/cpp/mapreduce/client/client.cpp b/yt/cpp/mapreduce/client/client.cpp index 9fcb82f5b7..7d73756759 100644 --- a/yt/cpp/mapreduce/client/client.cpp +++ b/yt/cpp/mapreduce/client/client.cpp @@ -940,7 +940,7 @@ TTransaction::TTransaction( : TClientBase(rawClient, context, parentTransactionId, parentClient->GetRetryPolicy()) , TransactionPinger_(parentClient->GetTransactionPinger()) , PingableTx_( - MakeHolder<TPingableTransaction>( + std::make_unique<TPingableTransaction>( rawClient, parentClient->GetRetryPolicy(), context, @@ -1434,7 +1434,7 @@ TYtPoller& TClient::GetYtPoller() // We don't use current client and create new client because YtPoller_ might use // this client during current client shutdown. // That might lead to incrementing of current client refcount and double delete of current client object. - YtPoller_ = MakeHolder<TYtPoller>(Context_, ClientRetryPolicy_); + YtPoller_ = std::make_unique<TYtPoller>(Context_, ClientRetryPolicy_); } return *YtPoller_; } diff --git a/yt/cpp/mapreduce/client/client.h b/yt/cpp/mapreduce/client/client.h index 32a0f9a97e..4ceb7f69b3 100644 --- a/yt/cpp/mapreduce/client/client.h +++ b/yt/cpp/mapreduce/client/client.h @@ -333,7 +333,7 @@ protected: private: ITransactionPingerPtr TransactionPinger_; - THolder<TPingableTransaction> PingableTx_; + std::unique_ptr<TPingableTransaction> PingableTx_; TClientPtr ParentClient_; }; @@ -502,7 +502,7 @@ private: std::atomic<bool> Shutdown_ = false; TMutex Lock_; - THolder<TYtPoller> YtPoller_; + std::unique_ptr<TYtPoller> YtPoller_; }; //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/cpp/mapreduce/client/client_reader.cpp b/yt/cpp/mapreduce/client/client_reader.cpp index e7538a22da..b144b70c02 100644 --- a/yt/cpp/mapreduce/client/client_reader.cpp +++ b/yt/cpp/mapreduce/client/client_reader.cpp @@ -58,7 +58,7 @@ TClientReader::TClientReader( { if (options.CreateTransaction_) { Y_ABORT_UNLESS(transactionPinger, "Internal error: transactionPinger is null"); - ReadTransaction_ = MakeHolder<TPingableTransaction>( + ReadTransaction_ = std::make_unique<TPingableTransaction>( RawClient_, ClientRetryPolicy_, Context_, diff --git a/yt/cpp/mapreduce/client/client_reader.h b/yt/cpp/mapreduce/client/client_reader.h index 3f73080046..cc78d2f3d3 100644 --- a/yt/cpp/mapreduce/client/client_reader.h +++ b/yt/cpp/mapreduce/client/client_reader.h @@ -51,7 +51,7 @@ private: TMaybe<TFormat> Format_; TTableReaderOptions Options_; - THolder<TPingableTransaction> ReadTransaction_; + std::unique_ptr<TPingableTransaction> ReadTransaction_; std::unique_ptr<IInputStream> Input_; diff --git a/yt/cpp/mapreduce/client/file_reader.cpp b/yt/cpp/mapreduce/client/file_reader.cpp index f88b40e38b..1682212f1d 100644 --- a/yt/cpp/mapreduce/client/file_reader.cpp +++ b/yt/cpp/mapreduce/client/file_reader.cpp @@ -47,7 +47,7 @@ TStreamReaderBase::TStreamReaderBase( const TTransactionId& transactionId) : RawClient_(rawClient) , ClientRetryPolicy_(std::move(clientRetryPolicy)) - , ReadTransaction_(MakeHolder<TPingableTransaction>( + , ReadTransaction_(std::make_unique<TPingableTransaction>( RawClient_, ClientRetryPolicy_, context, diff --git a/yt/cpp/mapreduce/client/file_reader.h b/yt/cpp/mapreduce/client/file_reader.h index 8aafdc860d..d3efe90f26 100644 --- a/yt/cpp/mapreduce/client/file_reader.h +++ b/yt/cpp/mapreduce/client/file_reader.h @@ -45,7 +45,7 @@ private: std::unique_ptr<IInputStream> Input_; - THolder<TPingableTransaction> ReadTransaction_; + std::unique_ptr<TPingableTransaction> ReadTransaction_; ui64 CurrentOffset_ = 0; }; diff --git a/yt/cpp/mapreduce/client/init.cpp b/yt/cpp/mapreduce/client/init.cpp index 6121952f86..78c92ee883 100644 --- a/yt/cpp/mapreduce/client/init.cpp +++ b/yt/cpp/mapreduce/client/init.cpp @@ -219,11 +219,11 @@ void ExecJob(int argc, const char** argv, const TInitializeOptions& options) NDetail::OutputTableCount = static_cast<i64>(outputTableCount); - THolder<IInputStream> jobStateStream; + std::unique_ptr<IInputStream> jobStateStream; if (hasState) { - jobStateStream = MakeHolder<TIFStream>("jobstate"); + jobStateStream = std::make_unique<TIFStream>("jobstate"); } else { - jobStateStream = MakeHolder<TBufferStream>(0); + jobStateStream = std::make_unique<TBufferStream>(0); } int ret = 1; diff --git a/yt/cpp/mapreduce/client/operation.cpp b/yt/cpp/mapreduce/client/operation.cpp index 98ed246e0e..9441c7efe2 100644 --- a/yt/cpp/mapreduce/client/operation.cpp +++ b/yt/cpp/mapreduce/client/operation.cpp @@ -969,9 +969,9 @@ void BuildIntermediateDataPart(const TSpec& spec, TNode* nodeSpec) TNode MergeSpec(TNode dst, TNode spec, const TOperationOptions& options) { - MergeNodes(dst["spec"], spec); + MergeNodes(dst, spec); if (options.Spec_) { - MergeNodes(dst["spec"], *options.Spec_); + MergeNodes(dst, *options.Spec_); } return dst; } @@ -1129,7 +1129,7 @@ void DoExecuteMap( spec.Title_ = spec.Title_.GetOrElse(AddModeToTitleIfDebug(map.GetClassName())); TNode specNode = BuildYsonNodeFluently() - .BeginMap().Item("spec").BeginMap() + .BeginMap() .Item("mapper").DoMap([&] (TFluentMap fluent) { BuildUserJobFluently( map, @@ -1148,18 +1148,18 @@ void DoExecuteMap( .DoIf(spec.Ordered_.Defined(), [&] (TFluentMap fluent) { fluent.Item("ordered").Value(spec.Ordered_.GetRef()); }) - .EndMap().EndMap(); + .EndMap(); - BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]); + BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode); - specNode["spec"]["job_io"]["control_attributes"]["enable_row_index"] = TNode(true); - specNode["spec"]["job_io"]["control_attributes"]["enable_range_index"] = TNode(true); + specNode["job_io"]["control_attributes"]["enable_row_index"] = TNode(true); + specNode["job_io"]["control_attributes"]["enable_range_index"] = TNode(true); if (!preparer->GetContext().Config->TableWriter.Empty()) { - specNode["spec"]["job_io"]["table_writer"] = preparer->GetContext().Config->TableWriter; + specNode["job_io"]["table_writer"] = preparer->GetContext().Config->TableWriter; } - BuildCommonUserOperationPart(spec, &specNode["spec"]); - BuildJobCountOperationPart(spec, &specNode["spec"]); + BuildCommonUserOperationPart(spec, &specNode); + BuildJobCountOperationPart(spec, &specNode); auto startOperation = [ operation=operation.Get(), @@ -1168,7 +1168,7 @@ void DoExecuteMap( operationIo, mapper ] () { - auto operationId = preparer->StartOperation(operation, "map", spec); + auto operationId = preparer->StartOperation(operation, EOperationType::Map, spec); LogJob(operationId, mapper.Get(), "mapper"); LogYPaths(operationId, operationIo.Inputs, "input"); @@ -1247,7 +1247,7 @@ void DoExecuteReduce( spec.Title_ = spec.Title_.GetOrElse(AddModeToTitleIfDebug(reduce.GetClassName())); TNode specNode = BuildYsonNodeFluently() - .BeginMap().Item("spec").BeginMap() + .BeginMap() .Item("reducer").DoMap([&] (TFluentMap fluent) { BuildUserJobFluently( reduce, @@ -1280,11 +1280,11 @@ void DoExecuteReduce( .DoIf(spec.AutoMerge_.Defined(), [&] (TFluentMap fluent) { fluent.Item("auto_merge").Value(BuildAutoMergeSpec(*spec.AutoMerge_)); }) - .EndMap().EndMap(); + .EndMap(); - BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]); - BuildCommonUserOperationPart(spec, &specNode["spec"]); - BuildJobCountOperationPart(spec, &specNode["spec"]); + BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode); + BuildCommonUserOperationPart(spec, &specNode); + BuildJobCountOperationPart(spec, &specNode); auto startOperation = [ operation=operation.Get(), @@ -1293,7 +1293,7 @@ void DoExecuteReduce( operationIo, reducer ] () { - auto operationId = preparer->StartOperation(operation, "reduce", spec); + auto operationId = preparer->StartOperation(operation, EOperationType::Reduce, spec); LogJob(operationId, reducer.Get(), "reducer"); LogYPaths(operationId, operationIo.Inputs, "input"); @@ -1373,7 +1373,7 @@ void DoExecuteJoinReduce( spec.Title_ = spec.Title_.GetOrElse(AddModeToTitleIfDebug(reduce.GetClassName())); TNode specNode = BuildYsonNodeFluently() - .BeginMap().Item("spec").BeginMap() + .BeginMap() .Item("reducer").DoMap([&] (TFluentMap fluent) { BuildUserJobFluently( reduce, @@ -1394,11 +1394,11 @@ void DoExecuteJoinReduce( fluent.Item("table_writer").Value(preparer->GetContext().Config->TableWriter); }) .EndMap() - .EndMap().EndMap(); + .EndMap(); - BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]); - BuildCommonUserOperationPart(spec, &specNode["spec"]); - BuildJobCountOperationPart(spec, &specNode["spec"]); + BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode); + BuildCommonUserOperationPart(spec, &specNode); + BuildJobCountOperationPart(spec, &specNode); auto startOperation = [ operation=operation.Get(), @@ -1407,7 +1407,7 @@ void DoExecuteJoinReduce( reducer, operationIo ] () { - auto operationId = preparer->StartOperation(operation, "join_reduce", spec); + auto operationId = preparer->StartOperation(operation, EOperationType::JoinReduce, spec); LogJob(operationId, reducer.Get(), "reducer"); LogYPaths(operationId, operationIo.Inputs, "input"); @@ -1505,7 +1505,7 @@ void DoExecuteMapReduce( TString title; TNode specNode = BuildYsonNodeFluently() - .BeginMap().Item("spec").BeginMap() + .BeginMap() .DoIf(hasMapper, [&] (TFluentMap fluent) { TJobPreparer map( *preparer, @@ -1584,18 +1584,18 @@ void DoExecuteMapReduce( .Do([&] (TFluentMap) { spec.Title_ = spec.Title_.GetOrElse(AddModeToTitleIfDebug(title + "reducer:" + reduce.GetClassName())); }) - .EndMap().EndMap(); + .EndMap(); if (spec.Ordered_) { - specNode["spec"]["ordered"] = *spec.Ordered_; + specNode["ordered"] = *spec.Ordered_; } - BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]); - BuildCommonUserOperationPart(spec, &specNode["spec"]); - BuildMapJobCountOperationPart(spec, &specNode["spec"]); - BuildPartitionCountOperationPart(spec, &specNode["spec"]); - BuildIntermediateDataPart(spec, &specNode["spec"]); - BuildDataSizePerSortJobPart(spec, &specNode["spec"]); + BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode); + BuildCommonUserOperationPart(spec, &specNode); + BuildMapJobCountOperationPart(spec, &specNode); + BuildPartitionCountOperationPart(spec, &specNode); + BuildIntermediateDataPart(spec, &specNode); + BuildDataSizePerSortJobPart(spec, &specNode); auto startOperation = [ operation=operation.Get(), @@ -1607,7 +1607,7 @@ void DoExecuteMapReduce( inputs=operationIo.Inputs, allOutputs ] () { - auto operationId = preparer->StartOperation(operation, "map_reduce", spec); + auto operationId = preparer->StartOperation(operation, EOperationType::MapReduce, spec); LogJob(operationId, mapper.Get(), "mapper"); LogJob(operationId, reduceCombiner.Get(), "reduce_combiner"); @@ -1962,19 +1962,19 @@ void ExecuteSort( } TNode specNode = BuildYsonNodeFluently() - .BeginMap().Item("spec").BeginMap() + .BeginMap() .Item("input_table_paths").List(inputs) .Item("output_table_path").Value(output) .Item("sort_by").Value(spec.SortBy_) .DoIf(spec.SchemaInferenceMode_.Defined(), [&] (TFluentMap fluent) { fluent.Item("schema_inference_mode").Value(ToString(*spec.SchemaInferenceMode_)); }) - .EndMap().EndMap(); + .EndMap(); - BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]); - BuildPartitionCountOperationPart(spec, &specNode["spec"]); - BuildPartitionJobCountOperationPart(spec, &specNode["spec"]); - BuildIntermediateDataPart(spec, &specNode["spec"]); + BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode); + BuildPartitionCountOperationPart(spec, &specNode); + BuildPartitionJobCountOperationPart(spec, &specNode); + BuildIntermediateDataPart(spec, &specNode); auto startOperation = [ operation=operation.Get(), @@ -1983,7 +1983,7 @@ void ExecuteSort( inputs, output ] () { - auto operationId = preparer->StartOperation(operation, "sort", spec); + auto operationId = preparer->StartOperation(operation, EOperationType::Sort, spec); LogYPaths(operationId, inputs, "input"); LogYPath(operationId, output, "output"); @@ -2011,7 +2011,7 @@ void ExecuteMerge( } TNode specNode = BuildYsonNodeFluently() - .BeginMap().Item("spec").BeginMap() + .BeginMap() .Item("input_table_paths").List(inputs) .Item("output_table_path").Value(output) .Item("mode").Value(ToString(spec.Mode_)) @@ -2021,10 +2021,10 @@ void ExecuteMerge( .DoIf(spec.SchemaInferenceMode_.Defined(), [&] (TFluentMap fluent) { fluent.Item("schema_inference_mode").Value(ToString(*spec.SchemaInferenceMode_)); }) - .EndMap().EndMap(); + .EndMap(); - BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]); - BuildJobCountOperationPart(spec, &specNode["spec"]); + BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode); + BuildJobCountOperationPart(spec, &specNode); auto startOperation = [ operation=operation.Get(), @@ -2033,7 +2033,7 @@ void ExecuteMerge( inputs, output ] () { - auto operationId = preparer->StartOperation(operation, "merge", spec); + auto operationId = preparer->StartOperation(operation, EOperationType::Merge, spec); LogYPaths(operationId, inputs, "input"); LogYPath(operationId, output, "output"); @@ -2055,22 +2055,22 @@ void ExecuteErase( auto tablePath = NRawClient::CanonizeYPath(nullptr, preparer->GetContext(), spec.TablePath_); TNode specNode = BuildYsonNodeFluently() - .BeginMap().Item("spec").BeginMap() + .BeginMap() .Item("table_path").Value(tablePath) .Item("combine_chunks").Value(spec.CombineChunks_) .DoIf(spec.SchemaInferenceMode_.Defined(), [&] (TFluentMap fluent) { fluent.Item("schema_inference_mode").Value(ToString(*spec.SchemaInferenceMode_)); }) - .EndMap().EndMap(); + .EndMap(); - BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]); + BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode); auto startOperation = [ operation=operation.Get(), spec=MergeSpec(std::move(specNode), preparer->GetContext().Config->Spec, options), preparer, tablePath ] () { - auto operationId = preparer->StartOperation(operation, "erase", spec); + auto operationId = preparer->StartOperation(operation, EOperationType::Erase, spec); LogYPath(operationId, tablePath, "table_path"); @@ -2098,7 +2098,7 @@ void ExecuteRemoteCopy( Y_ENSURE_EX(!spec.ClusterName_.empty(), TApiUsageError() << "ClusterName parameter is required"); TNode specNode = BuildYsonNodeFluently() - .BeginMap().Item("spec").BeginMap() + .BeginMap() .Item("cluster_name").Value(spec.ClusterName_) .Item("input_table_paths").List(inputs) .Item("output_table_path").Value(output) @@ -2115,9 +2115,9 @@ void ExecuteRemoteCopy( "doesn't make sense without CopyAttributes == true"); fluent.Item("attribute_keys").List(spec.AttributeKeys_); }) - .EndMap().EndMap(); + .EndMap(); - BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]); + BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode); auto startOperation = [ operation=operation.Get(), spec=MergeSpec(specNode, preparer->GetContext().Config->Spec, options), @@ -2125,7 +2125,7 @@ void ExecuteRemoteCopy( inputs, output ] () { - auto operationId = preparer->StartOperation(operation, "remote_copy", spec); + auto operationId = preparer->StartOperation(operation, EOperationType::RemoteCopy, spec); LogYPaths(operationId, inputs, "input"); LogYPath(operationId, output, "output"); @@ -2222,15 +2222,15 @@ void ExecuteVanilla( } TNode specNode = BuildYsonNodeFluently() - .BeginMap().Item("spec").BeginMap() + .BeginMap() .Item("tasks").DoMapFor(spec.Tasks_, addTask) - .EndMap().EndMap(); + .EndMap(); - BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]); - BuildCommonUserOperationPart(spec, &specNode["spec"]); + BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode); + BuildCommonUserOperationPart(spec, &specNode); auto startOperation = [operation=operation.Get(), spec=MergeSpec(std::move(specNode), preparer->GetContext().Config->Spec, options), preparer] () { - auto operationId = preparer->StartOperation(operation, "vanilla", spec, /* useStartOperationRequest */ true); + auto operationId = preparer->StartOperation(operation, EOperationType::Vanilla, spec); return operationId; }; @@ -2784,7 +2784,7 @@ void TOperation::TOperationImpl::AsyncFinishOperation(TOperationAttributes opera void* TOperation::TOperationImpl::SyncFinishOperationProc(void* pArgs) { - THolder<TAsyncFinishOperationsArgs> args(static_cast<TAsyncFinishOperationsArgs*>(pArgs)); + std::unique_ptr<TAsyncFinishOperationsArgs> args(static_cast<TAsyncFinishOperationsArgs*>(pArgs)); args->OperationImpl->SyncFinishOperationImpl(args->OperationAttributes); return nullptr; } @@ -3013,7 +3013,7 @@ struct TAsyncPrepareAndStartOperationArgs void* SyncPrepareAndStartOperation(void* pArgs) { - THolder<TAsyncPrepareAndStartOperationArgs> args(static_cast<TAsyncPrepareAndStartOperationArgs*>(pArgs)); + std::unique_ptr<TAsyncPrepareAndStartOperationArgs> args(static_cast<TAsyncPrepareAndStartOperationArgs*>(pArgs)); args->PrepareAndStart(); return nullptr; } diff --git a/yt/cpp/mapreduce/client/operation_preparer.cpp b/yt/cpp/mapreduce/client/operation_preparer.cpp index e8e4ee26d2..70bd5f8b65 100644 --- a/yt/cpp/mapreduce/client/operation_preparer.cpp +++ b/yt/cpp/mapreduce/client/operation_preparer.cpp @@ -37,7 +37,7 @@ class TWaitOperationStartPollerItem : public IYtPollerItem { public: - TWaitOperationStartPollerItem(TOperationId operationId, THolder<TPingableTransaction> transaction) + TWaitOperationStartPollerItem(TOperationId operationId, std::unique_ptr<TPingableTransaction> transaction) : OperationId_(operationId) , Transaction_(std::move(transaction)) { } @@ -78,7 +78,7 @@ public: private: TOperationId OperationId_; - THolder<TPingableTransaction> Transaction_; + std::unique_ptr<TPingableTransaction> Transaction_; ::NThreading::TFuture<TOperationAttributes> Future_; }; @@ -139,7 +139,7 @@ private: TOperationPreparer::TOperationPreparer(TClientPtr client, TTransactionId transactionId) : Client_(std::move(client)) , TransactionId_(transactionId) - , FileTransaction_(MakeHolder<TPingableTransaction>( + , FileTransaction_(std::make_unique<TPingableTransaction>( Client_->GetRawClient(), Client_->GetRetryPolicy(), Client_->GetContext(), @@ -177,35 +177,26 @@ const IClientRetryPolicyPtr& TOperationPreparer::GetClientRetryPolicy() const TOperationId TOperationPreparer::StartOperation( TOperation* operation, - const TString& operationType, - const TNode& spec, - bool useStartOperationRequest) + EOperationType type, + const TNode& spec) { CheckValidity(); - THttpHeader header("POST", (useStartOperationRequest ? "start_op" : operationType)); - if (useStartOperationRequest) { - header.AddParameter("operation_type", operationType); - } - header.AddTransactionId(TransactionId_); - header.AddMutationId(); - - auto ysonSpec = NodeToYsonString(spec); - auto responseInfo = RetryRequestWithPolicy( + auto operationId = RequestWithRetry<TOperationId>( ::MakeIntrusive<TOperationForwardingRequestRetryPolicy>( ClientRetryPolicy_->CreatePolicyForStartOperationRequest(), TOperationPtr(operation)), - GetContext(), - header, - ysonSpec); - TOperationId operationId = ParseGuidFromResponse(responseInfo.Response); + [this, &type, &spec] (TMutationId& mutationId) { + return Client_->GetRawClient()->StartOperation(mutationId, TransactionId_, type, spec); + }); + YT_LOG_DEBUG("Operation started (OperationId: %v; PreparationId: %v)", operationId, GetPreparationId()); YT_LOG_INFO("Operation %v started (%v): %v", operationId, - operationType, + type, GetOperationWebInterfaceUrl(GetContext().ServerName, operationId)); TOperationExecutionTimeTracker::Get()->Start(operationId); @@ -305,9 +296,9 @@ public: return result; } - THolder<IInputStream> CreateInputStream() const override + std::unique_ptr<IInputStream> CreateInputStream() const override { - return MakeHolder<TFileInput>(FileName_); + return std::make_unique<TFileInput>(FileName_); } TString GetDescription() const override @@ -343,9 +334,9 @@ public: return result; } - THolder<IInputStream> CreateInputStream() const override + std::unique_ptr<IInputStream> CreateInputStream() const override { - return MakeHolder<TMemoryInput>(Data_.data(), Data_.size()); + return std::make_unique<TMemoryInput>(Data_.data(), Data_.size()); } TString GetDescription() const override diff --git a/yt/cpp/mapreduce/client/operation_preparer.h b/yt/cpp/mapreduce/client/operation_preparer.h index 90fd91378c..ef3a790c91 100644 --- a/yt/cpp/mapreduce/client/operation_preparer.h +++ b/yt/cpp/mapreduce/client/operation_preparer.h @@ -28,16 +28,15 @@ public: TOperationId StartOperation( TOperation* operation, - const TString& operationType, - const TNode& spec, - bool useStartOperationRequest = false); + EOperationType type, + const TNode& spec); const IClientRetryPolicyPtr& GetClientRetryPolicy() const; private: TClientPtr Client_; TTransactionId TransactionId_; - THolder<TPingableTransaction> FileTransaction_; + std::unique_ptr<TPingableTransaction> FileTransaction_; IClientRetryPolicyPtr ClientRetryPolicy_; const TString PreparationId_; @@ -54,7 +53,7 @@ struct IItemToUpload virtual ~IItemToUpload() = default; virtual TString CalculateMD5() const = 0; - virtual THolder<IInputStream> CreateInputStream() const = 0; + virtual std::unique_ptr<IInputStream> CreateInputStream() const = 0; virtual TString GetDescription() const = 0; virtual i64 GetDataSize() const = 0; }; diff --git a/yt/cpp/mapreduce/client/retry_heavy_write_request.cpp b/yt/cpp/mapreduce/client/retry_heavy_write_request.cpp index 44c7db3a97..80f54d2340 100644 --- a/yt/cpp/mapreduce/client/retry_heavy_write_request.cpp +++ b/yt/cpp/mapreduce/client/retry_heavy_write_request.cpp @@ -31,7 +31,7 @@ void RetryHeavyWriteRequest( const TClientContext& context, const TTransactionId& parentId, THttpHeader& header, - std::function<THolder<IInputStream>()> streamMaker) + std::function<std::unique_ptr<IInputStream>()> streamMaker) { int retryCount = context.Config->RetryCount; if (context.ServiceTicketAuth) { @@ -63,7 +63,7 @@ void RetryHeavyWriteRequest( GetFullUrlForProxy(hostName, context, header), requestId, header); - TransferData(input.Get(), request->GetStream()); + TransferData(input.get(), request->GetStream()); request->Finish()->GetResponse(); } catch (TErrorResponse& e) { YT_LOG_ERROR("RSP %v - attempt %v failed", @@ -100,7 +100,7 @@ THeavyRequestRetrier::THeavyRequestRetrier(TParameters parameters) : Parameters_(std::move(parameters)) , RequestRetryPolicy_(Parameters_.ClientRetryPolicy->CreatePolicyForGenericRequest()) , StreamFactory_([] { - return MakeHolder<TNullInput>(); + return std::make_unique<TNullInput>(); }) { Retry([] { }); diff --git a/yt/cpp/mapreduce/client/retry_heavy_write_request.h b/yt/cpp/mapreduce/client/retry_heavy_write_request.h index 6933170e96..b65f457239 100644 --- a/yt/cpp/mapreduce/client/retry_heavy_write_request.h +++ b/yt/cpp/mapreduce/client/retry_heavy_write_request.h @@ -26,7 +26,7 @@ public: THttpHeader Header; }; - using TStreamFactory = std::function<THolder<IInputStream>()>; + using TStreamFactory = std::function<std::unique_ptr<IInputStream>()>; public: explicit THeavyRequestRetrier(TParameters parameters); @@ -65,7 +65,7 @@ void RetryHeavyWriteRequest( const TClientContext& context, const TTransactionId& parentId, THttpHeader& header, - std::function<THolder<IInputStream>()> streamMaker); + std::function<std::unique_ptr<IInputStream>()> streamMaker); //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/cpp/mapreduce/client/retryful_writer.cpp b/yt/cpp/mapreduce/client/retryful_writer.cpp index 41ad1298cd..666229d7a6 100644 --- a/yt/cpp/mapreduce/client/retryful_writer.cpp +++ b/yt/cpp/mapreduce/client/retryful_writer.cpp @@ -104,7 +104,7 @@ void TRetryfulWriter::Send(const TBuffer& buffer) header.MergeParameters(Parameters_); auto streamMaker = [&buffer] () { - return MakeHolder<TBufferInput>(buffer); + return std::make_unique<TBufferInput>(buffer); }; auto transactionId = (WriteTransaction_ ? WriteTransaction_->GetId() : ParentTransactionId_); diff --git a/yt/cpp/mapreduce/client/retryful_writer_v2.cpp b/yt/cpp/mapreduce/client/retryful_writer_v2.cpp index 40198baaef..c11f20ce3f 100644 --- a/yt/cpp/mapreduce/client/retryful_writer_v2.cpp +++ b/yt/cpp/mapreduce/client/retryful_writer_v2.cpp @@ -176,7 +176,7 @@ private: void ThreadMain(TRichYPath path, const THeavyRequestRetrier::TParameters& parameters) { - THolder<THeavyRequestRetrier> retrier; + std::unique_ptr<THeavyRequestRetrier> retrier; auto firstRequestParameters = parameters; auto restRequestParameters = parameters; @@ -218,14 +218,14 @@ private: try { if (!retrier) { - retrier = MakeHolder<THeavyRequestRetrier>(*currentParameters); + retrier = std::make_unique<THeavyRequestRetrier>(*currentParameters); } retrier->Update([task=task] { - return MakeHolder<TMemoryInput>(task.Data->data(), task.Size); + return std::make_unique<TMemoryInput>(task.Data->data(), task.Size); }); if (task.BufferComplete) { retrier->Finish(); - retrier.Reset(); + retrier.reset(); } } catch (const std::exception& ex) { task.SendingComplete.SetException(std::current_exception()); @@ -235,7 +235,7 @@ private: } if (task.BufferComplete) { - retrier.Reset(); + retrier.reset(); task.SendingComplete.SetValue(); currentParameters = &restRequestParameters; @@ -303,15 +303,15 @@ TRetryfulWriterV2::TRetryfulWriterV2( ssize_t bufferSize, bool createTransaction) : BufferSize_(bufferSize) - , Current_(MakeHolder<TSendTask>()) - , Previous_(MakeHolder<TSendTask>()) + , Current_(std::make_unique<TSendTask>()) + , Previous_(std::make_unique<TSendTask>()) { THttpHeader httpHeader("PUT", command); httpHeader.SetInputFormat(format); httpHeader.MergeParameters(serializedWriterOptions); if (createTransaction) { - WriteTransaction_ = MakeHolder<TPingableTransaction>( + WriteTransaction_ = std::make_unique<TPingableTransaction>( rawClient, clientRetryPolicy, context, @@ -337,7 +337,7 @@ TRetryfulWriterV2::TRetryfulWriterV2( .Header = std::move(httpHeader), }; - Sender_ = MakeHolder<TSender>(path, parameters); + Sender_ = std::make_unique<TSender>(path, parameters); DoStartBatch(); } diff --git a/yt/cpp/mapreduce/client/retryful_writer_v2.h b/yt/cpp/mapreduce/client/retryful_writer_v2.h index 661ef5d0b5..6d314307ec 100644 --- a/yt/cpp/mapreduce/client/retryful_writer_v2.h +++ b/yt/cpp/mapreduce/client/retryful_writer_v2.h @@ -49,11 +49,11 @@ private: const ssize_t BufferSize_; const ssize_t SendStep_ = 64_KB; ssize_t NextSizeToSend_; - THolder<TSender> Sender_; - THolder<TPingableTransaction> WriteTransaction_; + std::unique_ptr<TSender> Sender_; + std::unique_ptr<TPingableTransaction> WriteTransaction_; - THolder<TSendTask> Current_; - THolder<TSendTask> Previous_; + std::unique_ptr<TSendTask> Current_; + std::unique_ptr<TSendTask> Previous_; }; //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/cpp/mapreduce/client/retryless_writer.h b/yt/cpp/mapreduce/client/retryless_writer.h index 6916cddbf6..689c1c5f2f 100644 --- a/yt/cpp/mapreduce/client/retryless_writer.h +++ b/yt/cpp/mapreduce/client/retryless_writer.h @@ -56,7 +56,7 @@ public: auto hostName = GetProxyForHeavyRequest(context); UpdateHeaderForProxyIfNeed(hostName, context, header); Request_ = context.HttpClient->StartRequest(GetFullUrl(hostName, context, header), requestId, header); - BufferedOutput_.Reset(new TBufferedOutput(Request_->GetStream(), BufferSize_)); + BufferedOutput_ = std::make_unique<TBufferedOutput>(Request_->GetStream(), BufferSize_); } ~TRetrylessWriter() override; @@ -75,7 +75,7 @@ private: bool Running_ = true; NHttpClient::IHttpRequestPtr Request_; - THolder<TBufferedOutput> BufferedOutput_; + std::unique_ptr<TBufferedOutput> BufferedOutput_; }; //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/cpp/mapreduce/client/transaction_pinger.cpp b/yt/cpp/mapreduce/client/transaction_pinger.cpp index ea42867715..074249efce 100644 --- a/yt/cpp/mapreduce/client/transaction_pinger.cpp +++ b/yt/cpp/mapreduce/client/transaction_pinger.cpp @@ -228,7 +228,7 @@ public: PingableTx_ = &pingableTx; Running_ = true; - PingerThread_ = MakeHolder<TThread>( + PingerThread_ = std::make_unique<TThread>( TThread::TParams{Pinger, this}.SetName("pingable_tx")); PingerThread_->Start(); } @@ -284,7 +284,7 @@ private: const TPingableTransaction* PingableTx_ = nullptr; std::atomic<bool> Running_ = false; - THolder<TThread> PingerThread_; + std::unique_ptr<TThread> PingerThread_; }; //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/cpp/mapreduce/http/http.cpp b/yt/cpp/mapreduce/http/http.cpp index 51253feec2..765a96f042 100644 --- a/yt/cpp/mapreduce/http/http.cpp +++ b/yt/cpp/mapreduce/http/http.cpp @@ -545,7 +545,7 @@ TConnectionPtr TConnectionPool::Connect( TSocketHolder socket(DoConnect(networkAddress)); SetNonBlock(socket, false); - connection->Socket.Reset(new TSocket(socket.Release())); + connection->Socket = std::make_unique<TSocket>(socket.Release()); connection->DeadLine = TInstant::Now() + socketTimeout; connection->Socket->SetSocketTimeout(socketTimeout.Seconds()); @@ -754,7 +754,7 @@ private: THttpResponse::THttpResponse( TRequestContext context, IInputStream* socketStream) - : HttpInput_(MakeHolder<THttpInputWrapped>(context, socketStream)) + : HttpInput_(std::make_unique<THttpInputWrapped>(context, socketStream)) , Unframe_(HttpInput_->Headers().HasHeader("X-YT-Framing")) , Context_(std::move(context)) { @@ -935,7 +935,7 @@ bool THttpResponse::RefreshFrameIfNecessary() case EFrameType::KeepAlive: break; case EFrameType::Data: - RemainingFrameSize_ = ReadDataFrameSize(HttpInput_.Get()); + RemainingFrameSize_ = ReadDataFrameSize(HttpInput_.get()); break; default: ythrow yexception() << "Bad frame type " << static_cast<int>(frameTypeByte); @@ -1027,10 +1027,10 @@ IOutputStream* THttpRequest::StartRequestImpl(bool includeParameters) LogResponse_ = true; } - RequestStream_ = MakeHolder<TRequestStream>(this, *Connection_->Socket.Get()); + RequestStream_ = std::make_unique<TRequestStream>(this, *Connection_->Socket.get()); RequestStream_->Write(strHeader.data(), strHeader.size()); - return RequestStream_.Get(); + return RequestStream_.get(); } IOutputStream* THttpRequest::StartRequest() @@ -1064,16 +1064,16 @@ void THttpRequest::SmallRequest(TMaybe<TStringBuf> body) THttpResponse* THttpRequest::GetResponseStream() { if (!Input_) { - SocketInput_.Reset(new TSocketInput(*Connection_->Socket.Get())); + SocketInput_ = std::make_unique<TSocketInput>(*Connection_->Socket.get()); if (TConfig::Get()->UseAbortableResponse) { Y_ABORT_UNLESS(!Url_.empty()); - Input_.Reset(new TAbortableHttpResponse(Context_, SocketInput_.Get(), Url_)); + Input_ = std::make_unique<TAbortableHttpResponse>(Context_, SocketInput_.get(), Url_); } else { - Input_.Reset(new THttpResponse(Context_, SocketInput_.Get())); + Input_ = std::make_unique<THttpResponse>(Context_, SocketInput_.get()); } Input_->CheckErrorResponse(); } - return Input_.Get(); + return Input_.get(); } TString THttpRequest::GetResponse() diff --git a/yt/cpp/mapreduce/http/http.h b/yt/cpp/mapreduce/http/http.h index 0f5e9034ee..45cba99861 100644 --- a/yt/cpp/mapreduce/http/http.h +++ b/yt/cpp/mapreduce/http/http.h @@ -138,7 +138,7 @@ private: struct TConnection { - THolder<TSocket> Socket; + std::unique_ptr<TSocket> Socket; TAtomic Busy = 1; TInstant DeadLine; ui32 Id; @@ -206,7 +206,7 @@ private: class THttpInputWrapped; private: - THolder<THttpInputWrapped> HttpInput_; + std::unique_ptr<THttpInputWrapped> HttpInput_; const bool Unframe_; @@ -257,10 +257,10 @@ private: TConnectionPtr Connection_; - THolder<TRequestStream> RequestStream_; + std::unique_ptr<TRequestStream> RequestStream_; - THolder<TSocketInput> SocketInput_; - THolder<THttpResponse> Input_; + std::unique_ptr<TSocketInput> SocketInput_; + std::unique_ptr<THttpResponse> Input_; bool LogResponse_ = false; }; diff --git a/yt/cpp/mapreduce/http/ut/connection_pool_ut.cpp b/yt/cpp/mapreduce/http/ut/connection_pool_ut.cpp index fa072675fb..a8f6856aee 100644 --- a/yt/cpp/mapreduce/http/ut/connection_pool_ut.cpp +++ b/yt/cpp/mapreduce/http/ut/connection_pool_ut.cpp @@ -32,10 +32,10 @@ namespace { } } // namespace -THolder<TSimpleServer> CreateSimpleHttpServer() +std::unique_ptr<TSimpleServer> CreateSimpleHttpServer() { auto port = NTesting::GetFreePort(); - return MakeHolder<TSimpleServer>( + return std::make_unique<TSimpleServer>( port, [] (IInputStream* input, IOutputStream* output) { try { @@ -57,10 +57,10 @@ THolder<TSimpleServer> CreateSimpleHttpServer() }); } -THolder<TSimpleServer> CreateProxyHttpServer() +std::unique_ptr<TSimpleServer> CreateProxyHttpServer() { auto port = NTesting::GetFreePort(); - return MakeHolder<TSimpleServer>( + return std::make_unique<TSimpleServer>( port, [] (IInputStream* input, IOutputStream* output) { try { @@ -182,9 +182,9 @@ TEST(TConnectionPool, TestConcurrency) } }; - TVector<THolder<TFuncThread>> threads; + TVector<std::unique_ptr<TFuncThread>> threads; for (int i = 0; i != 10; ++i) { - threads.push_back(MakeHolder<TFuncThread>(func)); + threads.push_back(std::make_unique<TFuncThread>(func)); }; for (auto& t : threads) { diff --git a/yt/cpp/mapreduce/http/ut/http_ut.cpp b/yt/cpp/mapreduce/http/ut/http_ut.cpp index e41e83c5a0..2d382f6aa0 100644 --- a/yt/cpp/mapreduce/http/ut/http_ut.cpp +++ b/yt/cpp/mapreduce/http/ut/http_ut.cpp @@ -21,10 +21,10 @@ void WriteDataFrame(TStringBuf string, IOutputStream* stream) stream->Write(string); } -THolder<TSimpleServer> CreateFramingEchoServer() +std::unique_ptr<TSimpleServer> CreateFramingEchoServer() { auto port = NTesting::GetFreePort(); - return MakeHolder<TSimpleServer>( + return std::make_unique<TSimpleServer>( port, [] (IInputStream* input, IOutputStream* output) { try { diff --git a/yt/cpp/mapreduce/http/ut/simple_server.cpp b/yt/cpp/mapreduce/http/ut/simple_server.cpp index fbc369ec20..111010a83d 100644 --- a/yt/cpp/mapreduce/http/ut/simple_server.cpp +++ b/yt/cpp/mapreduce/http/ut/simple_server.cpp @@ -23,9 +23,9 @@ TSimpleServer::TSimpleServer(int port, TRequestHandler requestHandler) ret = listenSocket->Listen(10); Y_ENSURE_EX(ret == 0, TSystemError() << "Can not listen socket"); - SendFinishSocket_ = MakeHolder<TInetStreamSocket>(socketPair[1]); + SendFinishSocket_ = std::make_unique<TInetStreamSocket>(socketPair[1]); - ThreadPool_ = MakeHolder<TAdaptiveThreadPool>(); + ThreadPool_ = std::make_unique<TAdaptiveThreadPool>(); ThreadPool_->Start(1); auto receiveFinish = MakeAtomicShared<TInetStreamSocket>(socketPair[0]); @@ -76,7 +76,7 @@ void TSimpleServer::Stop() SendFinishSocket_->Send("X", 1); ListenerThread_->Join(); ThreadPool_->Stop(); - ThreadPool_.Destroy(); + ThreadPool_.reset(); } int TSimpleServer::GetPort() const diff --git a/yt/cpp/mapreduce/http/ut/simple_server.h b/yt/cpp/mapreduce/http/ut/simple_server.h index f468ca55a6..f9f7287126 100644 --- a/yt/cpp/mapreduce/http/ut/simple_server.h +++ b/yt/cpp/mapreduce/http/ut/simple_server.h @@ -29,7 +29,7 @@ public: private: const int Port_; - THolder<IThreadPool> ThreadPool_; + std::unique_ptr<IThreadPool> ThreadPool_; THolder<IThreadFactory::IThread> ListenerThread_; - THolder<TInetStreamSocket> SendFinishSocket_; + std::unique_ptr<TInetStreamSocket> SendFinishSocket_; }; diff --git a/yt/cpp/mapreduce/interface/io-inl.h b/yt/cpp/mapreduce/interface/io-inl.h index ffaf71f0a2..056910f785 100644 --- a/yt/cpp/mapreduce/interface/io-inl.h +++ b/yt/cpp/mapreduce/interface/io-inl.h @@ -350,11 +350,11 @@ public: return TBase::DoGetRowCached( /* cacher */ [&] { - CachedRow_.Reset(new U); - Reader_->ReadRow(CachedRow_.Get()); + CachedRow_.reset(new U); + Reader_->ReadRow(CachedRow_.get()); }, /* cacheGetter */ [&] { - auto result = dynamic_cast<const U*>(CachedRow_.Get()); + auto result = dynamic_cast<const U*>(CachedRow_.get()); Y_ABORT_UNLESS(result); return result; }); @@ -371,7 +371,7 @@ public: Reader_->ReadRow(result); }, /* cacheMover */ [&] (U* result) { - auto cast = dynamic_cast<U*>(CachedRow_.Get()); + auto cast = dynamic_cast<U*>(CachedRow_.get()); Y_ABORT_UNLESS(cast); result->Swap(cast); }); @@ -394,7 +394,7 @@ public: private: using TBase::Reader_; - mutable THolder<Message> CachedRow_; + mutable std::unique_ptr<Message> CachedRow_; }; template<class... TProtoRowTypes> diff --git a/yt/cpp/mapreduce/interface/raw_client.h b/yt/cpp/mapreduce/interface/raw_client.h index 4994826863..ff2dafeb6d 100644 --- a/yt/cpp/mapreduce/interface/raw_client.h +++ b/yt/cpp/mapreduce/interface/raw_client.h @@ -139,6 +139,12 @@ public: // Operations + virtual TOperationId StartOperation( + TMutationId& mutationId, + const TTransactionId& transactionId, + EOperationType type, + const TNode& spec) = 0; + virtual TOperationAttributes GetOperation( const TOperationId& operationId, const TGetOperationOptions& options = {}) = 0; diff --git a/yt/cpp/mapreduce/io/node_table_reader.cpp b/yt/cpp/mapreduce/io/node_table_reader.cpp index bc3da75ee6..558c42b30e 100644 --- a/yt/cpp/mapreduce/io/node_table_reader.cpp +++ b/yt/cpp/mapreduce/io/node_table_reader.cpp @@ -35,7 +35,7 @@ public: void Finalize(); private: - THolder<TNodeBuilder> Builder_; + std::unique_ptr<TNodeBuilder> Builder_; TRowElement Row_; int Depth_ = 0; bool Started_ = false; @@ -143,7 +143,7 @@ void TRowBuilder::SaveResultRow() *ResultRow_ = std::move(Row_); } Row_.Reset(); - Builder_.Reset(new TNodeBuilder(&Row_.Node)); + Builder_ = std::make_unique<TNodeBuilder>(&Row_.Node); } void TRowBuilder::Finalize() @@ -346,8 +346,8 @@ bool TNodeTableReader::IsRawReaderExhausted() const void TNodeTableReader::PrepareParsing() { NextRow_.Clear(); - Builder_.Reset(new TRowBuilder(&NextRow_)); - Parser_.Reset(new ::NYson::TYsonListParser(Builder_.Get(), &Input_)); + Builder_ = std::make_unique<TRowBuilder>(&NextRow_); + Parser_ = std::make_unique<::NYson::TYsonListParser>(Builder_.get(), &Input_); } void TNodeTableReader::OnStreamError(std::exception_ptr exception, TString error) diff --git a/yt/cpp/mapreduce/io/node_table_reader.h b/yt/cpp/mapreduce/io/node_table_reader.h index 38cb440632..c8e319ce4f 100644 --- a/yt/cpp/mapreduce/io/node_table_reader.h +++ b/yt/cpp/mapreduce/io/node_table_reader.h @@ -79,8 +79,8 @@ private: TMaybe<TRowElement> Row_; TMaybe<TRowElement> NextRow_; - THolder<TRowBuilder> Builder_; - THolder<::NYson::TYsonListParser> Parser_; + std::unique_ptr<TRowBuilder> Builder_; + std::unique_ptr<::NYson::TYsonListParser> Parser_; std::exception_ptr Exception_; bool NeedParseFirst_ = true; diff --git a/yt/cpp/mapreduce/io/node_table_writer.cpp b/yt/cpp/mapreduce/io/node_table_writer.cpp index 916dec7ae4..c516c7b4ee 100644 --- a/yt/cpp/mapreduce/io/node_table_writer.cpp +++ b/yt/cpp/mapreduce/io/node_table_writer.cpp @@ -13,11 +13,11 @@ namespace NYT { //////////////////////////////////////////////////////////////////////////////// TNodeTableWriter::TNodeTableWriter(THolder<IProxyOutput> output, NYson::EYsonFormat format) - : Output_(std::move(output)) + : Output_(output.Release()) { for (size_t i = 0; i < Output_->GetStreamCount(); ++i) { Writers_.push_back( - MakeHolder<::NYson::TYsonWriter>(Output_->GetStream(i), format, NYT::NYson::EYsonType::ListFragment)); + std::make_unique<::NYson::TYsonWriter>(Output_->GetStream(i), format, NYT::NYson::EYsonType::ListFragment)); } } @@ -54,7 +54,7 @@ void TNodeTableWriter::AddRow(const TNode& row, size_t tableIndex) } } - auto* writer = Writers_[tableIndex].Get(); + auto* writer = Writers_[tableIndex].get(); writer->OnListItem(); TNodeVisitor visitor(writer); diff --git a/yt/cpp/mapreduce/io/node_table_writer.h b/yt/cpp/mapreduce/io/node_table_writer.h index 155bec076d..0bc7a58557 100644 --- a/yt/cpp/mapreduce/io/node_table_writer.h +++ b/yt/cpp/mapreduce/io/node_table_writer.h @@ -25,8 +25,8 @@ public: void Abort() override; private: - THolder<IProxyOutput> Output_; - TVector<THolder<::NYson::TYsonWriter>> Writers_; + std::unique_ptr<IProxyOutput> Output_; + TVector<std::unique_ptr<::NYson::TYsonWriter>> Writers_; }; //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/cpp/mapreduce/io/proto_table_reader.h b/yt/cpp/mapreduce/io/proto_table_reader.h index 8452545005..bfe4ac5647 100644 --- a/yt/cpp/mapreduce/io/proto_table_reader.h +++ b/yt/cpp/mapreduce/io/proto_table_reader.h @@ -33,7 +33,7 @@ public: bool IsRawReaderExhausted() const override; private: - THolder<TNodeTableReader> NodeReader_; + std::unique_ptr<TNodeTableReader> NodeReader_; TVector<const ::google::protobuf::Descriptor*> Descriptors_; }; diff --git a/yt/cpp/mapreduce/io/proto_table_writer.cpp b/yt/cpp/mapreduce/io/proto_table_writer.cpp index f56784eb60..ff0dbddc71 100644 --- a/yt/cpp/mapreduce/io/proto_table_writer.cpp +++ b/yt/cpp/mapreduce/io/proto_table_writer.cpp @@ -102,7 +102,7 @@ TNode MakeNodeFromMessage(const Message& row) TProtoTableWriter::TProtoTableWriter( THolder<IProxyOutput> output, TVector<const Descriptor*>&& descriptors) - : NodeWriter_(new TNodeTableWriter(std::move(output))) + : NodeWriter_(std::make_unique<TNodeTableWriter>(std::move(output))) , Descriptors_(std::move(descriptors)) { } @@ -145,7 +145,7 @@ void TProtoTableWriter::Abort() TLenvalProtoTableWriter::TLenvalProtoTableWriter( THolder<IProxyOutput> output, TVector<const Descriptor*>&& descriptors) - : Output_(std::move(output)) + : Output_(output.Release()) , Descriptors_(std::move(descriptors)) { } diff --git a/yt/cpp/mapreduce/io/proto_table_writer.h b/yt/cpp/mapreduce/io/proto_table_writer.h index 336230f55f..8dae9aaf79 100644 --- a/yt/cpp/mapreduce/io/proto_table_writer.h +++ b/yt/cpp/mapreduce/io/proto_table_writer.h @@ -27,7 +27,7 @@ public: void Abort() override; private: - THolder<TNodeTableWriter> NodeWriter_; + std::unique_ptr<TNodeTableWriter> NodeWriter_; TVector<const ::google::protobuf::Descriptor*> Descriptors_; }; @@ -51,7 +51,7 @@ public: void Abort() override; protected: - THolder<IProxyOutput> Output_; + std::unique_ptr<IProxyOutput> Output_; TVector<const ::google::protobuf::Descriptor*> Descriptors_; }; diff --git a/yt/cpp/mapreduce/io/ut/yamr_table_reader_ut.cpp b/yt/cpp/mapreduce/io/ut/yamr_table_reader_ut.cpp index fc20be017f..b45f47c64c 100644 --- a/yt/cpp/mapreduce/io/ut/yamr_table_reader_ut.cpp +++ b/yt/cpp/mapreduce/io/ut/yamr_table_reader_ut.cpp @@ -56,13 +56,13 @@ public: TTestRawTableReader(const TRowCollection& rowCollection) : RowCollection_(rowCollection) , DataToRead_(RowCollection_.GetStreamDataStartFromRow(0)) - , Input_(MakeHolder<TStringStream>(DataToRead_)) + , Input_(std::make_unique<TStringStream>(DataToRead_)) { } TTestRawTableReader(const TRowCollection& rowCollection, size_t breakPoint) : RowCollection_(rowCollection) , DataToRead_(RowCollection_.GetStreamDataStartFromRow(0).substr(0, breakPoint)) - , Input_(MakeHolder<TStringStream>(DataToRead_)) + , Input_(std::make_unique<TStringStream>(DataToRead_)) , Broken_(true) { } @@ -86,7 +86,7 @@ public: } ui64 actualRowIndex = rowIndex ? *rowIndex : 0; DataToRead_ = RowCollection_.GetStreamDataStartFromRow(actualRowIndex); - Input_ = MakeHolder<TStringInput>(DataToRead_); + Input_ = std::make_unique<TStringInput>(DataToRead_); Broken_ = false; return true; } @@ -102,7 +102,7 @@ public: private: TRowCollection RowCollection_; TString DataToRead_; - THolder<IInputStream> Input_; + std::unique_ptr<IInputStream> Input_; bool Broken_ = false; i32 Retries = 1; }; diff --git a/yt/cpp/mapreduce/io/yamr_table_writer.cpp b/yt/cpp/mapreduce/io/yamr_table_writer.cpp index fe31eb5543..dcfacb7541 100644 --- a/yt/cpp/mapreduce/io/yamr_table_writer.cpp +++ b/yt/cpp/mapreduce/io/yamr_table_writer.cpp @@ -7,7 +7,7 @@ namespace NYT { //////////////////////////////////////////////////////////////////////////////// TYaMRTableWriter::TYaMRTableWriter(THolder<IProxyOutput> output) - : Output_(std::move(output)) + : Output_(output.Release()) { } TYaMRTableWriter::~TYaMRTableWriter() diff --git a/yt/cpp/mapreduce/io/yamr_table_writer.h b/yt/cpp/mapreduce/io/yamr_table_writer.h index 7f72c8005a..f976a35832 100644 --- a/yt/cpp/mapreduce/io/yamr_table_writer.h +++ b/yt/cpp/mapreduce/io/yamr_table_writer.h @@ -24,7 +24,7 @@ public: void Abort() override; private: - THolder<IProxyOutput> Output_; + std::unique_ptr<IProxyOutput> Output_; }; //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/cpp/mapreduce/library/user_job_statistics/user_job_statistics.cpp b/yt/cpp/mapreduce/library/user_job_statistics/user_job_statistics.cpp index 56ab88ee9d..7cc5a84e6a 100644 --- a/yt/cpp/mapreduce/library/user_job_statistics/user_job_statistics.cpp +++ b/yt/cpp/mapreduce/library/user_job_statistics/user_job_statistics.cpp @@ -42,8 +42,8 @@ void TUserJobStatsProxy::Init(IOutputStream * usingStream) { if (usingStream == nullptr) { TFileHandle fixedDesrc(JobStatisticsHandle); - FetchedOut = MakeHolder<TFixedBufferFileOutput>(TFile(fixedDesrc.Duplicate())); - UsingStream = FetchedOut.Get(); + FetchedOut = std::make_unique<TFixedBufferFileOutput>(TFile(fixedDesrc.Duplicate())); + UsingStream = FetchedOut.get(); fixedDesrc.Release(); } else { UsingStream = usingStream; @@ -55,8 +55,8 @@ void TUserJobStatsProxy::InitChecked(IOutputStream* def) { if (usingStream == nullptr && !GetEnv("YT_JOB_ID").empty()) { TFileHandle fixedDesrc(JobStatisticsHandle); - FetchedOut = MakeHolder<TFixedBufferFileOutput>(TFile(fixedDesrc.Duplicate())); - UsingStream = FetchedOut.Get(); + FetchedOut = std::make_unique<TFixedBufferFileOutput>(TFile(fixedDesrc.Duplicate())); + UsingStream = FetchedOut.get(); fixedDesrc.Release(); } else { UsingStream = def; @@ -89,7 +89,7 @@ void TUserJobStatsProxy::CommitStats() { TTimeStatHolder TUserJobStatsProxy::TimerStart(TString name, bool commitOnFinish) { - return THolder(new TTimeStat(this, name, commitOnFinish)); + return std::unique_ptr<TTimeStat>(new TTimeStat(this, name, commitOnFinish)); } void TUserJobStatsProxy::WriteStat(TString name, i64 val) { diff --git a/yt/cpp/mapreduce/library/user_job_statistics/user_job_statistics.h b/yt/cpp/mapreduce/library/user_job_statistics/user_job_statistics.h index 6939d20417..928c79864b 100644 --- a/yt/cpp/mapreduce/library/user_job_statistics/user_job_statistics.h +++ b/yt/cpp/mapreduce/library/user_job_statistics/user_job_statistics.h @@ -6,13 +6,13 @@ namespace NYtTools { class TTimeStat; - using TTimeStatHolder = THolder<TTimeStat>; + using TTimeStatHolder = std::unique_ptr<TTimeStat>; class TUserJobStatsProxy { public: static const FHANDLE JobStatisticsHandle; private: - THolder<IOutputStream> FetchedOut; + std::unique_ptr<IOutputStream> FetchedOut; IOutputStream* UsingStream = &Cerr; public: // TODO: add inheritance diff --git a/yt/cpp/mapreduce/raw_client/raw_client.cpp b/yt/cpp/mapreduce/raw_client/raw_client.cpp index 65bfa01cea..2868e10f6e 100644 --- a/yt/cpp/mapreduce/raw_client/raw_client.cpp +++ b/yt/cpp/mapreduce/raw_client/raw_client.cpp @@ -289,6 +289,18 @@ void THttpRawClient::CommitTransaction( RequestWithoutRetry(Context_, mutationId, header)->GetResponse(); } +TOperationId THttpRawClient::StartOperation( + TMutationId& mutationId, + const TTransactionId& transactionId, + EOperationType type, + const TNode& spec) +{ + THttpHeader header("POST", "start_op"); + header.AddMutationId(); + header.MergeParameters(NRawClient::SerializeParamsForStartOperation(transactionId, type, spec)); + return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse()); +} + TOperationAttributes THttpRawClient::GetOperation( const TOperationId& operationId, const TGetOperationOptions& options) diff --git a/yt/cpp/mapreduce/raw_client/raw_client.h b/yt/cpp/mapreduce/raw_client/raw_client.h index e540d1b331..1b3e274507 100644 --- a/yt/cpp/mapreduce/raw_client/raw_client.h +++ b/yt/cpp/mapreduce/raw_client/raw_client.h @@ -136,6 +136,12 @@ public: // Operations + TOperationId StartOperation( + TMutationId& mutationId, + const TTransactionId& transactionId, + EOperationType type, + const TNode& spec) override; + TOperationAttributes GetOperation( const TOperationId& operationId, const TGetOperationOptions& options = {}) override; diff --git a/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp index 2869ddcc0f..98ef5ed099 100644 --- a/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp +++ b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp @@ -393,7 +393,21 @@ TNode SerializeParamsForListOperations( return result; } -TNode SerializeParamsForGetOperation(const std::variant<TString, TOperationId>& aliasOrOperationId, const TGetOperationOptions& options) +TNode SerializeParamsForStartOperation( + const TTransactionId& transactionId, + EOperationType type, + const TNode& spec) +{ + TNode result; + SetTransactionIdParam(&result, transactionId); + result["operation_type"] = ToString(type); + result["spec"] = spec; + return result; +} + +TNode SerializeParamsForGetOperation( + const std::variant<TString, TOperationId>& aliasOrOperationId, + const TGetOperationOptions& options) { auto includeRuntime = options.IncludeRuntime_; TNode result; diff --git a/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h index acbf003b5c..308dcfea64 100644 --- a/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h +++ b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h @@ -1,8 +1,10 @@ #pragma once #include <yt/cpp/mapreduce/common/helpers.h> -#include <yt/cpp/mapreduce/interface/fwd.h> + #include <yt/cpp/mapreduce/interface/client_method_options.h> +#include <yt/cpp/mapreduce/interface/fwd.h> +#include <yt/cpp/mapreduce/interface/operation.h> namespace NYT::NDetail::NRawClient { @@ -95,7 +97,14 @@ TNode SerializeParamsForConcatenate( TNode SerializeParamsForPingTx( const TTransactionId& transactionId); -TNode SerializeParamsForGetOperation(const std::variant<TString, TOperationId>& aliasOrOperationId, const TGetOperationOptions& options); +TNode SerializeParamsForStartOperation( + const TTransactionId& transactionId, + EOperationType type, + const TNode& spec); + +TNode SerializeParamsForGetOperation( + const std::variant<TString, TOperationId>& aliasOrOperationId, + const TGetOperationOptions& options); TNode SerializeParamsForAbortOperation( const TOperationId& operationId); diff --git a/yt/yql/providers/yt/codec/ya.make b/yt/yql/providers/yt/codec/ya.make index 036561f455..0bbae8552b 100644 --- a/yt/yql/providers/yt/codec/ya.make +++ b/yt/yql/providers/yt/codec/ya.make @@ -18,6 +18,7 @@ PEERDIR( yt/cpp/mapreduce/interface yt/cpp/mapreduce/io contrib/libs/apache/arrow + yql/essentials/core yql/essentials/minikql yql/essentials/minikql/computation yql/essentials/public/udf diff --git a/yt/yql/providers/yt/codec/yt_codec.cpp b/yt/yql/providers/yt/codec/yt_codec.cpp index a4811f12b4..86381e1920 100644 --- a/yt/yql/providers/yt/codec/yt_codec.cpp +++ b/yt/yql/providers/yt/codec/yt_codec.cpp @@ -10,6 +10,7 @@ #include <yql/essentials/minikql/mkql_node_builder.h> #include <yql/essentials/minikql/mkql_string_util.h> #include <yql/essentials/utils/yql_panic.h> +#include <yql/essentials/core/yql_type_annotation.h> #include <library/cpp/yson/node/node_io.h> @@ -411,8 +412,9 @@ void TMkqlIOSpecs::InitInput(NCommon::TCodecContext& codecCtx, bool useCommonColumns = true; THashMap<TString, ui32> structColumns; if (columns.Defined()) { + TColumnOrder order(*columns); for (size_t i = 0; i < columns->size(); ++i) { - structColumns.insert({columns->at(i), (ui32)i}); + structColumns.insert({order.at(i).PhysicalName, (ui32)i}); } } else if (itemType && InputGroups.empty()) { diff --git a/yt/yql/providers/yt/common/yql_names.h b/yt/yql/providers/yt/common/yql_names.h index e8ca0c8222..4d35a04c15 100644 --- a/yt/yql/providers/yt/common/yql_names.h +++ b/yt/yql/providers/yt/common/yql_names.h @@ -22,6 +22,7 @@ const TStringBuf RowSpecAttrUseNativeYtTypes = "UseNativeYtTypes"; const TStringBuf RowSpecAttrNativeYtTypeFlags = "NativeYtTypeFlags"; const TStringBuf RowSpecAttrExplicitYson = "ExplicitYson"; const TStringBuf RowSpecAttrConstraints = "Constraints"; +const TStringBuf RowSpecAttrColumnOrder = "ColumnOrder"; const TStringBuf YqlReadUdfAttribute = "_yql_read_udf"; const TStringBuf YqlReadUdfTypeConfigAttribute = "_yql_read_udf_type_config"; diff --git a/yt/yql/providers/yt/gateway/lib/yt_helpers.cpp b/yt/yql/providers/yt/gateway/lib/yt_helpers.cpp index 494bce164e..6ad53ec6d9 100644 --- a/yt/yql/providers/yt/gateway/lib/yt_helpers.cpp +++ b/yt/yql/providers/yt/gateway/lib/yt_helpers.cpp @@ -354,7 +354,7 @@ static bool IterateRows(NYT::ITransactionPtr tx, if (!YAMRED_DSV && exec.GetColumns()) { if (!specsCache.GetSpecs().Inputs[tableIndex]->OthersStructIndex) { - path.Columns(*exec.GetColumns()); + path.Columns(TColumnOrder(*exec.GetColumns()).GetPhysicalNames()); } } diff --git a/yt/yql/providers/yt/lib/row_spec/yql_row_spec.cpp b/yt/yql/providers/yt/lib/row_spec/yql_row_spec.cpp index 10ee1b54ad..e23bc48c80 100644 --- a/yt/yql/providers/yt/lib/row_spec/yql_row_spec.cpp +++ b/yt/yql/providers/yt/lib/row_spec/yql_row_spec.cpp @@ -161,6 +161,7 @@ bool TYqlRowSpecInfo::Parse(const NYT::TNode& rowSpecAttr, TExprContext& ctx, co if (!ParseType(rowSpecAttr, ctx, pos) || !ParseSort(rowSpecAttr, ctx, pos)) { return false; } + ParseColumnOrder(rowSpecAttr); ParseFlags(rowSpecAttr); ParseDefValues(rowSpecAttr); ParseConstraints(rowSpecAttr); @@ -279,6 +280,7 @@ bool TYqlRowSpecInfo::ParsePatched(const NYT::TNode& rowSpecAttr, const THashMap return false; } + ParseColumnOrder(rowSpecAttr); ParseFlags(rowSpecAttr); ParseDefValues(rowSpecAttr); ParseConstraints(rowSpecAttr); @@ -290,6 +292,7 @@ bool TYqlRowSpecInfo::ParseFull(const NYT::TNode& rowSpecAttr, const THashMap<TS if (!ParseType(rowSpecAttr, ctx, pos) || !ParseSort(rowSpecAttr, ctx, pos)) { return false; } + ParseColumnOrder(rowSpecAttr); ParseFlags(rowSpecAttr); ParseDefValues(rowSpecAttr); ParseConstraints(rowSpecAttr); @@ -400,15 +403,34 @@ bool TYqlRowSpecInfo::Parse(const THashMap<TString, TString>& attrs, TExprContex return Validate(ctx, pos); } +void TYqlRowSpecInfo::ParseColumnOrder(const NYT::TNode& rowSpecAttr) { + if (rowSpecAttr.HasKey(RowSpecAttrColumnOrder)) { + TColumnOrder columns; + auto columnOrderAttr = rowSpecAttr[RowSpecAttrColumnOrder]; + if (!columnOrderAttr.IsList()) { + YQL_LOG_CTX_THROW yexception() << "Row spec has invalid column order"; + } + for (const auto& name: columnOrderAttr.AsList()) { + if (!name.IsString()) { + YQL_LOG_CTX_THROW yexception() << "Row spec has invalid column order"; + } + columns.AddColumn(name.AsString()); + } + Columns = columns; + } +} + bool TYqlRowSpecInfo::ParseType(const NYT::TNode& rowSpecAttr, TExprContext& ctx, const TPositionHandle& pos) { if (!rowSpecAttr.HasKey(RowSpecAttrType)) { YQL_LOG_CTX_THROW yexception() << "Row spec doesn't have mandatory Type attribute"; } TColumnOrder columns; + auto type = NCommon::ParseOrderAwareTypeFromYson(rowSpecAttr[RowSpecAttrType], columns, ctx, ctx.GetPosition(pos)); if (!type) { return false; } + if (type->GetKind() != ETypeAnnotationKind::Struct) { YQL_LOG_CTX_THROW yexception() << "Row spec defines not a struct type"; } @@ -694,6 +716,19 @@ bool TYqlRowSpecInfo::Validate(const TExprNode& node, TExprContext& ctx, const T return false; } type = rawType->Cast<TStructExprType>(); + } else if (name->Content() == RowSpecAttrColumnOrder) { + if (!EnsureTuple(*value, ctx)) { + return false; + } + TColumnOrder order; + for (const TExprNode::TPtr& item: value->Children()) { + if (!EnsureAtom(*item, ctx)) { + return false; + } + order.AddColumn(TString(item->Content())); + } + columnOrder = order; + } else if (name->Content() == RowSpecAttrSortedBy) { if (!EnsureTuple(*value, ctx)) { return false; @@ -783,6 +818,18 @@ bool TYqlRowSpecInfo::Validate(const TExprNode& node, TExprContext& ctx, const T << " option is mandatory for " << TYqlRowSpec::CallableName())); return false; } + if (columnOrder) { + if (columnOrder->Size() != type->GetSize()) { + ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder() << "Column order size " << columnOrder->Size() + << " != " << type->GetSize() << " (type size)")); + } + for (auto& [_, name]: *columnOrder) { + if (!type->FindItem(name)) { + ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder() << "Column " << name.Quote() + << " from column order isn't present in type")); + } + } + } if (sortedBy.size() != sortDirectionsCount) { ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder() << TString{RowSpecAttrSortDirections}.Quote() << " should have the same size as " << TString{RowSpecAttrSortedBy}.Quote())); @@ -911,6 +958,13 @@ void TYqlRowSpecInfo::Parse(NNodes::TExprBase node, bool withTypes) { } Type = node.Ref().GetTypeAnn()->Cast<TStructExprType>(); } + } else if (setting.Name().Value() == RowSpecAttrColumnOrder) { + auto& val = setting.Value().Cast().Ref(); + TColumnOrder order; + for (const TExprNode::TPtr& item: val.Children()) { + order.AddColumn(TString(item->Content())); + } + Columns = order; } else if (setting.Name().Value() == RowSpecAttrConstraints) { ConstraintsNode = NYT::NodeFromYsonString(setting.Value().Cast().Ref().Content()); } else if (setting.Name().Value() == RowSpecAttrSortedBy) { @@ -989,6 +1043,17 @@ void TYqlRowSpecInfo::SetColumnOrder(const TMaybe<TColumnOrder>& columns) { Columns = columns; } +void TYqlRowSpecInfo::FillColumnOrder(NYT::TNode& attrs) const { + if (!Columns || !Columns->HasDuplicates()) { + return; + } + NYT::TNode order = NYT::TNode::CreateList(); + for (const auto &name: Columns->GetLogicalNames()) { + order.Add(name); + } + attrs[RowSpecAttrColumnOrder] = order; +} + TString TYqlRowSpecInfo::ToYsonString() const { NYT::TNode attrs = NYT::TNode::CreateMap(); FillCodecNode(attrs[YqlRowSpecAttribute]); @@ -1023,14 +1088,14 @@ void TYqlRowSpecInfo::CopyTypeOrders(const NYT::TNode& typeNode) { if (!StrictSchema && name == YqlOthersColumnName) { continue; } - auto origType = Type->FindItemType(name); + auto origType = Type->FindItemType(gen_name); YQL_ENSURE(origType); auto origTypeNode = NCommon::TypeToYsonNode(origType); - auto it = fromMembers.find(name); + auto it = fromMembers.find(gen_name); if (it == fromMembers.end() || !NCommon::EqualsYsonTypesIgnoreStructOrder(origTypeNode, it->second)) { - members.Add(NYT::TNode::CreateList().Add(name).Add(origTypeNode)); + members.Add(NYT::TNode::CreateList().Add(gen_name).Add(origTypeNode)); } else { - members.Add(NYT::TNode::CreateList().Add(name).Add(it->second)); + members.Add(NYT::TNode::CreateList().Add(gen_name).Add(it->second)); } } @@ -1098,6 +1163,7 @@ void TYqlRowSpecInfo::FillSort(NYT::TNode& attrs, const NCommon::TStructMemberMa } } } + if (!curSortedBy->empty()) { attrs[RowSpecAttrUniqueKeys] = curUniqueKeys; } @@ -1191,6 +1257,7 @@ void TYqlRowSpecInfo::FillCodecNode(NYT::TNode& attrs, const NCommon::TStructMem FillDefValues(attrs, mapper); FillFlags(attrs); FillExplicitYson(attrs, mapper); + FillColumnOrder(attrs); } void TYqlRowSpecInfo::FillAttrNode(NYT::TNode& attrs, ui64 nativeTypeCompatibility, bool useCompactForm) const { @@ -1230,6 +1297,7 @@ void TYqlRowSpecInfo::FillAttrNode(NYT::TNode& attrs, ui64 nativeTypeCompatibili if (!useCompactForm || HasAuxColumns() || AnyOf(SortedBy, [&patchedFields](const auto& name) { return patchedFields.contains(name); } )) { FillSort(attrs); } + FillColumnOrder(attrs); FillDefValues(attrs); FillFlags(attrs); FillConstraints(attrs); @@ -1327,6 +1395,9 @@ NNodes::TExprBase TYqlRowSpecInfo::ToExprNode(TExprContext& ctx, const TPosition saveColumnList(RowSpecAttrSortMembers, SortMembers); saveColumnList(RowSpecAttrSortedBy, SortedBy); + if (Columns && Columns->HasDuplicates()) { + saveColumnList(RowSpecAttrColumnOrder, Columns->GetLogicalNames()); + } if (!SortedByTypes.empty()) { auto listBuilder = Build<TExprList>(ctx, pos); diff --git a/yt/yql/providers/yt/lib/row_spec/yql_row_spec.h b/yt/yql/providers/yt/lib/row_spec/yql_row_spec.h index 049b67d2a6..3cbf5ba90a 100644 --- a/yt/yql/providers/yt/lib/row_spec/yql_row_spec.h +++ b/yt/yql/providers/yt/lib/row_spec/yql_row_spec.h @@ -50,6 +50,7 @@ struct TYqlRowSpecInfo: public TThrRefBase { TString ToYsonString() const; void FillCodecNode(NYT::TNode& attrs, const NCommon::TStructMemberMapper& mapper = {}) const; void FillAttrNode(NYT::TNode& attrs, ui64 nativeTypeCompatibility, bool useCompactForm) const; + void FillColumnOrder(NYT::TNode& attr) const; NNodes::TExprBase ToExprNode(TExprContext& ctx, const TPositionHandle& pos) const; bool IsSorted() const { @@ -129,6 +130,7 @@ private: bool ParsePatched(const NYT::TNode& rowSpecAttr, const THashMap<TString, TString>& attrs, TExprContext& ctx, const TPositionHandle& pos); bool ParseFull(const NYT::TNode& rowSpecAttr, const THashMap<TString, TString>& attrs, TExprContext& ctx, const TPositionHandle& pos); bool ParseType(const NYT::TNode& rowSpecAttr, TExprContext& ctx, const TPositionHandle& pos); + void ParseColumnOrder(const NYT::TNode& rowSpecAttr); bool ParseSort(const NYT::TNode& rowSpecAttr, TExprContext& ctx, const TPositionHandle& pos); void ParseFlags(const NYT::TNode& rowSpecAttr); void ParseConstraints(const NYT::TNode& rowSpecAttr); diff --git a/yt/yql/providers/yt/provider/yql_yt_dq_integration.cpp b/yt/yql/providers/yt/provider/yql_yt_dq_integration.cpp index b8cfc3c087..b8891b1208 100644 --- a/yt/yql/providers/yt/provider/yql_yt_dq_integration.cpp +++ b/yt/yql/providers/yt/provider/yql_yt_dq_integration.cpp @@ -825,7 +825,7 @@ public: YQL_CLOG(INFO, ProviderYt) << "DQ annotate: adding yt.write=" << param; } - bool PrepareFullResultTableParams(const TExprNode& root, TExprContext& ctx, THashMap<TString, TString>& params, THashMap<TString, TString>& secureParams) override { + bool PrepareFullResultTableParams(const TExprNode& root, TExprContext& ctx, THashMap<TString, TString>& params, THashMap<TString, TString>& secureParams, const TMaybe<TColumnOrder>& order) override { const auto resOrPull = TResOrPullBase(&root); if (FromString<bool>(resOrPull.Discard().Value())) { @@ -853,8 +853,9 @@ public: } const auto type = GetSequenceItemType(input->Pos(), input->GetTypeAnn(), false, ctx); + YQL_ENSURE(type); - TYtOutTableInfo outTableInfo(type->Cast<TStructExprType>(), State_->Configuration->UseNativeYtTypes.Get().GetOrElse(DEFAULT_USE_NATIVE_YT_TYPES) ? NTCF_ALL : NTCF_NONE); + TYtOutTableInfo outTableInfo(type->Cast<TStructExprType>(), State_->Configuration->UseNativeYtTypes.Get().GetOrElse(DEFAULT_USE_NATIVE_YT_TYPES) ? NTCF_ALL : NTCF_NONE, order); const auto res = State_->Gateway->PrepareFullResultTable( IYtGateway::TFullResultTableOptions(State_->SessionId) diff --git a/yt/yql/providers/yt/provider/yql_yt_table.cpp b/yt/yql/providers/yt/provider/yql_yt_table.cpp index d793aee0a1..8be885b84c 100644 --- a/yt/yql/providers/yt/provider/yql_yt_table.cpp +++ b/yt/yql/providers/yt/provider/yql_yt_table.cpp @@ -900,9 +900,10 @@ bool TYtTableInfo::HasSubstAnonymousLabel(NNodes::TExprBase node) { ///////////////////////////////////////////////////////////////////////////////////////////////////////// -TYtOutTableInfo::TYtOutTableInfo(const TStructExprType* type, ui64 nativeYtTypeFlags) { +TYtOutTableInfo::TYtOutTableInfo(const TStructExprType* type, ui64 nativeYtTypeFlags, const TMaybe<TColumnOrder>& columnOrder) { RowSpec = MakeIntrusive<TYqlRowSpecInfo>(); RowSpec->SetType(type, nativeYtTypeFlags); + RowSpec->SetColumnOrder(columnOrder); Meta = MakeIntrusive<TYtTableMetaInfo>(); Meta->CanWrite = true; diff --git a/yt/yql/providers/yt/provider/yql_yt_table.h b/yt/yql/providers/yt/provider/yql_yt_table.h index 9675ce7c74..bae060b89d 100644 --- a/yt/yql/providers/yt/provider/yql_yt_table.h +++ b/yt/yql/providers/yt/provider/yql_yt_table.h @@ -157,7 +157,7 @@ struct TYtOutTableInfo: public TYtTableBaseInfo { TYtOutTableInfo() { IsTemp = true; } - TYtOutTableInfo(const TStructExprType* type, ui64 nativeYtTypeFlags); + TYtOutTableInfo(const TStructExprType* type, ui64 nativeYtTypeFlags, const TMaybe<TColumnOrder>& columnOrder = {}); TYtOutTableInfo(NNodes::TExprBase node) { Parse(node); IsTemp = true; diff --git a/yt/yt/core/concurrency/invoker_alarm.h b/yt/yt/core/concurrency/invoker_alarm.h index fa51d4e89f..690ef0431b 100644 --- a/yt/yt/core/concurrency/invoker_alarm.h +++ b/yt/yt/core/concurrency/invoker_alarm.h @@ -23,7 +23,7 @@ namespace NYT::NConcurrency { * if so, the callback is invoked synchronously (and its scheduled invocation becomes a no-op). * * \note - * Thread-affininty: single-threaded (moreover, all methods must be called within the invoker). + * Thread-affinity: single-threaded (moreover, all methods must be called within the invoker). */ class TInvokerAlarm : public TRefCounted diff --git a/yt/yt/core/misc/concurrent_cache-inl.h b/yt/yt/core/misc/concurrent_cache-inl.h index 614b5ef329..9419262ce4 100644 --- a/yt/yt/core/misc/concurrent_cache-inl.h +++ b/yt/yt/core/misc/concurrent_cache-inl.h @@ -17,12 +17,14 @@ struct TConcurrentCache<T>::TLookupTable final static constexpr bool EnableHazard = true; const size_t Capacity; + const TMemoryUsageTrackerGuard MemoryUsageGuard; std::atomic<size_t> Size = 0; TAtomicPtr<TLookupTable> Next; - explicit TLookupTable(size_t capacity) + TLookupTable(size_t capacity, IMemoryUsageTrackerPtr memoryUsageTracker) : THashTable(capacity) , Capacity(capacity) + , MemoryUsageGuard(TMemoryUsageTrackerGuard::Acquire(std::move(memoryUsageTracker), THashTable::GetByteSize())) { } typename THashTable::TItemRef Insert(TValuePtr item) @@ -46,7 +48,7 @@ TConcurrentCache<T>::RenewTable(const TIntrusivePtr<TLookupTable>& head, size_t } // Rotate lookup table. - auto newHead = New<TLookupTable>(capacity); + auto newHead = New<TLookupTable>(capacity, MemoryUsageTracker_); newHead->Next = head; if (Head_.SwapIfCompare(head, newHead)) { @@ -63,9 +65,10 @@ TConcurrentCache<T>::RenewTable(const TIntrusivePtr<TLookupTable>& head, size_t } template <class T> -TConcurrentCache<T>::TConcurrentCache(size_t capacity) +TConcurrentCache<T>::TConcurrentCache(size_t capacity, IMemoryUsageTrackerPtr tracker) : Capacity_(capacity) - , Head_(New<TLookupTable>(capacity)) + , Head_(New<TLookupTable>(capacity, tracker)) + , MemoryUsageTracker_(tracker) { YT_VERIFY(capacity > 0); } diff --git a/yt/yt/core/misc/concurrent_cache.h b/yt/yt/core/misc/concurrent_cache.h index 7f0f52054e..ae5672b967 100644 --- a/yt/yt/core/misc/concurrent_cache.h +++ b/yt/yt/core/misc/concurrent_cache.h @@ -3,6 +3,7 @@ #include "public.h" #include "atomic_ptr.h" #include "lock_free_hash_table.h" +#include "memory_usage_tracker.h" namespace NYT { @@ -21,7 +22,7 @@ private: public: using TValuePtr = TIntrusivePtr<T>; - explicit TConcurrentCache(size_t maxElementCount); + explicit TConcurrentCache(size_t maxElementCount, IMemoryUsageTrackerPtr tracker = nullptr); ~TConcurrentCache(); @@ -96,6 +97,7 @@ public: private: std::atomic<size_t> Capacity_; TAtomicPtr<TLookupTable> Head_; + IMemoryUsageTrackerPtr MemoryUsageTracker_; }; diff --git a/yt/yt/core/misc/configurable_singleton_def.cpp b/yt/yt/core/misc/configurable_singleton_def.cpp index acad95481d..515a107d14 100644 --- a/yt/yt/core/misc/configurable_singleton_def.cpp +++ b/yt/yt/core/misc/configurable_singleton_def.cpp @@ -37,10 +37,10 @@ public: THROW_ERROR_EXCEPTION("Singletons have already been configured"); } - Config_ = config; + Config_ = CloneYsonStruct(config); for (const auto& [name, traits] : Singletons()) { - const auto& field = GetOrCrash(config->NameToConfig_, name); + const auto& field = GetOrCrash(Config_->NameToConfig_, name); traits.Configure(field); } } @@ -53,15 +53,29 @@ public: THROW_ERROR_EXCEPTION("Singletons are not configured yet"); } + DynamicConfig_ = CloneYsonStruct(dynamicConfig); + for (const auto& [name, traits] : Singletons()) { if (const auto& reconfigure = traits.Reconfigure) { const auto& singletonConfig = GetOrCrash(Config_->NameToConfig_, name); - const auto& singletonDynamicConfig = GetOrCrash(dynamicConfig->NameToConfig_, name); + const auto& singletonDynamicConfig = GetOrCrash(DynamicConfig_->NameToConfig_, name); reconfigure(singletonConfig, singletonDynamicConfig); } } } + TSingletonsConfigPtr GetConfig() + { + auto guard = Guard(ConfigureLock_); + return Config_; + } + + TSingletonsDynamicConfigPtr GetDynamicConfig() + { + auto guard = Guard(ConfigureLock_); + return DynamicConfig_; + } + using TSingletonMap = THashMap<std::string, TSingletonTraits>; const TSingletonMap& Singletons() const @@ -79,6 +93,7 @@ private: NThreading::TSpinLock ConfigureLock_; TSingletonsConfigPtr Config_; + TSingletonsDynamicConfigPtr DynamicConfig_; bool Configured_ = false; }; @@ -128,6 +143,16 @@ void TSingletonManager::Reconfigure(const TSingletonsDynamicConfigPtr& dynamicCo NDetail::TSingletonManagerImpl::Get()->Reconfigure(dynamicConfig); } +TSingletonsConfigPtr TSingletonManager::GetConfig() +{ + return NDetail::TSingletonManagerImpl::Get()->GetConfig(); +} + +TSingletonsDynamicConfigPtr TSingletonManager::GetDynamicConfig() +{ + return NDetail::TSingletonManagerImpl::Get()->GetDynamicConfig(); +} + //////////////////////////////////////////////////////////////////////////////// void TSingletonsConfig::Register(TRegistrar registrar) diff --git a/yt/yt/core/misc/configurable_singleton_def.h b/yt/yt/core/misc/configurable_singleton_def.h index 684d50e314..89ed29fcd8 100644 --- a/yt/yt/core/misc/configurable_singleton_def.h +++ b/yt/yt/core/misc/configurable_singleton_def.h @@ -82,6 +82,9 @@ class TSingletonManager public: static void Configure(const TSingletonsConfigPtr& config); static void Reconfigure(const TSingletonsDynamicConfigPtr& dynamicConfig); + + static TSingletonsConfigPtr GetConfig(); + static TSingletonsDynamicConfigPtr GetDynamicConfig(); }; //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/tracing/trace_context.h b/yt/yt/core/tracing/trace_context.h index 694e99cd4e..e4e5dcf1d0 100644 --- a/yt/yt/core/tracing/trace_context.h +++ b/yt/yt/core/tracing/trace_context.h @@ -70,7 +70,7 @@ DEFINE_ENUM(ETraceContextState, * * By default, child objects inherit TraceId, RequestId and LoggingTag from the parent. * - * \note Thread affininty: any unless noted otherwise. + * \note Thread affinity: any unless noted otherwise. */ class TTraceContext : public TRefCounted |