#include "errors.h"
#include <library/cpp/yson/node/node_io.h>
#include <library/cpp/yson/node/node_visitor.h>
#include <yt/cpp/mapreduce/interface/error_codes.h>
#include <library/cpp/json/json_reader.h>
#include <library/cpp/yson/writer.h>
#include <util/string/builder.h>
#include <util/stream/str.h>
#include <util/generic/set.h>
namespace NYT {
using namespace NJson;
////////////////////////////////////////////////////////////////////////////////
static void WriteErrorDescription(const TYtError& error, IOutputStream* out)
{
(*out) << error.GetMessage();
const auto& innerErrorList = error.InnerErrors();
if (!innerErrorList.empty()) {
(*out) << ": ";
WriteErrorDescription(innerErrorList[0], out);
}
}
static void SerializeError(const TYtError& error, NYson::IYsonConsumer* consumer)
{
consumer->OnBeginMap();
{
consumer->OnKeyedItem("code");
consumer->OnInt64Scalar(error.GetCode());
consumer->OnKeyedItem("message");
consumer->OnStringScalar(error.GetMessage());
if (!error.GetAttributes().empty()) {
consumer->OnKeyedItem("attributes");
consumer->OnBeginMap();
{
for (const auto& item : error.GetAttributes()) {
consumer->OnKeyedItem(item.first);
TNodeVisitor(consumer).Visit(item.second);
}
}
consumer->OnEndMap();
}
if (!error.InnerErrors().empty()) {
consumer->OnKeyedItem("inner_errors");
{
consumer->OnBeginList();
for (const auto& innerError : error.InnerErrors()) {
SerializeError(innerError, consumer);
}
consumer->OnEndList();
}
}
}
consumer->OnEndMap();
}
static TString DumpJobInfoForException(const TOperationId& operationId, const TVector<TFailedJobInfo>& failedJobInfoList)
{
::TStringBuilder output;
// Exceptions have limit to contain 65508 bytes of text, so we also limit stderr text
constexpr size_t MAX_SIZE = 65508 / 2;
size_t written = 0;
for (const auto& failedJobInfo : failedJobInfoList) {
if (written >= MAX_SIZE) {
break;
}
TStringStream nextChunk;
nextChunk << '\n';
nextChunk << "OperationId: " << GetGuidAsString(operationId) << " JobId: " << GetGuidAsString(failedJobInfo.JobId) << '\n';
nextChunk << "Error: " << failedJobInfo.Error.FullDescription() << '\n';
if (!failedJobInfo.Stderr.empty()) {
nextChunk << "Stderr: " << Endl;
size_t tmpWritten = written + nextChunk.Str().size();
if (tmpWritten >= MAX_SIZE) {
break;
}
if (tmpWritten + failedJobInfo.Stderr.size() > MAX_SIZE) {
nextChunk << failedJobInfo.Stderr.substr(failedJobInfo.Stderr.size() - (MAX_SIZE - tmpWritten));
} else {
nextChunk << failedJobInfo.Stderr;
}
}
written += nextChunk.Str().size();
output << nextChunk.Str();
}
return output;
}
////////////////////////////////////////////////////////////////////////////////
TYtError::TYtError()
: Code_(0)
{ }
TYtError::TYtError(const TString& message)
: Code_(NYT::NClusterErrorCodes::Generic)
, Message_(message)
{ }
TYtError::TYtError(int code, TString message, TVector<TYtError> innerError, TNode::TMapType attributes)
: Code_(code)
, Message_(message)
, InnerErrors_(innerError)
, Attributes_(attributes)
{ }
TYtError::TYtError(const TJsonValue& value)
{
const TJsonValue::TMapType& map = value.GetMap();
TJsonValue::TMapType::const_iterator it = map.find("message");
if (it != map.end()) {
Message_ = it->second.GetString();
}
it = map.find("code");
if (it != map.end()) {
Code_ = static_cast<int>(it->second.GetInteger());
} else {
Code_ = NYT::NClusterErrorCodes::Generic;
}
it = map.find("inner_errors");
if (it != map.end()) {
const TJsonValue::TArray& innerErrors = it->second.GetArray();
for (const auto& innerError : innerErrors) {
InnerErrors_.push_back(TYtError(innerError));
}
}
it = map.find("attributes");
if (it != map.end()) {
auto attributes = NYT::NodeFromJsonValue(it->second);
if (attributes.IsMap()) {
Attributes_ = std::move(attributes.AsMap());
}
}
}
TYtError::TYtError(const TNode& node)
{
const auto& map = node.AsMap();
auto it = map.find("message");
if (it != map.end()) {
Message_ = it->second.AsString();
}
it = map.find("code");
if (it != map.end()) {
Code_ = static_cast<int>(it->second.AsInt64());
} else {
Code_ = NYT::NClusterErrorCodes::Generic;
}
it = map.find("inner_errors");
if (it != map.end()) {
const auto& innerErrors = it->second.AsList();
for (const auto& innerError : innerErrors) {
InnerErrors_.push_back(TYtError(innerError));
}
}
it = map.find("attributes");
if (it != map.end()) {
auto& attributes = it->second;
if (attributes.IsMap()) {
Attributes_ = std::move(attributes.AsMap());
}
}
}
int TYtError::GetCode() const
{
return Code_;
}
const TString& TYtError::GetMessage() const
{
return Message_;
}
const TVector<TYtError>& TYtError::InnerErrors() const
{
return InnerErrors_;
}
void TYtError::ParseFrom(const TString& jsonError)
{
TJsonValue value;
TStringInput input(jsonError);
ReadJsonTree(&input, &value);
*this = TYtError(value);
}
TSet<int> TYtError::GetAllErrorCodes() const
{
TDeque<const TYtError*> queue = {this};
TSet<int> result;
while (!queue.empty()) {
const auto* current = queue.front();
queue.pop_front();
result.insert(current->Code_);
for (const auto& error : current->InnerErrors_) {
queue.push_back(&error);
}
}
return result;
}
bool TYtError::ContainsErrorCode(int code) const
{
if (Code_ == code) {
return true;
}
for (const auto& error : InnerErrors_) {
if (error.ContainsErrorCode(code)) {
return true;
}
}
return false;
}
bool TYtError::ContainsText(const TStringBuf& text) const
{
if (Message_.Contains(text)) {
return true;
}
for (const auto& error : InnerErrors_) {
if (error.ContainsText(text)) {
return true;
}
}
return false;
}
bool TYtError::HasAttributes() const
{
return !Attributes_.empty();
}
const TNode::TMapType& TYtError::GetAttributes() const
{
return Attributes_;
}
TString TYtError::GetYsonText() const
{
TStringStream out;
::NYson::TYsonWriter writer(&out, NYson::EYsonFormat::Text);
SerializeError(*this, &writer);
return std::move(out.Str());
}
TString TYtError::ShortDescription() const
{
TStringStream out;
WriteErrorDescription(*this, &out);
return std::move(out.Str());
}
TString TYtError::FullDescription() const
{
TStringStream s;
WriteErrorDescription(*this, &s);
s << "; full error: " << GetYsonText();
return s.Str();
}
////////////////////////////////////////////////////////////////////////////////
TErrorResponse::TErrorResponse(int httpCode, const TString& requestId)
: HttpCode_(httpCode)
, RequestId_(requestId)
{ }
bool TErrorResponse::IsOk() const
{
return Error_.GetCode() == 0;
}
void TErrorResponse::SetRawError(const TString& message)
{
Error_ = TYtError(message);
Setup();
}
void TErrorResponse::SetError(TYtError error)
{
Error_ = std::move(error);
Setup();
}
void TErrorResponse::ParseFromJsonError(const TString& jsonError)
{
Error_.ParseFrom(jsonError);
Setup();
}
void TErrorResponse::SetIsFromTrailers(bool isFromTrailers)
{
IsFromTrailers_ = isFromTrailers;
}
int TErrorResponse::GetHttpCode() const
{
return HttpCode_;
}
bool TErrorResponse::IsFromTrailers() const
{
return IsFromTrailers_;
}
bool TErrorResponse::IsTransportError() const
{
return HttpCode_ == 503;
}
TString TErrorResponse::GetRequestId() const
{
return RequestId_;
}
const TYtError& TErrorResponse::GetError() const
{
return Error_;
}
bool TErrorResponse::IsResolveError() const
{
return Error_.ContainsErrorCode(NClusterErrorCodes::NYTree::ResolveError);
}
bool TErrorResponse::IsAccessDenied() const
{
return Error_.ContainsErrorCode(NClusterErrorCodes::NSecurityClient::AuthorizationError);
}
bool TErrorResponse::IsConcurrentTransactionLockConflict() const
{
return Error_.ContainsErrorCode(NClusterErrorCodes::NCypressClient::ConcurrentTransactionLockConflict);
}
bool TErrorResponse::IsRequestRateLimitExceeded() const
{
return Error_.ContainsErrorCode(NClusterErrorCodes::NSecurityClient::RequestQueueSizeLimitExceeded);
}
bool TErrorResponse::IsRequestQueueSizeLimitExceeded() const
{
return Error_.ContainsErrorCode(NClusterErrorCodes::NRpc::RequestQueueSizeLimitExceeded);
}
bool TErrorResponse::IsChunkUnavailable() const
{
return Error_.ContainsErrorCode(NClusterErrorCodes::NChunkClient::ChunkUnavailable);
}
bool TErrorResponse::IsRequestTimedOut() const
{
return Error_.ContainsErrorCode(NClusterErrorCodes::Timeout);
}
bool TErrorResponse::IsNoSuchTransaction() const
{
return Error_.ContainsErrorCode(NClusterErrorCodes::NTransactionClient::NoSuchTransaction);
}
bool TErrorResponse::IsConcurrentOperationsLimitReached() const
{
return Error_.ContainsErrorCode(NClusterErrorCodes::NScheduler::TooManyOperations);
}
void TErrorResponse::Setup()
{
TStringStream s;
*this << Error_.FullDescription();
}
TTransportError::TTransportError(TYtError error)
{
*this << error.FullDescription();
}
////////////////////////////////////////////////////////////////////////////////
TOperationFailedError::TOperationFailedError(
EState state,
TOperationId id,
TYtError ytError,
TVector<TFailedJobInfo> failedJobInfo)
: State_(state)
, OperationId_(id)
, Error_(std::move(ytError))
, FailedJobInfo_(std::move(failedJobInfo))
{
*this << Error_.FullDescription();
if (!FailedJobInfo_.empty()) {
*this << DumpJobInfoForException(OperationId_, FailedJobInfo_);
}
}
TOperationFailedError::EState TOperationFailedError::GetState() const
{
return State_;
}
TOperationId TOperationFailedError::GetOperationId() const
{
return OperationId_;
}
const TYtError& TOperationFailedError::GetError() const
{
return Error_;
}
const TVector<TFailedJobInfo>& TOperationFailedError::GetFailedJobInfo() const
{
return FailedJobInfo_;
}
} // namespace NYT