aboutsummaryrefslogtreecommitdiffstats
path: root/ydb/core/tx/datashard/direct_tx_unit.cpp
blob: 1284b1f6ad6f02044212b32a3562e6994e062295 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
#include "datashard_direct_transaction.h"
#include "datashard_pipeline.h"
#include "execution_unit_ctors.h"
#include "setup_sys_locks.h"

namespace NKikimr {
namespace NDataShard {

class TDirectOpUnit : public TExecutionUnit {
public:
    TDirectOpUnit(TDataShard& self, TPipeline& pipeline)
        : TExecutionUnit(EExecutionUnitKind::DirectOp, true, self, pipeline)
    {
    }

    ~TDirectOpUnit()
    {
    }

    bool IsReadyToExecute(TOperation::TPtr op) const override {
        return !op->HasRuntimeConflicts();
    }

    EExecutionStatus Execute(TOperation::TPtr op, TTransactionContext& txc, const TActorContext& ctx) override {
        Y_UNUSED(ctx);

        if (op->IsImmediate()) {
            // Every time we execute immediate transaction we may choose a new mvcc version
            op->MvccReadWriteVersion.reset();
        }

        TSetupSysLocks guardLocks(op, DataShard);

        TDirectTransaction* tx = dynamic_cast<TDirectTransaction*>(op.Get());
        Y_VERIFY(tx != nullptr);

        if (!tx->Execute(&DataShard, txc)) {
            return EExecutionStatus::Restart;
        }

        if (auto changes = tx->GetCollectedChanges()) {
            op->ChangeRecords().reserve(changes.size());
            for (const auto& change : changes) {
                op->ChangeRecords().emplace_back(change.Order(), change.PathId(), change.BodySize());
            }
        }

        DataShard.SysLocksTable().ApplyLocks();
        Pipeline.AddCommittingOp(op);

        return EExecutionStatus::DelayCompleteNoMoreRestarts;
    }

    void Complete(TOperation::TPtr op, const TActorContext& ctx) override {
        Pipeline.RemoveCommittingOp(op);
        DataShard.EnqueueChangeRecords(std::move(op->ChangeRecords()));

        TDirectTransaction* tx = dynamic_cast<TDirectTransaction*>(op.Get());
        Y_VERIFY(tx != nullptr);

        tx->SendResult(&DataShard, ctx);
    }

}; // TDirectOpUnit

THolder<TExecutionUnit> CreateDirectOpUnit(TDataShard& self, TPipeline& pipeline) {
    return THolder(new TDirectOpUnit(self, pipeline));
}

} // NDataShard
} // NKikimr