#include "raw_client.h" #include "raw_requests.h" #include "rpc_parameters_serialization.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace NYT::NDetail { //////////////////////////////////////////////////////////////////////////////// namespace { void CheckError(const TString& requestId, NHttp::IResponsePtr response) { if (const auto* ytError = response->GetHeaders()->Find("X-YT-Error")) { TYtError error; error.ParseFrom(*ytError); TErrorResponse errorResponse(std::move(error), requestId); if (errorResponse.IsOk()) { return; } if (TExpectedErrorGuard::IsErrorExpected(errorResponse)) { YT_LOG_INFO("Received expected error, RSP %v - HTTP %v - %v", requestId, response->GetStatusCode(), errorResponse.AsStrBuf()); } else { YT_LOG_ERROR("RSP %v - HTTP %v - %v", requestId, response->GetStatusCode(), errorResponse.AsStrBuf()); } ythrow errorResponse; } } void SetMutationId(TNode& params, TMutationId& mutationId) { if (mutationId.IsEmpty()) { params["retry"] = false; mutationId = GenerateMutationId(); } else { params["retry"] = true; } params["mutation_id"] = GetGuidAsString(mutationId); } } // anonymous namespace //////////////////////////////////////////////////////////////////////////////// THttpRawClient::THttpRawClient(const TClientContext& context) : Context_(context) { } TNode THttpRawClient::Get( const TTransactionId& transactionId, const TYPath& path, const TGetOptions& options) { TMutationId mutationId; THttpHeader header("GET", "get"); header.MergeParameters(NRawClient::SerializeParamsForGet(transactionId, Context_.Config->Prefix, path, options)); return NodeFromYsonString(RequestWithoutRetry(Context_, mutationId, header)->GetResponse()); } TNode THttpRawClient::TryGet( const TTransactionId& transactionId, const TYPath& path, const TGetOptions& options) { try { return Get(transactionId, path, options); } catch (const TErrorResponse& error) { if (!error.IsResolveError()) { throw; } return {}; } } void THttpRawClient::Set( TMutationId& mutationId, const TTransactionId& transactionId, const TYPath& path, const TNode& value, const TSetOptions& options) { THttpHeader header("PUT", "set"); header.AddMutationId(); header.MergeParameters(NRawClient::SerializeParamsForSet(transactionId, Context_.Config->Prefix, path, options)); auto body = NodeToYsonString(value); RequestWithoutRetry(Context_, mutationId, header, body)->GetResponse(); } bool THttpRawClient::Exists( const TTransactionId& transactionId, const TYPath& path, const TExistsOptions& options) { TMutationId mutationId; THttpHeader header("GET", "exists"); header.MergeParameters(NRawClient::SerializeParamsForExists(transactionId, Context_.Config->Prefix, path, options)); return ParseBoolFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse()); } void THttpRawClient::MultisetAttributes( TMutationId& mutationId, const TTransactionId& transactionId, const TYPath& path, const TNode::TMapType& value, const TMultisetAttributesOptions& options) { THttpHeader header("PUT", "api/v4/multiset_attributes", false); header.AddMutationId(); header.MergeParameters(NRawClient::SerializeParamsForMultisetAttributes(transactionId, Context_.Config->Prefix, path, options)); auto body = NodeToYsonString(value); RequestWithoutRetry(Context_, mutationId, header, body)->GetResponse(); } TNodeId THttpRawClient::Create( TMutationId& mutationId, const TTransactionId& transactionId, const TYPath& path, const ENodeType& type, const TCreateOptions& options) { THttpHeader header("POST", "create"); header.AddMutationId(); header.MergeParameters(NRawClient::SerializeParamsForCreate(transactionId, Context_.Config->Prefix, path, type, options)); return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse()); } TNodeId THttpRawClient::CopyWithoutRetries( const TTransactionId& transactionId, const TYPath& sourcePath, const TYPath& destinationPath, const TCopyOptions& options) { TMutationId mutationId; THttpHeader header("POST", "copy"); header.AddMutationId(); header.MergeParameters(NRawClient::SerializeParamsForCopy(transactionId, Context_.Config->Prefix, sourcePath, destinationPath, options)); return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse()); } TNodeId THttpRawClient::CopyInsideMasterCell( TMutationId& mutationId, const TTransactionId& transactionId, const TYPath& sourcePath, const TYPath& destinationPath, const TCopyOptions& options) { THttpHeader header("POST", "copy"); header.AddMutationId(); auto params = NRawClient::SerializeParamsForCopy(transactionId, Context_.Config->Prefix, sourcePath, destinationPath, options); // Make cross cell copying disable. params["enable_cross_cell_copying"] = false; header.MergeParameters(params); return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse()); } TNodeId THttpRawClient::MoveWithoutRetries( const TTransactionId& transactionId, const TYPath& sourcePath, const TYPath& destinationPath, const TMoveOptions& options) { TMutationId mutationId; THttpHeader header("POST", "move"); header.AddMutationId(); header.MergeParameters(NRawClient::SerializeParamsForMove(transactionId, Context_.Config->Prefix, sourcePath, destinationPath, options)); return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse()); } TNodeId THttpRawClient::MoveInsideMasterCell( TMutationId& mutationId, const TTransactionId& transactionId, const TYPath& sourcePath, const TYPath& destinationPath, const TMoveOptions& options) { THttpHeader header("POST", "move"); header.AddMutationId(); auto params = NRawClient::SerializeParamsForMove(transactionId, Context_.Config->Prefix, sourcePath, destinationPath, options); // Make cross cell copying disable. params["enable_cross_cell_copying"] = false; header.MergeParameters(params); return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse()); } void THttpRawClient::Remove( TMutationId& mutationId, const TTransactionId& transactionId, const TYPath& path, const TRemoveOptions& options) { THttpHeader header("POST", "remove"); header.AddMutationId(); header.MergeParameters(NRawClient::SerializeParamsForRemove(transactionId, Context_.Config->Prefix, path, options)); RequestWithoutRetry(Context_, mutationId, header)->GetResponse(); } TNode::TListType THttpRawClient::List( const TTransactionId& transactionId, const TYPath& path, const TListOptions& options) { TMutationId mutationId; THttpHeader header("GET", "list"); TYPath updatedPath = AddPathPrefix(path, Context_.Config->Prefix); // Translate "//" to "/" // Translate "//some/constom/prefix/from/config/" to "//some/constom/prefix/from/config" if (path.empty() && updatedPath.EndsWith('/')) { updatedPath.pop_back(); } header.MergeParameters(NRawClient::SerializeParamsForList(transactionId, Context_.Config->Prefix, updatedPath, options)); auto responseInfo = RequestWithoutRetry(Context_, mutationId, header); return NodeFromYsonString(responseInfo->GetResponse()).AsList(); } TNodeId THttpRawClient::Link( TMutationId& mutationId, const TTransactionId& transactionId, const TYPath& targetPath, const TYPath& linkPath, const TLinkOptions& options) { THttpHeader header("POST", "link"); header.AddMutationId(); header.MergeParameters(NRawClient::SerializeParamsForLink(transactionId, Context_.Config->Prefix, targetPath, linkPath, options)); return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse()); } TLockId THttpRawClient::Lock( TMutationId& mutationId, const TTransactionId& transactionId, const TYPath& path, ELockMode mode, const TLockOptions& options) { THttpHeader header("POST", "lock"); header.AddMutationId(); header.MergeParameters(NRawClient::SerializeParamsForLock(transactionId, Context_.Config->Prefix, path, mode, options)); return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse()); } void THttpRawClient::Unlock( TMutationId& mutationId, const TTransactionId& transactionId, const TYPath& path, const TUnlockOptions& options) { THttpHeader header("POST", "unlock"); header.AddMutationId(); header.MergeParameters(NRawClient::SerializeParamsForUnlock(transactionId, Context_.Config->Prefix, path, options)); RequestWithoutRetry(Context_, mutationId, header)->GetResponse(); } void THttpRawClient::Concatenate( const TTransactionId& transactionId, const TVector& sourcePaths, const TRichYPath& destinationPath, const TConcatenateOptions& options) { TMutationId mutationId; THttpHeader header("POST", "concatenate"); header.AddMutationId(); header.MergeParameters(NRawClient::SerializeParamsForConcatenate(transactionId, Context_.Config->Prefix, sourcePaths, destinationPath, options)); TRequestConfig config; config.IsHeavy = true; RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config)->GetResponse(); } TTransactionId THttpRawClient::StartTransaction( TMutationId& mutationId, const TTransactionId& parentTransactionId, const TStartTransactionOptions& options) { THttpHeader header("POST", "start_tx"); header.AddMutationId(); header.MergeParameters(NRawClient::SerializeParamsForStartTransaction(parentTransactionId, Context_.Config->TxTimeout, options)); return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse()); } void THttpRawClient::PingTransaction(const TTransactionId& transactionId) { TNode node; node["transaction_id"] = GetGuidAsString(transactionId); auto strParams = NodeToYsonString(node); PostAsync("ping_tx", node); } void THttpRawClient::AbortTransaction( TMutationId& mutationId, const TTransactionId& transactionId) { TNode params = NRawClient::SerializeParamsForAbortTransaction(transactionId); SetMutationId(params, mutationId); PostAsync("abort_tx", params); } void THttpRawClient::CommitTransaction( TMutationId& mutationId, const TTransactionId& transactionId) { THttpHeader header("POST", "commit_tx"); header.AddMutationId(); header.MergeParameters(NRawClient::SerializeParamsForCommitTransaction(transactionId)); RequestWithoutRetry(Context_, mutationId, header)->GetResponse(); } TOperationId THttpRawClient::StartOperation( TMutationId& mutationId, const TTransactionId& transactionId, EOperationType type, const TNode& spec) { THttpHeader header("POST", "start_op"); header.AddMutationId(); header.MergeParameters(NRawClient::SerializeParamsForStartOperation(transactionId, type, spec)); return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse()); } TOperationAttributes THttpRawClient::GetOperation( const TOperationId& operationId, const TGetOperationOptions& options) { TMutationId mutationId; THttpHeader header("GET", "get_operation"); header.MergeParameters(NRawClient::SerializeParamsForGetOperation(operationId, options)); auto responseInfo = RequestWithoutRetry(Context_, mutationId, header); return NRawClient::ParseOperationAttributes(NodeFromYsonString(responseInfo->GetResponse())); } TOperationAttributes THttpRawClient::GetOperation( const TString& alias, const TGetOperationOptions& options) { TMutationId mutationId; THttpHeader header("GET", "get_operation"); header.MergeParameters(NRawClient::SerializeParamsForGetOperation(alias, options)); auto responseInfo = RequestWithoutRetry(Context_, mutationId, header); return NRawClient::ParseOperationAttributes(NodeFromYsonString(responseInfo->GetResponse())); } void THttpRawClient::AbortOperation( TMutationId& mutationId, const TOperationId& operationId) { THttpHeader header("POST", "abort_op"); header.AddMutationId(); header.MergeParameters(NRawClient::SerializeParamsForAbortOperation(operationId)); RequestWithoutRetry(Context_, mutationId, header)->GetResponse(); } void THttpRawClient::CompleteOperation( TMutationId& mutationId, const TOperationId& operationId) { THttpHeader header("POST", "complete_op"); header.AddMutationId(); header.MergeParameters(NRawClient::SerializeParamsForCompleteOperation(operationId)); RequestWithoutRetry(Context_, mutationId, header)->GetResponse(); } void THttpRawClient::SuspendOperation( TMutationId& mutationId, const TOperationId& operationId, const TSuspendOperationOptions& options) { THttpHeader header("POST", "suspend_op"); header.AddMutationId(); header.MergeParameters(NRawClient::SerializeParamsForSuspendOperation(operationId, options)); RequestWithoutRetry(Context_, mutationId, header)->GetResponse(); } void THttpRawClient::ResumeOperation( TMutationId& mutationId, const TOperationId& operationId, const TResumeOperationOptions& options) { THttpHeader header("POST", "resume_op"); header.AddMutationId(); header.MergeParameters(NRawClient::SerializeParamsForResumeOperation(operationId, options)); RequestWithoutRetry(Context_, mutationId, header)->GetResponse(); } template static THashMap GetCounts(const TNode& countsNode) { THashMap counts; for (const auto& entry : countsNode.AsMap()) { counts.emplace(FromString(entry.first), entry.second.AsInt64()); } return counts; } TListOperationsResult THttpRawClient::ListOperations(const TListOperationsOptions& options) { TMutationId mutationId; THttpHeader header("GET", "list_operations"); header.MergeParameters(NRawClient::SerializeParamsForListOperations(options)); auto responseInfo = RequestWithoutRetry(Context_, mutationId, header); auto resultNode = NodeFromYsonString(responseInfo->GetResponse()); const auto& operationNodesList = resultNode["operations"].AsList(); TListOperationsResult result; result.Operations.reserve(operationNodesList.size()); for (const auto& operationNode : operationNodesList) { result.Operations.push_back(NRawClient::ParseOperationAttributes(operationNode)); } if (resultNode.HasKey("pool_counts")) { result.PoolCounts = GetCounts(resultNode["pool_counts"]); } if (resultNode.HasKey("user_counts")) { result.UserCounts = GetCounts(resultNode["user_counts"]); } if (resultNode.HasKey("type_counts")) { result.TypeCounts = GetCounts(resultNode["type_counts"]); } if (resultNode.HasKey("state_counts")) { result.StateCounts = GetCounts(resultNode["state_counts"]); } if (resultNode.HasKey("failed_jobs_count")) { result.WithFailedJobsCount = resultNode["failed_jobs_count"].AsInt64(); } result.Incomplete = resultNode["incomplete"].AsBool(); return result; } void THttpRawClient::UpdateOperationParameters( const TOperationId& operationId, const TUpdateOperationParametersOptions& options) { TMutationId mutationId; THttpHeader header("POST", "update_op_parameters"); header.MergeParameters(NRawClient::SerializeParamsForUpdateOperationParameters(operationId, options)); RequestWithoutRetry(Context_, mutationId, header)->GetResponse(); } NYson::TYsonString THttpRawClient::GetJob( const TOperationId& operationId, const TJobId& jobId, const TGetJobOptions& options) { TMutationId mutationId; THttpHeader header("GET", "get_job"); header.MergeParameters(NRawClient::SerializeParamsForGetJob(operationId, jobId, options)); auto responseInfo = RequestWithoutRetry(Context_, mutationId, header); return NYson::TYsonString(responseInfo->GetResponse()); } TListJobsResult THttpRawClient::ListJobs( const TOperationId& operationId, const TListJobsOptions& options) { TMutationId mutationId; THttpHeader header("GET", "list_jobs"); header.MergeParameters(NRawClient::SerializeParamsForListJobs(operationId, options)); auto responseInfo = RequestWithoutRetry(Context_, mutationId, header); auto resultNode = NodeFromYsonString(responseInfo->GetResponse()); const auto& jobNodesList = resultNode["jobs"].AsList(); TListJobsResult result; result.Jobs.reserve(jobNodesList.size()); for (const auto& jobNode : jobNodesList) { result.Jobs.push_back(NRawClient::ParseJobAttributes(jobNode)); } if (resultNode.HasKey("cypress_job_count") && !resultNode["cypress_job_count"].IsNull()) { result.CypressJobCount = resultNode["cypress_job_count"].AsInt64(); } if (resultNode.HasKey("controller_agent_job_count") && !resultNode["controller_agent_job_count"].IsNull()) { result.ControllerAgentJobCount = resultNode["scheduler_job_count"].AsInt64(); } if (resultNode.HasKey("archive_job_count") && !resultNode["archive_job_count"].IsNull()) { result.ArchiveJobCount = resultNode["archive_job_count"].AsInt64(); } return result; } IFileReaderPtr THttpRawClient::GetJobInput( const TJobId& jobId, const TGetJobInputOptions& /*options*/) { TMutationId mutationId; THttpHeader header("GET", "get_job_input"); header.AddParameter("job_id", GetGuidAsString(jobId)); TRequestConfig config; config.IsHeavy = true; auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config); return MakeIntrusive(std::move(responseInfo)); } IFileReaderPtr THttpRawClient::GetJobFailContext( const TOperationId& operationId, const TJobId& jobId, const TGetJobFailContextOptions& /*options*/) { TMutationId mutationId; THttpHeader header("GET", "get_job_fail_context"); header.AddOperationId(operationId); header.AddParameter("job_id", GetGuidAsString(jobId)); TRequestConfig config; config.IsHeavy = true; auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config); return MakeIntrusive(std::move(responseInfo)); } IFileReaderPtr THttpRawClient::GetJobStderr( const TOperationId& operationId, const TJobId& jobId, const TGetJobStderrOptions& /*options*/) { TMutationId mutationId; THttpHeader header("GET", "get_job_stderr"); header.AddOperationId(operationId); header.AddParameter("job_id", GetGuidAsString(jobId)); TRequestConfig config; config.IsHeavy = true; auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config); return MakeIntrusive(std::move(responseInfo)); } TJobTraceEvent ParseJobTraceEvent(const TNode& node) { const auto& mapNode = node.AsMap(); TJobTraceEvent result; if (auto idNode = mapNode.FindPtr("operation_id")) { result.OperationId = GetGuid(idNode->AsString()); } if (auto idNode = mapNode.FindPtr("job_id")) { result.JobId = GetGuid(idNode->AsString()); } if (auto idNode = mapNode.FindPtr("trace_id")) { result.TraceId = GetGuid(idNode->AsString()); } if (auto eventIndexNode = mapNode.FindPtr("event_index")) { result.EventIndex = eventIndexNode->AsInt64(); } if (auto eventNode = mapNode.FindPtr("event")) { result.Event = eventNode->AsString(); } if (auto eventTimeNode = mapNode.FindPtr("event_time")) { result.EventTime = TInstant::ParseIso8601(eventTimeNode->AsString());; } return result; } IFileReaderPtr THttpRawClient::GetJobTrace( const TOperationId& operationId, const TJobId& jobId, const TGetJobTraceOptions& options) { TMutationId mutationId; THttpHeader header("GET", "get_job_trace"); header.MergeParameters(NRawClient::SerializeParamsForGetJobTrace(operationId, jobId, options)); TRequestConfig config; config.IsHeavy = true; auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config); return MakeIntrusive(std::move(responseInfo)); } std::unique_ptr THttpRawClient::ReadFile( const TTransactionId& transactionId, const TRichYPath& path, const TFileReaderOptions& options) { TMutationId mutationId; THttpHeader header("GET", GetReadFileCommand(Context_.Config->ApiVersion)); header.SetOutputFormat(TMaybe()); // Binary format header.SetResponseCompression(ToString(Context_.Config->AcceptEncoding)); header.MergeParameters(NRawClient::SerializeParamsForReadFile(transactionId, options)); header.MergeParameters(FormIORequestParameters(path, options)); TRequestConfig config; config.IsHeavy = true; auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config); return std::make_unique(std::move(responseInfo)); } TMaybe THttpRawClient::GetFileFromCache( const TTransactionId& transactionId, const TString& md5Signature, const TYPath& cachePath, const TGetFileFromCacheOptions& options) { TMutationId mutationId; THttpHeader header("GET", "get_file_from_cache"); header.MergeParameters(NRawClient::SerializeParamsForGetFileFromCache(transactionId, md5Signature, cachePath, options)); auto responseInfo = RequestWithoutRetry(Context_, mutationId, header); auto resultNode = NodeFromYsonString(responseInfo->GetResponse()).AsString(); return resultNode.empty() ? Nothing() : TMaybe(resultNode); } TYPath THttpRawClient::PutFileToCache( const TTransactionId& transactionId, const TYPath& filePath, const TString& md5Signature, const TYPath& cachePath, const TPutFileToCacheOptions& options) { TMutationId mutationId; THttpHeader header("POST", "put_file_to_cache"); header.MergeParameters(NRawClient::SerializeParamsForPutFileToCache(transactionId, Context_.Config->Prefix, filePath, md5Signature, cachePath, options)); auto responseInfo = RequestWithoutRetry(Context_, mutationId, header); return NodeFromYsonString(responseInfo->GetResponse()).AsString(); } void THttpRawClient::MountTable( TMutationId& mutationId, const TYPath& path, const TMountTableOptions& options) { THttpHeader header("POST", "mount_table"); header.AddMutationId(); header.MergeParameters(NRawClient::SerializeTabletParams(Context_.Config->Prefix, path, options)); if (options.CellId_) { header.AddParameter("cell_id", GetGuidAsString(*options.CellId_)); } header.AddParameter("freeze", options.Freeze_); RequestWithoutRetry(Context_, mutationId, header)->GetResponse(); } void THttpRawClient::UnmountTable( TMutationId& mutationId, const TYPath& path, const TUnmountTableOptions& options) { THttpHeader header("POST", "unmount_table"); header.AddMutationId(); header.MergeParameters(NRawClient::SerializeTabletParams(Context_.Config->Prefix, path, options)); header.AddParameter("force", options.Force_); RequestWithoutRetry(Context_, mutationId, header)->GetResponse(); } void THttpRawClient::RemountTable( TMutationId& mutationId, const TYPath& path, const TRemountTableOptions& options) { THttpHeader header("POST", "remount_table"); header.AddMutationId(); header.MergeParameters(NRawClient::SerializeTabletParams(Context_.Config->Prefix, path, options)); RequestWithoutRetry(Context_, mutationId, header)->GetResponse(); } void THttpRawClient::ReshardTableByPivotKeys( TMutationId& mutationId, const TYPath& path, const TVector& keys, const TReshardTableOptions& options) { THttpHeader header("POST", "reshard_table"); header.AddMutationId(); header.MergeParameters(NRawClient::SerializeTabletParams(Context_.Config->Prefix, path, options)); header.AddParameter("pivot_keys", BuildYsonNodeFluently().List(keys)); RequestWithoutRetry(Context_, mutationId, header)->GetResponse(); } void THttpRawClient::ReshardTableByTabletCount( TMutationId& mutationId, const TYPath& path, i64 tabletCount, const TReshardTableOptions& options) { THttpHeader header("POST", "reshard_table"); header.AddMutationId(); header.MergeParameters(NRawClient::SerializeTabletParams(Context_.Config->Prefix, path, options)); header.AddParameter("tablet_count", tabletCount); RequestWithoutRetry(Context_, mutationId, header)->GetResponse(); } void THttpRawClient::InsertRows( const TYPath& path, const TNode::TListType& rows, const TInsertRowsOptions& options) { TMutationId mutationId; THttpHeader header("PUT", "insert_rows"); header.SetInputFormat(TFormat::YsonBinary()); header.MergeParameters(NRawClient::SerializeParametersForInsertRows(Context_.Config->Prefix, path, options)); auto body = NodeListToYsonString(rows); TRequestConfig config; config.IsHeavy = true; RequestWithoutRetry(Context_, mutationId, header, body, config)->GetResponse(); } void THttpRawClient::TrimRows( const TYPath& path, i64 tabletIndex, i64 rowCount, const TTrimRowsOptions& options) { TMutationId mutationId; THttpHeader header("POST", "trim_rows"); header.AddParameter("trimmed_row_count", rowCount); header.AddParameter("tablet_index", tabletIndex); header.MergeParameters(NRawClient::SerializeParametersForTrimRows(Context_.Config->Prefix, path, options)); TRequestConfig config; config.IsHeavy = true; RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config)->GetResponse(); } TNode::TListType THttpRawClient::LookupRows( const TYPath& path, const TNode::TListType& keys, const TLookupRowsOptions& options) { TMutationId mutationId; THttpHeader header("PUT", "lookup_rows"); header.AddPath(AddPathPrefix(path, Context_.Config->ApiVersion)); header.SetInputFormat(TFormat::YsonBinary()); header.SetOutputFormat(TFormat::YsonBinary()); header.MergeParameters(BuildYsonNodeFluently().BeginMap() .DoIf(options.Timeout_.Defined(), [&] (TFluentMap fluent) { fluent.Item("timeout").Value(static_cast(options.Timeout_->MilliSeconds())); }) .Item("keep_missing_rows").Value(options.KeepMissingRows_) .Item("versioned").Value(options.Versioned_) .DoIf(options.Columns_.Defined(), [&] (TFluentMap fluent) { fluent.Item("column_names").Value(*options.Columns_); }) .EndMap()); auto body = NodeListToYsonString(keys); TRequestConfig config; config.IsHeavy = true; auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, body, config); return NodeFromYsonString(responseInfo->GetResponse(), ::NYson::EYsonType::ListFragment).AsList(); } TNode::TListType THttpRawClient::SelectRows( const TString& query, const TSelectRowsOptions& options) { TMutationId mutationId; THttpHeader header("GET", "select_rows"); header.SetInputFormat(TFormat::YsonBinary()); header.SetOutputFormat(TFormat::YsonBinary()); header.MergeParameters(NRawClient::SerializeParamsForSelectRows(query, options)); TRequestConfig config; config.IsHeavy = true; auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config); return NodeFromYsonString(responseInfo->GetResponse(), ::NYson::EYsonType::ListFragment).AsList(); } std::unique_ptr THttpRawClient::WriteTable( const TTransactionId& transactionId, const TRichYPath& path, const TMaybe& format, const TTableWriterOptions& options) { return NRawClient::WriteTable(Context_, transactionId, path, format, options); } std::unique_ptr THttpRawClient::ReadTable( const TTransactionId& transactionId, const TRichYPath& path, const TFormat& format, const TTableReaderOptions& options) { TMutationId mutationId; THttpHeader header("GET", GetReadTableCommand(Context_.Config->ApiVersion)); header.SetOutputFormat(format); header.SetResponseCompression(ToString(Context_.Config->AcceptEncoding)); header.MergeParameters(NRawClient::SerializeParamsForReadTable(transactionId, options)); header.MergeParameters(FormIORequestParameters(path, options)); TRequestConfig config; config.IsHeavy = true; auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config); return std::make_unique(std::move(responseInfo)); } std::unique_ptr THttpRawClient::WriteFile( const TTransactionId& transactionId, const TRichYPath& path, const TFileWriterOptions& options) { return NRawClient::WriteFile(Context_, transactionId, path, options); } std::unique_ptr THttpRawClient::ReadTablePartition( const TString& cookie, const TFormat& format, const TTablePartitionReaderOptions& options) { TMutationId mutationId; THttpHeader header("GET", "api/v4/read_table_partition", /*isApi*/ false); header.SetOutputFormat(format); header.SetResponseCompression(ToString(Context_.Config->AcceptEncoding)); auto params = NRawClient::SerializeParamsForReadTablePartition(cookie, options); header.MergeParameters(params); TRequestConfig config; config.IsHeavy = true; auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config); return std::make_unique(std::move(responseInfo)); } std::unique_ptr THttpRawClient::ReadBlobTable( const TTransactionId& transactionId, const TRichYPath& path, const TKey& key, const TBlobTableReaderOptions& options) { TMutationId mutationId; THttpHeader header("GET", "read_blob_table"); header.SetOutputFormat(TMaybe()); // Binary format header.SetResponseCompression(ToString(Context_.Config->AcceptEncoding)); header.MergeParameters(NRawClient::SerializeParamsForReadBlobTable(transactionId, path, key, options)); TRequestConfig config; config.IsHeavy = true; auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config); return std::make_unique(std::move(responseInfo)); } void THttpRawClient::AlterTable( TMutationId& mutationId, const TTransactionId& transactionId, const TYPath& path, const TAlterTableOptions& options) { THttpHeader header("POST", "alter_table"); header.AddMutationId(); header.MergeParameters(NRawClient::SerializeParamsForAlterTable(transactionId, Context_.Config->Prefix, path, options)); RequestWithoutRetry(Context_, mutationId, header)->GetResponse(); } void THttpRawClient::AlterTableReplica( TMutationId& mutationId, const TReplicaId& replicaId, const TAlterTableReplicaOptions& options) { THttpHeader header("POST", "alter_table_replica"); header.AddMutationId(); header.MergeParameters(NRawClient::SerializeParamsForAlterTableReplica(replicaId, options)); RequestWithoutRetry(Context_, mutationId, header)->GetResponse(); } void THttpRawClient::DeleteRows( const TYPath& path, const TNode::TListType& keys, const TDeleteRowsOptions& options) { TMutationId mutationId; THttpHeader header("PUT", "delete_rows"); header.SetInputFormat(TFormat::YsonBinary()); header.MergeParameters(NRawClient::SerializeParametersForDeleteRows(Context_.Config->Prefix, path, options)); auto body = NodeListToYsonString(keys); TRequestConfig config; config.IsHeavy = true; RequestWithoutRetry(Context_, mutationId, header, body, config)->GetResponse(); } void THttpRawClient::FreezeTable( const TYPath& path, const TFreezeTableOptions& options) { TMutationId mutationId; THttpHeader header("POST", "freeze_table"); header.MergeParameters(NRawClient::SerializeParamsForFreezeTable(Context_.Config->Prefix, path, options)); RequestWithoutRetry(Context_, mutationId, header)->GetResponse(); } void THttpRawClient::UnfreezeTable( const TYPath& path, const TUnfreezeTableOptions& options) { TMutationId mutationId; THttpHeader header("POST", "unfreeze_table"); header.MergeParameters(NRawClient::SerializeParamsForUnfreezeTable(Context_.Config->Prefix, path, options)); RequestWithoutRetry(Context_, mutationId, header)->GetResponse(); } class THttpRequestStreamWithResponse : public IOutputStreamWithResponse { public: explicit THttpRequestStreamWithResponse(NHttpClient::IHttpRequestPtr request) : Request_(std::move(request)) , Underlying_(Request_->GetStream()) { } TString GetResponse() const override { if (!Finished_) { ythrow TApiUsageError() << "Stream must be finished before response can be received."; } return Response_; } private: NHttpClient::IHttpRequestPtr Request_; IOutputStream* Underlying_; TString Response_; bool Finished_ = false; void DoWrite(const void* buf, size_t len) override { Underlying_->Write(buf, len); } void DoFinish() override { Underlying_->Finish(); Response_ = Request_->Finish()->GetResponse(); Finished_ = true; } }; template T DeserializeStartWriteSessionResponse(TNode node) { using TSession = decltype(T::Session_); using TCookie = decltype(T::Cookies_)::value_type; const auto& cookiesNode = node["cookies"].AsList(); TVector cookies; cookies.reserve(cookiesNode.size()); for (const auto& cookieNode : cookiesNode) { cookies.push_back(TCookie(cookieNode)); } T result; result.Session(TSession(node["session"])); result.Cookies(std::move(cookies)); return result; } TDistributedWriteTableSessionWithCookies THttpRawClient::StartDistributedWriteTableSession( TMutationId& mutationId, const TTransactionId& transactionId, const TRichYPath& richPath, i64 cookieCount, const TStartDistributedWriteTableOptions& options) { // NB(achains): C++ client by default uses v3 api while v4 is not fully supported. // Explicit command path is needed until v4 is not default version. THttpHeader header("GET", "api/v4/start_distributed_write_session", /*isApi*/ false); header.AddMutationId(); header.MergeParameters(NRawClient::SerializeParamsForStartDistributedTableSession(transactionId, richPath, cookieCount, options)); auto responseInfo = RequestWithoutRetry(Context_, mutationId, header); return DeserializeStartWriteSessionResponse(NodeFromYsonString(responseInfo->GetResponse())); } void THttpRawClient::PingDistributedWriteTableSession( const TDistributedWriteTableSession& session, const TPingDistributedWriteTableOptions& /*options*/) { TMutationId mutationId; // NB(achains): C++ client by default uses v3 api while v4 is not fully supported. // Explicit command path is needed until v4 is not default version. THttpHeader header("GET", "api/v4/ping_distributed_write_session", /*isApi*/ false); header.AddParameter("session", session.Underlying()); RequestWithoutRetry(Context_, mutationId, header)->GetResponse(); } void THttpRawClient::FinishDistributedWriteTableSession( TMutationId& mutationId, const TDistributedWriteTableSession& session, const TVector& results, const TFinishDistributedWriteTableOptions& /*options*/) { // NB(achains): C++ client by default uses v3 api while v4 is not fully supported. // Explicit command path is needed until v4 is not default version. THttpHeader header("GET", "api/v4/finish_distributed_write_session", /*isApi*/ false); header.AddMutationId(); TNode::TListType resultNode; resultNode.reserve(results.size()); for (const auto& result : results) { resultNode.push_back(result.Underlying()); } TNode parameters; parameters["session"] = session.Underlying(); parameters["results"] = TNode::CreateList(std::move(resultNode)); header.MergeParameters(parameters); RequestWithoutRetry(Context_, mutationId, header)->GetResponse(); } std::unique_ptr THttpRawClient::WriteTableFragment( const TDistributedWriteTableCookie& cookie, const TMaybe& format, const TTableFragmentWriterOptions& /*options*/) { // NB(achains): C++ client by default uses v3 api while v4 is not fully supported. // Explicit command path is needed until v4 is not default version. THttpHeader header("PUT", "api/v4/write_table_fragment", /*isApi=*/ false); header.SetInputFormat(format); header.SetRequestCompression(ToString(Context_.Config->ContentEncoding)); header.AddParameter("cookie", cookie.Underlying()); TRequestConfig config; config.IsHeavy = true; auto request = StartRequestWithoutRetry(Context_, header, config); return std::make_unique(std::move(request)); } TDistributedWriteFileSessionWithCookies THttpRawClient::StartDistributedWriteFileSession( TMutationId& mutationId, const TTransactionId& transactionId, const TRichYPath& richPath, i64 cookieCount, const TStartDistributedWriteFileOptions& options) { // NB(achains): C++ client by default uses v3 api while v4 is not fully supported. // Explicit command path is needed until v4 is not default version. THttpHeader header("GET", "api/v4/start_distributed_write_file_session", /*isApi*/ false); header.AddMutationId(); header.MergeParameters(NRawClient::SerializeParamsForStartDistributedFileSession(transactionId, richPath, cookieCount, options)); auto responseInfo = RequestWithoutRetry(Context_, mutationId, header); return DeserializeStartWriteSessionResponse(NodeFromYsonString(responseInfo->GetResponse())); } void THttpRawClient::PingDistributedWriteFileSession( const TDistributedWriteFileSession& session, const TPingDistributedWriteFileOptions& /*options*/) { TMutationId mutationId; // NB(achains): C++ client by default uses v3 api while v4 is not fully supported. // Explicit command path is needed until v4 is not default version. THttpHeader header("GET", "api/v4/ping_distributed_write_file_session", /*isApi*/ false); header.AddParameter("session", session.Underlying()); RequestWithoutRetry(Context_, mutationId, header)->GetResponse(); } void THttpRawClient::FinishDistributedWriteFileSession( TMutationId& mutationId, const TDistributedWriteFileSession& session, const TVector& results, const TFinishDistributedWriteFileOptions& /*options*/) { // NB(achains): C++ client by default uses v3 api while v4 is not fully supported. // Explicit command path is needed until v4 is not default version. THttpHeader header("GET", "api/v4/finish_distributed_write_file_session", /*isApi*/ false); header.AddMutationId(); TNode::TListType resultNode; resultNode.reserve(results.size()); for (const auto& result : results) { resultNode.push_back(result.Underlying()); } TNode parameters; parameters["session"] = session.Underlying(); parameters["results"] = TNode::CreateList(std::move(resultNode)); header.MergeParameters(parameters); RequestWithoutRetry(Context_, mutationId, header)->GetResponse(); } std::unique_ptr THttpRawClient::WriteFileFragment( const TDistributedWriteFileCookie& cookie, const TFileFragmentWriterOptions& /*options*/) { THttpHeader header("PUT", "api/v4/write_file_fragment", /*isApi*/ false); header.SetRequestCompression(ToString(Context_.Config->ContentEncoding)); header.AddParameter("cookie", cookie.Underlying()); TRequestConfig config; config.IsHeavy = true; auto request = StartRequestWithoutRetry(Context_, header, config); return std::make_unique(std::move(request)); } TCheckPermissionResponse THttpRawClient::CheckPermission( const TString& user, EPermission permission, const TYPath& path, const TCheckPermissionOptions& options) { TMutationId mutationId; THttpHeader header("GET", "check_permission"); header.MergeParameters(NRawClient::SerializeParamsForCheckPermission(user, permission, Context_.Config->Prefix, path, options)); auto responseInfo = RequestWithoutRetry(Context_, mutationId, header); return NRawClient::ParseCheckPermissionResponse(NodeFromYsonString(responseInfo->GetResponse())); } TVector THttpRawClient::GetTabletInfos( const TYPath& path, const TVector& tabletIndexes, const TGetTabletInfosOptions& options) { TMutationId mutationId; THttpHeader header("POST", "api/v4/get_tablet_infos", /*isApi*/ false); header.MergeParameters(NRawClient::SerializeParamsForGetTabletInfos(Context_.Config->Prefix, path, tabletIndexes, options)); auto responseInfo = RequestWithoutRetry(Context_, mutationId, header); TVector result; Deserialize(result, *NodeFromYsonString(responseInfo->GetResponse()).AsMap().FindPtr("tablets")); return result; } TVector THttpRawClient::GetTableColumnarStatistics( const TTransactionId& transactionId, const TVector& paths, const TGetTableColumnarStatisticsOptions& options) { TMutationId mutationId; THttpHeader header("GET", "get_table_columnar_statistics"); header.MergeParameters(NRawClient::SerializeParamsForGetTableColumnarStatistics(transactionId, paths, options)); TRequestConfig config; config.IsHeavy = true; auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config); TVector result; Deserialize(result, NodeFromYsonString(responseInfo->GetResponse())); return result; } TMultiTablePartitions THttpRawClient::GetTablePartitions( const TTransactionId& transactionId, const TVector& paths, const TGetTablePartitionsOptions& options) { TMutationId mutationId; THttpHeader header("GET", "partition_tables"); header.MergeParameters(NRawClient::SerializeParamsForGetTablePartitions(transactionId, paths, options)); TRequestConfig config; config.IsHeavy = true; auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config); TMultiTablePartitions result; Deserialize(result, NodeFromYsonString(responseInfo->GetResponse())); return result; } ui64 THttpRawClient::GenerateTimestamp() { TMutationId mutationId; THttpHeader header("GET", "generate_timestamp"); TRequestConfig config; config.IsHeavy = true; auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config); return NodeFromYsonString(responseInfo->GetResponse()).AsUint64(); } IRawBatchRequestPtr THttpRawClient::CreateRawBatchRequest() { return MakeIntrusive(Context_, /*retryPolicy*/ nullptr); } IRawClientPtr THttpRawClient::Clone() { return ::MakeIntrusive(Context_); } IRawClientPtr THttpRawClient::Clone(const TClientContext& context) { return ::MakeIntrusive(context); } void THttpRawClient::InitAsyncClient() { auto httpPoller = NConcurrency::CreateThreadPoolPoller( Context_.Config->AsyncHttpClientThreads, "tx_http_client_poller"); if (Context_.UseTLS) { auto httpsClientConfig = NYT::New(); httpsClientConfig->MaxIdleConnections = 16; AsyncHttpClient_ = NHttps::CreateClient(std::move(httpsClientConfig), std::move(httpPoller)); } else { auto httpClientConfig = NYT::New(); httpClientConfig->MaxIdleConnections = 16; AsyncHttpClient_ = NHttp::CreateClient(std::move(httpClientConfig), std::move(httpPoller)); } } void THttpRawClient::PostAsync(const TString& command, TNode params) { auto traceContext = Context_.Config->EnableClientTracing ? NTracing::CreateTraceContextFromCurrent("ping_tx") : nullptr; NTracing::TCurrentTraceContextGuard traceContextGuard(traceContext); std::call_once(AsyncHttpClientInitOnceFlag_, [this] () { InitAsyncClient(); }); auto url = TString::Join(Context_.UseTLS ? "https://" : "http://", Context_.ServerName, "/api/", Context_.Config->ApiVersion, "/", command); auto headers = New(); auto requestId = CreateGuidAsString(); headers->Add("Host", url); headers->Add("User-Agent", TProcessState::Get()->ClientVersion); if (const auto& serviceTicketAuth = Context_.ServiceTicketAuth) { const auto serviceTicket = serviceTicketAuth->Ptr->IssueServiceTicket(); headers->Add("X-Ya-Service-Ticket", serviceTicket); } else if (const auto& token = Context_.Token; !token.empty()) { headers->Add("Authorization", "OAuth " + token); } headers->Add("Transfer-Encoding", "chunked"); headers->Add("X-YT-Correlation-Id", requestId); headers->Add("X-YT-Header-Format", "yson"); headers->Add("Content-Encoding", "identity"); headers->Add("Accept-Encoding", "identity"); if (traceContext) { auto traceparent = FormatTraceParentHeader(traceContext->GetTraceId(), traceContext->GetSpanId()); headers->Add("traceparent", traceparent); } auto strParams = NodeToYsonString(params); YT_LOG_DEBUG("REQ %v - sending request (HostName: %v; Method POST %v; X-YT-Parameters (sent in body): %v)", requestId, Context_.ServerName, url, strParams); auto response = NConcurrency::WaitFor(AsyncHttpClient_->Post(url, TSharedRef::FromString(strParams), headers)) .ValueOrThrow(); CheckError(requestId, response); YT_LOG_DEBUG("RSP %v - received response %v bytes. (%v)", requestId, response->ReadAll().size(), strParams); } //////////////////////////////////////////////////////////////////////////////// } // namespace NYT::NDetail