1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
|
#include "transactions.h"
#include "constants.h"
#include "log.h"
#include "util.h"
#include <library/cpp/time_provider/monotonic.h>
#include <util/generic/singleton.h>
#include <util/string/printf.h>
#include <format>
#include <unordered_map>
namespace NYdb::NTPCC {
namespace {
//-----------------------------------------------------------------------------
using namespace NYdb::NQuery;
//-----------------------------------------------------------------------------
NYdb::NQuery::TAsyncExecuteQueryResult Select1(
NQuery::TSession& session,
const std::optional<NYdb::NQuery::TTransaction>& tx,
TTransactionContext& context)
{
auto& Log = context.Log;
static std::string query = std::format(R"(
PRAGMA TablePathPrefix("{}");
DECLARE $count AS Int32;
SELECT $count;
)", context.Path.c_str());
auto params = TParamsBuilder()
.AddParam("$count").Int32(1).Build()
.Build();
auto txControl = tx ? TTxControl::Tx(*tx) : TTxControl::BeginTx(TTxSettings::SerializableRW());
auto result = session.ExecuteQuery(
query,
txControl,
std::move(params));
LOG_T("Terminal " << context.TerminalID << " waiting for select1");
return result;
}
} // anonymous
//-----------------------------------------------------------------------------
NThreading::TFuture<TStatus> GetSimulationTask(
TTransactionContext& context,
TDuration& latency,
NQuery::TSession session)
{
TMonotonic startTs = TMonotonic::Now();
TTransactionInflightGuard guard;
co_await TTaskReady(context.TaskQueue, context.TerminalID);
auto& Log = context.Log;
LOG_T("Terminal " << context.TerminalID << " started simulated transaction");
// just to test if we have problems with generator (we don't)
for (size_t i = 0; i < 10; ++i) {
RandomNumber(DISTRICT_LOW_ID, DISTRICT_HIGH_ID);
}
if (context.SimulateTransactionMs != 0) {
std::chrono::milliseconds delay(context.SimulateTransactionMs);
co_await TSuspend(context.TaskQueue, context.TerminalID, delay);
TMonotonic endTs = TMonotonic::Now();
latency = endTs - startTs;
co_return TStatus(EStatus::SUCCESS, NIssue::TIssues());
}
// sleep 1 sumulation
std::optional<TTransaction> tx;
for (int i = 0; i < context.SimulateTransactionSelect1; ++i) {
auto future = Select1(session, tx, context);
auto result = co_await TSuspendWithFuture(future, context.TaskQueue, context.TerminalID);
if (!result.IsSuccess()) {
co_return result;
}
if (!tx) {
tx = *result.GetTransaction();
LOG_T("Terminal " << context.TerminalID << " simulated transaction txId " << tx->GetId());
}
}
auto commitFuture = tx->Commit();
auto commitResult = co_await TSuspendWithFuture(commitFuture, context.TaskQueue, context.TerminalID);
TMonotonic endTs = TMonotonic::Now();
latency = endTs - startTs;
co_return commitResult;
}
} // namespace NYdb::NTPCC
|