aboutsummaryrefslogtreecommitdiffstats
path: root/ydb/public/lib/ydb_cli/commands/topic_operations_scenario.h
blob: 0913cc492dc2d269d41f3277b1d00b9242610c8e (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
#pragma once

#include <ydb/public/lib/ydb_cli/common/command.h>

#include <util/datetime/base.h>
#include <util/generic/fwd.h>
#include <util/system/types.h>

#include <atomic>
#include <future>
#include <memory>
#include <thread>
#include <vector>

class TLogBackend;
class TLog;

namespace NYdb::inline V3 {

class TDriver;
class TParams;

}

namespace NYdb::inline V3::NTable {

class TSession;
class TTableClient;

}

namespace NYdb::NConsoleClient {

class TTopicWorkloadStatsCollector;

class TTopicOperationsScenario {
public:
    TTopicOperationsScenario();

    int Run(const TClientCommand::TConfig& config);

    void EnsurePercentileIsValid() const;
    void EnsureWarmupSecIsValid() const;

    TString GetReadOnlyTableName() const;
    TString GetWriteOnlyTableName() const;

    TDuration TotalSec;
    TDuration WindowSec;
    TDuration WarmupSec;
    bool Quiet = false;
    bool PrintTimestamp = false;
    double Percentile = 99.0;
    TString TopicName;
    ui32 TopicPartitionCount = 1;
    bool TopicAutoscaling = false;
    ui32 TopicMaxPartitionCount = 100;
    ui32 StabilizationWindowSeconds = 15;
    ui32 UpUtilizationPercent = 90;
    ui32 DownUtilizationPercent = 30;
    ui32 ProducerThreadCount = 0;
    ui32 ConsumerThreadCount = 0;
    ui32 ConsumerCount = 0;
    bool Direct = false;
    TString ConsumerPrefix;
    size_t MessageSizeBytes;
    size_t MessagesPerSec;
    size_t BytesPerSec;
    ui32 Codec;
    TString TableName;
    ui32 TablePartitionCount = 1;
    bool UseTransactions = false;
    size_t CommitPeriodSeconds = 1;
    size_t TxCommitIntervalMs = 0;
    size_t CommitMessages = 1'000'000;
    bool OnlyTopicInTx = true;
    bool OnlyTableInTx = false;
    bool UseTableSelect = false;
    bool ReadWithoutConsumer = false;
    bool UseCpuTimestamp = false;

protected:
    void CreateTopic(const TString& database,
                     const TString& topic,
                     ui32 partitionCount,
                     ui32 consumerCount,
                     bool autoscaling = false,
                     ui32 maxPartitionCount = 100,
                     ui32 stabilizationWindowSeconds = 15,
                     ui32 upUtilizationPercent = 90,
                     ui32 downUtilizationPercent = 30);
    void DropTopic(const TString& database,
                   const TString& topic);

    void DropTable(const TString& database, const TString& table);

    void ExecSchemeQuery(const TString& query);
    void ExecDataQuery(const TString& query, const NYdb::TParams& params);

    void StartConsumerThreads(std::vector<std::future<void>>& threads,
                              const TString& database);
    void StartProducerThreads(std::vector<std::future<void>>& threads,
                              ui32 partitionCount,
                              ui32 partitionSeed,
                              const std::vector<TString>& generatedMessages,
                              const TString& database);
    void JoinThreads(const std::vector<std::future<void>>& threads);

    bool AnyErrors() const;
    bool AnyIncomingMessages() const;
    bool AnyOutgoingMessages() const;

    std::unique_ptr<TDriver> Driver;
    std::shared_ptr<TLog> Log;
    std::shared_ptr<std::atomic_bool> ErrorFlag;
    std::shared_ptr<TTopicWorkloadStatsCollector> StatsCollector;

private:
    virtual int DoRun(const TClientCommand::TConfig& config) = 0;

    void EnsureTopicNotExist(const TString& topic);
    void CreateTopic(const TString& topic,
                     ui32 partitionCount,
                     ui32 consumerCount,
                     bool autoscaling,
                     ui32 maxPartitionCount,
                     ui32 stabilizationWindowSeconds,
                     ui32 upUtilizationPercent,
                     ui32 downUtilizationPercent);

    static NTable::TSession GetSession(NTable::TTableClient& client);

    static THolder<TLogBackend> MakeLogBackend(TClientCommand::TConfig::EVerbosityLevel level);

    void InitLog(const TClientCommand::TConfig& config);
    void InitDriver(const TClientCommand::TConfig& config);
    void InitStatsCollector();
};

}