summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authormdartemenko <[email protected]>2022-08-12 16:22:34 +0300
committermdartemenko <[email protected]>2022-08-12 16:22:34 +0300
commit04e2f0cc097bf5f211d7e2297d9ea8d2a410d4a2 (patch)
tree19cb76663c4652ff37420a87892c21a98a36d6f2
parent2fda6d5f28b0008d1cdd6fa14c54913b0f06daaa (diff)
KqpLoadActor
kqp load actor for query processing benchmark
-rw-r--r--CMakeLists.darwin.txt4
-rw-r--r--CMakeLists.linux.txt4
-rw-r--r--ydb/core/blobstorage/testload/CMakeLists.txt1
-rw-r--r--ydb/core/blobstorage/testload/test_load_kqp.cpp809
-rw-r--r--ydb/core/protos/blobstorage.proto25
-rw-r--r--ydb/library/workload/stock_workload.cpp10
-rw-r--r--ydb/library/workload/stock_workload.h4
-rw-r--r--ydb/library/workload/workload_factory.cpp11
-rw-r--r--ydb/library/workload/workload_factory.h6
-rw-r--r--ydb/library/workload/workload_query_generator.h1
-rw-r--r--ydb/public/lib/ydb_cli/commands/stock_workload.cpp68
-rw-r--r--ydb/public/lib/ydb_cli/commands/stock_workload.h9
12 files changed, 746 insertions, 206 deletions
diff --git a/CMakeLists.darwin.txt b/CMakeLists.darwin.txt
index 74119ab78b6..ce7866efa47 100644
--- a/CMakeLists.darwin.txt
+++ b/CMakeLists.darwin.txt
@@ -411,6 +411,8 @@ add_subdirectory(ydb/core/blobstorage/nodewarden)
add_subdirectory(ydb/core/blob_depot/agent)
add_subdirectory(ydb/core/blobstorage/other)
add_subdirectory(ydb/core/blobstorage/testload)
+add_subdirectory(library/cpp/histogram/hdr)
+add_subdirectory(contrib/libs/hdr_histogram)
add_subdirectory(ydb/core/keyvalue)
add_subdirectory(ydb/core/engine/minikql)
add_subdirectory(ydb/core/client/minikql_compile)
@@ -980,8 +982,6 @@ add_subdirectory(ydb/library/yql/udfs/logs/dsv)
add_subdirectory(ydb/apps/ydb)
add_subdirectory(ydb/apps/ydb/commands)
add_subdirectory(ydb/public/lib/ydb_cli/commands)
-add_subdirectory(library/cpp/histogram/hdr)
-add_subdirectory(contrib/libs/hdr_histogram)
add_subdirectory(library/cpp/threading/local_executor)
add_subdirectory(contrib/libs/tbb)
add_subdirectory(ydb/library/backup)
diff --git a/CMakeLists.linux.txt b/CMakeLists.linux.txt
index 93586903ff2..5d3560f6e14 100644
--- a/CMakeLists.linux.txt
+++ b/CMakeLists.linux.txt
@@ -414,6 +414,8 @@ add_subdirectory(ydb/core/blobstorage/nodewarden)
add_subdirectory(ydb/core/blob_depot/agent)
add_subdirectory(ydb/core/blobstorage/other)
add_subdirectory(ydb/core/blobstorage/testload)
+add_subdirectory(library/cpp/histogram/hdr)
+add_subdirectory(contrib/libs/hdr_histogram)
add_subdirectory(ydb/core/keyvalue)
add_subdirectory(ydb/core/engine/minikql)
add_subdirectory(ydb/core/client/minikql_compile)
@@ -987,8 +989,6 @@ add_subdirectory(ydb/library/yql/udfs/logs/dsv)
add_subdirectory(ydb/apps/ydb)
add_subdirectory(ydb/apps/ydb/commands)
add_subdirectory(ydb/public/lib/ydb_cli/commands)
-add_subdirectory(library/cpp/histogram/hdr)
-add_subdirectory(contrib/libs/hdr_histogram)
add_subdirectory(library/cpp/threading/local_executor)
add_subdirectory(contrib/libs/tbb)
add_subdirectory(ydb/library/backup)
diff --git a/ydb/core/blobstorage/testload/CMakeLists.txt b/ydb/core/blobstorage/testload/CMakeLists.txt
index f50aae1d18c..6f3a2076fe5 100644
--- a/ydb/core/blobstorage/testload/CMakeLists.txt
+++ b/ydb/core/blobstorage/testload/CMakeLists.txt
@@ -11,6 +11,7 @@ add_library(core-blobstorage-testload)
target_link_libraries(core-blobstorage-testload PUBLIC
contrib-libs-cxxsupp
yutil
+ cpp-histogram-hdr
contrib-libs-protobuf
monlib-dynamic_counters-percentile
monlib-service-pages
diff --git a/ydb/core/blobstorage/testload/test_load_kqp.cpp b/ydb/core/blobstorage/testload/test_load_kqp.cpp
index f4601596075..1f8e0519639 100644
--- a/ydb/core/blobstorage/testload/test_load_kqp.cpp
+++ b/ydb/core/blobstorage/testload/test_load_kqp.cpp
@@ -21,6 +21,7 @@
#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
#include <library/cpp/monlib/service/pages/templates.h>
+#include <library/cpp/histogram/hdr/histogram.h>
#include <util/generic/queue.h>
#include <util/random/fast.h>
@@ -29,233 +30,729 @@
namespace NKikimr {
-class TEvKqpWriterTestLoadActor;
+enum {
+ EvKqpWorkerResponse
+};
-class TKqpWriterTestLoadActor : public TActorBootstrapped<TKqpWriterTestLoadActor> {
- struct TRequestInfo {
- ui32 Size;
- TInstant LogStartTime;
- };
-
- struct TRequestStat {
- ui64 BytesWrittenTotal;
- ui32 Size;
- TDuration Latency;
- };
-
- struct TLogWriteCookie {
- ui32 WorkerIdx;
- TInstant SentTime;
- ui64 Size;
- };
-
- ui64 Key;
- ui64 FirstKey;
- ui64 TotalRowsToUpsert;
- ui64 MaxInFlight;
- ui64 UniformPartitionsCount;
- ui32 NumOfSessions;
- TActorId Pipe;
- std::vector<TString> TxId;
+struct MonitoringData {
+public:
+ MonitoringData()
+ : WindowHist(60000, 2)
+ , WindowErrors(0) {}
- const TActorId Parent;
- ui64 Tag;
- ui32 DurationSeconds;
- ui32 StringValueSize;
- bool SequentialWrite;
- TString StringValue;
+ MonitoringData(const NHdr::THistogram& hist, ui64 window_errors)
+ : WindowHist(60000, 2)
+ , WindowErrors(window_errors)
+ {
+ WindowHist.Add(hist);
+ }
+
+ void Add(const MonitoringData& other) {
+ WindowHist.Add(other.WindowHist);
+ WindowErrors += other.WindowErrors;
+ }
+
+public:
+ NHdr::THistogram WindowHist;
+ ui64 WindowErrors;
+
+};
+
+struct TEvKqpWorkerResponse : TEventLocal<TEvKqpWorkerResponse, EvKqpWorkerResponse> {
+public:
+ TEvKqpWorkerResponse(const NHdr::THistogram& hist, ui64 window_errors, ui64 phase, ui64 worker_tag)
+ : Data(hist, window_errors)
+ , Phase(phase)
+ , WorkerTag(worker_tag) {}
+
+public:
+ MonitoringData Data;
+ ui64 Phase;
+ ui64 WorkerTag;
+
+};
+
+void ConvertYdbParamsToMiniKQLParams(const NYdb::TParams& input, NKikimrMiniKQL::TParams& output) {
+ output.MutableType()->SetKind(NKikimrMiniKQL::ETypeKind::Struct);
+ auto type = output.MutableType()->MutableStruct();
+ auto value = output.MutableValue();
+ for (const auto& p : input.GetValues()) {
+ auto typeMember = type->AddMember();
+ auto valueItem = value->AddStruct();
+ typeMember->SetName(p.first);
+ ConvertYdbTypeToMiniKQLType(NYdb::TProtoAccessor::GetProto(p.second.GetType()), *typeMember->MutableType());
+ ConvertYdbValueToMiniKQLValue(NYdb::TProtoAccessor::GetProto(p.second.GetType()), NYdb::TProtoAccessor::GetProto(p.second), *valueItem);
+ }
+}
+
+class TKqpLoadWorker : public TActorBootstrapped<TKqpLoadWorker> {
+public:
+ TKqpLoadWorker(TActorId parent,
+ TString working_dir,
+ std::shared_ptr<NYdbWorkload::IWorkloadQueryGenerator> workload_query_gen,
+ ui64 workload_type,
+ ui64 parentTag,
+ ui64 workerTag,
+ ui64 durationSeconds,
+ ui64 windowDuration,
+ ui64 windowCount,
+ NMonitoring::TDynamicCounters::TCounterPtr transactions,
+ NMonitoring::TDynamicCounters::TCounterPtr transactionsBytesWritten)
+ : Parent(std::move(parent))
+ , WorkingDir(std::move(working_dir))
+ , WorkloadQueryGen(workload_query_gen)
+ , WorkloadType(workload_type)
+ , ParentTag(parentTag)
+ , WorkerTag(workerTag)
+ , DurationSeconds(durationSeconds)
+ , WindowHist(60000, 2)
+ , WindowDuration(windowDuration)
+ , WindowCount(windowCount)
+ , Transactions(transactions)
+ , TransactionsBytesWritten(transactionsBytesWritten) {}
+
+ void Bootstrap(const TActorContext& ctx) {
+ LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag << " TKqpLoadWorker Bootstrap called");
+
+ ctx.Schedule(TDuration::Seconds(DurationSeconds), new TEvents::TEvPoisonPill);
+ ctx.Schedule(TDuration::Seconds(WindowDuration), new TEvUpdateMonitoring);
+
+ Become(&TKqpLoadWorker::StateFunc);
+ CreateWorkingSession(ctx);
+ }
+
+ STRICT_STFUNC(StateFunc,
+ CFunc(TEvents::TSystem::PoisonPill, HandlePoisonPill)
+ HFunc(NKqp::TEvKqp::TEvQueryResponse, HandleResponse)
+ HFunc(NKqp::TEvKqp::TEvCreateSessionResponse, HandleCreateSessionResponse)
+ HFunc(TEvUpdateMonitoring, HandleWindowTimer)
+ )
+
+private:
+
+ // death
+
+ void HandlePoisonPill(const TActorContext& ctx) {
+ LOG_INFO_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag << " HandlePoisonPill");
+
+ if (Phase < WindowCount) {
+ SendMonitoringEvent(ctx);
+ }
+
+ CloseSession(ctx);
+ Die(ctx);
+ }
+
+ void CloseSession(const TActorContext& ctx) {
+ LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag << " creating event for session close");
+
+ auto ev = MakeHolder<NKqp::TEvKqp::TEvCloseSessionRequest>();
+ ev->Record.MutableRequest()->SetSessionId(WorkerSession);
+
+ auto kqp_proxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId());
+ LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag
+ << " sending session close query to proxy: " + kqp_proxy.ToString());
+
+ ctx.Send(kqp_proxy, ev.Release());
+ }
+
+private:
+
+ // working
+
+ void CreateWorkingSession(const TActorContext& ctx) {
+ LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag << " creating event for session creation");
+ auto ev = MakeHolder<NKqp::TEvKqp::TEvCreateSessionRequest>();
+
+ ev->Record.MutableRequest()->SetDatabase(WorkingDir);
+
+ auto kqp_proxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId());
+ LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag
+ << " sending event for session creation to proxy: " << kqp_proxy.ToString());
+
+ Send(kqp_proxy, ev.Release());
+ }
+
+ void HandleCreateSessionResponse(NKqp::TEvKqp::TEvCreateSessionResponse::TPtr& ev, const TActorContext& ctx) {
+ auto& response = ev->Get()->Record;
+
+ if (response.GetYdbStatus() == Ydb::StatusIds_StatusCode_SUCCESS) {
+ WorkerSession = response.GetResponse().GetSessionId();
+ LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag << " Session is created: " + WorkerSession);
+ CreateDataQuery(ctx);
+ } else {
+ LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag
+ << " Session creation failed: " + ev->Get()->ToString());
+ }
+ }
+
+ void CreateDataQuery(const TActorContext& ctx) {
+ if (queries.empty()) {
+ queries = WorkloadQueryGen->GetWorkload(WorkloadType);
+ }
+
+ auto q = std::move(queries.front());
+ queries.pop_front();
+
+ LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag
+ << " query type: " << WorkloadType << ", params size: " << q.Params.GetValues().size());
+
+ Transactions->Inc();
+
+ TString query_text = TString(q.Query);
+ NYdb::TParams query_params = q.Params;
+
+ auto request = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>();
+
+ LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag << " using session: " << WorkerSession);
+
+ request->Record.MutableRequest()->SetSessionId(WorkerSession);
+ request->Record.MutableRequest()->SetKeepSession(true);
+ request->Record.MutableRequest()->SetDatabase(WorkingDir);
+
+ request->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
+ request->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DML);
+ request->Record.MutableRequest()->SetQuery(query_text);
+
+ request->Record.MutableRequest()->MutableQueryCachePolicy()->set_keep_in_cache(true);
+ request->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write();
+ request->Record.MutableRequest()->MutableTxControl()->set_commit_tx(true);
+
+ request->Record.MutableRequest()->SetCollectStats(Ydb::Table::QueryStatsCollection_Mode::QueryStatsCollection_Mode_STATS_COLLECTION_FULL);
+
+ NKikimrMiniKQL::TParams params;
+ ConvertYdbParamsToMiniKQLParams(query_params, params);
+ request->Record.MutableRequest()->MutableParameters()->Swap(&params);
+
+ auto kqp_proxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId());
+ LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag
+ << " sending data query to proxy: " + kqp_proxy.ToString());
+
+ ctx.Send(kqp_proxy, request.Release());
+
+ }
+
+ void HandleResponse(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) {
+ auto& response = ev->Get()->Record.GetRef();
+
+ Transactions->Dec();
+
+ if (response.GetYdbStatus() == Ydb::StatusIds_StatusCode_SUCCESS) {
+ LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag << " data request status: Success");
+ TransactionsBytesWritten->Add(response.GetResponse().GetQueryStats().ByteSize());
+ WindowHist.RecordValue(response.GetResponse().GetQueryStats().GetDurationUs());
+ } else {
+ LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag
+ << " data request status: Fail, Issue: " + ev->Get()->ToString());
+ ++WindowErrors;
+ }
+
+ if (Phase < WindowCount) {
+ CreateDataQuery(ctx);
+ }
+ }
+
+private:
+
+ // monitoring
+
+ void HandleWindowTimer(TEvUpdateMonitoring::TPtr& /*ev*/, const TActorContext& ctx) {
+ LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag
+ << " handle TEvUpdateMonitoring, Phase: " << Phase);
+
+ SendMonitoringEvent(ctx);
+
+ if (Phase < WindowCount) {
+ LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag
+ << " reschedule TEvUpdateMonitoring, Phase: " << Phase);
+ ctx.Schedule(TDuration::Seconds(WindowDuration), new TEvUpdateMonitoring);
+ }
+ }
+
+private:
+
+ // common
+
+ void SendMonitoringEvent(const TActorContext& ctx) {
+ auto ev = MakeHolder<TEvKqpWorkerResponse>(WindowHist, WindowErrors, Phase, WorkerTag);
+
+ WindowHist.Reset();
+ WindowErrors = 0;
+ ++Phase;
+
+ ctx.Send(Parent, ev.Release());
+ }
+
+private:
+ TActorId Parent;
TString WorkingDir;
- size_t ProductCount;
- size_t Quantity;
- bool DeleteTableOnFinish;
- std::vector<TString> preparedQuery;
std::shared_ptr<NYdbWorkload::IWorkloadQueryGenerator> WorkloadQueryGen;
+ ui64 WorkloadType;
+ ui64 ParentTag;
+ ui64 WorkerTag;
- TReallyFastRng32 Rng;
+ NYdbWorkload::TQueryInfoList queries;
- std::unordered_map<TString, std::queue<TInstant>> SentTime;
+ TString WorkerSession = "wrong sessionId";
+ ui64 DurationSeconds = 1;
- // Monitoring
- TIntrusivePtr<::NMonitoring::TDynamicCounters> LoadCounters;
- ::NMonitoring::TDynamicCounters::TCounterPtr Transactions;
- ::NMonitoring::TDynamicCounters::TCounterPtr TransactionsBytesWritten;
- TInstant TestStartTime;
+private:
+ // for monitoring
+ NHdr::THistogram WindowHist;
+ ui64 WindowErrors = 0;
+
+ ui64 WindowDuration;
+ ui64 WindowCount;
- TMap<ui64, TLogWriteCookie> InFlightWrites;
- NMonitoring::TPercentileTrackerLg<6, 5, 15> ResponseTimes;
- std::vector<TString> Sessions;
- TString PreparedSelectQuery;
+ ui64 Phase = 0;
+ NMonitoring::TDynamicCounters::TCounterPtr Transactions;
+ NMonitoring::TDynamicCounters::TCounterPtr TransactionsBytesWritten;
+
+};
+
+class TKqpWriterTestLoadActor : public TActorBootstrapped<TKqpWriterTestLoadActor> {
public:
static constexpr auto ActorActivityType() {
return NKikimrServices::TActivity::KQP_TEST_WORKLOAD;
}
TKqpWriterTestLoadActor(const NKikimrBlobStorage::TEvTestLoadRequest::TKqpLoadStart& cmd,
- const TActorId& parent, const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters, ui64 index, ui64 tag)
+ const TActorId& parent,
+ const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters,
+ ui64 index,
+ ui64 tag)
: Parent(parent)
, Tag(tag)
- , Rng(Now().GetValue())
{
Y_UNUSED(index);
VERIFY_PARAM(DurationSeconds);
- DurationSeconds = cmd.GetDurationSeconds();
- NumOfSessions = cmd.GetNumOfSessions();
- MaxInFlight = cmd.GetMaxInFlight();
- Key = cmd.GetFirstKey();
- FirstKey = Key;
- TotalRowsToUpsert = cmd.GetTotalRowsToUpsert();
- StringValueSize = cmd.GetStringValueSize();
- DeleteTableOnFinish = cmd.GetDeleteTableOnFinish();
+
+ google::protobuf::TextFormat::PrintToString(cmd, &ConfingString);
+
UniformPartitionsCount = cmd.GetUniformPartitionsCount();
- SequentialWrite = cmd.GetSequentialWrite();
- StringValue = TString(StringValueSize, 'a');
+ DeleteTableOnFinish = cmd.GetDeleteTableOnFinish();
WorkingDir = cmd.GetWorkingDir();
-
- if (cmd.Workload_case() == NKikimrBlobStorage::TEvTestLoadRequest_TKqpLoadStart::WorkloadCase::kStock) {
- ProductCount = cmd.GetStock().GetProductCount();
- Quantity = cmd.GetStock().GetQuantity();
+ WorkloadType = cmd.GetWorkloadType();
+ DurationSeconds = cmd.GetDurationSeconds();
+ WindowDuration = cmd.GetWindowDuration();
+ WindowCount = (DurationSeconds + WindowDuration - 1) / WindowDuration;
+ NumOfSessions = cmd.GetNumOfSessions();
+ ChunkLoad.resize(WindowCount);
+ Chunk.reserve(WindowCount);
+ Total = std::make_unique<MonitoringData>();
+ for (size_t i = 0; i < WindowCount; ++i) {
+ Chunk.push_back(std::make_unique<MonitoringData>());
}
+
+ NYdbWorkload::TWorkloadFactory factory;
- if (cmd.GetWorkloadName() != "stock") {
+ if (cmd.Workload_case() == NKikimrBlobStorage::TEvTestLoadRequest_TKqpLoadStart::WorkloadCase::kStock) {
+ NYdbWorkload::TStockWorkloadParams params;
+ params.PartitionsByLoad = cmd.GetStock().GetPartitionsByLoad();
+ params.OrderCount = cmd.GetStock().GetOrderCount();
+ params.ProductCount = cmd.GetStock().GetProductCount();
+ params.Quantity = cmd.GetStock().GetQuantity();
+ params.Limit = cmd.GetStock().GetLimit();
+ params.DbPath = WorkingDir;
+ params.MinPartitions = UniformPartitionsCount;
+ WorkloadQueryGen = factory.GetWorkloadQueryGenerator(NYdbWorkload::EWorkload::STOCK, &params);
+ } else {
return;
}
-
- NYdbWorkload::TStockWorkloadParams params;
- params.DbPath = WorkingDir;
- params.PartitionsByLoad = true;
-
- NYdbWorkload::TWorkloadFactory factory;
- WorkloadQueryGen = factory.GetWorkloadQueryGenerator(cmd.GetWorkloadName(), &params);
+
Y_ASSERT(WorkloadQueryGen.get() != nullptr);
Y_ASSERT(DurationSeconds > DelayBeforeMeasurements.Seconds());
// Monitoring initialization
- TVector<float> percentiles {0.1f, 0.5f, 0.9f, 0.99f, 0.999f, 1.0f};
+
LoadCounters = counters->GetSubgroup("tag", Sprintf("%" PRIu64, tag));
Transactions = LoadCounters->GetCounter("Transactions", true);
TransactionsBytesWritten = LoadCounters->GetCounter("TransactionsBytesWritten", true);
-
- ResponseTimes.Initialize(LoadCounters, "subsystem", "LoadActorLogWriteDuration", "Time in microseconds", percentiles);
}
~TKqpWriterTestLoadActor() {
LoadCounters->ResetCounters();
}
-
void Bootstrap(const TActorContext& ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag
- << " TKqpWriterTestLoadActor Bootstrap called");
- Become(&TKqpWriterTestLoadActor::StateFunc);
+ LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " TKqpWriterTestLoadActor Bootstrap called");
+ Become(&TKqpWriterTestLoadActor::StateStart);
+
LOG_INFO_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " Schedule PoisonPill");
- ctx.Schedule(TDuration::Seconds(DurationSeconds), new TEvents::TEvPoisonPill);
- ctx.Schedule(TDuration::MilliSeconds(MonitoringUpdateCycleMs), new TEvUpdateMonitoring);
+ ctx.Schedule(TDuration::Seconds(DurationSeconds * 2), new TEvents::TEvPoisonPill);
- LOG_INFO_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " Bootstrap");
-
- CreateSession(ctx);
+ CreateSessionForTablesDDL(ctx);
}
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- // Death management
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ STRICT_STFUNC(StateStart,
+ CFunc(TEvents::TSystem::PoisonPill, HandlePoisonPill)
+ HFunc(NKqp::TEvKqp::TEvCreateSessionResponse, HandleCreateSessionResponse)
+ HFunc(NKqp::TEvKqp::TEvQueryResponse, HandleCreateTableResponse)
+ HFunc(NMon::TEvHttpInfo, HandleHTML)
+ )
+
+ STRICT_STFUNC(StateMain,
+ CFunc(TEvents::TSystem::PoisonPill, HandlePoisonPill)
+ HFunc(NKqp::TEvKqp::TEvQueryResponse, HandleDataQueryResponse)
+ HFunc(TEvKqpWorkerResponse, HandleMonitoring)
+ HFunc(NMon::TEvHttpInfo, HandleHTML)
+ )
+
+ STRICT_STFUNC(StateEndOfWork,
+ HFunc(NKqp::TEvKqp::TEvQueryResponse, HandleDropTablesResponse)
+ )
+
+private:
+
+ // death
void HandlePoisonPill(const TActorContext& ctx) {
- LOG_INFO_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " HandlePoisonPill, "
- << "all workers is initialized, so starting death process");
+ LOG_INFO_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " HandlePoisonPill, starting death process");
StartDeathProcess(ctx);
}
void StartDeathProcess(const TActorContext& ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag
- << " TKqpWriterTestLoadActor StartDeathProcess called");
- for(const auto& session : Sessions) {
- auto request = std::make_unique<NKqp::TEvKqp::TEvCloseSessionRequest>();
- request->Record.MutableRequest()->SetSessionId(session);
- ctx.Send( new IEventHandle(NKqp::MakeKqpProxyID(1), SelfId(), request.release(),
- 0, /* via actor system */ true));
- }
+ LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " TKqpWriterTestLoadActor StartDeathProcess called");
Become(&TKqpWriterTestLoadActor::StateEndOfWork);
+
+ if (DeleteTableOnFinish) {
+ DropTables(ctx);
+ } else {
+ DeathReport(ctx);
+ }
+ }
+
+ void DropTables(const TActorContext& ctx) {
+ LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " creating event for tables drop");
+
+ auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>();
+ ev->Record.MutableRequest()->SetDatabase(WorkingDir);
+ ev->Record.MutableRequest()->SetSessionId(TableSession);
+ ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
+ ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DDL);
+ ev->Record.MutableRequest()->SetQuery(WorkloadQueryGen->GetCleanDDLQueries());
+
+ auto kqp_proxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId());
+ LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " sending drop tables query to proxy: " + kqp_proxy.ToString());
+
+ ctx.Send(kqp_proxy, ev.Release());
+ }
+
+ void HandleDropTablesResponse(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) {
+ auto& response = ev->Get()->Record.GetRef();
+
+ if (response.GetYdbStatus() == Ydb::StatusIds_StatusCode_SUCCESS) {
+ LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " drop tables status: SUCCESS");
+ } else {
+ LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " drop tables status: FAIL, reason: " + ev->Get()->ToString());
+ }
+
+ DeathReport(ctx);
+ }
+
+ void DeathReport(const TActorContext& ctx) {
+ CloseSession(ctx);
+
TIntrusivePtr<TLoadReport> Report(new TLoadReport());
Report->Duration = TDuration::Seconds(DurationSeconds);
+
ctx.Send(Parent, new TEvTestLoadFinished(Tag, Report, "OK called StartDeathProcess"));
Die(ctx);
}
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- // Monitoring
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+private:
- void Handle(TEvUpdateMonitoring::TPtr& /*ev*/, const TActorContext& ctx) {
- ResponseTimes.Update();
- ctx.Schedule(TDuration::MilliSeconds(MonitoringUpdateCycleMs), new TEvUpdateMonitoring);
+ // monitoring
+ void HandleMonitoring(TEvKqpWorkerResponse::TPtr& ev, const TActorContext& ctx) {
+ const auto& response = ev->Get();
+
+ LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " got monitoring response from worker Tag# " << response->WorkerTag
+ << " Phase: " << response->Phase
+ << " Min: " << response->Data.WindowHist.GetMin()
+ << " Max: " << response->Data.WindowHist.GetMax()
+ << " Count: " << response->Data.WindowHist.GetTotalCount());
+
+ Chunk[response->Phase]->Add(response->Data);
+ ChunkLoad[response->Phase] += 1;
+
+ if (ChunkLoad[Phase] == NumOfSessions) {
+ Total->Add(*Chunk[Phase]);
+ SendNewRowToParent(ctx);
+ }
}
- STRICT_STFUNC(StateFunc,
- CFunc(TEvents::TSystem::PoisonPill, HandlePoisonPill)
- HFunc(TEvUpdateMonitoring, Handle)
- )
+ void SendNewRowToParent(const TActorContext& ctx) {
+ Phase += 1;
+
+ LOG_INFO_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag
+ << " total: Phase: " << Phase << " -> "
+ << Total->WindowHist.GetTotalCount() << " | "
+ << Total->WindowHist.GetTotalCount() / (WindowDuration * std::max(ui64(1), Phase) * 1.0) << " | "
+ << Total->WindowErrors << " | "
+ << Total->WindowHist.GetValueAtPercentile(50.0) / (WindowDuration * 1000.0) << " | "
+ << Total->WindowHist.GetValueAtPercentile(95.0) / (WindowDuration * 1000.0) << " | "
+ << Total->WindowHist.GetValueAtPercentile(99.0) / (WindowDuration * 1000.0) << " | "
+ << Total->WindowHist.GetMax() / (WindowDuration * 1000.0)
+ );
+
+ if (Phase >= WindowCount) {
+ StartDeathProcess(ctx);
+ }
+ }
- STRICT_STFUNC(StateEndOfWork,
- HFunc(TEvUpdateMonitoring, Handle)
- )
+private:
+
+ // creating tables
+
+ void CreateSessionForTablesDDL(const TActorContext& ctx) {
+ LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " creating event for session creation");
+ auto ev = MakeHolder<NKqp::TEvKqp::TEvCreateSessionRequest>();
+
+ ev->Record.MutableRequest()->SetDatabase(WorkingDir);
+
+ auto kqp_proxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId());
+ LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " sending event for session creation to proxy: " << kqp_proxy.ToString());
+
+ Send(kqp_proxy, ev.Release());
+ }
+
+ void HandleCreateSessionResponse(NKqp::TEvKqp::TEvCreateSessionResponse::TPtr& ev, const TActorContext& ctx) {
+ auto& response = ev->Get()->Record;
+
+ if (response.GetYdbStatus() == Ydb::StatusIds_StatusCode_SUCCESS) {
+ TableSession = response.GetResponse().GetSessionId();
+ LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " Session is created: " + TableSession);
+ CreateTables(ctx);
+ } else {
+ LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " Session creation failed: " + ev->Get()->ToString());
+ }
+ }
+
+ void CreateTables(const TActorContext& ctx) {
+ LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " creating event for tables creation");
+
+ auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>();
+ ev->Record.MutableRequest()->SetDatabase(WorkingDir);
+ ev->Record.MutableRequest()->SetSessionId(TableSession);
+ ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
+ ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DDL);
+ ev->Record.MutableRequest()->SetQuery(WorkloadQueryGen->GetDDLQueries());
+
+ auto kqp_proxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId());
+ LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " sending ddl query to proxy: " + kqp_proxy.ToString());
+
+ ctx.Send(kqp_proxy, ev.Release());
+ }
+
+ void HandleCreateTableResponse(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) {
+ auto& response = ev->Get()->Record.GetRef();
+
+ Become(&TKqpWriterTestLoadActor::StateMain);
+
+ if (response.GetYdbStatus() == Ydb::StatusIds_StatusCode_SUCCESS) {
+ LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " tables are created");
+ InitData = WorkloadQueryGen->GetInitialData();
+ InsertInitData(ctx);
+ } else {
+ LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " tables creation failed: " + ev->Get()->ToString());
+ }
+ }
private:
- void CreateSession(const TActorContext& ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Creating session to run tables DDL.");
- auto request = Ydb::Table::CreateSessionRequest();
- auto cb = [this, &ctx](const Ydb::Table::CreateSessionResponse& resp) {
- auto op = resp.Getoperation();
- if (op.Getready()) {
- auto status = op.Getstatus();
- if (status == Ydb::StatusIds_StatusCode_SUCCESS) {
- Ydb::Table::CreateSessionResult result;
- op.result().UnpackTo(&result);
- TString sessionId = result.session_id();
- LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Session is created: " + sessionId);
- CreateShardedTable(ctx, sessionId);
- } else {
- LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Session creation failed " + std::to_string((int)status)
- + " Issue: " + op.Getissues(0).Getmessage());
- }
- }
- };
- using namespace NGRpcService;
- using TEvCreateSessionRequest = TGrpcRequestOperationCall<Ydb::Table::CreateSessionRequest,
- Ydb::Table::CreateSessionResponse>;
- NKikimr::NRpcService::DoLocalRpcSameMailbox<TEvCreateSessionRequest>(
- std::move(request), std::move(cb), WorkingDir, TString(), ctx
- );
+ // table initialization
+
+ void InsertInitData(const TActorContext& ctx) {
+ if (InitData.empty()) {
+ InitWorkers(ctx);
+ return;
+ }
+
+ auto q = std::move(InitData.front());
+ InitData.pop_front();
+
+ LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << Tag
+ << " Creating request for init query, need to exec: " << InitData.size() + 1);
+
+ TString query_text = TString(q.Query);
+ NYdb::TParams query_params = q.Params;
+
+ auto request = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>();
+
+ LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << Tag << " using session: " << TableSession);
+
+ request->Record.MutableRequest()->SetSessionId(TableSession);
+ request->Record.MutableRequest()->SetKeepSession(true);
+ request->Record.MutableRequest()->SetDatabase(WorkingDir);
+
+ request->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
+ request->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DML);
+ request->Record.MutableRequest()->SetQuery(query_text);
+
+ request->Record.MutableRequest()->MutableQueryCachePolicy()->set_keep_in_cache(true);
+ request->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write();
+ request->Record.MutableRequest()->MutableTxControl()->set_commit_tx(true);
+
+ request->Record.MutableRequest()->SetCollectStats(Ydb::Table::QueryStatsCollection_Mode::QueryStatsCollection_Mode_STATS_COLLECTION_FULL);
+
+ NKikimrMiniKQL::TParams params;
+ ConvertYdbParamsToMiniKQLParams(query_params, params);
+ request->Record.MutableRequest()->MutableParameters()->Swap(&params);
+
+ auto kqp_proxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId());
+ LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << Tag
+ << " sending init query to proxy: " + kqp_proxy.ToString());
+
+ ctx.Send(kqp_proxy, request.Release());
+ }
+
+ void HandleDataQueryResponse(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) {
+ auto& response = ev->Get()->Record.GetRef();
+
+ if (response.GetYdbStatus() == Ydb::StatusIds_StatusCode_SUCCESS) {
+ LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " init query status: SUCCESS");
+ } else {
+ LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " init query status: FAIL, reason: " + ev->Get()->ToString());
+ }
+
+ InsertInitData(ctx);
}
- void CreateShardedTable(const TActorContext& ctx, const TString& sessionId) {
- LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Creating tables for workload.");
- auto request = Ydb::Table::ExecuteSchemeQueryRequest();
- request.set_session_id(sessionId);
- request.set_yql_text(WorkloadQueryGen->GetDDLQueries());
- auto cb = [&ctx](const Ydb::Table::ExecuteSchemeQueryResponse& resp) {
- auto op = resp.Getoperation();
- if (op.Getready()) {
- auto status = op.Getstatus();
- if (status == Ydb::StatusIds_StatusCode_SUCCESS) {
- LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tables are created.");
- } else {
- LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tables creation failed " + std::to_string((int)status)
- + " Issue: " + op.Getissues(0).Getmessage());
+private:
+
+ // html render
+
+ void HandleHTML(NMon::TEvHttpInfo::TPtr& ev, const TActorContext& ctx) {
+ TStringStream str;
+ HTML(str) {
+ TABLE_CLASS("table table-condensed") {
+ TABLEHEAD() {
+ TABLER() {
+ TABLEH() {
+ str << "Window";
+ }
+ TABLEH() {
+ str << "Txs";
+ }
+ TABLEH() {
+ str << "Txs/Sec";
+ }
+ TABLEH() {
+ str << "Errors";
+ }
+ TABLEH() {
+ str << "p50(ms)";
+ }
+ TABLEH() {
+ str << "p95(ms)";
+ }
+ TABLEH() {
+ str << "p99(ms)";
+ }
+ TABLEH() {
+ str << "pMax(ms)";
+ }
+ }
+ }
+ TABLEBODY() {
+ TABLER() {
+ TABLED() { str << "total"; };
+ TABLED() { str << Total->WindowHist.GetTotalCount(); };
+ TABLED() { str << Total->WindowHist.GetTotalCount() / (WindowDuration * std::max(ui64(1), Phase) * 1.0); };
+ TABLED() { str << Total->WindowErrors; };
+ TABLED() { str << Total->WindowHist.GetValueAtPercentile(50.0) / (WindowDuration * 1000.0); };
+ TABLED() { str << Total->WindowHist.GetValueAtPercentile(95.0) / (WindowDuration * 1000.0); };
+ TABLED() { str << Total->WindowHist.GetValueAtPercentile(99.0) / (WindowDuration * 1000.0); };
+ TABLED() { str << Total->WindowHist.GetMax() / (WindowDuration * 1000.0); };
+ }
+ for (size_t i = Phase; i >= 1; --i) {
+ TABLER() {
+ TABLED() { str << i; };
+ TABLED() { str << Chunk[i - 1]->WindowHist.GetTotalCount(); };
+ TABLED() { str << Chunk[i - 1]->WindowHist.GetTotalCount() / (WindowDuration * 1.0); };
+ TABLED() { str << Chunk[i - 1]->WindowErrors; };
+ TABLED() { str << Chunk[i - 1]->WindowHist.GetValueAtPercentile(50.0) / (WindowDuration * 1000.0); };
+ TABLED() { str << Chunk[i - 1]->WindowHist.GetValueAtPercentile(95.0) / (WindowDuration * 1000.0); };
+ TABLED() { str << Chunk[i - 1]->WindowHist.GetValueAtPercentile(99.0) / (WindowDuration * 1000.0); };
+ TABLED() { str << Chunk[i - 1]->WindowHist.GetMax() / (WindowDuration * 1000.0); };
+ }
+ }
}
}
- };
- using namespace NGRpcService;
- using TEvExecuteSchemeQueryRequest = TGrpcRequestOperationCall<Ydb::Table::ExecuteSchemeQueryRequest,
- Ydb::Table::ExecuteSchemeQueryResponse>;
- NKikimr::NRpcService::DoLocalRpcSameMailbox<TEvExecuteSchemeQueryRequest>(
- std::move(request), std::move(cb), WorkingDir, TString(), ctx
- );
+ COLLAPSED_BUTTON_CONTENT(Sprintf("configProtobuf%" PRIu64, Tag), "Config") {
+ str << "<pre>" << ConfingString << "</pre>";
+ }
+ }
+ ctx.Send(ev->Sender, new NMon::TEvHttpInfoRes(str.Str(), ev->Get()->SubRequestId));
+ }
+
+
+private:
+
+ // common
+
+ void InitWorkers(const TActorContext& ctx) {
+ for (ui64 i = 0; i < NumOfSessions; ++i) {
+ auto* worker = new TKqpLoadWorker(
+ SelfId(),
+ WorkingDir,
+ WorkloadQueryGen,
+ WorkloadType,
+ Tag,
+ i,
+ DurationSeconds,
+ WindowDuration,
+ WindowCount,
+ Transactions,
+ TransactionsBytesWritten);
+ Workers.push_back(ctx.Register(worker));
+ }
}
+ void CloseSession(const TActorContext& ctx) {
+ LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " creating event for session close");
+
+ auto ev = MakeHolder<NKqp::TEvKqp::TEvCloseSessionRequest>();
+ ev->Record.MutableRequest()->SetSessionId(TableSession);
+
+ auto kqp_proxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId());
+ LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " sending session close query to proxy: " + kqp_proxy.ToString());
+
+ ctx.Send(kqp_proxy, ev.Release());
+ }
+
+private:
+ TString TableSession = "wrong sessionId";
+ TString WorkingDir;
+ ui64 WorkloadType;
+ ui64 WindowCount;
+ ui64 WindowDuration;
+ std::vector<TActorId> Workers;
+ TString ConfingString;
+ ui64 UniformPartitionsCount;
+ bool DeleteTableOnFinish;
+ ui32 NumOfSessions;
+
+ NYdbWorkload::TQueryInfoList InitData;
+
+ const TActorId Parent;
+ ui64 Tag;
+ ui32 DurationSeconds;
+ std::shared_ptr<NYdbWorkload::IWorkloadQueryGenerator> WorkloadQueryGen;
+
+ // Monitoring
+ std::vector<std::unique_ptr<MonitoringData>> Chunk;
+ std::vector<ui64> ChunkLoad;
+ std::unique_ptr<MonitoringData> Total;
+ ui64 Phase = 0;
+
+ // counters
+ TIntrusivePtr<::NMonitoring::TDynamicCounters> LoadCounters;
+ NMonitoring::TDynamicCounters::TCounterPtr Transactions;
+ NMonitoring::TDynamicCounters::TCounterPtr TransactionsBytesWritten;
+
};
IActor * CreateKqpWriterTestLoad(const NKikimrBlobStorage::TEvTestLoadRequest::TKqpLoadStart& cmd,
diff --git a/ydb/core/protos/blobstorage.proto b/ydb/core/protos/blobstorage.proto
index 0ea61ce9abb..2d498cd2afd 100644
--- a/ydb/core/protos/blobstorage.proto
+++ b/ydb/core/protos/blobstorage.proto
@@ -1529,23 +1529,22 @@ message TEvTestLoadRequest {
message TKqpLoadStart {
message TStockWorkload {
- optional uint64 ProductCount = 1;
- optional uint64 Quantity = 2;
+ optional uint64 ProductCount = 1 [default = 100];
+ optional uint64 Quantity = 2 [default = 1000];
+ optional uint64 OrderCount = 3 [default = 100];
+ optional uint64 Limit = 4 [default = 10];
+ optional bool PartitionsByLoad = 5 [default = true];
}
optional uint64 Tag = 1;
optional uint32 DurationSeconds = 2;
- optional uint32 NumOfSessions = 3;
- optional uint32 MaxInFlight = 4;
- optional uint32 StringValueSize = 5 [default = 8000];
- optional uint64 FirstKey = 6 [default = 0];
- optional bool DeleteTableOnFinish = 7 [default = true];
- optional bool SequentialWrite = 8 [default = true];
- optional uint32 UniformPartitionsCount = 9 [default = 1];
- optional uint64 TotalRowsToUpsert = 10;
- optional string WorkingDir = 11;
- optional string WorkloadName = 12;
+ optional uint32 WindowDuration = 3;
+ optional string WorkingDir = 4;
+ optional uint32 NumOfSessions = 5;
+ optional bool DeleteTableOnFinish = 6;
+ optional uint32 UniformPartitionsCount = 7;
+ optional uint32 WorkloadType = 8;
oneof Workload {
- TStockWorkload Stock = 13;
+ TStockWorkload Stock = 9;
}
}
diff --git a/ydb/library/workload/stock_workload.cpp b/ydb/library/workload/stock_workload.cpp
index c65b5b392e4..28d70fa44c5 100644
--- a/ydb/library/workload/stock_workload.cpp
+++ b/ydb/library/workload/stock_workload.cpp
@@ -74,6 +74,16 @@ TQueryInfoList TStockWorkloadGenerator::GetInitialData() {
return res;
}
+std::string TStockWorkloadGenerator::GetCleanDDLQueries() const {
+ std::string clean_query = R"(
+ DROP TABLE `stock`;
+ DROP TABLE `orders`;
+ DROP TABLE `orderLines`;
+ )";
+
+ return clean_query;
+}
+
TQueryInfo TStockWorkloadGenerator::FillStockData() const {
std::string query = R"(--!syntax_v1
DECLARE $stocks AS List<Struct<product:Utf8,quantity:Int64>>;
diff --git a/ydb/library/workload/stock_workload.h b/ydb/library/workload/stock_workload.h
index 3725cfe1394..aaac35c4451 100644
--- a/ydb/library/workload/stock_workload.h
+++ b/ydb/library/workload/stock_workload.h
@@ -22,7 +22,7 @@ public:
static TStockWorkloadGenerator* New(const TStockWorkloadParams* params) {
if (!validateDbPath(params->DbPath)) {
- return nullptr;
+ throw yexception() << "Invalid path to database." << Endl;
}
return new TStockWorkloadGenerator(params);
}
@@ -33,6 +33,8 @@ public:
TQueryInfoList GetInitialData() override;
+ std::string GetCleanDDLQueries() const override;
+
TQueryInfoList GetWorkload(int type) override;
enum class EType {
diff --git a/ydb/library/workload/workload_factory.cpp b/ydb/library/workload/workload_factory.cpp
index f1a34ea78cc..823da997dc8 100644
--- a/ydb/library/workload/workload_factory.cpp
+++ b/ydb/library/workload/workload_factory.cpp
@@ -4,18 +4,17 @@
namespace NYdbWorkload {
- std::shared_ptr<IWorkloadQueryGenerator> TWorkloadFactory::GetWorkloadQueryGenerator(const std::string& workloadName,
- const TWorkloadParams* params)
+ std::shared_ptr<IWorkloadQueryGenerator> TWorkloadFactory::GetWorkloadQueryGenerator(const EWorkload& type , const TWorkloadParams* params)
{
if (!params) {
- return nullptr;
+ throw yexception() << "Params not specified";
}
- if (workloadName == "stock") {
+ if (type == EWorkload::STOCK) {
return std::shared_ptr<TStockWorkloadGenerator>(TStockWorkloadGenerator::New(static_cast<const TStockWorkloadParams*>(params)));
- } else {
- return nullptr;
}
+
+ throw yexception() << "Unknown workload";
}
} \ No newline at end of file
diff --git a/ydb/library/workload/workload_factory.h b/ydb/library/workload/workload_factory.h
index fbfaaa65467..53bd764a1ad 100644
--- a/ydb/library/workload/workload_factory.h
+++ b/ydb/library/workload/workload_factory.h
@@ -6,9 +6,13 @@
namespace NYdbWorkload {
+enum class EWorkload {
+ STOCK,
+};
+
class TWorkloadFactory {
public:
- std::shared_ptr<IWorkloadQueryGenerator> GetWorkloadQueryGenerator(const std::string& workloadName, const TWorkloadParams* params);
+ std::shared_ptr<IWorkloadQueryGenerator> GetWorkloadQueryGenerator(const EWorkload& type, const TWorkloadParams* params);
};
} // namespace NYdbWorkload \ No newline at end of file
diff --git a/ydb/library/workload/workload_query_generator.h b/ydb/library/workload/workload_query_generator.h
index 4eb02efb549..f48d3960246 100644
--- a/ydb/library/workload/workload_query_generator.h
+++ b/ydb/library/workload/workload_query_generator.h
@@ -31,6 +31,7 @@ public:
virtual std::string GetDDLQueries() const = 0;
virtual TQueryInfoList GetInitialData() = 0;
+ virtual std::string GetCleanDDLQueries() const = 0;
virtual TQueryInfoList GetWorkload(int type) = 0;
};
diff --git a/ydb/public/lib/ydb_cli/commands/stock_workload.cpp b/ydb/public/lib/ydb_cli/commands/stock_workload.cpp
index e00874b617c..240625924e0 100644
--- a/ydb/public/lib/ydb_cli/commands/stock_workload.cpp
+++ b/ydb/public/lib/ydb_cli/commands/stock_workload.cpp
@@ -16,6 +16,7 @@ TCommandStock::TCommandStock()
: TClientCommandTree("stock", {}, "YDB stock workload")
{
AddCommand(std::make_unique<TCommandStockInit>());
+ AddCommand(std::make_unique<TCommandStockClean>());
AddCommand(std::make_unique<TCommandStockRun>());
}
@@ -64,10 +65,7 @@ int TCommandStockInit::Run(TConfig& config) {
params.PartitionsByLoad = PartitionsByLoad;
NYdbWorkload::TWorkloadFactory factory;
- auto workloadGen = factory.GetWorkloadQueryGenerator("stock", &params);
- if (workloadGen.get() == nullptr) {
- throw TMisuseException() << "Invalid path to database." << Endl;
- }
+ auto workloadGen = factory.GetWorkloadQueryGenerator(NYdbWorkload::EWorkload::STOCK, &params);
auto session = GetSession();
auto result = session.ExecuteSchemeQuery(workloadGen->GetDDLQueries()).GetValueSync();
@@ -95,6 +93,42 @@ int TCommandStockInit::Run(TConfig& config) {
return EXIT_SUCCESS;
}
+TCommandStockClean::TCommandStockClean()
+ : TWorkloadCommand("clean", {}, "drop tables created in init phase") {}
+
+void TCommandStockClean::Config(TConfig& config) {
+ TWorkloadCommand::Config(config);
+ config.SetFreeArgsNum(0);
+}
+
+void TCommandStockClean::Parse(TConfig& config) {
+ TClientCommand::Parse(config);
+}
+
+int TCommandStockClean::Run(TConfig& config) {
+ Driver = std::make_unique<NYdb::TDriver>(CreateDriver(config));
+ TableClient = std::make_unique<NTable::TTableClient>(*Driver);
+ NYdbWorkload::TStockWorkloadParams params;
+ params.DbPath = config.Database;
+
+ NYdbWorkload::TWorkloadFactory factory;
+ auto workloadGen = factory.GetWorkloadQueryGenerator(NYdbWorkload::EWorkload::STOCK, &params);
+
+ auto session = GetSession();
+
+ auto query = workloadGen->GetCleanDDLQueries();
+ TStatus result(EStatus::SUCCESS, NYql::TIssues());
+ result = session.ExecuteSchemeQuery(TString(query)).GetValueSync();
+
+ if (!result.IsSuccess()) {
+ Cerr << "Query execution failed: " << result.GetIssues().ToString() << Endl
+ << "Query:\n" << query << Endl;
+ return EXIT_FAILURE;
+ }
+
+ return EXIT_SUCCESS;
+}
+
TCommandStockRun::TCommandStockRun()
: TClientCommandTree("run", {}, "Run YDB stock workload")
{
@@ -131,10 +165,7 @@ int TCommandStockRunInsertRandomOrder::Run(TConfig& config) {
params.ProductCount = ProductCount;
NYdbWorkload::TWorkloadFactory factory;
- auto workloadGen = factory.GetWorkloadQueryGenerator("stock", &params);
- if (workloadGen.get() == nullptr) {
- throw TMisuseException() << "Invalid path to database." << Endl;
- }
+ auto workloadGen = factory.GetWorkloadQueryGenerator(NYdbWorkload::EWorkload::STOCK, &params);
return RunWorkload(workloadGen, static_cast<int>(NYdbWorkload::TStockWorkloadGenerator::EType::InsertRandomOrder));
}
@@ -165,10 +196,7 @@ int TCommandStockRunSubmitRandomOrder::Run(TConfig& config) {
params.ProductCount = ProductCount;
NYdbWorkload::TWorkloadFactory factory;
- auto workloadGen = factory.GetWorkloadQueryGenerator("stock", &params);
- if (workloadGen.get() == nullptr) {
- throw TMisuseException() << "Invalid path to database." << Endl;
- }
+ auto workloadGen = factory.GetWorkloadQueryGenerator(NYdbWorkload::EWorkload::STOCK, &params);
return RunWorkload(workloadGen, static_cast<int>(NYdbWorkload::TStockWorkloadGenerator::EType::SubmitRandomOrder));
}
@@ -199,10 +227,7 @@ int TCommandStockRunSubmitSameOrder::Run(TConfig& config) {
params.ProductCount = ProductCount;
NYdbWorkload::TWorkloadFactory factory;
- auto workloadGen = factory.GetWorkloadQueryGenerator("stock", &params);
- if (workloadGen.get() == nullptr) {
- throw TMisuseException() << "Invalid path to database." << Endl;
- }
+ auto workloadGen = factory.GetWorkloadQueryGenerator(NYdbWorkload::EWorkload::STOCK, &params);
return RunWorkload(workloadGen, static_cast<int>(NYdbWorkload::TStockWorkloadGenerator::EType::SubmitSameOrder));
}
@@ -233,11 +258,7 @@ int TCommandStockRunGetRandomCustomerHistory::Run(TConfig& config) {
params.Limit = Limit;
NYdbWorkload::TWorkloadFactory factory;
- auto workloadGen = factory.GetWorkloadQueryGenerator("stock", &params);
- if (workloadGen.get() == nullptr) {
- throw TMisuseException() << "Invalid path to database." << Endl;
- }
-
+ auto workloadGen = factory.GetWorkloadQueryGenerator(NYdbWorkload::EWorkload::STOCK, &params);
return RunWorkload(workloadGen, static_cast<int>(NYdbWorkload::TStockWorkloadGenerator::EType::GetRandomCustomerHistory));
}
@@ -267,10 +288,7 @@ int TCommandStockRunGetCustomerHistory::Run(TConfig& config) {
params.Limit = Limit;
NYdbWorkload::TWorkloadFactory factory;
- auto workloadGen = factory.GetWorkloadQueryGenerator("stock", &params);
- if (workloadGen.get() == nullptr) {
- throw TMisuseException() << "Invalid path to database." << Endl;
- }
+ auto workloadGen = factory.GetWorkloadQueryGenerator(NYdbWorkload::EWorkload::STOCK, &params);
return RunWorkload(workloadGen, static_cast<int>(NYdbWorkload::TStockWorkloadGenerator::EType::GetCustomerHistory));
}
diff --git a/ydb/public/lib/ydb_cli/commands/stock_workload.h b/ydb/public/lib/ydb_cli/commands/stock_workload.h
index f755d632260..c2e12fb7cc9 100644
--- a/ydb/public/lib/ydb_cli/commands/stock_workload.h
+++ b/ydb/public/lib/ydb_cli/commands/stock_workload.h
@@ -25,6 +25,15 @@ private:
bool PartitionsByLoad;
};
+class TCommandStockClean : public TWorkloadCommand {
+public:
+ TCommandStockClean();
+ virtual void Config(TConfig& config) override;
+ virtual void Parse(TConfig& config) override;
+ virtual int Run(TConfig& config) override;
+
+};
+
class TCommandStockRun : public TClientCommandTree {
public:
TCommandStockRun();