summaryrefslogtreecommitdiffstats
path: root/ydb/core/grpc_services/rpc_rollback_transaction.cpp
blob: 0bfcee0ea59ca2ae816f8e5209ca40818c48307d (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
#include "grpc_request_proxy.h" 
 
#include "rpc_calls.h" 
#include "rpc_kqp_base.h" 
#include "rpc_common.h"
 
#include <ydb/library/yql/public/issue/yql_issue_message.h>
#include <ydb/library/yql/public/issue/yql_issue.h>
 
namespace NKikimr { 
namespace NGRpcService { 
 
using namespace NActors; 
using namespace Ydb; 
using namespace NKqp; 
 
class TRollbackTransactionRPC : public TRpcKqpRequestActor<TRollbackTransactionRPC, TEvRollbackTransactionRequest> { 
    using TBase = TRpcKqpRequestActor<TRollbackTransactionRPC, TEvRollbackTransactionRequest>; 
 
public: 
    TRollbackTransactionRPC(TEvRollbackTransactionRequest* msg) 
        : TBase(msg) {} 
 
    void Bootstrap(const TActorContext& ctx) { 
        TBase::Bootstrap(ctx); 
 
        RollbackTransactionImpl(ctx); 
        Become(&TRollbackTransactionRPC::StateWork); 
    } 
 
    void StateWork(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx) { 
        switch (ev->GetTypeRewrite()) { 
            HFunc(NKqp::TEvKqp::TEvQueryResponse, Handle); 
            default: TBase::StateWork(ev, ctx); 
        } 
    } 
 
private: 
    void RollbackTransactionImpl(const TActorContext &ctx) { 
        const auto req = GetProtoRequest();
        const auto traceId = Request_->GetTraceId(); 
 
        TString sessionId; 
        auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(); 
        SetAuthToken(ev, *Request_);
        SetDatabase(ev, *Request_);

        NYql::TIssues issues; 
        if (CheckSession(req->session_id(), issues)) { 
            ev->Record.MutableRequest()->SetSessionId(req->session_id()); 
        } else { 
            return Reply(Ydb::StatusIds::BAD_REQUEST, issues, ctx); 
        } 
 
        if (traceId) { 
            ev->Record.SetTraceId(traceId.GetRef()); 
        } 
 
        if (!req->tx_id()) { 
            NYql::TIssues issues; 
            issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Empty transaction id.")); 
            return Reply(Ydb::StatusIds::BAD_REQUEST, issues, ctx); 
        } 
 
        ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_ROLLBACK_TX); 
        ev->Record.MutableRequest()->MutableTxControl()->set_tx_id(req->tx_id()); 
 
        ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release()); 
    } 
 
    void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) { 
        const auto& record = ev->Get()->Record.GetRef(); 
        AddServerHintsIfAny(record);

        if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) { 
            const auto& kqpResponse = record.GetResponse(); 
            const auto& issueMessage = kqpResponse.GetQueryIssues(); 
 
            ReplyWithResult(Ydb::StatusIds::SUCCESS, issueMessage, ctx); 
        } else { 
            return OnGenericQueryResponseError(record, ctx); 
        } 
    } 
 
    void ReplyWithResult(StatusIds::StatusCode status, 
                         const google::protobuf::RepeatedPtrField<TYdbIssueMessageType>& message,
                         const TActorContext& ctx) { 
        Request_->SendResult(status, message); 
        Die(ctx); 
    } 
}; 
 
void TGRpcRequestProxy::Handle(TEvRollbackTransactionRequest::TPtr& ev, const TActorContext& ctx) { 
    ctx.Register(new TRollbackTransactionRPC(ev->Release().Release())); 
} 
 
} // namespace NGRpcService 
} // namespace NKikimr