aboutsummaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce
diff options
context:
space:
mode:
authorMaxim Yurchuk <maxim-yurchuk@ydb.tech>2024-10-18 20:31:38 +0300
committerGitHub <noreply@github.com>2024-10-18 20:31:38 +0300
commit2a74bac2d2d3bccb4e10120f1ead805640ec9dd0 (patch)
tree047e4818ced5aaf73f58517629e5260b5291f9f0 /yt/cpp/mapreduce
parent2d9656823e9521d8c29ea4c9a1d0eab78391abfc (diff)
parent3d834a1923bbf9403cd4a448e7f32b670aa4124f (diff)
downloadydb-2a74bac2d2d3bccb4e10120f1ead805640ec9dd0.tar.gz
Merge pull request #10502 from ydb-platform/mergelibs-241016-1210
Library import 241016-1210
Diffstat (limited to 'yt/cpp/mapreduce')
-rw-r--r--yt/cpp/mapreduce/client/client.cpp8
-rw-r--r--yt/cpp/mapreduce/client/client.h4
-rw-r--r--yt/cpp/mapreduce/interface/client.h12
-rw-r--r--yt/cpp/mapreduce/interface/fwd.h3
-rw-r--r--yt/cpp/mapreduce/interface/operation.h64
-rw-r--r--yt/cpp/mapreduce/raw_client/raw_requests.cpp49
-rw-r--r--yt/cpp/mapreduce/raw_client/raw_requests.h6
-rw-r--r--yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp9
-rw-r--r--yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h4
9 files changed, 159 insertions, 0 deletions
diff --git a/yt/cpp/mapreduce/client/client.cpp b/yt/cpp/mapreduce/client/client.cpp
index 9e2885bbe0..9dab176bde 100644
--- a/yt/cpp/mapreduce/client/client.cpp
+++ b/yt/cpp/mapreduce/client/client.cpp
@@ -1243,6 +1243,14 @@ IFileReaderPtr TClient::GetJobStderr(
return NRawClient::GetJobStderr(Context_, operationId, jobId, options);
}
+std::vector<TJobTraceEvent> TClient::GetJobTrace(
+ const TOperationId& operationId,
+ const TGetJobTraceOptions& options)
+{
+ CheckShutdown();
+ return NRawClient::GetJobTrace(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, operationId, options);
+}
+
TNode::TListType TClient::SkyShareTable(
const std::vector<TYPath>& tablePaths,
const TSkyShareTableOptions& options)
diff --git a/yt/cpp/mapreduce/client/client.h b/yt/cpp/mapreduce/client/client.h
index 5de00285ef..32c458316d 100644
--- a/yt/cpp/mapreduce/client/client.h
+++ b/yt/cpp/mapreduce/client/client.h
@@ -449,6 +449,10 @@ public:
const TJobId& jobId,
const TGetJobStderrOptions& options = TGetJobStderrOptions()) override;
+ std::vector<TJobTraceEvent> GetJobTrace(
+ const TOperationId& operationId,
+ const TGetJobTraceOptions& options = TGetJobTraceOptions()) override;
+
TNode::TListType SkyShareTable(
const std::vector<TYPath>& tablePaths,
const TSkyShareTableOptions& options = TSkyShareTableOptions()) override;
diff --git a/yt/cpp/mapreduce/interface/client.h b/yt/cpp/mapreduce/interface/client.h
index 56efa3c23c..3032025140 100644
--- a/yt/cpp/mapreduce/interface/client.h
+++ b/yt/cpp/mapreduce/interface/client.h
@@ -493,6 +493,18 @@ public:
const TGetJobStderrOptions& options = TGetJobStderrOptions()) = 0;
///
+ /// @brief Get trace of a job.
+ ///
+ /// @ref NYT::TErrorResponse exception is thrown if it is missing.
+ ///
+ /// @note YT doesn't store all job traces.
+ ///
+ /// @see [YT doc](https://ytsaurus.tech/docs/en/api/commands.html#get_job_trace)
+ virtual std::vector<TJobTraceEvent> GetJobTrace(
+ const TOperationId& operationId,
+ const TGetJobTraceOptions& options = TGetJobTraceOptions()) = 0;
+
+ ///
/// @brief Create one or several rbtorrents for files in a blob table.
///
/// If specified, one torrent is created for each value of `KeyColumns` option.
diff --git a/yt/cpp/mapreduce/interface/fwd.h b/yt/cpp/mapreduce/interface/fwd.h
index 0434c03d8b..485b45129a 100644
--- a/yt/cpp/mapreduce/interface/fwd.h
+++ b/yt/cpp/mapreduce/interface/fwd.h
@@ -157,6 +157,7 @@ namespace NYT {
using TTabletCellId = TGUID;
using TReplicaId = TGUID;
using TJobId = TGUID;
+ using TJobTraceId = TGUID;
using TYPath = TString;
using TLocalFilePath = TString;
@@ -370,6 +371,8 @@ namespace NYT {
struct TListJobsOptions;
+ struct TGetJobTraceOptions;
+
struct IOperationClient;
enum class EFinishedJobState : int;
diff --git a/yt/cpp/mapreduce/interface/operation.h b/yt/cpp/mapreduce/interface/operation.h
index 9a85049886..f2de3ea3bd 100644
--- a/yt/cpp/mapreduce/interface/operation.h
+++ b/yt/cpp/mapreduce/interface/operation.h
@@ -3048,6 +3048,70 @@ struct TGetFailedJobInfoOptions
////////////////////////////////////////////////////////////////////////////////
///
+/// @brief Options for @ref NYT::IClient::GetJobTrace.
+struct TGetJobTraceOptions
+{
+ /// @cond Doxygen_Suppress
+ using TSelf = TGetJobTraceOptions;
+ /// @endcond
+
+ ///
+ /// @brief Id of the job.
+ FLUENT_FIELD_OPTION(TJobId, JobId);
+
+ ///
+ /// @brief Id of the trace.
+ FLUENT_FIELD_OPTION(TJobTraceId, TraceId);
+
+ ///
+ /// @brief Search for traces with time >= `FromTime`.
+ FLUENT_FIELD_OPTION(i64, FromTime);
+
+ ///
+ /// @brief Search for traces with time <= `ToTime`.
+ FLUENT_FIELD_OPTION(i64, ToTime);
+
+ ///
+ /// @brief Search for traces with event index >= `FromEventIndex`.
+ FLUENT_FIELD_OPTION(i64, FromEventIndex);
+
+ ///
+ /// @brief Search for traces with event index >= `ToEventIndex`.
+ FLUENT_FIELD_OPTION(i64, ToEventIndex);
+};
+
+///
+/// @brief Response for @ref NYT::IOperation::GetJobTrace.
+struct TJobTraceEvent
+{
+ ///
+ /// @brief Id of the operation.
+ TOperationId OperationId;
+
+ ///
+ /// @brief Id of the job.
+ TJobId JobId;
+
+ ///
+ /// @brief Id of the trace.
+ TJobTraceId TraceId;
+
+ ///
+ /// @brief Index of the trace event.
+ i64 EventIndex;
+
+ ///
+ /// @brief Raw evenr in json format.
+ TString Event;
+
+ ///
+ /// @brief Time of the event.
+ TInstant EventTime;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+///
/// @brief Interface representing an operation.
struct IOperation
: public TThrRefBase
diff --git a/yt/cpp/mapreduce/raw_client/raw_requests.cpp b/yt/cpp/mapreduce/raw_client/raw_requests.cpp
index 2f9610a1ca..59868c599e 100644
--- a/yt/cpp/mapreduce/raw_client/raw_requests.cpp
+++ b/yt/cpp/mapreduce/raw_client/raw_requests.cpp
@@ -783,6 +783,55 @@ IFileReaderPtr GetJobStderr(
return new TResponseReader(context, std::move(header));
}
+TJobTraceEvent ParseJobTraceEvent(const TNode& node)
+{
+ const auto& mapNode = node.AsMap();
+ TJobTraceEvent result;
+
+ if (auto idNode = mapNode.FindPtr("operation_id")) {
+ result.OperationId = GetGuid(idNode->AsString());
+ }
+ if (auto idNode = mapNode.FindPtr("job_id")) {
+ result.JobId = GetGuid(idNode->AsString());
+ }
+ if (auto idNode = mapNode.FindPtr("trace_id")) {
+ result.TraceId = GetGuid(idNode->AsString());
+ }
+ if (auto eventIndexNode = mapNode.FindPtr("event_index")) {
+ result.EventIndex = eventIndexNode->AsInt64();
+ }
+ if (auto eventNode = mapNode.FindPtr("event")) {
+ result.Event = eventNode->AsString();
+ }
+ if (auto eventTimeNode = mapNode.FindPtr("event_time")) {
+ result.EventTime = TInstant::ParseIso8601(eventTimeNode->AsString());;
+ }
+
+ return result;
+}
+
+std::vector<TJobTraceEvent> GetJobTrace(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TOperationId& operationId,
+ const TGetJobTraceOptions& options)
+{
+ THttpHeader header("GET", "get_job_trace");
+ header.MergeParameters(SerializeParamsForGetJobTrace(operationId, options));
+ auto responseInfo = RetryRequestWithPolicy(retryPolicy, context, header);
+ auto resultNode = NodeFromYsonString(responseInfo.Response);
+
+ std::vector<TJobTraceEvent> result;
+
+ const auto& traceEventNodesList = resultNode.AsList();
+ result.reserve(traceEventNodesList.size());
+ for (const auto& traceEventNode : traceEventNodesList) {
+ result.push_back(ParseJobTraceEvent(traceEventNode));
+ }
+
+ return result;
+}
+
TMaybe<TYPath> GetFileFromCache(
const IRequestRetryPolicyPtr& retryPolicy,
const TClientContext& context,
diff --git a/yt/cpp/mapreduce/raw_client/raw_requests.h b/yt/cpp/mapreduce/raw_client/raw_requests.h
index c2d1a53b51..0a183d617c 100644
--- a/yt/cpp/mapreduce/raw_client/raw_requests.h
+++ b/yt/cpp/mapreduce/raw_client/raw_requests.h
@@ -257,6 +257,12 @@ TString GetJobStderrWithRetries(
const TJobId& jobId,
const TGetJobStderrOptions& options = TGetJobStderrOptions());
+std::vector<TJobTraceEvent> GetJobTrace(
+ const IRequestRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TOperationId& operationId,
+ const TGetJobTraceOptions& options = TGetJobTraceOptions());
+
//
// File cache
//
diff --git a/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp
index a638ac581d..bebc59ec91 100644
--- a/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp
+++ b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp
@@ -525,6 +525,15 @@ TNode SerializeParamsForGetJob(
return result;
}
+TNode SerializeParamsForGetJobTrace(
+ const TOperationId& operationId,
+ const TGetJobTraceOptions& /* options */)
+{
+ TNode result;
+ SetOperationIdParam(&result, operationId);
+ return result;
+}
+
TNode SerializeParamsForListJobs(
const TOperationId& operationId,
const TListJobsOptions& options)
diff --git a/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h
index a7ab35d91d..69a4888267 100644
--- a/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h
+++ b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h
@@ -126,6 +126,10 @@ TNode SerializeParamsForListJobs(
const TOperationId& operationId,
const TListJobsOptions& options);
+TNode SerializeParamsForGetJobTrace(
+ const TOperationId& operationId,
+ const TGetJobTraceOptions& options);
+
TNode SerializeParametersForInsertRows(
const TString& pathPrefix,
const TYPath& path,