diff options
author | Maxim Yurchuk <maxim-yurchuk@ydb.tech> | 2024-10-18 20:31:38 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-10-18 20:31:38 +0300 |
commit | 2a74bac2d2d3bccb4e10120f1ead805640ec9dd0 (patch) | |
tree | 047e4818ced5aaf73f58517629e5260b5291f9f0 /yt/cpp/mapreduce | |
parent | 2d9656823e9521d8c29ea4c9a1d0eab78391abfc (diff) | |
parent | 3d834a1923bbf9403cd4a448e7f32b670aa4124f (diff) | |
download | ydb-2a74bac2d2d3bccb4e10120f1ead805640ec9dd0.tar.gz |
Merge pull request #10502 from ydb-platform/mergelibs-241016-1210
Library import 241016-1210
Diffstat (limited to 'yt/cpp/mapreduce')
-rw-r--r-- | yt/cpp/mapreduce/client/client.cpp | 8 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/client.h | 4 | ||||
-rw-r--r-- | yt/cpp/mapreduce/interface/client.h | 12 | ||||
-rw-r--r-- | yt/cpp/mapreduce/interface/fwd.h | 3 | ||||
-rw-r--r-- | yt/cpp/mapreduce/interface/operation.h | 64 | ||||
-rw-r--r-- | yt/cpp/mapreduce/raw_client/raw_requests.cpp | 49 | ||||
-rw-r--r-- | yt/cpp/mapreduce/raw_client/raw_requests.h | 6 | ||||
-rw-r--r-- | yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp | 9 | ||||
-rw-r--r-- | yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h | 4 |
9 files changed, 159 insertions, 0 deletions
diff --git a/yt/cpp/mapreduce/client/client.cpp b/yt/cpp/mapreduce/client/client.cpp index 9e2885bbe0..9dab176bde 100644 --- a/yt/cpp/mapreduce/client/client.cpp +++ b/yt/cpp/mapreduce/client/client.cpp @@ -1243,6 +1243,14 @@ IFileReaderPtr TClient::GetJobStderr( return NRawClient::GetJobStderr(Context_, operationId, jobId, options); } +std::vector<TJobTraceEvent> TClient::GetJobTrace( + const TOperationId& operationId, + const TGetJobTraceOptions& options) +{ + CheckShutdown(); + return NRawClient::GetJobTrace(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, operationId, options); +} + TNode::TListType TClient::SkyShareTable( const std::vector<TYPath>& tablePaths, const TSkyShareTableOptions& options) diff --git a/yt/cpp/mapreduce/client/client.h b/yt/cpp/mapreduce/client/client.h index 5de00285ef..32c458316d 100644 --- a/yt/cpp/mapreduce/client/client.h +++ b/yt/cpp/mapreduce/client/client.h @@ -449,6 +449,10 @@ public: const TJobId& jobId, const TGetJobStderrOptions& options = TGetJobStderrOptions()) override; + std::vector<TJobTraceEvent> GetJobTrace( + const TOperationId& operationId, + const TGetJobTraceOptions& options = TGetJobTraceOptions()) override; + TNode::TListType SkyShareTable( const std::vector<TYPath>& tablePaths, const TSkyShareTableOptions& options = TSkyShareTableOptions()) override; diff --git a/yt/cpp/mapreduce/interface/client.h b/yt/cpp/mapreduce/interface/client.h index 56efa3c23c..3032025140 100644 --- a/yt/cpp/mapreduce/interface/client.h +++ b/yt/cpp/mapreduce/interface/client.h @@ -493,6 +493,18 @@ public: const TGetJobStderrOptions& options = TGetJobStderrOptions()) = 0; /// + /// @brief Get trace of a job. + /// + /// @ref NYT::TErrorResponse exception is thrown if it is missing. + /// + /// @note YT doesn't store all job traces. + /// + /// @see [YT doc](https://ytsaurus.tech/docs/en/api/commands.html#get_job_trace) + virtual std::vector<TJobTraceEvent> GetJobTrace( + const TOperationId& operationId, + const TGetJobTraceOptions& options = TGetJobTraceOptions()) = 0; + + /// /// @brief Create one or several rbtorrents for files in a blob table. /// /// If specified, one torrent is created for each value of `KeyColumns` option. diff --git a/yt/cpp/mapreduce/interface/fwd.h b/yt/cpp/mapreduce/interface/fwd.h index 0434c03d8b..485b45129a 100644 --- a/yt/cpp/mapreduce/interface/fwd.h +++ b/yt/cpp/mapreduce/interface/fwd.h @@ -157,6 +157,7 @@ namespace NYT { using TTabletCellId = TGUID; using TReplicaId = TGUID; using TJobId = TGUID; + using TJobTraceId = TGUID; using TYPath = TString; using TLocalFilePath = TString; @@ -370,6 +371,8 @@ namespace NYT { struct TListJobsOptions; + struct TGetJobTraceOptions; + struct IOperationClient; enum class EFinishedJobState : int; diff --git a/yt/cpp/mapreduce/interface/operation.h b/yt/cpp/mapreduce/interface/operation.h index 9a85049886..f2de3ea3bd 100644 --- a/yt/cpp/mapreduce/interface/operation.h +++ b/yt/cpp/mapreduce/interface/operation.h @@ -3048,6 +3048,70 @@ struct TGetFailedJobInfoOptions //////////////////////////////////////////////////////////////////////////////// /// +/// @brief Options for @ref NYT::IClient::GetJobTrace. +struct TGetJobTraceOptions +{ + /// @cond Doxygen_Suppress + using TSelf = TGetJobTraceOptions; + /// @endcond + + /// + /// @brief Id of the job. + FLUENT_FIELD_OPTION(TJobId, JobId); + + /// + /// @brief Id of the trace. + FLUENT_FIELD_OPTION(TJobTraceId, TraceId); + + /// + /// @brief Search for traces with time >= `FromTime`. + FLUENT_FIELD_OPTION(i64, FromTime); + + /// + /// @brief Search for traces with time <= `ToTime`. + FLUENT_FIELD_OPTION(i64, ToTime); + + /// + /// @brief Search for traces with event index >= `FromEventIndex`. + FLUENT_FIELD_OPTION(i64, FromEventIndex); + + /// + /// @brief Search for traces with event index >= `ToEventIndex`. + FLUENT_FIELD_OPTION(i64, ToEventIndex); +}; + +/// +/// @brief Response for @ref NYT::IOperation::GetJobTrace. +struct TJobTraceEvent +{ + /// + /// @brief Id of the operation. + TOperationId OperationId; + + /// + /// @brief Id of the job. + TJobId JobId; + + /// + /// @brief Id of the trace. + TJobTraceId TraceId; + + /// + /// @brief Index of the trace event. + i64 EventIndex; + + /// + /// @brief Raw evenr in json format. + TString Event; + + /// + /// @brief Time of the event. + TInstant EventTime; +}; + +//////////////////////////////////////////////////////////////////////////////// + +/// /// @brief Interface representing an operation. struct IOperation : public TThrRefBase diff --git a/yt/cpp/mapreduce/raw_client/raw_requests.cpp b/yt/cpp/mapreduce/raw_client/raw_requests.cpp index 2f9610a1ca..59868c599e 100644 --- a/yt/cpp/mapreduce/raw_client/raw_requests.cpp +++ b/yt/cpp/mapreduce/raw_client/raw_requests.cpp @@ -783,6 +783,55 @@ IFileReaderPtr GetJobStderr( return new TResponseReader(context, std::move(header)); } +TJobTraceEvent ParseJobTraceEvent(const TNode& node) +{ + const auto& mapNode = node.AsMap(); + TJobTraceEvent result; + + if (auto idNode = mapNode.FindPtr("operation_id")) { + result.OperationId = GetGuid(idNode->AsString()); + } + if (auto idNode = mapNode.FindPtr("job_id")) { + result.JobId = GetGuid(idNode->AsString()); + } + if (auto idNode = mapNode.FindPtr("trace_id")) { + result.TraceId = GetGuid(idNode->AsString()); + } + if (auto eventIndexNode = mapNode.FindPtr("event_index")) { + result.EventIndex = eventIndexNode->AsInt64(); + } + if (auto eventNode = mapNode.FindPtr("event")) { + result.Event = eventNode->AsString(); + } + if (auto eventTimeNode = mapNode.FindPtr("event_time")) { + result.EventTime = TInstant::ParseIso8601(eventTimeNode->AsString());; + } + + return result; +} + +std::vector<TJobTraceEvent> GetJobTrace( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TOperationId& operationId, + const TGetJobTraceOptions& options) +{ + THttpHeader header("GET", "get_job_trace"); + header.MergeParameters(SerializeParamsForGetJobTrace(operationId, options)); + auto responseInfo = RetryRequestWithPolicy(retryPolicy, context, header); + auto resultNode = NodeFromYsonString(responseInfo.Response); + + std::vector<TJobTraceEvent> result; + + const auto& traceEventNodesList = resultNode.AsList(); + result.reserve(traceEventNodesList.size()); + for (const auto& traceEventNode : traceEventNodesList) { + result.push_back(ParseJobTraceEvent(traceEventNode)); + } + + return result; +} + TMaybe<TYPath> GetFileFromCache( const IRequestRetryPolicyPtr& retryPolicy, const TClientContext& context, diff --git a/yt/cpp/mapreduce/raw_client/raw_requests.h b/yt/cpp/mapreduce/raw_client/raw_requests.h index c2d1a53b51..0a183d617c 100644 --- a/yt/cpp/mapreduce/raw_client/raw_requests.h +++ b/yt/cpp/mapreduce/raw_client/raw_requests.h @@ -257,6 +257,12 @@ TString GetJobStderrWithRetries( const TJobId& jobId, const TGetJobStderrOptions& options = TGetJobStderrOptions()); +std::vector<TJobTraceEvent> GetJobTrace( + const IRequestRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TOperationId& operationId, + const TGetJobTraceOptions& options = TGetJobTraceOptions()); + // // File cache // diff --git a/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp index a638ac581d..bebc59ec91 100644 --- a/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp +++ b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp @@ -525,6 +525,15 @@ TNode SerializeParamsForGetJob( return result; } +TNode SerializeParamsForGetJobTrace( + const TOperationId& operationId, + const TGetJobTraceOptions& /* options */) +{ + TNode result; + SetOperationIdParam(&result, operationId); + return result; +} + TNode SerializeParamsForListJobs( const TOperationId& operationId, const TListJobsOptions& options) diff --git a/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h index a7ab35d91d..69a4888267 100644 --- a/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h +++ b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h @@ -126,6 +126,10 @@ TNode SerializeParamsForListJobs( const TOperationId& operationId, const TListJobsOptions& options); +TNode SerializeParamsForGetJobTrace( + const TOperationId& operationId, + const TGetJobTraceOptions& options); + TNode SerializeParametersForInsertRows( const TString& pathPrefix, const TYPath& path, |