1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
|
#include "datashard_direct_transaction.h"
#include "datashard_pipeline.h"
#include "execution_unit_ctors.h"
#include "setup_sys_locks.h"
namespace NKikimr {
namespace NDataShard {
class TDirectOpUnit : public TExecutionUnit {
public:
TDirectOpUnit(TDataShard& self, TPipeline& pipeline)
: TExecutionUnit(EExecutionUnitKind::DirectOp, true, self, pipeline)
{
}
~TDirectOpUnit()
{
}
bool IsReadyToExecute(TOperation::TPtr op) const override {
return !op->HasRuntimeConflicts();
}
EExecutionStatus Execute(TOperation::TPtr op, TTransactionContext& txc, const TActorContext& ctx) override {
Y_UNUSED(ctx);
if (op->IsImmediate()) {
// Every time we execute immediate transaction we may choose a new mvcc version
op->MvccReadWriteVersion.reset();
}
TSetupSysLocks guardLocks(op, DataShard);
TDirectTransaction* tx = dynamic_cast<TDirectTransaction*>(op.Get());
Y_VERIFY(tx != nullptr);
if (!tx->Execute(&DataShard, txc)) {
return EExecutionStatus::Restart;
}
if (auto changes = tx->GetCollectedChanges()) {
op->ChangeRecords().reserve(changes.size());
for (const auto& change : changes) {
op->ChangeRecords().emplace_back(change.Order(), change.PathId(), change.BodySize());
}
}
DataShard.SysLocksTable().ApplyLocks();
Pipeline.AddCommittingOp(op);
return EExecutionStatus::DelayCompleteNoMoreRestarts;
}
void Complete(TOperation::TPtr op, const TActorContext& ctx) override {
Pipeline.RemoveCommittingOp(op);
DataShard.EnqueueChangeRecords(std::move(op->ChangeRecords()));
TDirectTransaction* tx = dynamic_cast<TDirectTransaction*>(op.Get());
Y_VERIFY(tx != nullptr);
tx->SendResult(&DataShard, ctx);
}
}; // TDirectOpUnit
THolder<TExecutionUnit> CreateDirectOpUnit(TDataShard& self, TPipeline& pipeline) {
return THolder(new TDirectOpUnit(self, pipeline));
}
} // NDataShard
} // NKikimr
|