#include "raw_batch_request.h"

#include "raw_requests.h"
#include "rpc_parameters_serialization.h"

#include <yt/cpp/mapreduce/common/helpers.h>
#include <yt/cpp/mapreduce/common/retry_lib.h>

#include <yt/cpp/mapreduce/interface/logging/yt_log.h>

#include <yt/cpp/mapreduce/interface/client.h>
#include <yt/cpp/mapreduce/interface/errors.h>
#include <yt/cpp/mapreduce/interface/serialize.h>

#include <library/cpp/yson/node/node.h>

#include <yt/cpp/mapreduce/http/context.h>
#include <yt/cpp/mapreduce/http/retry_request.h>

#include <util/generic/guid.h>
#include <util/string/builder.h>

#include <exception>

namespace NYT::NDetail::NRawClient {

using NThreading::TFuture;
using NThreading::TPromise;
using NThreading::NewPromise;

////////////////////////////////////////////////////////////////////

static TString RequestInfo(const TNode& request)
{
    return ::TStringBuilder()
        << request["command"].AsString() << ' ' << NodeToYsonString(request["parameters"]);
}

static void EnsureNothing(const TMaybe<TNode>& node)
{
    Y_ENSURE(!node, "Internal error: expected to have no response, but got response of type " << node->GetType());
}

static void EnsureSomething(const TMaybe<TNode>& node)
{
    Y_ENSURE(node, "Internal error: expected to have response of any type, but got no response.");
}

static void EnsureType(const TNode& node, TNode::EType type)
{
    Y_ENSURE(node.GetType() == type, "Internal error: unexpected response type. "
        << "Expected: " << type << ", actual: " << node.GetType());
}

static void EnsureType(const TMaybe<TNode>& node, TNode::EType type)
{
    Y_ENSURE(node, "Internal error: expected to have response of type " << type << ", but got no response.");
    EnsureType(*node, type);
}

////////////////////////////////////////////////////////////////////

template <typename TReturnType>
class TResponseParserBase
    : public TRawBatchRequest::IResponseItemParser
{
public:
    using TFutureResult = TFuture<TReturnType>;

public:
    TResponseParserBase()
        : Result_(NewPromise<TReturnType>())
    { }

    void SetException(std::exception_ptr e) override
    {
        Result_.SetException(std::move(e));
    }

    TFuture<TReturnType> GetFuture()
    {
        return Result_.GetFuture();
    }

protected:
    TPromise<TReturnType> Result_;
};

////////////////////////////////////////////////////////////////////


class TGetResponseParser
    : public TResponseParserBase<TNode>
{
public:
    void SetResponse(TMaybe<TNode> node) override
    {
        EnsureSomething(node);
        Result_.SetValue(std::move(*node));
    }
};

////////////////////////////////////////////////////////////////////

class TVoidResponseParser
    : public TResponseParserBase<void>
{
public:
    void SetResponse(TMaybe<TNode> node) override
    {
        EnsureNothing(node);
        Result_.SetValue();
    }
};

////////////////////////////////////////////////////////////////////

class TListResponseParser
    : public TResponseParserBase<TNode::TListType>
{
public:
    void SetResponse(TMaybe<TNode> node) override
    {
        EnsureType(node, TNode::List);
        Result_.SetValue(std::move(node->AsList()));
    }
};

////////////////////////////////////////////////////////////////////

class TExistsResponseParser
    : public TResponseParserBase<bool>
{
public:
    void SetResponse(TMaybe<TNode> node) override
    {
        EnsureType(node, TNode::Bool);
        Result_.SetValue(std::move(node->AsBool()));
    }
};

////////////////////////////////////////////////////////////////////

class TGuidResponseParser
    : public TResponseParserBase<TGUID>
{
public:
    void SetResponse(TMaybe<TNode> node) override
    {
        EnsureType(node, TNode::String);
        Result_.SetValue(GetGuid(node->AsString()));
    }
};

////////////////////////////////////////////////////////////////////

class TCanonizeYPathResponseParser
    : public TResponseParserBase<TRichYPath>
{
public:
    explicit TCanonizeYPathResponseParser(TString pathPrefix, const TRichYPath& original)
        : OriginalNode_(PathToNode(original))
        , PathPrefix_(std::move(pathPrefix))
    { }

    void SetResponse(TMaybe<TNode> node) override
    {
        EnsureType(node, TNode::String);

        for (const auto& item : OriginalNode_.GetAttributes().AsMap()) {
            node->Attributes()[item.first] = item.second;
        }
        TRichYPath result;
        Deserialize(result, *node);
        result.Path_ = AddPathPrefix(result.Path_, PathPrefix_);
        Result_.SetValue(result);
    }

private:
    TNode OriginalNode_;
    TString PathPrefix_;
};

////////////////////////////////////////////////////////////////////

class TGetOperationResponseParser
    : public TResponseParserBase<TOperationAttributes>
{
public:
    void SetResponse(TMaybe<TNode> node) override
    {
        EnsureType(node, TNode::Map);
        Result_.SetValue(ParseOperationAttributes(*node));
    }
};

////////////////////////////////////////////////////////////////////

class TTableColumnarStatisticsParser
    : public TResponseParserBase<TVector<TTableColumnarStatistics>>
{
public:
    void SetResponse(TMaybe<TNode> node) override
    {
        EnsureType(node, TNode::Map);
        TVector<TTableColumnarStatistics> statistics;
        Deserialize(statistics, *node);
        Result_.SetValue(std::move(statistics));
    }
};

////////////////////////////////////////////////////////////////////

class TTablePartitionsParser
    : public TResponseParserBase<TMultiTablePartitions>
{
public:
    void SetResponse(TMaybe<TNode> node) override
    {
        EnsureType(node, TNode::Map);
        TMultiTablePartitions partitions;
        Deserialize(partitions, *node);
        Result_.SetValue(std::move(partitions));
    }
};

////////////////////////////////////////////////////////////////////////////////

class TGetFileFromCacheParser
    : public TResponseParserBase<TMaybe<TYPath>>
{
public:
    void SetResponse(TMaybe<TNode> node) override
    {
        EnsureType(node, TNode::String);
        if (node->AsString().empty()) {
            Result_.SetValue(Nothing());
        } else {
            Result_.SetValue(node->AsString());
        }
    }
};

////////////////////////////////////////////////////////////////////

class TYPathParser
    : public TResponseParserBase<TYPath>
{
public:
    void SetResponse(TMaybe<TNode> node) override
    {
        EnsureType(node, TNode::String);
        Result_.SetValue(node->AsString());
    }
};

////////////////////////////////////////////////////////////////////

class TCheckPermissionParser
    : public TResponseParserBase<TCheckPermissionResponse>
{
public:
    void SetResponse(TMaybe<TNode> node) override
    {
        EnsureType(node, TNode::Map);
        Result_.SetValue(ParseCheckPermissionResponse(*node));
    }
};

////////////////////////////////////////////////////////////////////

TRawBatchRequest::TBatchItem::TBatchItem(TNode parameters, ::TIntrusivePtr<IResponseItemParser> responseParser)
    : Parameters(std::move(parameters))
    , ResponseParser(std::move(responseParser))
    , NextTry()
{ }

TRawBatchRequest::TBatchItem::TBatchItem(const TBatchItem& batchItem, TInstant nextTry)
    : Parameters(batchItem.Parameters)
    , ResponseParser(batchItem.ResponseParser)
    , NextTry(nextTry)
{ }

////////////////////////////////////////////////////////////////////

TRawBatchRequest::TRawBatchRequest(const TConfigPtr& config)
    : Config_(config)
{ }

TRawBatchRequest::~TRawBatchRequest() = default;

bool TRawBatchRequest::IsExecuted() const
{
    return Executed_;
}

void TRawBatchRequest::MarkExecuted()
{
    Executed_ = true;
}

template <typename TResponseParser>
typename TResponseParser::TFutureResult TRawBatchRequest::AddRequest(
    const TString& command,
    TNode parameters,
    TMaybe<TNode> input)
{
    return AddRequest(command, parameters, input, MakeIntrusive<TResponseParser>());
}

template <typename TResponseParser>
typename TResponseParser::TFutureResult TRawBatchRequest::AddRequest(
    const TString& command,
    TNode parameters,
    TMaybe<TNode> input,
    ::TIntrusivePtr<TResponseParser> parser)
{
    Y_ENSURE(!Executed_, "Cannot add request: batch request is already executed");
    TNode request;
    request["command"] = command;
    request["parameters"] = std::move(parameters);
    if (input) {
        request["input"] = std::move(*input);
    }
    BatchItemList_.emplace_back(std::move(request), parser);
    return parser->GetFuture();
}

void TRawBatchRequest::AddRequest(TBatchItem batchItem)
{
    Y_ENSURE(!Executed_, "Cannot add request: batch request is already executed");
    BatchItemList_.push_back(batchItem);
}

TFuture<TNodeId> TRawBatchRequest::Create(
    const TTransactionId& transaction,
    const TYPath& path,
    ENodeType type,
    const TCreateOptions& options)
{
    return AddRequest<TGuidResponseParser>(
        "create",
        SerializeParamsForCreate(transaction, Config_->Prefix, path, type, options),
        Nothing());
}

TFuture<void> TRawBatchRequest::Remove(
    const TTransactionId& transaction,
    const TYPath& path,
    const TRemoveOptions& options)
{
    return AddRequest<TVoidResponseParser>(
        "remove",
        SerializeParamsForRemove(transaction, Config_->Prefix, path, options),
        Nothing());
}

TFuture<bool> TRawBatchRequest::Exists(
    const TTransactionId& transaction,
    const TYPath& path,
    const TExistsOptions& options)
{
    return AddRequest<TExistsResponseParser>(
        "exists",
        SerializeParamsForExists(transaction, Config_->Prefix, path, options),
        Nothing());
}

TFuture<TNode> TRawBatchRequest::Get(
    const TTransactionId& transaction,
    const TYPath& path,
    const TGetOptions& options)
{
    return AddRequest<TGetResponseParser>(
        "get",
        SerializeParamsForGet(transaction, Config_->Prefix, path, options),
        Nothing());
}

TFuture<void> TRawBatchRequest::Set(
    const TTransactionId& transaction,
    const TYPath& path,
    const TNode& node,
    const TSetOptions& options)
{
    return AddRequest<TVoidResponseParser>(
        "set",
        SerializeParamsForSet(transaction, Config_->Prefix, path, options),
        node);
}

TFuture<TNode::TListType> TRawBatchRequest::List(
    const TTransactionId& transaction,
    const TYPath& path,
    const TListOptions& options)
{
    return AddRequest<TListResponseParser>(
        "list",
        SerializeParamsForList(transaction, Config_->Prefix, path, options),
        Nothing());
}

TFuture<TNodeId> TRawBatchRequest::Copy(
    const TTransactionId& transaction,
    const TYPath& sourcePath,
    const TYPath& destinationPath,
    const TCopyOptions& options)
{
    return AddRequest<TGuidResponseParser>(
        "copy",
        SerializeParamsForCopy(transaction, Config_->Prefix, sourcePath, destinationPath, options),
        Nothing());
}

TFuture<TNodeId> TRawBatchRequest::Move(
    const TTransactionId& transaction,
    const TYPath& sourcePath,
    const TYPath& destinationPath,
    const TMoveOptions& options)
{
    return AddRequest<TGuidResponseParser>(
        "move",
        SerializeParamsForMove(transaction, Config_->Prefix, sourcePath, destinationPath, options),
        Nothing());
}

TFuture<TNodeId> TRawBatchRequest::Link(
    const TTransactionId& transaction,
    const TYPath& targetPath,
    const TYPath& linkPath,
    const TLinkOptions& options)
{
    return AddRequest<TGuidResponseParser>(
        "link",
        SerializeParamsForLink(transaction, Config_->Prefix, targetPath, linkPath, options),
        Nothing());
}

TFuture<TLockId> TRawBatchRequest::Lock(
    const TTransactionId& transaction,
    const TYPath& path,
    ELockMode mode,
    const TLockOptions& options)
{
    return AddRequest<TGuidResponseParser>(
        "lock",
        SerializeParamsForLock(transaction, Config_->Prefix, path, mode, options),
        Nothing());
}

TFuture<void> TRawBatchRequest::Unlock(
    const TTransactionId& transaction,
    const TYPath& path,
    const TUnlockOptions& options)
{
    return AddRequest<TVoidResponseParser>(
        "unlock",
        SerializeParamsForUnlock(transaction, Config_->Prefix, path, options),
        Nothing());
}

TFuture<TMaybe<TYPath>> TRawBatchRequest::GetFileFromCache(
    const TTransactionId& transactionId,
    const TString& md5Signature,
    const TYPath& cachePath,
    const TGetFileFromCacheOptions& options)
{
    return AddRequest<TGetFileFromCacheParser>(
        "get_file_from_cache",
        SerializeParamsForGetFileFromCache(transactionId, md5Signature, cachePath, options),
        Nothing());
}

TFuture<TYPath> TRawBatchRequest::PutFileToCache(
    const TTransactionId& transactionId,
    const TYPath& filePath,
    const TString& md5Signature,
    const TYPath& cachePath,
    const TPutFileToCacheOptions& options)
{
    return AddRequest<TYPathParser>(
        "put_file_to_cache",
        SerializeParamsForPutFileToCache(transactionId, Config_->Prefix, filePath, md5Signature, cachePath, options),
        Nothing());
}

TFuture<TCheckPermissionResponse> TRawBatchRequest::CheckPermission(
    const TString& user,
    EPermission permission,
    const TYPath& path,
    const TCheckPermissionOptions& options)
{
    return AddRequest<TCheckPermissionParser>(
        "check_permission",
        SerializeParamsForCheckPermission(user, permission, Config_->Prefix, path, options),
        Nothing());
}

TFuture<TOperationAttributes> TRawBatchRequest::GetOperation(
    const TOperationId& operationId,
    const TGetOperationOptions& options)
{
    return AddRequest<TGetOperationResponseParser>(
        "get_operation",
        SerializeParamsForGetOperation(operationId, options),
        Nothing());
}

TFuture<void> TRawBatchRequest::AbortOperation(const TOperationId& operationId)
{
    return AddRequest<TVoidResponseParser>(
        "abort_op",
        SerializeParamsForAbortOperation(operationId),
        Nothing());
}

TFuture<void> TRawBatchRequest::CompleteOperation(const TOperationId& operationId)
{
    return AddRequest<TVoidResponseParser>(
        "complete_op",
        SerializeParamsForCompleteOperation(operationId),
        Nothing());
}
TFuture<void> TRawBatchRequest::SuspendOperation(
    const TOperationId& operationId,
    const TSuspendOperationOptions& options)
{
    return AddRequest<TVoidResponseParser>(
        "suspend_operation",
        SerializeParamsForSuspendOperation(operationId, options),
        Nothing());
}
TFuture<void> TRawBatchRequest::ResumeOperation(
    const TOperationId& operationId,
    const TResumeOperationOptions& options)
{
    return AddRequest<TVoidResponseParser>(
        "resume_operation",
        SerializeParamsForResumeOperation(operationId, options),
        Nothing());
}

TFuture<void> TRawBatchRequest::UpdateOperationParameters(
    const TOperationId& operationId,
    const TUpdateOperationParametersOptions& options)
{
    return AddRequest<TVoidResponseParser>(
        "update_op_parameters",
        SerializeParamsForUpdateOperationParameters(operationId, options),
        Nothing());
}

TFuture<TRichYPath> TRawBatchRequest::CanonizeYPath(const TRichYPath& path)
{
    TRichYPath result = path;
    // Out of the symbols in the canonization branch below, only '<' can appear in the beggining of a valid rich YPath.
    if (!result.Path_.StartsWith("<")) {
        result.Path_ = AddPathPrefix(result.Path_, Config_->Prefix);
    }

    if (result.Path_.find_first_of("<>{}[]") != TString::npos) {
        return AddRequest<TCanonizeYPathResponseParser>(
            "parse_ypath",
            SerializeParamsForParseYPath(result),
            Nothing(),
            MakeIntrusive<TCanonizeYPathResponseParser>(Config_->Prefix, result));
    }
    return NThreading::MakeFuture(result);
}

TFuture<TVector<TTableColumnarStatistics>> TRawBatchRequest::GetTableColumnarStatistics(
    const TTransactionId& transaction,
    const TVector<TRichYPath>& paths,
    const TGetTableColumnarStatisticsOptions& options)
{
    return AddRequest<TTableColumnarStatisticsParser>(
        "get_table_columnar_statistics",
        SerializeParamsForGetTableColumnarStatistics(transaction, paths, options),
        Nothing());
}

TFuture<TMultiTablePartitions> TRawBatchRequest::GetTablePartitions(
    const TTransactionId& transaction,
    const TVector<TRichYPath>& paths,
    const TGetTablePartitionsOptions& options)
{
    return AddRequest<TTablePartitionsParser>(
        "partition_tables",
        SerializeParamsForGetTablePartitions(transaction, paths, options),
        Nothing());
}

void TRawBatchRequest::FillParameterList(size_t maxSize, TNode* result, TInstant* nextTry) const
{
    Y_ABORT_UNLESS(result);
    Y_ABORT_UNLESS(nextTry);

    *nextTry = TInstant();
    maxSize = Min(maxSize, BatchItemList_.size());
    *result = TNode::CreateList();
    for (size_t i = 0; i < maxSize; ++i) {
        YT_LOG_DEBUG("ExecuteBatch preparing: %v",
            RequestInfo(BatchItemList_[i].Parameters));

        result->Add(BatchItemList_[i].Parameters);
        if (BatchItemList_[i].NextTry > *nextTry) {
            *nextTry = BatchItemList_[i].NextTry;
        }
    }
}

void TRawBatchRequest::ParseResponse(
    const TResponseInfo& requestResult,
    const IRequestRetryPolicyPtr& retryPolicy,
    TRawBatchRequest* retryBatch,
    TInstant now)
{
    TNode node = NodeFromYsonString(requestResult.Response);
    return ParseResponse(node, requestResult.RequestId, retryPolicy, retryBatch, now);
}

void TRawBatchRequest::ParseResponse(
    TNode node,
    const TString& requestId,
    const IRequestRetryPolicyPtr& retryPolicy,
    TRawBatchRequest* retryBatch,
    TInstant now)
{
    Y_ABORT_UNLESS(retryBatch);

    EnsureType(node, TNode::List);
    auto& responseList = node.AsList();
    const auto size = responseList.size();
    Y_ENSURE(size <= BatchItemList_.size(),
        "Size of server response exceeds size of batch request;"
        " size of batch: " << BatchItemList_.size() <<
        " size of server response: " << size << '.');

    for (size_t i = 0; i != size; ++i) {
        try {
            EnsureType(responseList[i], TNode::Map);
            auto& responseNode = responseList[i].AsMap();
            const auto outputIt = responseNode.find("output");
            if (outputIt != responseNode.end()) {
                BatchItemList_[i].ResponseParser->SetResponse(std::move(outputIt->second));
            } else {
                const auto errorIt = responseNode.find("error");
                if (errorIt == responseNode.end()) {
                    BatchItemList_[i].ResponseParser->SetResponse(Nothing());
                } else {
                    TErrorResponse error(400, requestId);
                    error.SetError(TYtError(errorIt->second));
                    if (auto curInterval = IsRetriable(error) ? retryPolicy->OnRetriableError(error) : Nothing()) {
                        YT_LOG_INFO(
                            "Batch subrequest (%s) failed, will retry, error: %s",
                            RequestInfo(BatchItemList_[i].Parameters),
                            error.what());
                        retryBatch->AddRequest(TBatchItem(BatchItemList_[i], now + *curInterval));
                    } else {
                        YT_LOG_ERROR(
                            "Batch subrequest (%s) failed, error: %s",
                            RequestInfo(BatchItemList_[i].Parameters),
                            error.what());
                        BatchItemList_[i].ResponseParser->SetException(std::make_exception_ptr(error));
                    }
                }
            }
        } catch (const std::exception& e) {
            // We don't expect other exceptions, so we don't catch (...)
            BatchItemList_[i].ResponseParser->SetException(std::current_exception());
        }
    }
    BatchItemList_.erase(BatchItemList_.begin(), BatchItemList_.begin() + size);
}

void TRawBatchRequest::SetErrorResult(std::exception_ptr e) const
{
    for (const auto& batchItem : BatchItemList_) {
        batchItem.ResponseParser->SetException(e);
    }
}

size_t TRawBatchRequest::BatchSize() const
{
    return BatchItemList_.size();
}

////////////////////////////////////////////////////////////////////////////////

} // namespace NYT::NDetail::NRawClient