diff options
author | Alexander Avdonkin <aavdonkin@yandex.ru> | 2025-04-09 17:26:37 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-04-09 17:26:37 +0300 |
commit | a55bf5d901ba7fd4cd4cbf3c998b31f28ecece4c (patch) | |
tree | 34f3f1769c7a0091991c947d2974ae8c561a4644 | |
parent | 751c197f1a8cf88c09b44fee5e4afe528d0a970f (diff) | |
download | ydb-a55bf5d901ba7fd4cd4cbf3c998b31f28ecece4c.tar.gz |
Mixed stress test (#14163)
-rw-r--r-- | ydb/library/workload/mixed/mixed.cpp | 367 | ||||
-rw-r--r-- | ydb/library/workload/mixed/mixed.h | 109 | ||||
-rw-r--r-- | ydb/library/workload/mixed/registrar.cpp | 12 | ||||
-rw-r--r-- | ydb/library/workload/mixed/ya.make | 14 | ||||
-rw-r--r-- | ydb/library/workload/ya.make | 2 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/commands/ydb_workload.cpp | 4 | ||||
-rw-r--r-- | ydb/tests/stress/mixedpy/test_mixed.py | 108 | ||||
-rw-r--r-- | ydb/tests/stress/mixedpy/ya.make | 31 | ||||
-rw-r--r-- | ydb/tests/stress/ya.make | 1 |
9 files changed, 646 insertions, 2 deletions
diff --git a/ydb/library/workload/mixed/mixed.cpp b/ydb/library/workload/mixed/mixed.cpp new file mode 100644 index 00000000000..28902ac4619 --- /dev/null +++ b/ydb/library/workload/mixed/mixed.cpp @@ -0,0 +1,367 @@ +#include "mixed.h" +#include <util/generic/serialized_enum.h> +#include <util/random/normal.h> +#include <util/random/random.h> +#include <util/datetime/base.h> + +namespace NYdbWorkload { + +namespace NMixed { + +using TRow = TLogGenerator::TRow; + + +TLogGenerator::TLogGenerator(const TMixedWorkloadParams* params) + : TBase(params) + , TotalColumnsCnt(1 + Params.IntColumnsCnt + Params.StrColumnsCnt) +{ + Y_ABORT_UNLESS(TotalColumnsCnt >= Params.KeyColumnsCnt); +} + +std::string TLogGenerator::GetDDLQueries() const { + std::stringstream ss; + + ss << "--!syntax_v1\n"; + ss << "CREATE TABLE `" << Params.DbPath << "/" << Params.TableName << "`("; + + for (size_t i = 0; i < TotalColumnsCnt; ++i) { + if (i == 0) { + ss << "ts Timestamp"; + } else if (i < Params.IntColumnsCnt + 1) { + ss << "c" << i << " Uint64"; + } else { + ss << "c" << i << " String"; + } + + if (i < Params.KeyColumnsCnt && Params.GetStoreType() == TMixedWorkloadParams::EStoreType::Column) { + ss << " NOT NULL"; + } + ss << ", "; + } + + ss << "PRIMARY KEY("; + ss << "ts"; + for (size_t i = 1; i < Params.KeyColumnsCnt; ++i) { + ss << ", c" << i; + } + ss << ")) WITH ("; + + ss << "TTL = Interval(\"PT" << Params.TimestampTtlMinutes << "M\") ON ts, "; + + switch (Params.GetStoreType()) { + case TMixedWorkloadParams::EStoreType::Row: + ss << "STORE = ROW, "; + break; + case TMixedWorkloadParams::EStoreType::Column: + ss << "STORE = COLUMN, "; + break; + default: + throw yexception() << "Unsupported store type: " << Params.GetStoreType(); + } + if (Params.PartitionsByLoad) { + ss << "AUTO_PARTITIONING_BY_LOAD = ENABLED, "; + } + ss << "AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = " << Max(Params.MinPartitions, Params.MaxPartitions) << ", "; + ss << "AUTO_PARTITIONING_PARTITION_SIZE_MB = " << Params.PartitionSizeMb << ", "; + ss << "AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = " << Params.MinPartitions << ")"; + return ss.str(); +} + +TQueryInfoList TLogGenerator::GetWorkload(int type) { + switch (static_cast<EType>(type)) { + case EType::Insert: + return Insert(GenerateRandomRows()); + case EType::Upsert: + return Upsert(GenerateRandomRows()); + case EType::BulkUpsert: + return BulkUpsert(GenerateRandomRows()); + case EType::Select: + return Select(GenerateRandomRows()); + default: + return TQueryInfoList(); + } +} + + +TVector<IWorkloadQueryGenerator::TWorkloadType> TLogGenerator::GetSupportedWorkloadTypes() const { + TVector<TWorkloadType> result; + result.emplace_back(static_cast<int>(EType::Insert), "insert", "Insert random rows into table near current ts"); + result.emplace_back(static_cast<int>(EType::Upsert), "upsert", "Upsert random rows into table near current ts"); + result.emplace_back(static_cast<int>(EType::BulkUpsert), "bulk_upsert", "Bulk upsert random rows into table near current ts"); + result.emplace_back(static_cast<int>(EType::Select), "select", "Select random rows from table"); + return result; +} + +TQueryInfoList TLogGenerator::WriteRows(TString operation, TVector<TRow>&& rows) { + std::stringstream ss; + + NYdb::TParamsBuilder paramsBuilder; + + ss << "--!syntax_v1\n"; + + for (size_t row = 0; row < Params.RowsCnt; ++row) { + for (size_t col = 0; col < TotalColumnsCnt; ++col) { + TString cname = "$c" + std::to_string(row) + "_" + std::to_string(col); + if (col == 0) { + ss << "DECLARE " << cname << " AS Timestamp;\n"; + paramsBuilder.AddParam(cname).Timestamp(rows[row].Ts).Build(); + } else if (col < Params.IntColumnsCnt + 1) { + ss << "DECLARE " << cname << " AS Uint64;\n"; + paramsBuilder.AddParam(cname).Uint64(rows[row].Ints[col - 1]).Build(); + } else { + ss << "DECLARE " << cname << " AS String;\n"; + paramsBuilder.AddParam(cname).String(rows[row].Strings[col - Params.IntColumnsCnt - 1]).Build(); + } + } + } + + ss << operation << " INTO `" << Params.TableName << "` ("; + + for (size_t col = 0; col < TotalColumnsCnt; ++col) { + if (col != 0) { + ss << "c" << col; + } else { + ss << "ts"; + } + + if (col + 1 < TotalColumnsCnt) { + ss << ", "; + } + } + + ss << ") VALUES "; + + for (size_t row = 0; row < Params.RowsCnt; ++row) { + ss << "("; + + for (size_t col = 0; col < TotalColumnsCnt; ++col) { + ss << "$c" << row << "_" << col; + if (col + 1 < TotalColumnsCnt) { + ss << ", "; + } + } + + ss << ")"; + + if (row + 1 < Params.RowsCnt) { + ss << ", "; + } + } + auto params = paramsBuilder.Build(); + return TQueryInfoList(1, TQueryInfo(ss.str(), std::move(params))); +} + +TQueryInfoList TLogGenerator::Insert(TVector<TRow>&& rows) { + return WriteRows("INSERT", std::move(rows)); +} + +TQueryInfoList TLogGenerator::Upsert(TVector<TRow>&& rows) { + return WriteRows("UPSERT", std::move(rows)); +} + +TQueryInfoList TLogGenerator::Select(TVector<TRow>&& rows) { + std::stringstream ss; + + NYdb::TParamsBuilder paramsBuilder; + + ss << "--!syntax_v1\n"; + + for (size_t row = 0; row < Params.RowsCnt; ++row) { + for (size_t col = 0; col < Params.KeyColumnsCnt; ++col) { + TString paramName = "$r" + std::to_string(row) + "_" + std::to_string(col); + if (col == 0) { + ss << "DECLARE " << paramName << " AS Timestamp;\n"; + paramsBuilder.AddParam(paramName).Timestamp(rows[row].Ts).Build(); + } else if (col < Params.IntColumnsCnt) { + ss << "DECLARE " << paramName << " AS Uint64;\n"; + paramsBuilder.AddParam(paramName).Uint64(rows[row].Ints[col]).Build(); + } else { + ss << "DECLARE " << paramName << " AS String;\n"; + paramsBuilder.AddParam(paramName).String(rows[row].Strings[col - Params.IntColumnsCnt]).Build(); + } + } + } + + ss << "SELECT "; + for (size_t col = 1; col <= TotalColumnsCnt; ++col) { + ss << "c" << col; + if (col + 1 < TotalColumnsCnt) { + ss << ","; + } + ss << " "; + } + + ss << "FROM `" << Params.TableName << "` WHERE "; + for (size_t row = 0; row < Params.RowsCnt; ++row) { + for (size_t col = 0; col < Params.KeyColumnsCnt; ++col) { + TString paramName = "$r" + std::to_string(row) + "_" + std::to_string(col); + if (col == 0) { + ss << "ts = " << paramName; + } else { + ss << "c" << col << " = " << paramName; + } + if (col + 1 < Params.KeyColumnsCnt) { + ss << " AND "; + } + } + if (row + 1 < Params.RowsCnt) { + ss << " OR "; + } + } + + auto params = paramsBuilder.Build(); + TQueryInfo info(ss.str(), std::move(params)); + return TQueryInfoList(1, std::move(info)); +} + +TQueryInfoList TLogGenerator::BulkUpsert(TVector<TRow>&& rows) { + NYdb::TValueBuilder valueBuilder; + valueBuilder.BeginList(); + for (const TRow& row : rows) { + auto &listItem = valueBuilder.AddListItem(); + listItem.BeginStruct(); + for (size_t col = 0; col < TotalColumnsCnt; ++col) { + if (col == 0) { + listItem.AddMember("ts").Timestamp(row.Ts); + } else if (col < Params.IntColumnsCnt + 1) { + listItem.AddMember(std::format("c{}", col)).Uint64(row.Ints[col-1]); + } else { + listItem.AddMember(std::format("c{}", col)).String(row.Strings[col - Params.IntColumnsCnt - 1]); + } + } + listItem.EndStruct(); + } + valueBuilder.EndList(); + TString table_path = Params.DbPath + "/" + Params.TableName; + NYdb::TValue rowsValue = valueBuilder.Build(); + auto bulkUpsertOperation = [table_path, rowsValue](NYdb::NTable::TTableClient& tableClient) { + auto r = rowsValue; + auto status = tableClient.BulkUpsert(table_path, std::move(r)); + return status.GetValueSync(); + }; + TQueryInfo queryInfo; + queryInfo.TableOperation = bulkUpsertOperation; + return TQueryInfoList(1, std::move(queryInfo)); +} + + +TQueryInfoList TLogGenerator::GetInitialData() { + TQueryInfoList res; + return res; +} + +TVector<std::string> TLogGenerator::GetCleanPaths() const { + return { Params.TableName }; +} + +TVector<TRow> TLogGenerator::GenerateRandomRows() { + TVector<TRow> result(Params.RowsCnt); + + for (size_t row = 0; row < Params.RowsCnt; ++row) { + result[row].Ts = TInstant::Now(); + i64 millisecondsDiff = 60 * 1000 * NormalRandom(0., static_cast<double>(Params.TimestampStandardDeviationMinutes)); + if (millisecondsDiff >= 0) { // TDuration::MilliSeconds can't be negative for some reason... + result[row].Ts = result[row].Ts + TDuration::MilliSeconds(millisecondsDiff); + } else { + result[row].Ts = result[row].Ts - TDuration::MilliSeconds(-millisecondsDiff); + } + + result[row].Ints.resize(Params.IntColumnsCnt); + result[row].Strings.resize(Params.StrColumnsCnt); + + for (size_t col = 0; col < Params.IntColumnsCnt; ++col) { + ui64 val = RandomNumber<ui64>(); + result[row].Ints[col] = val; + } + + for (size_t col = 0; col < Params.StrColumnsCnt; ++col) { + TString val; + val = TString(Params.StringLen, '_'); + for (size_t i = 0; i < Params.StringLen; i++) { + val[i] = (char)('a' + RandomNumber<u_char>(26)); + } + result[row].Strings[col] = val; + } + } + + return result; +} + +void TMixedWorkloadParams::ConfigureOpts(NLastGetopt::TOpts& opts, const ECommandType commandType, int workloadType) { + opts.AddLongOption('p', "path", "Path where benchmark tables are located") + .Optional() + .DefaultValue(TableName) + .Handler1T<TStringBuf>([this](TStringBuf arg) { + while(arg.SkipPrefix("/")); + while(arg.ChopSuffix("/")); + TableName = arg; + }); + switch (commandType) { + case TWorkloadParams::ECommandType::Init: + opts.AddLongOption("min-partitions", "Minimum partitions for tables.") + .DefaultValue((ui64)LogWorkloadConstants::MIN_PARTITIONS).StoreResult(&MinPartitions); + opts.AddLongOption("max-partitions", "Maximum partitions for tables.") + .DefaultValue((ui64)LogWorkloadConstants::MAX_PARTITIONS).StoreResult(&MaxPartitions); + opts.AddLongOption("partition-size", "Maximum partition size in megabytes (AUTO_PARTITIONING_PARTITION_SIZE_MB).") + .DefaultValue((ui64)LogWorkloadConstants::PARTITION_SIZE_MB).StoreResult(&PartitionSizeMb); + opts.AddLongOption("auto-partition", "Enable auto partitioning by load.") + .DefaultValue((ui64)LogWorkloadConstants::PARTITIONS_BY_LOAD).StoreResult(&PartitionsByLoad); + opts.AddLongOption("len", "String len") + .DefaultValue((ui64)LogWorkloadConstants::STRING_LEN).StoreResult(&StringLen); + opts.AddLongOption("int-cols", "Number of int columns") + .DefaultValue((ui64)LogWorkloadConstants::INT_COLUMNS_CNT).StoreResult(&IntColumnsCnt); + opts.AddLongOption("str-cols", "Number of string columns") + .DefaultValue((ui64)LogWorkloadConstants::STR_COLUMNS_CNT).StoreResult(&StrColumnsCnt); + opts.AddLongOption("key-cols", "Number of key columns") + .DefaultValue((ui64)LogWorkloadConstants::KEY_COLUMNS_CNT).StoreResult(&KeyColumnsCnt); + opts.AddLongOption("ttl", "TTL for timestamp column in minutes") + .DefaultValue((ui64)LogWorkloadConstants::TIMESTAMP_TTL_MIN).StoreResult(&TimestampTtlMinutes); + opts.AddLongOption("store", "Storage type." + " Options: row, column\n" + " row - use row-based storage engine;\n" + " column - use column-based storage engine.") + .DefaultValue(StoreType) + .Handler1T<TStringBuf>([this](TStringBuf arg) { + const auto l = to_lower(TString(arg)); + if (!TryFromString(arg, StoreType)) { + throw yexception() << "Ivalid store type: " << arg; + } + }); + break; + case TWorkloadParams::ECommandType::Run: + opts.AddLongOption("int-cols", "Number of int columns") + .DefaultValue((ui64)LogWorkloadConstants::INT_COLUMNS_CNT).StoreResult(&IntColumnsCnt); + opts.AddLongOption("str-cols", "Number of string columns") + .DefaultValue((ui64)LogWorkloadConstants::STR_COLUMNS_CNT).StoreResult(&StrColumnsCnt); + opts.AddLongOption("key-cols", "Number of key columns") + .DefaultValue((ui64)LogWorkloadConstants::KEY_COLUMNS_CNT).StoreResult(&KeyColumnsCnt); + switch (static_cast<TLogGenerator::EType>(workloadType)) { + case TLogGenerator::EType::Select: + case TLogGenerator::EType::Insert: + case TLogGenerator::EType::Upsert: + case TLogGenerator::EType::BulkUpsert: + opts.AddLongOption("len", "String len") + .DefaultValue((ui64)LogWorkloadConstants::STRING_LEN).StoreResult(&StringLen); + opts.AddLongOption("rows", "Number of rows to upsert") + .DefaultValue((ui64)LogWorkloadConstants::ROWS_CNT).StoreResult(&RowsCnt); + opts.AddLongOption("timestamp_deviation", "Standard deviation. For each timestamp, a random variable with a specified standard deviation in minutes is added.") + .DefaultValue((ui64)LogWorkloadConstants::TIMESTAMP_STANDARD_DEVIATION_MINUTES).StoreResult(&TimestampStandardDeviationMinutes); + break; + } + break; + default: + break; + } +} + +THolder<IWorkloadQueryGenerator> TMixedWorkloadParams::CreateGenerator() const { + return MakeHolder<TLogGenerator>(this); +} + +TString TMixedWorkloadParams::GetWorkloadName() const { + return "Log"; +} + +} // namespace NMixed + +} // namespace NYdbWorkload diff --git a/ydb/library/workload/mixed/mixed.h b/ydb/library/workload/mixed/mixed.h new file mode 100644 index 00000000000..08e27ca93f5 --- /dev/null +++ b/ydb/library/workload/mixed/mixed.h @@ -0,0 +1,109 @@ +#pragma once + +#include <ydb/library/workload/abstract/workload_query_generator.h> + +#include <cctype> + +namespace NYdbWorkload { + +namespace NMixed { + +enum LogWorkloadConstants : ui64 { + MIN_PARTITIONS = 40, + MAX_PARTITIONS = 1000, + PARTITION_SIZE_MB = 2000, + STRING_LEN = 8, + STR_COLUMNS_CNT = 1, + INT_COLUMNS_CNT = 1, + KEY_COLUMNS_CNT = 1, + ROWS_CNT = 1, + PARTITIONS_BY_LOAD = true, + + TIMESTAMP_STANDARD_DEVIATION_MINUTES = 0, + TIMESTAMP_TTL_MIN = 60, +}; + +class TMixedWorkloadParams : public TWorkloadParams { +public: + enum class EStoreType { + Row /* "row" */, + Column /* "column" */, + }; + + void ConfigureOpts(NLastGetopt::TOpts& opts, const ECommandType commandType, int workloadType) override; + THolder<IWorkloadQueryGenerator> CreateGenerator() const override; + TString GetWorkloadName() const override; + ui64 MinPartitions = LogWorkloadConstants::MIN_PARTITIONS; + ui64 MaxPartitions = LogWorkloadConstants::MAX_PARTITIONS; + ui64 PartitionSizeMb = LogWorkloadConstants::PARTITION_SIZE_MB; + ui64 StringLen = LogWorkloadConstants::STRING_LEN; + ui64 StrColumnsCnt = LogWorkloadConstants::STR_COLUMNS_CNT; + ui64 IntColumnsCnt = LogWorkloadConstants::INT_COLUMNS_CNT; + ui64 KeyColumnsCnt = LogWorkloadConstants::KEY_COLUMNS_CNT; + ui64 TimestampStandardDeviationMinutes = LogWorkloadConstants::TIMESTAMP_STANDARD_DEVIATION_MINUTES; + ui64 TimestampTtlMinutes = LogWorkloadConstants::TIMESTAMP_STANDARD_DEVIATION_MINUTES; + ui64 RowsCnt = LogWorkloadConstants::ROWS_CNT; + bool PartitionsByLoad = LogWorkloadConstants::PARTITIONS_BY_LOAD; + + std::string TableName = "log_writer_test"; + + YDB_READONLY(EStoreType, StoreType, EStoreType::Row); +}; + +class TLogGenerator final: public TWorkloadQueryGeneratorBase<TMixedWorkloadParams> { +public: + using TBase = TWorkloadQueryGeneratorBase<TMixedWorkloadParams>; + struct TRow { + TInstant Ts; + TVector<ui64> Ints; + TVector<TString> Strings; + + TString ToString() const { + std::stringstream ss; + ss << "( "; + for (auto i : Ints) { + ss << i << " "; + } + for (auto s : Strings) { + ss << s << " "; + } + ss << ")"; + return ss.str(); + } + + bool operator == (const TRow &other) const { + return Ts == other.Ts && Ints == other.Ints && Strings == other.Strings; + } + }; + TLogGenerator(const TMixedWorkloadParams* params); + + std::string GetDDLQueries() const override; + + TQueryInfoList GetInitialData() override; + + TVector<std::string> GetCleanPaths() const override; + + TQueryInfoList GetWorkload(int type) override; + TVector<TWorkloadType> GetSupportedWorkloadTypes() const override; + + enum class EType { + Insert, + Upsert, + BulkUpsert, + Select + }; + +private: + TQueryInfoList WriteRows(TString operation, TVector<TRow>&& rows); + TQueryInfoList Select(TVector<TRow>&& rows); + TQueryInfoList Insert(TVector<TRow>&& rows); + TQueryInfoList Upsert(TVector<TRow>&& rows); + TQueryInfoList BulkUpsert(TVector<TRow>&& rows); + TVector<TRow> GenerateRandomRows(); + + const ui64 TotalColumnsCnt; +}; + +} // namespace NMixed + +} // namespace NYdbWorkload diff --git a/ydb/library/workload/mixed/registrar.cpp b/ydb/library/workload/mixed/registrar.cpp new file mode 100644 index 00000000000..8a8042cf343 --- /dev/null +++ b/ydb/library/workload/mixed/registrar.cpp @@ -0,0 +1,12 @@ +#include "mixed.h"
+#include <ydb/library/workload/abstract/workload_factory.h>
+
+namespace NYdbWorkload {
+
+namespace NMixed {
+
+TWorkloadFactory::TRegistrator<TMixedWorkloadParams> Registrar("mixed");
+
+} // namespace NMixed
+
+} // namespace NYdbWorkload
diff --git a/ydb/library/workload/mixed/ya.make b/ydb/library/workload/mixed/ya.make new file mode 100644 index 00000000000..6cdd7dc405d --- /dev/null +++ b/ydb/library/workload/mixed/ya.make @@ -0,0 +1,14 @@ +LIBRARY() + +SRCS( + GLOBAL registrar.cpp + mixed.cpp +) + +PEERDIR( + ydb/library/workload/abstract +) + +GENERATE_ENUM_SERIALIZATION_WITH_HEADER(mixed.h) + +END() diff --git a/ydb/library/workload/ya.make b/ydb/library/workload/ya.make index 7819c15847a..b269c9422a5 100644 --- a/ydb/library/workload/ya.make +++ b/ydb/library/workload/ya.make @@ -5,6 +5,7 @@ PEERDIR( ydb/library/workload/clickbench ydb/library/workload/kv ydb/library/workload/log + ydb/library/workload/mixed ydb/library/workload/stock ydb/library/workload/tpcds ydb/library/workload/tpch @@ -18,6 +19,7 @@ RECURSE( clickbench kv log + mixed stock tpc_base tpcds diff --git a/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp b/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp index 10e3423996d..a5ce8a92734 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp +++ b/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp @@ -268,7 +268,7 @@ void TWorkloadCommand::WorkerFn(int taskId, NYdbWorkload::IWorkloadQueryGenerato int TWorkloadCommand::RunWorkload(NYdbWorkload::IWorkloadQueryGenerator& workloadGen, const int type) { if (!Quiet) { - std::cout << "Window\tTxs/Sec\tRetries\tErrors\tp50(ms)\tp95(ms)\tp99(ms)\tpMax(ms)"; + std::cout << "Window\tTxs\tTxs/Sec\tRetries\tErrors\tp50(ms)\tp95(ms)\tp99(ms)\tpMax(ms)"; if (PrintTimestamp) { std::cout << "\tTimestamp"; } @@ -320,7 +320,7 @@ void TWorkloadCommand::PrintWindowStats(int windowIt) { WindowHist.Reset(); } if (!Quiet) { - std::cout << windowIt << "\t" << std::setw(7) << stats.OpsCount / WindowSec << "\t" << retries << "\t" + std::cout << windowIt << "\t" << std::setw(7) << stats.OpsCount << "\t" << stats.OpsCount / WindowSec << "\t" << retries << "\t" << errors << "\t" << stats.Percentile50 << "\t" << stats.Percentile95 << "\t" << stats.Percentile99 << "\t" << stats.Percentile100; if (PrintTimestamp) { diff --git a/ydb/tests/stress/mixedpy/test_mixed.py b/ydb/tests/stress/mixedpy/test_mixed.py new file mode 100644 index 00000000000..50f1ef0e23e --- /dev/null +++ b/ydb/tests/stress/mixedpy/test_mixed.py @@ -0,0 +1,108 @@ +# -*- coding: utf-8 -*- +import os +import sys + +import pytest + +import yatest + +from ydb.tests.library.harness.kikimr_runner import KiKiMR +from ydb.tests.library.harness.kikimr_config import KikimrConfigGenerator +from ydb.tests.olap.lib.utils import get_external_param + + +class TestYdbMixedWorkload(object): + @classmethod + def setup_class(cls): + cls.cluster = KiKiMR(KikimrConfigGenerator()) + cls.cluster.start() + + def get_command_prefix(self, subcmds: list[str]) -> list[str]: + return [ + yatest.common.binary_path(os.getenv('YDB_CLI_BINARY')), + '--verbose', + '--endpoint', self.endpoint, + '--database={}'.format(self.database), + 'workload', 'mixed' + ] + subcmds + + @classmethod + def teardown_class(cls): + cls.cluster.stop() + + @classmethod + def get_cols_count_command_params(cls) -> list[str]: + return [ + '--int-cols', '5', + '--str-cols', '5', + ] + + def get_bulk_upsert_cmd(self, duration: str): + return self.get_command_prefix(subcmds=['run', 'bulk_upsert'] + self.get_cols_count_command_params()) + [ + '-s', duration, + '-t', '40', + '--rows', '1000', + '--window', '20', + '--len', '1000', + ] + + def get_select_cmd(self, duration: str): + return self.get_command_prefix(subcmds=['run', 'select']) + [ + '-s', duration, + '-t', '5', + '--rows', '100', + ] + + def get_update_cmd(self, duration: str): + return self.get_command_prefix(subcmds=['run', 'upsert'] + self.get_cols_count_command_params()) + [ + '-s', duration, + '-t', '5', + '--rows', '100', + '--len', '1000', + ] + + @classmethod + def print_txs(cls, step: str, filename: str): + found = False + with open(filename, 'r') as f: + for line in f.readlines(): + if found: + print('{} txs/sec: {}'.format(step, line.split()[1])) + break + else: + words = line.split() + if len(words) > 0 and words[0] == 'Txs': + found = True + + @pytest.mark.parametrize('store_type', ['row', 'column']) + def test(self, store_type): + duration = get_external_param('duration', '120') + self.endpoint = get_external_param('endpoint', 'grpc://localhost:%d' % self.cluster.nodes[1].grpc_port) + self.database = get_external_param('database', '/Root') + yatest.common.execute( + self.get_command_prefix(subcmds=['clean'])) + + yatest.common.execute( + self.get_command_prefix(subcmds=['init'] + self.get_cols_count_command_params()) + [ + '--store', store_type, + ]) + + with open('upsert_out', 'w+') as out: + yatest.common.execute(self.get_bulk_upsert_cmd(duration), stdout=out, stderr=sys.stderr) + + with open('upsert1_out', 'w+') as out: + bulk_upsert = yatest.common.execute(self.get_bulk_upsert_cmd(duration), wait=False, stdout=out, stderr=sys.stderr) + select = yatest.common.execute(self.get_select_cmd(duration), wait=False) + bulk_upsert.wait() + select.wait() + + with open('upsert2_out', 'w+') as out: + bulk_upsert = yatest.common.execute(self.get_bulk_upsert_cmd(duration), wait=False, stdout=out, stderr=sys.stderr) + select = yatest.common.execute(self.get_select_cmd(duration), wait=False) + update = yatest.common.execute(self.get_update_cmd(duration), wait=False) + bulk_upsert.wait() + select.wait() + update.wait() + self.print_txs('Upsert', 'upsert_out') + self.print_txs('Select', 'upsert1_out') + self.print_txs('Update', 'upsert2_out') diff --git a/ydb/tests/stress/mixedpy/ya.make b/ydb/tests/stress/mixedpy/ya.make new file mode 100644 index 00000000000..244460f7c92 --- /dev/null +++ b/ydb/tests/stress/mixedpy/ya.make @@ -0,0 +1,31 @@ +PY3TEST() +ENV(YDB_DRIVER_BINARY="ydb/apps/ydbd/ydbd") +ENV(YDB_CLI_BINARY="ydb/apps/ydb/ydb") + + +TEST_SRCS( + test_mixed.py +) + +IF (SANITIZER_TYPE) + REQUIREMENTS(ram:32) +ELSE() + REQUIREMENTS(ram:16) +ENDIF() + +TIMEOUT(1200) +SIZE(LARGE) +TAG(ya:fat) + +DEPENDS( + ydb/apps/ydbd + ydb/apps/ydb +) + +PEERDIR( + ydb/tests/library + ydb/tests/olap/lib +) + + +END() diff --git a/ydb/tests/stress/ya.make b/ydb/tests/stress/ya.make index a688f425b89..95ed1f67463 100644 --- a/ydb/tests/stress/ya.make +++ b/ydb/tests/stress/ya.make @@ -1,6 +1,7 @@ RECURSE( kv log + mixedpy olap_workload oltp_workload simple_queue |