aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlexander Avdonkin <aavdonkin@yandex.ru>2025-04-09 17:26:37 +0300
committerGitHub <noreply@github.com>2025-04-09 17:26:37 +0300
commita55bf5d901ba7fd4cd4cbf3c998b31f28ecece4c (patch)
tree34f3f1769c7a0091991c947d2974ae8c561a4644
parent751c197f1a8cf88c09b44fee5e4afe528d0a970f (diff)
downloadydb-a55bf5d901ba7fd4cd4cbf3c998b31f28ecece4c.tar.gz
Mixed stress test (#14163)
-rw-r--r--ydb/library/workload/mixed/mixed.cpp367
-rw-r--r--ydb/library/workload/mixed/mixed.h109
-rw-r--r--ydb/library/workload/mixed/registrar.cpp12
-rw-r--r--ydb/library/workload/mixed/ya.make14
-rw-r--r--ydb/library/workload/ya.make2
-rw-r--r--ydb/public/lib/ydb_cli/commands/ydb_workload.cpp4
-rw-r--r--ydb/tests/stress/mixedpy/test_mixed.py108
-rw-r--r--ydb/tests/stress/mixedpy/ya.make31
-rw-r--r--ydb/tests/stress/ya.make1
9 files changed, 646 insertions, 2 deletions
diff --git a/ydb/library/workload/mixed/mixed.cpp b/ydb/library/workload/mixed/mixed.cpp
new file mode 100644
index 00000000000..28902ac4619
--- /dev/null
+++ b/ydb/library/workload/mixed/mixed.cpp
@@ -0,0 +1,367 @@
+#include "mixed.h"
+#include <util/generic/serialized_enum.h>
+#include <util/random/normal.h>
+#include <util/random/random.h>
+#include <util/datetime/base.h>
+
+namespace NYdbWorkload {
+
+namespace NMixed {
+
+using TRow = TLogGenerator::TRow;
+
+
+TLogGenerator::TLogGenerator(const TMixedWorkloadParams* params)
+ : TBase(params)
+ , TotalColumnsCnt(1 + Params.IntColumnsCnt + Params.StrColumnsCnt)
+{
+ Y_ABORT_UNLESS(TotalColumnsCnt >= Params.KeyColumnsCnt);
+}
+
+std::string TLogGenerator::GetDDLQueries() const {
+ std::stringstream ss;
+
+ ss << "--!syntax_v1\n";
+ ss << "CREATE TABLE `" << Params.DbPath << "/" << Params.TableName << "`(";
+
+ for (size_t i = 0; i < TotalColumnsCnt; ++i) {
+ if (i == 0) {
+ ss << "ts Timestamp";
+ } else if (i < Params.IntColumnsCnt + 1) {
+ ss << "c" << i << " Uint64";
+ } else {
+ ss << "c" << i << " String";
+ }
+
+ if (i < Params.KeyColumnsCnt && Params.GetStoreType() == TMixedWorkloadParams::EStoreType::Column) {
+ ss << " NOT NULL";
+ }
+ ss << ", ";
+ }
+
+ ss << "PRIMARY KEY(";
+ ss << "ts";
+ for (size_t i = 1; i < Params.KeyColumnsCnt; ++i) {
+ ss << ", c" << i;
+ }
+ ss << ")) WITH (";
+
+ ss << "TTL = Interval(\"PT" << Params.TimestampTtlMinutes << "M\") ON ts, ";
+
+ switch (Params.GetStoreType()) {
+ case TMixedWorkloadParams::EStoreType::Row:
+ ss << "STORE = ROW, ";
+ break;
+ case TMixedWorkloadParams::EStoreType::Column:
+ ss << "STORE = COLUMN, ";
+ break;
+ default:
+ throw yexception() << "Unsupported store type: " << Params.GetStoreType();
+ }
+ if (Params.PartitionsByLoad) {
+ ss << "AUTO_PARTITIONING_BY_LOAD = ENABLED, ";
+ }
+ ss << "AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = " << Max(Params.MinPartitions, Params.MaxPartitions) << ", ";
+ ss << "AUTO_PARTITIONING_PARTITION_SIZE_MB = " << Params.PartitionSizeMb << ", ";
+ ss << "AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = " << Params.MinPartitions << ")";
+ return ss.str();
+}
+
+TQueryInfoList TLogGenerator::GetWorkload(int type) {
+ switch (static_cast<EType>(type)) {
+ case EType::Insert:
+ return Insert(GenerateRandomRows());
+ case EType::Upsert:
+ return Upsert(GenerateRandomRows());
+ case EType::BulkUpsert:
+ return BulkUpsert(GenerateRandomRows());
+ case EType::Select:
+ return Select(GenerateRandomRows());
+ default:
+ return TQueryInfoList();
+ }
+}
+
+
+TVector<IWorkloadQueryGenerator::TWorkloadType> TLogGenerator::GetSupportedWorkloadTypes() const {
+ TVector<TWorkloadType> result;
+ result.emplace_back(static_cast<int>(EType::Insert), "insert", "Insert random rows into table near current ts");
+ result.emplace_back(static_cast<int>(EType::Upsert), "upsert", "Upsert random rows into table near current ts");
+ result.emplace_back(static_cast<int>(EType::BulkUpsert), "bulk_upsert", "Bulk upsert random rows into table near current ts");
+ result.emplace_back(static_cast<int>(EType::Select), "select", "Select random rows from table");
+ return result;
+}
+
+TQueryInfoList TLogGenerator::WriteRows(TString operation, TVector<TRow>&& rows) {
+ std::stringstream ss;
+
+ NYdb::TParamsBuilder paramsBuilder;
+
+ ss << "--!syntax_v1\n";
+
+ for (size_t row = 0; row < Params.RowsCnt; ++row) {
+ for (size_t col = 0; col < TotalColumnsCnt; ++col) {
+ TString cname = "$c" + std::to_string(row) + "_" + std::to_string(col);
+ if (col == 0) {
+ ss << "DECLARE " << cname << " AS Timestamp;\n";
+ paramsBuilder.AddParam(cname).Timestamp(rows[row].Ts).Build();
+ } else if (col < Params.IntColumnsCnt + 1) {
+ ss << "DECLARE " << cname << " AS Uint64;\n";
+ paramsBuilder.AddParam(cname).Uint64(rows[row].Ints[col - 1]).Build();
+ } else {
+ ss << "DECLARE " << cname << " AS String;\n";
+ paramsBuilder.AddParam(cname).String(rows[row].Strings[col - Params.IntColumnsCnt - 1]).Build();
+ }
+ }
+ }
+
+ ss << operation << " INTO `" << Params.TableName << "` (";
+
+ for (size_t col = 0; col < TotalColumnsCnt; ++col) {
+ if (col != 0) {
+ ss << "c" << col;
+ } else {
+ ss << "ts";
+ }
+
+ if (col + 1 < TotalColumnsCnt) {
+ ss << ", ";
+ }
+ }
+
+ ss << ") VALUES ";
+
+ for (size_t row = 0; row < Params.RowsCnt; ++row) {
+ ss << "(";
+
+ for (size_t col = 0; col < TotalColumnsCnt; ++col) {
+ ss << "$c" << row << "_" << col;
+ if (col + 1 < TotalColumnsCnt) {
+ ss << ", ";
+ }
+ }
+
+ ss << ")";
+
+ if (row + 1 < Params.RowsCnt) {
+ ss << ", ";
+ }
+ }
+ auto params = paramsBuilder.Build();
+ return TQueryInfoList(1, TQueryInfo(ss.str(), std::move(params)));
+}
+
+TQueryInfoList TLogGenerator::Insert(TVector<TRow>&& rows) {
+ return WriteRows("INSERT", std::move(rows));
+}
+
+TQueryInfoList TLogGenerator::Upsert(TVector<TRow>&& rows) {
+ return WriteRows("UPSERT", std::move(rows));
+}
+
+TQueryInfoList TLogGenerator::Select(TVector<TRow>&& rows) {
+ std::stringstream ss;
+
+ NYdb::TParamsBuilder paramsBuilder;
+
+ ss << "--!syntax_v1\n";
+
+ for (size_t row = 0; row < Params.RowsCnt; ++row) {
+ for (size_t col = 0; col < Params.KeyColumnsCnt; ++col) {
+ TString paramName = "$r" + std::to_string(row) + "_" + std::to_string(col);
+ if (col == 0) {
+ ss << "DECLARE " << paramName << " AS Timestamp;\n";
+ paramsBuilder.AddParam(paramName).Timestamp(rows[row].Ts).Build();
+ } else if (col < Params.IntColumnsCnt) {
+ ss << "DECLARE " << paramName << " AS Uint64;\n";
+ paramsBuilder.AddParam(paramName).Uint64(rows[row].Ints[col]).Build();
+ } else {
+ ss << "DECLARE " << paramName << " AS String;\n";
+ paramsBuilder.AddParam(paramName).String(rows[row].Strings[col - Params.IntColumnsCnt]).Build();
+ }
+ }
+ }
+
+ ss << "SELECT ";
+ for (size_t col = 1; col <= TotalColumnsCnt; ++col) {
+ ss << "c" << col;
+ if (col + 1 < TotalColumnsCnt) {
+ ss << ",";
+ }
+ ss << " ";
+ }
+
+ ss << "FROM `" << Params.TableName << "` WHERE ";
+ for (size_t row = 0; row < Params.RowsCnt; ++row) {
+ for (size_t col = 0; col < Params.KeyColumnsCnt; ++col) {
+ TString paramName = "$r" + std::to_string(row) + "_" + std::to_string(col);
+ if (col == 0) {
+ ss << "ts = " << paramName;
+ } else {
+ ss << "c" << col << " = " << paramName;
+ }
+ if (col + 1 < Params.KeyColumnsCnt) {
+ ss << " AND ";
+ }
+ }
+ if (row + 1 < Params.RowsCnt) {
+ ss << " OR ";
+ }
+ }
+
+ auto params = paramsBuilder.Build();
+ TQueryInfo info(ss.str(), std::move(params));
+ return TQueryInfoList(1, std::move(info));
+}
+
+TQueryInfoList TLogGenerator::BulkUpsert(TVector<TRow>&& rows) {
+ NYdb::TValueBuilder valueBuilder;
+ valueBuilder.BeginList();
+ for (const TRow& row : rows) {
+ auto &listItem = valueBuilder.AddListItem();
+ listItem.BeginStruct();
+ for (size_t col = 0; col < TotalColumnsCnt; ++col) {
+ if (col == 0) {
+ listItem.AddMember("ts").Timestamp(row.Ts);
+ } else if (col < Params.IntColumnsCnt + 1) {
+ listItem.AddMember(std::format("c{}", col)).Uint64(row.Ints[col-1]);
+ } else {
+ listItem.AddMember(std::format("c{}", col)).String(row.Strings[col - Params.IntColumnsCnt - 1]);
+ }
+ }
+ listItem.EndStruct();
+ }
+ valueBuilder.EndList();
+ TString table_path = Params.DbPath + "/" + Params.TableName;
+ NYdb::TValue rowsValue = valueBuilder.Build();
+ auto bulkUpsertOperation = [table_path, rowsValue](NYdb::NTable::TTableClient& tableClient) {
+ auto r = rowsValue;
+ auto status = tableClient.BulkUpsert(table_path, std::move(r));
+ return status.GetValueSync();
+ };
+ TQueryInfo queryInfo;
+ queryInfo.TableOperation = bulkUpsertOperation;
+ return TQueryInfoList(1, std::move(queryInfo));
+}
+
+
+TQueryInfoList TLogGenerator::GetInitialData() {
+ TQueryInfoList res;
+ return res;
+}
+
+TVector<std::string> TLogGenerator::GetCleanPaths() const {
+ return { Params.TableName };
+}
+
+TVector<TRow> TLogGenerator::GenerateRandomRows() {
+ TVector<TRow> result(Params.RowsCnt);
+
+ for (size_t row = 0; row < Params.RowsCnt; ++row) {
+ result[row].Ts = TInstant::Now();
+ i64 millisecondsDiff = 60 * 1000 * NormalRandom(0., static_cast<double>(Params.TimestampStandardDeviationMinutes));
+ if (millisecondsDiff >= 0) { // TDuration::MilliSeconds can't be negative for some reason...
+ result[row].Ts = result[row].Ts + TDuration::MilliSeconds(millisecondsDiff);
+ } else {
+ result[row].Ts = result[row].Ts - TDuration::MilliSeconds(-millisecondsDiff);
+ }
+
+ result[row].Ints.resize(Params.IntColumnsCnt);
+ result[row].Strings.resize(Params.StrColumnsCnt);
+
+ for (size_t col = 0; col < Params.IntColumnsCnt; ++col) {
+ ui64 val = RandomNumber<ui64>();
+ result[row].Ints[col] = val;
+ }
+
+ for (size_t col = 0; col < Params.StrColumnsCnt; ++col) {
+ TString val;
+ val = TString(Params.StringLen, '_');
+ for (size_t i = 0; i < Params.StringLen; i++) {
+ val[i] = (char)('a' + RandomNumber<u_char>(26));
+ }
+ result[row].Strings[col] = val;
+ }
+ }
+
+ return result;
+}
+
+void TMixedWorkloadParams::ConfigureOpts(NLastGetopt::TOpts& opts, const ECommandType commandType, int workloadType) {
+ opts.AddLongOption('p', "path", "Path where benchmark tables are located")
+ .Optional()
+ .DefaultValue(TableName)
+ .Handler1T<TStringBuf>([this](TStringBuf arg) {
+ while(arg.SkipPrefix("/"));
+ while(arg.ChopSuffix("/"));
+ TableName = arg;
+ });
+ switch (commandType) {
+ case TWorkloadParams::ECommandType::Init:
+ opts.AddLongOption("min-partitions", "Minimum partitions for tables.")
+ .DefaultValue((ui64)LogWorkloadConstants::MIN_PARTITIONS).StoreResult(&MinPartitions);
+ opts.AddLongOption("max-partitions", "Maximum partitions for tables.")
+ .DefaultValue((ui64)LogWorkloadConstants::MAX_PARTITIONS).StoreResult(&MaxPartitions);
+ opts.AddLongOption("partition-size", "Maximum partition size in megabytes (AUTO_PARTITIONING_PARTITION_SIZE_MB).")
+ .DefaultValue((ui64)LogWorkloadConstants::PARTITION_SIZE_MB).StoreResult(&PartitionSizeMb);
+ opts.AddLongOption("auto-partition", "Enable auto partitioning by load.")
+ .DefaultValue((ui64)LogWorkloadConstants::PARTITIONS_BY_LOAD).StoreResult(&PartitionsByLoad);
+ opts.AddLongOption("len", "String len")
+ .DefaultValue((ui64)LogWorkloadConstants::STRING_LEN).StoreResult(&StringLen);
+ opts.AddLongOption("int-cols", "Number of int columns")
+ .DefaultValue((ui64)LogWorkloadConstants::INT_COLUMNS_CNT).StoreResult(&IntColumnsCnt);
+ opts.AddLongOption("str-cols", "Number of string columns")
+ .DefaultValue((ui64)LogWorkloadConstants::STR_COLUMNS_CNT).StoreResult(&StrColumnsCnt);
+ opts.AddLongOption("key-cols", "Number of key columns")
+ .DefaultValue((ui64)LogWorkloadConstants::KEY_COLUMNS_CNT).StoreResult(&KeyColumnsCnt);
+ opts.AddLongOption("ttl", "TTL for timestamp column in minutes")
+ .DefaultValue((ui64)LogWorkloadConstants::TIMESTAMP_TTL_MIN).StoreResult(&TimestampTtlMinutes);
+ opts.AddLongOption("store", "Storage type."
+ " Options: row, column\n"
+ " row - use row-based storage engine;\n"
+ " column - use column-based storage engine.")
+ .DefaultValue(StoreType)
+ .Handler1T<TStringBuf>([this](TStringBuf arg) {
+ const auto l = to_lower(TString(arg));
+ if (!TryFromString(arg, StoreType)) {
+ throw yexception() << "Ivalid store type: " << arg;
+ }
+ });
+ break;
+ case TWorkloadParams::ECommandType::Run:
+ opts.AddLongOption("int-cols", "Number of int columns")
+ .DefaultValue((ui64)LogWorkloadConstants::INT_COLUMNS_CNT).StoreResult(&IntColumnsCnt);
+ opts.AddLongOption("str-cols", "Number of string columns")
+ .DefaultValue((ui64)LogWorkloadConstants::STR_COLUMNS_CNT).StoreResult(&StrColumnsCnt);
+ opts.AddLongOption("key-cols", "Number of key columns")
+ .DefaultValue((ui64)LogWorkloadConstants::KEY_COLUMNS_CNT).StoreResult(&KeyColumnsCnt);
+ switch (static_cast<TLogGenerator::EType>(workloadType)) {
+ case TLogGenerator::EType::Select:
+ case TLogGenerator::EType::Insert:
+ case TLogGenerator::EType::Upsert:
+ case TLogGenerator::EType::BulkUpsert:
+ opts.AddLongOption("len", "String len")
+ .DefaultValue((ui64)LogWorkloadConstants::STRING_LEN).StoreResult(&StringLen);
+ opts.AddLongOption("rows", "Number of rows to upsert")
+ .DefaultValue((ui64)LogWorkloadConstants::ROWS_CNT).StoreResult(&RowsCnt);
+ opts.AddLongOption("timestamp_deviation", "Standard deviation. For each timestamp, a random variable with a specified standard deviation in minutes is added.")
+ .DefaultValue((ui64)LogWorkloadConstants::TIMESTAMP_STANDARD_DEVIATION_MINUTES).StoreResult(&TimestampStandardDeviationMinutes);
+ break;
+ }
+ break;
+ default:
+ break;
+ }
+}
+
+THolder<IWorkloadQueryGenerator> TMixedWorkloadParams::CreateGenerator() const {
+ return MakeHolder<TLogGenerator>(this);
+}
+
+TString TMixedWorkloadParams::GetWorkloadName() const {
+ return "Log";
+}
+
+} // namespace NMixed
+
+} // namespace NYdbWorkload
diff --git a/ydb/library/workload/mixed/mixed.h b/ydb/library/workload/mixed/mixed.h
new file mode 100644
index 00000000000..08e27ca93f5
--- /dev/null
+++ b/ydb/library/workload/mixed/mixed.h
@@ -0,0 +1,109 @@
+#pragma once
+
+#include <ydb/library/workload/abstract/workload_query_generator.h>
+
+#include <cctype>
+
+namespace NYdbWorkload {
+
+namespace NMixed {
+
+enum LogWorkloadConstants : ui64 {
+ MIN_PARTITIONS = 40,
+ MAX_PARTITIONS = 1000,
+ PARTITION_SIZE_MB = 2000,
+ STRING_LEN = 8,
+ STR_COLUMNS_CNT = 1,
+ INT_COLUMNS_CNT = 1,
+ KEY_COLUMNS_CNT = 1,
+ ROWS_CNT = 1,
+ PARTITIONS_BY_LOAD = true,
+
+ TIMESTAMP_STANDARD_DEVIATION_MINUTES = 0,
+ TIMESTAMP_TTL_MIN = 60,
+};
+
+class TMixedWorkloadParams : public TWorkloadParams {
+public:
+ enum class EStoreType {
+ Row /* "row" */,
+ Column /* "column" */,
+ };
+
+ void ConfigureOpts(NLastGetopt::TOpts& opts, const ECommandType commandType, int workloadType) override;
+ THolder<IWorkloadQueryGenerator> CreateGenerator() const override;
+ TString GetWorkloadName() const override;
+ ui64 MinPartitions = LogWorkloadConstants::MIN_PARTITIONS;
+ ui64 MaxPartitions = LogWorkloadConstants::MAX_PARTITIONS;
+ ui64 PartitionSizeMb = LogWorkloadConstants::PARTITION_SIZE_MB;
+ ui64 StringLen = LogWorkloadConstants::STRING_LEN;
+ ui64 StrColumnsCnt = LogWorkloadConstants::STR_COLUMNS_CNT;
+ ui64 IntColumnsCnt = LogWorkloadConstants::INT_COLUMNS_CNT;
+ ui64 KeyColumnsCnt = LogWorkloadConstants::KEY_COLUMNS_CNT;
+ ui64 TimestampStandardDeviationMinutes = LogWorkloadConstants::TIMESTAMP_STANDARD_DEVIATION_MINUTES;
+ ui64 TimestampTtlMinutes = LogWorkloadConstants::TIMESTAMP_STANDARD_DEVIATION_MINUTES;
+ ui64 RowsCnt = LogWorkloadConstants::ROWS_CNT;
+ bool PartitionsByLoad = LogWorkloadConstants::PARTITIONS_BY_LOAD;
+
+ std::string TableName = "log_writer_test";
+
+ YDB_READONLY(EStoreType, StoreType, EStoreType::Row);
+};
+
+class TLogGenerator final: public TWorkloadQueryGeneratorBase<TMixedWorkloadParams> {
+public:
+ using TBase = TWorkloadQueryGeneratorBase<TMixedWorkloadParams>;
+ struct TRow {
+ TInstant Ts;
+ TVector<ui64> Ints;
+ TVector<TString> Strings;
+
+ TString ToString() const {
+ std::stringstream ss;
+ ss << "( ";
+ for (auto i : Ints) {
+ ss << i << " ";
+ }
+ for (auto s : Strings) {
+ ss << s << " ";
+ }
+ ss << ")";
+ return ss.str();
+ }
+
+ bool operator == (const TRow &other) const {
+ return Ts == other.Ts && Ints == other.Ints && Strings == other.Strings;
+ }
+ };
+ TLogGenerator(const TMixedWorkloadParams* params);
+
+ std::string GetDDLQueries() const override;
+
+ TQueryInfoList GetInitialData() override;
+
+ TVector<std::string> GetCleanPaths() const override;
+
+ TQueryInfoList GetWorkload(int type) override;
+ TVector<TWorkloadType> GetSupportedWorkloadTypes() const override;
+
+ enum class EType {
+ Insert,
+ Upsert,
+ BulkUpsert,
+ Select
+ };
+
+private:
+ TQueryInfoList WriteRows(TString operation, TVector<TRow>&& rows);
+ TQueryInfoList Select(TVector<TRow>&& rows);
+ TQueryInfoList Insert(TVector<TRow>&& rows);
+ TQueryInfoList Upsert(TVector<TRow>&& rows);
+ TQueryInfoList BulkUpsert(TVector<TRow>&& rows);
+ TVector<TRow> GenerateRandomRows();
+
+ const ui64 TotalColumnsCnt;
+};
+
+} // namespace NMixed
+
+} // namespace NYdbWorkload
diff --git a/ydb/library/workload/mixed/registrar.cpp b/ydb/library/workload/mixed/registrar.cpp
new file mode 100644
index 00000000000..8a8042cf343
--- /dev/null
+++ b/ydb/library/workload/mixed/registrar.cpp
@@ -0,0 +1,12 @@
+#include "mixed.h"
+#include <ydb/library/workload/abstract/workload_factory.h>
+
+namespace NYdbWorkload {
+
+namespace NMixed {
+
+TWorkloadFactory::TRegistrator<TMixedWorkloadParams> Registrar("mixed");
+
+} // namespace NMixed
+
+} // namespace NYdbWorkload
diff --git a/ydb/library/workload/mixed/ya.make b/ydb/library/workload/mixed/ya.make
new file mode 100644
index 00000000000..6cdd7dc405d
--- /dev/null
+++ b/ydb/library/workload/mixed/ya.make
@@ -0,0 +1,14 @@
+LIBRARY()
+
+SRCS(
+ GLOBAL registrar.cpp
+ mixed.cpp
+)
+
+PEERDIR(
+ ydb/library/workload/abstract
+)
+
+GENERATE_ENUM_SERIALIZATION_WITH_HEADER(mixed.h)
+
+END()
diff --git a/ydb/library/workload/ya.make b/ydb/library/workload/ya.make
index 7819c15847a..b269c9422a5 100644
--- a/ydb/library/workload/ya.make
+++ b/ydb/library/workload/ya.make
@@ -5,6 +5,7 @@ PEERDIR(
ydb/library/workload/clickbench
ydb/library/workload/kv
ydb/library/workload/log
+ ydb/library/workload/mixed
ydb/library/workload/stock
ydb/library/workload/tpcds
ydb/library/workload/tpch
@@ -18,6 +19,7 @@ RECURSE(
clickbench
kv
log
+ mixed
stock
tpc_base
tpcds
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp b/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp
index 10e3423996d..a5ce8a92734 100644
--- a/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp
+++ b/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp
@@ -268,7 +268,7 @@ void TWorkloadCommand::WorkerFn(int taskId, NYdbWorkload::IWorkloadQueryGenerato
int TWorkloadCommand::RunWorkload(NYdbWorkload::IWorkloadQueryGenerator& workloadGen, const int type) {
if (!Quiet) {
- std::cout << "Window\tTxs/Sec\tRetries\tErrors\tp50(ms)\tp95(ms)\tp99(ms)\tpMax(ms)";
+ std::cout << "Window\tTxs\tTxs/Sec\tRetries\tErrors\tp50(ms)\tp95(ms)\tp99(ms)\tpMax(ms)";
if (PrintTimestamp) {
std::cout << "\tTimestamp";
}
@@ -320,7 +320,7 @@ void TWorkloadCommand::PrintWindowStats(int windowIt) {
WindowHist.Reset();
}
if (!Quiet) {
- std::cout << windowIt << "\t" << std::setw(7) << stats.OpsCount / WindowSec << "\t" << retries << "\t"
+ std::cout << windowIt << "\t" << std::setw(7) << stats.OpsCount << "\t" << stats.OpsCount / WindowSec << "\t" << retries << "\t"
<< errors << "\t" << stats.Percentile50 << "\t" << stats.Percentile95 << "\t"
<< stats.Percentile99 << "\t" << stats.Percentile100;
if (PrintTimestamp) {
diff --git a/ydb/tests/stress/mixedpy/test_mixed.py b/ydb/tests/stress/mixedpy/test_mixed.py
new file mode 100644
index 00000000000..50f1ef0e23e
--- /dev/null
+++ b/ydb/tests/stress/mixedpy/test_mixed.py
@@ -0,0 +1,108 @@
+# -*- coding: utf-8 -*-
+import os
+import sys
+
+import pytest
+
+import yatest
+
+from ydb.tests.library.harness.kikimr_runner import KiKiMR
+from ydb.tests.library.harness.kikimr_config import KikimrConfigGenerator
+from ydb.tests.olap.lib.utils import get_external_param
+
+
+class TestYdbMixedWorkload(object):
+ @classmethod
+ def setup_class(cls):
+ cls.cluster = KiKiMR(KikimrConfigGenerator())
+ cls.cluster.start()
+
+ def get_command_prefix(self, subcmds: list[str]) -> list[str]:
+ return [
+ yatest.common.binary_path(os.getenv('YDB_CLI_BINARY')),
+ '--verbose',
+ '--endpoint', self.endpoint,
+ '--database={}'.format(self.database),
+ 'workload', 'mixed'
+ ] + subcmds
+
+ @classmethod
+ def teardown_class(cls):
+ cls.cluster.stop()
+
+ @classmethod
+ def get_cols_count_command_params(cls) -> list[str]:
+ return [
+ '--int-cols', '5',
+ '--str-cols', '5',
+ ]
+
+ def get_bulk_upsert_cmd(self, duration: str):
+ return self.get_command_prefix(subcmds=['run', 'bulk_upsert'] + self.get_cols_count_command_params()) + [
+ '-s', duration,
+ '-t', '40',
+ '--rows', '1000',
+ '--window', '20',
+ '--len', '1000',
+ ]
+
+ def get_select_cmd(self, duration: str):
+ return self.get_command_prefix(subcmds=['run', 'select']) + [
+ '-s', duration,
+ '-t', '5',
+ '--rows', '100',
+ ]
+
+ def get_update_cmd(self, duration: str):
+ return self.get_command_prefix(subcmds=['run', 'upsert'] + self.get_cols_count_command_params()) + [
+ '-s', duration,
+ '-t', '5',
+ '--rows', '100',
+ '--len', '1000',
+ ]
+
+ @classmethod
+ def print_txs(cls, step: str, filename: str):
+ found = False
+ with open(filename, 'r') as f:
+ for line in f.readlines():
+ if found:
+ print('{} txs/sec: {}'.format(step, line.split()[1]))
+ break
+ else:
+ words = line.split()
+ if len(words) > 0 and words[0] == 'Txs':
+ found = True
+
+ @pytest.mark.parametrize('store_type', ['row', 'column'])
+ def test(self, store_type):
+ duration = get_external_param('duration', '120')
+ self.endpoint = get_external_param('endpoint', 'grpc://localhost:%d' % self.cluster.nodes[1].grpc_port)
+ self.database = get_external_param('database', '/Root')
+ yatest.common.execute(
+ self.get_command_prefix(subcmds=['clean']))
+
+ yatest.common.execute(
+ self.get_command_prefix(subcmds=['init'] + self.get_cols_count_command_params()) + [
+ '--store', store_type,
+ ])
+
+ with open('upsert_out', 'w+') as out:
+ yatest.common.execute(self.get_bulk_upsert_cmd(duration), stdout=out, stderr=sys.stderr)
+
+ with open('upsert1_out', 'w+') as out:
+ bulk_upsert = yatest.common.execute(self.get_bulk_upsert_cmd(duration), wait=False, stdout=out, stderr=sys.stderr)
+ select = yatest.common.execute(self.get_select_cmd(duration), wait=False)
+ bulk_upsert.wait()
+ select.wait()
+
+ with open('upsert2_out', 'w+') as out:
+ bulk_upsert = yatest.common.execute(self.get_bulk_upsert_cmd(duration), wait=False, stdout=out, stderr=sys.stderr)
+ select = yatest.common.execute(self.get_select_cmd(duration), wait=False)
+ update = yatest.common.execute(self.get_update_cmd(duration), wait=False)
+ bulk_upsert.wait()
+ select.wait()
+ update.wait()
+ self.print_txs('Upsert', 'upsert_out')
+ self.print_txs('Select', 'upsert1_out')
+ self.print_txs('Update', 'upsert2_out')
diff --git a/ydb/tests/stress/mixedpy/ya.make b/ydb/tests/stress/mixedpy/ya.make
new file mode 100644
index 00000000000..244460f7c92
--- /dev/null
+++ b/ydb/tests/stress/mixedpy/ya.make
@@ -0,0 +1,31 @@
+PY3TEST()
+ENV(YDB_DRIVER_BINARY="ydb/apps/ydbd/ydbd")
+ENV(YDB_CLI_BINARY="ydb/apps/ydb/ydb")
+
+
+TEST_SRCS(
+ test_mixed.py
+)
+
+IF (SANITIZER_TYPE)
+ REQUIREMENTS(ram:32)
+ELSE()
+ REQUIREMENTS(ram:16)
+ENDIF()
+
+TIMEOUT(1200)
+SIZE(LARGE)
+TAG(ya:fat)
+
+DEPENDS(
+ ydb/apps/ydbd
+ ydb/apps/ydb
+)
+
+PEERDIR(
+ ydb/tests/library
+ ydb/tests/olap/lib
+)
+
+
+END()
diff --git a/ydb/tests/stress/ya.make b/ydb/tests/stress/ya.make
index a688f425b89..95ed1f67463 100644
--- a/ydb/tests/stress/ya.make
+++ b/ydb/tests/stress/ya.make
@@ -1,6 +1,7 @@
RECURSE(
kv
log
+ mixedpy
olap_workload
oltp_workload
simple_queue