summaryrefslogtreecommitdiffstats
path: root/ydb/core/grpc_services/rpc_rollback_transaction.cpp
blob: b6cdfe281770bd183a51040f22dc8ed6392b4d37 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
#include "grpc_request_proxy.h"

#include "rpc_calls.h"
#include "rpc_kqp_base.h"
#include "rpc_common.h"

#include <ydb/library/yql/public/issue/yql_issue_message.h>
#include <ydb/library/yql/public/issue/yql_issue.h>

namespace NKikimr {
namespace NGRpcService {

using namespace NActors;
using namespace Ydb;
using namespace NKqp;

class TRollbackTransactionRPC : public TRpcKqpRequestActor<TRollbackTransactionRPC, TEvRollbackTransactionRequest> {
    using TBase = TRpcKqpRequestActor<TRollbackTransactionRPC, TEvRollbackTransactionRequest>;

public:
    TRollbackTransactionRPC(TEvRollbackTransactionRequest* msg)
        : TBase(msg) {}

    void Bootstrap(const TActorContext& ctx) {
        TBase::Bootstrap(ctx);

        RollbackTransactionImpl(ctx);
        Become(&TRollbackTransactionRPC::StateWork);
    }

    void StateWork(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx) {
        switch (ev->GetTypeRewrite()) {
            HFunc(NKqp::TEvKqp::TEvQueryResponse, Handle);
            default: TBase::StateWork(ev, ctx);
        }
    }

private:
    void RollbackTransactionImpl(const TActorContext &ctx) {
        const auto req = GetProtoRequest();
        const auto traceId = Request_->GetTraceId();

        TString sessionId;
        auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>();
        SetAuthToken(ev, *Request_);
        SetDatabase(ev, *Request_);

        NYql::TIssues issues;
        if (CheckSession(req->session_id(), issues)) {
            ev->Record.MutableRequest()->SetSessionId(req->session_id());
        } else {
            return Reply(Ydb::StatusIds::BAD_REQUEST, issues, ctx);
        }

        if (traceId) {
            ev->Record.SetTraceId(traceId.GetRef());
        }

        if (!req->tx_id()) {
            NYql::TIssues issues;
            issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Empty transaction id."));
            return Reply(Ydb::StatusIds::BAD_REQUEST, issues, ctx);
        }

        ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_ROLLBACK_TX);
        ev->Record.MutableRequest()->MutableTxControl()->set_tx_id(req->tx_id());

        ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release());
    }

    void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) {
        const auto& record = ev->Get()->Record.GetRef();
        AddServerHintsIfAny(record);

        if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) {
            const auto& kqpResponse = record.GetResponse();
            const auto& issueMessage = kqpResponse.GetQueryIssues();

            ReplyWithResult(Ydb::StatusIds::SUCCESS, issueMessage, ctx);
        } else {
            return OnGenericQueryResponseError(record, ctx);
        }
    }

    void ReplyWithResult(StatusIds::StatusCode status,
                         const google::protobuf::RepeatedPtrField<TYdbIssueMessageType>& message,
                         const TActorContext& ctx) {
        Request_->SendResult(status, message);
        Die(ctx);
    }
};

void TGRpcRequestProxy::Handle(TEvRollbackTransactionRequest::TPtr& ev, const TActorContext& ctx) {
    ctx.Register(new TRollbackTransactionRPC(ev->Release().Release()));
}

} // namespace NGRpcService
} // namespace NKikimr