diff options
author | mdartemenko <[email protected]> | 2022-08-12 16:22:34 +0300 |
---|---|---|
committer | mdartemenko <[email protected]> | 2022-08-12 16:22:34 +0300 |
commit | 04e2f0cc097bf5f211d7e2297d9ea8d2a410d4a2 (patch) | |
tree | 19cb76663c4652ff37420a87892c21a98a36d6f2 | |
parent | 2fda6d5f28b0008d1cdd6fa14c54913b0f06daaa (diff) |
KqpLoadActor
kqp load actor for query processing benchmark
-rw-r--r-- | CMakeLists.darwin.txt | 4 | ||||
-rw-r--r-- | CMakeLists.linux.txt | 4 | ||||
-rw-r--r-- | ydb/core/blobstorage/testload/CMakeLists.txt | 1 | ||||
-rw-r--r-- | ydb/core/blobstorage/testload/test_load_kqp.cpp | 809 | ||||
-rw-r--r-- | ydb/core/protos/blobstorage.proto | 25 | ||||
-rw-r--r-- | ydb/library/workload/stock_workload.cpp | 10 | ||||
-rw-r--r-- | ydb/library/workload/stock_workload.h | 4 | ||||
-rw-r--r-- | ydb/library/workload/workload_factory.cpp | 11 | ||||
-rw-r--r-- | ydb/library/workload/workload_factory.h | 6 | ||||
-rw-r--r-- | ydb/library/workload/workload_query_generator.h | 1 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/commands/stock_workload.cpp | 68 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/commands/stock_workload.h | 9 |
12 files changed, 746 insertions, 206 deletions
diff --git a/CMakeLists.darwin.txt b/CMakeLists.darwin.txt index 74119ab78b6..ce7866efa47 100644 --- a/CMakeLists.darwin.txt +++ b/CMakeLists.darwin.txt @@ -411,6 +411,8 @@ add_subdirectory(ydb/core/blobstorage/nodewarden) add_subdirectory(ydb/core/blob_depot/agent) add_subdirectory(ydb/core/blobstorage/other) add_subdirectory(ydb/core/blobstorage/testload) +add_subdirectory(library/cpp/histogram/hdr) +add_subdirectory(contrib/libs/hdr_histogram) add_subdirectory(ydb/core/keyvalue) add_subdirectory(ydb/core/engine/minikql) add_subdirectory(ydb/core/client/minikql_compile) @@ -980,8 +982,6 @@ add_subdirectory(ydb/library/yql/udfs/logs/dsv) add_subdirectory(ydb/apps/ydb) add_subdirectory(ydb/apps/ydb/commands) add_subdirectory(ydb/public/lib/ydb_cli/commands) -add_subdirectory(library/cpp/histogram/hdr) -add_subdirectory(contrib/libs/hdr_histogram) add_subdirectory(library/cpp/threading/local_executor) add_subdirectory(contrib/libs/tbb) add_subdirectory(ydb/library/backup) diff --git a/CMakeLists.linux.txt b/CMakeLists.linux.txt index 93586903ff2..5d3560f6e14 100644 --- a/CMakeLists.linux.txt +++ b/CMakeLists.linux.txt @@ -414,6 +414,8 @@ add_subdirectory(ydb/core/blobstorage/nodewarden) add_subdirectory(ydb/core/blob_depot/agent) add_subdirectory(ydb/core/blobstorage/other) add_subdirectory(ydb/core/blobstorage/testload) +add_subdirectory(library/cpp/histogram/hdr) +add_subdirectory(contrib/libs/hdr_histogram) add_subdirectory(ydb/core/keyvalue) add_subdirectory(ydb/core/engine/minikql) add_subdirectory(ydb/core/client/minikql_compile) @@ -987,8 +989,6 @@ add_subdirectory(ydb/library/yql/udfs/logs/dsv) add_subdirectory(ydb/apps/ydb) add_subdirectory(ydb/apps/ydb/commands) add_subdirectory(ydb/public/lib/ydb_cli/commands) -add_subdirectory(library/cpp/histogram/hdr) -add_subdirectory(contrib/libs/hdr_histogram) add_subdirectory(library/cpp/threading/local_executor) add_subdirectory(contrib/libs/tbb) add_subdirectory(ydb/library/backup) diff --git a/ydb/core/blobstorage/testload/CMakeLists.txt b/ydb/core/blobstorage/testload/CMakeLists.txt index f50aae1d18c..6f3a2076fe5 100644 --- a/ydb/core/blobstorage/testload/CMakeLists.txt +++ b/ydb/core/blobstorage/testload/CMakeLists.txt @@ -11,6 +11,7 @@ add_library(core-blobstorage-testload) target_link_libraries(core-blobstorage-testload PUBLIC contrib-libs-cxxsupp yutil + cpp-histogram-hdr contrib-libs-protobuf monlib-dynamic_counters-percentile monlib-service-pages diff --git a/ydb/core/blobstorage/testload/test_load_kqp.cpp b/ydb/core/blobstorage/testload/test_load_kqp.cpp index f4601596075..1f8e0519639 100644 --- a/ydb/core/blobstorage/testload/test_load_kqp.cpp +++ b/ydb/core/blobstorage/testload/test_load_kqp.cpp @@ -21,6 +21,7 @@ #include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h> #include <library/cpp/monlib/service/pages/templates.h> +#include <library/cpp/histogram/hdr/histogram.h> #include <util/generic/queue.h> #include <util/random/fast.h> @@ -29,233 +30,729 @@ namespace NKikimr { -class TEvKqpWriterTestLoadActor; +enum { + EvKqpWorkerResponse +}; -class TKqpWriterTestLoadActor : public TActorBootstrapped<TKqpWriterTestLoadActor> { - struct TRequestInfo { - ui32 Size; - TInstant LogStartTime; - }; - - struct TRequestStat { - ui64 BytesWrittenTotal; - ui32 Size; - TDuration Latency; - }; - - struct TLogWriteCookie { - ui32 WorkerIdx; - TInstant SentTime; - ui64 Size; - }; - - ui64 Key; - ui64 FirstKey; - ui64 TotalRowsToUpsert; - ui64 MaxInFlight; - ui64 UniformPartitionsCount; - ui32 NumOfSessions; - TActorId Pipe; - std::vector<TString> TxId; +struct MonitoringData { +public: + MonitoringData() + : WindowHist(60000, 2) + , WindowErrors(0) {} - const TActorId Parent; - ui64 Tag; - ui32 DurationSeconds; - ui32 StringValueSize; - bool SequentialWrite; - TString StringValue; + MonitoringData(const NHdr::THistogram& hist, ui64 window_errors) + : WindowHist(60000, 2) + , WindowErrors(window_errors) + { + WindowHist.Add(hist); + } + + void Add(const MonitoringData& other) { + WindowHist.Add(other.WindowHist); + WindowErrors += other.WindowErrors; + } + +public: + NHdr::THistogram WindowHist; + ui64 WindowErrors; + +}; + +struct TEvKqpWorkerResponse : TEventLocal<TEvKqpWorkerResponse, EvKqpWorkerResponse> { +public: + TEvKqpWorkerResponse(const NHdr::THistogram& hist, ui64 window_errors, ui64 phase, ui64 worker_tag) + : Data(hist, window_errors) + , Phase(phase) + , WorkerTag(worker_tag) {} + +public: + MonitoringData Data; + ui64 Phase; + ui64 WorkerTag; + +}; + +void ConvertYdbParamsToMiniKQLParams(const NYdb::TParams& input, NKikimrMiniKQL::TParams& output) { + output.MutableType()->SetKind(NKikimrMiniKQL::ETypeKind::Struct); + auto type = output.MutableType()->MutableStruct(); + auto value = output.MutableValue(); + for (const auto& p : input.GetValues()) { + auto typeMember = type->AddMember(); + auto valueItem = value->AddStruct(); + typeMember->SetName(p.first); + ConvertYdbTypeToMiniKQLType(NYdb::TProtoAccessor::GetProto(p.second.GetType()), *typeMember->MutableType()); + ConvertYdbValueToMiniKQLValue(NYdb::TProtoAccessor::GetProto(p.second.GetType()), NYdb::TProtoAccessor::GetProto(p.second), *valueItem); + } +} + +class TKqpLoadWorker : public TActorBootstrapped<TKqpLoadWorker> { +public: + TKqpLoadWorker(TActorId parent, + TString working_dir, + std::shared_ptr<NYdbWorkload::IWorkloadQueryGenerator> workload_query_gen, + ui64 workload_type, + ui64 parentTag, + ui64 workerTag, + ui64 durationSeconds, + ui64 windowDuration, + ui64 windowCount, + NMonitoring::TDynamicCounters::TCounterPtr transactions, + NMonitoring::TDynamicCounters::TCounterPtr transactionsBytesWritten) + : Parent(std::move(parent)) + , WorkingDir(std::move(working_dir)) + , WorkloadQueryGen(workload_query_gen) + , WorkloadType(workload_type) + , ParentTag(parentTag) + , WorkerTag(workerTag) + , DurationSeconds(durationSeconds) + , WindowHist(60000, 2) + , WindowDuration(windowDuration) + , WindowCount(windowCount) + , Transactions(transactions) + , TransactionsBytesWritten(transactionsBytesWritten) {} + + void Bootstrap(const TActorContext& ctx) { + LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag << " TKqpLoadWorker Bootstrap called"); + + ctx.Schedule(TDuration::Seconds(DurationSeconds), new TEvents::TEvPoisonPill); + ctx.Schedule(TDuration::Seconds(WindowDuration), new TEvUpdateMonitoring); + + Become(&TKqpLoadWorker::StateFunc); + CreateWorkingSession(ctx); + } + + STRICT_STFUNC(StateFunc, + CFunc(TEvents::TSystem::PoisonPill, HandlePoisonPill) + HFunc(NKqp::TEvKqp::TEvQueryResponse, HandleResponse) + HFunc(NKqp::TEvKqp::TEvCreateSessionResponse, HandleCreateSessionResponse) + HFunc(TEvUpdateMonitoring, HandleWindowTimer) + ) + +private: + + // death + + void HandlePoisonPill(const TActorContext& ctx) { + LOG_INFO_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag << " HandlePoisonPill"); + + if (Phase < WindowCount) { + SendMonitoringEvent(ctx); + } + + CloseSession(ctx); + Die(ctx); + } + + void CloseSession(const TActorContext& ctx) { + LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag << " creating event for session close"); + + auto ev = MakeHolder<NKqp::TEvKqp::TEvCloseSessionRequest>(); + ev->Record.MutableRequest()->SetSessionId(WorkerSession); + + auto kqp_proxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()); + LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag + << " sending session close query to proxy: " + kqp_proxy.ToString()); + + ctx.Send(kqp_proxy, ev.Release()); + } + +private: + + // working + + void CreateWorkingSession(const TActorContext& ctx) { + LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag << " creating event for session creation"); + auto ev = MakeHolder<NKqp::TEvKqp::TEvCreateSessionRequest>(); + + ev->Record.MutableRequest()->SetDatabase(WorkingDir); + + auto kqp_proxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()); + LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag + << " sending event for session creation to proxy: " << kqp_proxy.ToString()); + + Send(kqp_proxy, ev.Release()); + } + + void HandleCreateSessionResponse(NKqp::TEvKqp::TEvCreateSessionResponse::TPtr& ev, const TActorContext& ctx) { + auto& response = ev->Get()->Record; + + if (response.GetYdbStatus() == Ydb::StatusIds_StatusCode_SUCCESS) { + WorkerSession = response.GetResponse().GetSessionId(); + LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag << " Session is created: " + WorkerSession); + CreateDataQuery(ctx); + } else { + LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag + << " Session creation failed: " + ev->Get()->ToString()); + } + } + + void CreateDataQuery(const TActorContext& ctx) { + if (queries.empty()) { + queries = WorkloadQueryGen->GetWorkload(WorkloadType); + } + + auto q = std::move(queries.front()); + queries.pop_front(); + + LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag + << " query type: " << WorkloadType << ", params size: " << q.Params.GetValues().size()); + + Transactions->Inc(); + + TString query_text = TString(q.Query); + NYdb::TParams query_params = q.Params; + + auto request = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(); + + LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag << " using session: " << WorkerSession); + + request->Record.MutableRequest()->SetSessionId(WorkerSession); + request->Record.MutableRequest()->SetKeepSession(true); + request->Record.MutableRequest()->SetDatabase(WorkingDir); + + request->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE); + request->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DML); + request->Record.MutableRequest()->SetQuery(query_text); + + request->Record.MutableRequest()->MutableQueryCachePolicy()->set_keep_in_cache(true); + request->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write(); + request->Record.MutableRequest()->MutableTxControl()->set_commit_tx(true); + + request->Record.MutableRequest()->SetCollectStats(Ydb::Table::QueryStatsCollection_Mode::QueryStatsCollection_Mode_STATS_COLLECTION_FULL); + + NKikimrMiniKQL::TParams params; + ConvertYdbParamsToMiniKQLParams(query_params, params); + request->Record.MutableRequest()->MutableParameters()->Swap(¶ms); + + auto kqp_proxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()); + LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag + << " sending data query to proxy: " + kqp_proxy.ToString()); + + ctx.Send(kqp_proxy, request.Release()); + + } + + void HandleResponse(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) { + auto& response = ev->Get()->Record.GetRef(); + + Transactions->Dec(); + + if (response.GetYdbStatus() == Ydb::StatusIds_StatusCode_SUCCESS) { + LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag << " data request status: Success"); + TransactionsBytesWritten->Add(response.GetResponse().GetQueryStats().ByteSize()); + WindowHist.RecordValue(response.GetResponse().GetQueryStats().GetDurationUs()); + } else { + LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag + << " data request status: Fail, Issue: " + ev->Get()->ToString()); + ++WindowErrors; + } + + if (Phase < WindowCount) { + CreateDataQuery(ctx); + } + } + +private: + + // monitoring + + void HandleWindowTimer(TEvUpdateMonitoring::TPtr& /*ev*/, const TActorContext& ctx) { + LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag + << " handle TEvUpdateMonitoring, Phase: " << Phase); + + SendMonitoringEvent(ctx); + + if (Phase < WindowCount) { + LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag + << " reschedule TEvUpdateMonitoring, Phase: " << Phase); + ctx.Schedule(TDuration::Seconds(WindowDuration), new TEvUpdateMonitoring); + } + } + +private: + + // common + + void SendMonitoringEvent(const TActorContext& ctx) { + auto ev = MakeHolder<TEvKqpWorkerResponse>(WindowHist, WindowErrors, Phase, WorkerTag); + + WindowHist.Reset(); + WindowErrors = 0; + ++Phase; + + ctx.Send(Parent, ev.Release()); + } + +private: + TActorId Parent; TString WorkingDir; - size_t ProductCount; - size_t Quantity; - bool DeleteTableOnFinish; - std::vector<TString> preparedQuery; std::shared_ptr<NYdbWorkload::IWorkloadQueryGenerator> WorkloadQueryGen; + ui64 WorkloadType; + ui64 ParentTag; + ui64 WorkerTag; - TReallyFastRng32 Rng; + NYdbWorkload::TQueryInfoList queries; - std::unordered_map<TString, std::queue<TInstant>> SentTime; + TString WorkerSession = "wrong sessionId"; + ui64 DurationSeconds = 1; - // Monitoring - TIntrusivePtr<::NMonitoring::TDynamicCounters> LoadCounters; - ::NMonitoring::TDynamicCounters::TCounterPtr Transactions; - ::NMonitoring::TDynamicCounters::TCounterPtr TransactionsBytesWritten; - TInstant TestStartTime; +private: + // for monitoring + NHdr::THistogram WindowHist; + ui64 WindowErrors = 0; + + ui64 WindowDuration; + ui64 WindowCount; - TMap<ui64, TLogWriteCookie> InFlightWrites; - NMonitoring::TPercentileTrackerLg<6, 5, 15> ResponseTimes; - std::vector<TString> Sessions; - TString PreparedSelectQuery; + ui64 Phase = 0; + NMonitoring::TDynamicCounters::TCounterPtr Transactions; + NMonitoring::TDynamicCounters::TCounterPtr TransactionsBytesWritten; + +}; + +class TKqpWriterTestLoadActor : public TActorBootstrapped<TKqpWriterTestLoadActor> { public: static constexpr auto ActorActivityType() { return NKikimrServices::TActivity::KQP_TEST_WORKLOAD; } TKqpWriterTestLoadActor(const NKikimrBlobStorage::TEvTestLoadRequest::TKqpLoadStart& cmd, - const TActorId& parent, const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters, ui64 index, ui64 tag) + const TActorId& parent, + const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters, + ui64 index, + ui64 tag) : Parent(parent) , Tag(tag) - , Rng(Now().GetValue()) { Y_UNUSED(index); VERIFY_PARAM(DurationSeconds); - DurationSeconds = cmd.GetDurationSeconds(); - NumOfSessions = cmd.GetNumOfSessions(); - MaxInFlight = cmd.GetMaxInFlight(); - Key = cmd.GetFirstKey(); - FirstKey = Key; - TotalRowsToUpsert = cmd.GetTotalRowsToUpsert(); - StringValueSize = cmd.GetStringValueSize(); - DeleteTableOnFinish = cmd.GetDeleteTableOnFinish(); + + google::protobuf::TextFormat::PrintToString(cmd, &ConfingString); + UniformPartitionsCount = cmd.GetUniformPartitionsCount(); - SequentialWrite = cmd.GetSequentialWrite(); - StringValue = TString(StringValueSize, 'a'); + DeleteTableOnFinish = cmd.GetDeleteTableOnFinish(); WorkingDir = cmd.GetWorkingDir(); - - if (cmd.Workload_case() == NKikimrBlobStorage::TEvTestLoadRequest_TKqpLoadStart::WorkloadCase::kStock) { - ProductCount = cmd.GetStock().GetProductCount(); - Quantity = cmd.GetStock().GetQuantity(); + WorkloadType = cmd.GetWorkloadType(); + DurationSeconds = cmd.GetDurationSeconds(); + WindowDuration = cmd.GetWindowDuration(); + WindowCount = (DurationSeconds + WindowDuration - 1) / WindowDuration; + NumOfSessions = cmd.GetNumOfSessions(); + ChunkLoad.resize(WindowCount); + Chunk.reserve(WindowCount); + Total = std::make_unique<MonitoringData>(); + for (size_t i = 0; i < WindowCount; ++i) { + Chunk.push_back(std::make_unique<MonitoringData>()); } + + NYdbWorkload::TWorkloadFactory factory; - if (cmd.GetWorkloadName() != "stock") { + if (cmd.Workload_case() == NKikimrBlobStorage::TEvTestLoadRequest_TKqpLoadStart::WorkloadCase::kStock) { + NYdbWorkload::TStockWorkloadParams params; + params.PartitionsByLoad = cmd.GetStock().GetPartitionsByLoad(); + params.OrderCount = cmd.GetStock().GetOrderCount(); + params.ProductCount = cmd.GetStock().GetProductCount(); + params.Quantity = cmd.GetStock().GetQuantity(); + params.Limit = cmd.GetStock().GetLimit(); + params.DbPath = WorkingDir; + params.MinPartitions = UniformPartitionsCount; + WorkloadQueryGen = factory.GetWorkloadQueryGenerator(NYdbWorkload::EWorkload::STOCK, ¶ms); + } else { return; } - - NYdbWorkload::TStockWorkloadParams params; - params.DbPath = WorkingDir; - params.PartitionsByLoad = true; - - NYdbWorkload::TWorkloadFactory factory; - WorkloadQueryGen = factory.GetWorkloadQueryGenerator(cmd.GetWorkloadName(), ¶ms); + Y_ASSERT(WorkloadQueryGen.get() != nullptr); Y_ASSERT(DurationSeconds > DelayBeforeMeasurements.Seconds()); // Monitoring initialization - TVector<float> percentiles {0.1f, 0.5f, 0.9f, 0.99f, 0.999f, 1.0f}; + LoadCounters = counters->GetSubgroup("tag", Sprintf("%" PRIu64, tag)); Transactions = LoadCounters->GetCounter("Transactions", true); TransactionsBytesWritten = LoadCounters->GetCounter("TransactionsBytesWritten", true); - - ResponseTimes.Initialize(LoadCounters, "subsystem", "LoadActorLogWriteDuration", "Time in microseconds", percentiles); } ~TKqpWriterTestLoadActor() { LoadCounters->ResetCounters(); } - void Bootstrap(const TActorContext& ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag - << " TKqpWriterTestLoadActor Bootstrap called"); - Become(&TKqpWriterTestLoadActor::StateFunc); + LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " TKqpWriterTestLoadActor Bootstrap called"); + Become(&TKqpWriterTestLoadActor::StateStart); + LOG_INFO_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " Schedule PoisonPill"); - ctx.Schedule(TDuration::Seconds(DurationSeconds), new TEvents::TEvPoisonPill); - ctx.Schedule(TDuration::MilliSeconds(MonitoringUpdateCycleMs), new TEvUpdateMonitoring); + ctx.Schedule(TDuration::Seconds(DurationSeconds * 2), new TEvents::TEvPoisonPill); - LOG_INFO_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " Bootstrap"); - - CreateSession(ctx); + CreateSessionForTablesDDL(ctx); } - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - // Death management - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + STRICT_STFUNC(StateStart, + CFunc(TEvents::TSystem::PoisonPill, HandlePoisonPill) + HFunc(NKqp::TEvKqp::TEvCreateSessionResponse, HandleCreateSessionResponse) + HFunc(NKqp::TEvKqp::TEvQueryResponse, HandleCreateTableResponse) + HFunc(NMon::TEvHttpInfo, HandleHTML) + ) + + STRICT_STFUNC(StateMain, + CFunc(TEvents::TSystem::PoisonPill, HandlePoisonPill) + HFunc(NKqp::TEvKqp::TEvQueryResponse, HandleDataQueryResponse) + HFunc(TEvKqpWorkerResponse, HandleMonitoring) + HFunc(NMon::TEvHttpInfo, HandleHTML) + ) + + STRICT_STFUNC(StateEndOfWork, + HFunc(NKqp::TEvKqp::TEvQueryResponse, HandleDropTablesResponse) + ) + +private: + + // death void HandlePoisonPill(const TActorContext& ctx) { - LOG_INFO_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " HandlePoisonPill, " - << "all workers is initialized, so starting death process"); + LOG_INFO_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " HandlePoisonPill, starting death process"); StartDeathProcess(ctx); } void StartDeathProcess(const TActorContext& ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag - << " TKqpWriterTestLoadActor StartDeathProcess called"); - for(const auto& session : Sessions) { - auto request = std::make_unique<NKqp::TEvKqp::TEvCloseSessionRequest>(); - request->Record.MutableRequest()->SetSessionId(session); - ctx.Send( new IEventHandle(NKqp::MakeKqpProxyID(1), SelfId(), request.release(), - 0, /* via actor system */ true)); - } + LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " TKqpWriterTestLoadActor StartDeathProcess called"); Become(&TKqpWriterTestLoadActor::StateEndOfWork); + + if (DeleteTableOnFinish) { + DropTables(ctx); + } else { + DeathReport(ctx); + } + } + + void DropTables(const TActorContext& ctx) { + LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " creating event for tables drop"); + + auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(); + ev->Record.MutableRequest()->SetDatabase(WorkingDir); + ev->Record.MutableRequest()->SetSessionId(TableSession); + ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE); + ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DDL); + ev->Record.MutableRequest()->SetQuery(WorkloadQueryGen->GetCleanDDLQueries()); + + auto kqp_proxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()); + LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " sending drop tables query to proxy: " + kqp_proxy.ToString()); + + ctx.Send(kqp_proxy, ev.Release()); + } + + void HandleDropTablesResponse(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) { + auto& response = ev->Get()->Record.GetRef(); + + if (response.GetYdbStatus() == Ydb::StatusIds_StatusCode_SUCCESS) { + LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " drop tables status: SUCCESS"); + } else { + LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " drop tables status: FAIL, reason: " + ev->Get()->ToString()); + } + + DeathReport(ctx); + } + + void DeathReport(const TActorContext& ctx) { + CloseSession(ctx); + TIntrusivePtr<TLoadReport> Report(new TLoadReport()); Report->Duration = TDuration::Seconds(DurationSeconds); + ctx.Send(Parent, new TEvTestLoadFinished(Tag, Report, "OK called StartDeathProcess")); Die(ctx); } - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - // Monitoring - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +private: - void Handle(TEvUpdateMonitoring::TPtr& /*ev*/, const TActorContext& ctx) { - ResponseTimes.Update(); - ctx.Schedule(TDuration::MilliSeconds(MonitoringUpdateCycleMs), new TEvUpdateMonitoring); + // monitoring + void HandleMonitoring(TEvKqpWorkerResponse::TPtr& ev, const TActorContext& ctx) { + const auto& response = ev->Get(); + + LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " got monitoring response from worker Tag# " << response->WorkerTag + << " Phase: " << response->Phase + << " Min: " << response->Data.WindowHist.GetMin() + << " Max: " << response->Data.WindowHist.GetMax() + << " Count: " << response->Data.WindowHist.GetTotalCount()); + + Chunk[response->Phase]->Add(response->Data); + ChunkLoad[response->Phase] += 1; + + if (ChunkLoad[Phase] == NumOfSessions) { + Total->Add(*Chunk[Phase]); + SendNewRowToParent(ctx); + } } - STRICT_STFUNC(StateFunc, - CFunc(TEvents::TSystem::PoisonPill, HandlePoisonPill) - HFunc(TEvUpdateMonitoring, Handle) - ) + void SendNewRowToParent(const TActorContext& ctx) { + Phase += 1; + + LOG_INFO_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag + << " total: Phase: " << Phase << " -> " + << Total->WindowHist.GetTotalCount() << " | " + << Total->WindowHist.GetTotalCount() / (WindowDuration * std::max(ui64(1), Phase) * 1.0) << " | " + << Total->WindowErrors << " | " + << Total->WindowHist.GetValueAtPercentile(50.0) / (WindowDuration * 1000.0) << " | " + << Total->WindowHist.GetValueAtPercentile(95.0) / (WindowDuration * 1000.0) << " | " + << Total->WindowHist.GetValueAtPercentile(99.0) / (WindowDuration * 1000.0) << " | " + << Total->WindowHist.GetMax() / (WindowDuration * 1000.0) + ); + + if (Phase >= WindowCount) { + StartDeathProcess(ctx); + } + } - STRICT_STFUNC(StateEndOfWork, - HFunc(TEvUpdateMonitoring, Handle) - ) +private: + + // creating tables + + void CreateSessionForTablesDDL(const TActorContext& ctx) { + LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " creating event for session creation"); + auto ev = MakeHolder<NKqp::TEvKqp::TEvCreateSessionRequest>(); + + ev->Record.MutableRequest()->SetDatabase(WorkingDir); + + auto kqp_proxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()); + LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " sending event for session creation to proxy: " << kqp_proxy.ToString()); + + Send(kqp_proxy, ev.Release()); + } + + void HandleCreateSessionResponse(NKqp::TEvKqp::TEvCreateSessionResponse::TPtr& ev, const TActorContext& ctx) { + auto& response = ev->Get()->Record; + + if (response.GetYdbStatus() == Ydb::StatusIds_StatusCode_SUCCESS) { + TableSession = response.GetResponse().GetSessionId(); + LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " Session is created: " + TableSession); + CreateTables(ctx); + } else { + LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " Session creation failed: " + ev->Get()->ToString()); + } + } + + void CreateTables(const TActorContext& ctx) { + LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " creating event for tables creation"); + + auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(); + ev->Record.MutableRequest()->SetDatabase(WorkingDir); + ev->Record.MutableRequest()->SetSessionId(TableSession); + ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE); + ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DDL); + ev->Record.MutableRequest()->SetQuery(WorkloadQueryGen->GetDDLQueries()); + + auto kqp_proxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()); + LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " sending ddl query to proxy: " + kqp_proxy.ToString()); + + ctx.Send(kqp_proxy, ev.Release()); + } + + void HandleCreateTableResponse(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) { + auto& response = ev->Get()->Record.GetRef(); + + Become(&TKqpWriterTestLoadActor::StateMain); + + if (response.GetYdbStatus() == Ydb::StatusIds_StatusCode_SUCCESS) { + LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " tables are created"); + InitData = WorkloadQueryGen->GetInitialData(); + InsertInitData(ctx); + } else { + LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " tables creation failed: " + ev->Get()->ToString()); + } + } private: - void CreateSession(const TActorContext& ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Creating session to run tables DDL."); - auto request = Ydb::Table::CreateSessionRequest(); - auto cb = [this, &ctx](const Ydb::Table::CreateSessionResponse& resp) { - auto op = resp.Getoperation(); - if (op.Getready()) { - auto status = op.Getstatus(); - if (status == Ydb::StatusIds_StatusCode_SUCCESS) { - Ydb::Table::CreateSessionResult result; - op.result().UnpackTo(&result); - TString sessionId = result.session_id(); - LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Session is created: " + sessionId); - CreateShardedTable(ctx, sessionId); - } else { - LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Session creation failed " + std::to_string((int)status) - + " Issue: " + op.Getissues(0).Getmessage()); - } - } - }; - using namespace NGRpcService; - using TEvCreateSessionRequest = TGrpcRequestOperationCall<Ydb::Table::CreateSessionRequest, - Ydb::Table::CreateSessionResponse>; - NKikimr::NRpcService::DoLocalRpcSameMailbox<TEvCreateSessionRequest>( - std::move(request), std::move(cb), WorkingDir, TString(), ctx - ); + // table initialization + + void InsertInitData(const TActorContext& ctx) { + if (InitData.empty()) { + InitWorkers(ctx); + return; + } + + auto q = std::move(InitData.front()); + InitData.pop_front(); + + LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << Tag + << " Creating request for init query, need to exec: " << InitData.size() + 1); + + TString query_text = TString(q.Query); + NYdb::TParams query_params = q.Params; + + auto request = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(); + + LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << Tag << " using session: " << TableSession); + + request->Record.MutableRequest()->SetSessionId(TableSession); + request->Record.MutableRequest()->SetKeepSession(true); + request->Record.MutableRequest()->SetDatabase(WorkingDir); + + request->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE); + request->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DML); + request->Record.MutableRequest()->SetQuery(query_text); + + request->Record.MutableRequest()->MutableQueryCachePolicy()->set_keep_in_cache(true); + request->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write(); + request->Record.MutableRequest()->MutableTxControl()->set_commit_tx(true); + + request->Record.MutableRequest()->SetCollectStats(Ydb::Table::QueryStatsCollection_Mode::QueryStatsCollection_Mode_STATS_COLLECTION_FULL); + + NKikimrMiniKQL::TParams params; + ConvertYdbParamsToMiniKQLParams(query_params, params); + request->Record.MutableRequest()->MutableParameters()->Swap(¶ms); + + auto kqp_proxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()); + LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << Tag + << " sending init query to proxy: " + kqp_proxy.ToString()); + + ctx.Send(kqp_proxy, request.Release()); + } + + void HandleDataQueryResponse(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) { + auto& response = ev->Get()->Record.GetRef(); + + if (response.GetYdbStatus() == Ydb::StatusIds_StatusCode_SUCCESS) { + LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " init query status: SUCCESS"); + } else { + LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " init query status: FAIL, reason: " + ev->Get()->ToString()); + } + + InsertInitData(ctx); } - void CreateShardedTable(const TActorContext& ctx, const TString& sessionId) { - LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Creating tables for workload."); - auto request = Ydb::Table::ExecuteSchemeQueryRequest(); - request.set_session_id(sessionId); - request.set_yql_text(WorkloadQueryGen->GetDDLQueries()); - auto cb = [&ctx](const Ydb::Table::ExecuteSchemeQueryResponse& resp) { - auto op = resp.Getoperation(); - if (op.Getready()) { - auto status = op.Getstatus(); - if (status == Ydb::StatusIds_StatusCode_SUCCESS) { - LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tables are created."); - } else { - LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tables creation failed " + std::to_string((int)status) - + " Issue: " + op.Getissues(0).Getmessage()); +private: + + // html render + + void HandleHTML(NMon::TEvHttpInfo::TPtr& ev, const TActorContext& ctx) { + TStringStream str; + HTML(str) { + TABLE_CLASS("table table-condensed") { + TABLEHEAD() { + TABLER() { + TABLEH() { + str << "Window"; + } + TABLEH() { + str << "Txs"; + } + TABLEH() { + str << "Txs/Sec"; + } + TABLEH() { + str << "Errors"; + } + TABLEH() { + str << "p50(ms)"; + } + TABLEH() { + str << "p95(ms)"; + } + TABLEH() { + str << "p99(ms)"; + } + TABLEH() { + str << "pMax(ms)"; + } + } + } + TABLEBODY() { + TABLER() { + TABLED() { str << "total"; }; + TABLED() { str << Total->WindowHist.GetTotalCount(); }; + TABLED() { str << Total->WindowHist.GetTotalCount() / (WindowDuration * std::max(ui64(1), Phase) * 1.0); }; + TABLED() { str << Total->WindowErrors; }; + TABLED() { str << Total->WindowHist.GetValueAtPercentile(50.0) / (WindowDuration * 1000.0); }; + TABLED() { str << Total->WindowHist.GetValueAtPercentile(95.0) / (WindowDuration * 1000.0); }; + TABLED() { str << Total->WindowHist.GetValueAtPercentile(99.0) / (WindowDuration * 1000.0); }; + TABLED() { str << Total->WindowHist.GetMax() / (WindowDuration * 1000.0); }; + } + for (size_t i = Phase; i >= 1; --i) { + TABLER() { + TABLED() { str << i; }; + TABLED() { str << Chunk[i - 1]->WindowHist.GetTotalCount(); }; + TABLED() { str << Chunk[i - 1]->WindowHist.GetTotalCount() / (WindowDuration * 1.0); }; + TABLED() { str << Chunk[i - 1]->WindowErrors; }; + TABLED() { str << Chunk[i - 1]->WindowHist.GetValueAtPercentile(50.0) / (WindowDuration * 1000.0); }; + TABLED() { str << Chunk[i - 1]->WindowHist.GetValueAtPercentile(95.0) / (WindowDuration * 1000.0); }; + TABLED() { str << Chunk[i - 1]->WindowHist.GetValueAtPercentile(99.0) / (WindowDuration * 1000.0); }; + TABLED() { str << Chunk[i - 1]->WindowHist.GetMax() / (WindowDuration * 1000.0); }; + } + } } } - }; - using namespace NGRpcService; - using TEvExecuteSchemeQueryRequest = TGrpcRequestOperationCall<Ydb::Table::ExecuteSchemeQueryRequest, - Ydb::Table::ExecuteSchemeQueryResponse>; - NKikimr::NRpcService::DoLocalRpcSameMailbox<TEvExecuteSchemeQueryRequest>( - std::move(request), std::move(cb), WorkingDir, TString(), ctx - ); + COLLAPSED_BUTTON_CONTENT(Sprintf("configProtobuf%" PRIu64, Tag), "Config") { + str << "<pre>" << ConfingString << "</pre>"; + } + } + ctx.Send(ev->Sender, new NMon::TEvHttpInfoRes(str.Str(), ev->Get()->SubRequestId)); + } + + +private: + + // common + + void InitWorkers(const TActorContext& ctx) { + for (ui64 i = 0; i < NumOfSessions; ++i) { + auto* worker = new TKqpLoadWorker( + SelfId(), + WorkingDir, + WorkloadQueryGen, + WorkloadType, + Tag, + i, + DurationSeconds, + WindowDuration, + WindowCount, + Transactions, + TransactionsBytesWritten); + Workers.push_back(ctx.Register(worker)); + } } + void CloseSession(const TActorContext& ctx) { + LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " creating event for session close"); + + auto ev = MakeHolder<NKqp::TEvKqp::TEvCloseSessionRequest>(); + ev->Record.MutableRequest()->SetSessionId(TableSession); + + auto kqp_proxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()); + LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " sending session close query to proxy: " + kqp_proxy.ToString()); + + ctx.Send(kqp_proxy, ev.Release()); + } + +private: + TString TableSession = "wrong sessionId"; + TString WorkingDir; + ui64 WorkloadType; + ui64 WindowCount; + ui64 WindowDuration; + std::vector<TActorId> Workers; + TString ConfingString; + ui64 UniformPartitionsCount; + bool DeleteTableOnFinish; + ui32 NumOfSessions; + + NYdbWorkload::TQueryInfoList InitData; + + const TActorId Parent; + ui64 Tag; + ui32 DurationSeconds; + std::shared_ptr<NYdbWorkload::IWorkloadQueryGenerator> WorkloadQueryGen; + + // Monitoring + std::vector<std::unique_ptr<MonitoringData>> Chunk; + std::vector<ui64> ChunkLoad; + std::unique_ptr<MonitoringData> Total; + ui64 Phase = 0; + + // counters + TIntrusivePtr<::NMonitoring::TDynamicCounters> LoadCounters; + NMonitoring::TDynamicCounters::TCounterPtr Transactions; + NMonitoring::TDynamicCounters::TCounterPtr TransactionsBytesWritten; + }; IActor * CreateKqpWriterTestLoad(const NKikimrBlobStorage::TEvTestLoadRequest::TKqpLoadStart& cmd, diff --git a/ydb/core/protos/blobstorage.proto b/ydb/core/protos/blobstorage.proto index 0ea61ce9abb..2d498cd2afd 100644 --- a/ydb/core/protos/blobstorage.proto +++ b/ydb/core/protos/blobstorage.proto @@ -1529,23 +1529,22 @@ message TEvTestLoadRequest { message TKqpLoadStart { message TStockWorkload { - optional uint64 ProductCount = 1; - optional uint64 Quantity = 2; + optional uint64 ProductCount = 1 [default = 100]; + optional uint64 Quantity = 2 [default = 1000]; + optional uint64 OrderCount = 3 [default = 100]; + optional uint64 Limit = 4 [default = 10]; + optional bool PartitionsByLoad = 5 [default = true]; } optional uint64 Tag = 1; optional uint32 DurationSeconds = 2; - optional uint32 NumOfSessions = 3; - optional uint32 MaxInFlight = 4; - optional uint32 StringValueSize = 5 [default = 8000]; - optional uint64 FirstKey = 6 [default = 0]; - optional bool DeleteTableOnFinish = 7 [default = true]; - optional bool SequentialWrite = 8 [default = true]; - optional uint32 UniformPartitionsCount = 9 [default = 1]; - optional uint64 TotalRowsToUpsert = 10; - optional string WorkingDir = 11; - optional string WorkloadName = 12; + optional uint32 WindowDuration = 3; + optional string WorkingDir = 4; + optional uint32 NumOfSessions = 5; + optional bool DeleteTableOnFinish = 6; + optional uint32 UniformPartitionsCount = 7; + optional uint32 WorkloadType = 8; oneof Workload { - TStockWorkload Stock = 13; + TStockWorkload Stock = 9; } } diff --git a/ydb/library/workload/stock_workload.cpp b/ydb/library/workload/stock_workload.cpp index c65b5b392e4..28d70fa44c5 100644 --- a/ydb/library/workload/stock_workload.cpp +++ b/ydb/library/workload/stock_workload.cpp @@ -74,6 +74,16 @@ TQueryInfoList TStockWorkloadGenerator::GetInitialData() { return res; } +std::string TStockWorkloadGenerator::GetCleanDDLQueries() const { + std::string clean_query = R"( + DROP TABLE `stock`; + DROP TABLE `orders`; + DROP TABLE `orderLines`; + )"; + + return clean_query; +} + TQueryInfo TStockWorkloadGenerator::FillStockData() const { std::string query = R"(--!syntax_v1 DECLARE $stocks AS List<Struct<product:Utf8,quantity:Int64>>; diff --git a/ydb/library/workload/stock_workload.h b/ydb/library/workload/stock_workload.h index 3725cfe1394..aaac35c4451 100644 --- a/ydb/library/workload/stock_workload.h +++ b/ydb/library/workload/stock_workload.h @@ -22,7 +22,7 @@ public: static TStockWorkloadGenerator* New(const TStockWorkloadParams* params) { if (!validateDbPath(params->DbPath)) { - return nullptr; + throw yexception() << "Invalid path to database." << Endl; } return new TStockWorkloadGenerator(params); } @@ -33,6 +33,8 @@ public: TQueryInfoList GetInitialData() override; + std::string GetCleanDDLQueries() const override; + TQueryInfoList GetWorkload(int type) override; enum class EType { diff --git a/ydb/library/workload/workload_factory.cpp b/ydb/library/workload/workload_factory.cpp index f1a34ea78cc..823da997dc8 100644 --- a/ydb/library/workload/workload_factory.cpp +++ b/ydb/library/workload/workload_factory.cpp @@ -4,18 +4,17 @@ namespace NYdbWorkload { - std::shared_ptr<IWorkloadQueryGenerator> TWorkloadFactory::GetWorkloadQueryGenerator(const std::string& workloadName, - const TWorkloadParams* params) + std::shared_ptr<IWorkloadQueryGenerator> TWorkloadFactory::GetWorkloadQueryGenerator(const EWorkload& type , const TWorkloadParams* params) { if (!params) { - return nullptr; + throw yexception() << "Params not specified"; } - if (workloadName == "stock") { + if (type == EWorkload::STOCK) { return std::shared_ptr<TStockWorkloadGenerator>(TStockWorkloadGenerator::New(static_cast<const TStockWorkloadParams*>(params))); - } else { - return nullptr; } + + throw yexception() << "Unknown workload"; } }
\ No newline at end of file diff --git a/ydb/library/workload/workload_factory.h b/ydb/library/workload/workload_factory.h index fbfaaa65467..53bd764a1ad 100644 --- a/ydb/library/workload/workload_factory.h +++ b/ydb/library/workload/workload_factory.h @@ -6,9 +6,13 @@ namespace NYdbWorkload { +enum class EWorkload { + STOCK, +}; + class TWorkloadFactory { public: - std::shared_ptr<IWorkloadQueryGenerator> GetWorkloadQueryGenerator(const std::string& workloadName, const TWorkloadParams* params); + std::shared_ptr<IWorkloadQueryGenerator> GetWorkloadQueryGenerator(const EWorkload& type, const TWorkloadParams* params); }; } // namespace NYdbWorkload
\ No newline at end of file diff --git a/ydb/library/workload/workload_query_generator.h b/ydb/library/workload/workload_query_generator.h index 4eb02efb549..f48d3960246 100644 --- a/ydb/library/workload/workload_query_generator.h +++ b/ydb/library/workload/workload_query_generator.h @@ -31,6 +31,7 @@ public: virtual std::string GetDDLQueries() const = 0; virtual TQueryInfoList GetInitialData() = 0; + virtual std::string GetCleanDDLQueries() const = 0; virtual TQueryInfoList GetWorkload(int type) = 0; }; diff --git a/ydb/public/lib/ydb_cli/commands/stock_workload.cpp b/ydb/public/lib/ydb_cli/commands/stock_workload.cpp index e00874b617c..240625924e0 100644 --- a/ydb/public/lib/ydb_cli/commands/stock_workload.cpp +++ b/ydb/public/lib/ydb_cli/commands/stock_workload.cpp @@ -16,6 +16,7 @@ TCommandStock::TCommandStock() : TClientCommandTree("stock", {}, "YDB stock workload") { AddCommand(std::make_unique<TCommandStockInit>()); + AddCommand(std::make_unique<TCommandStockClean>()); AddCommand(std::make_unique<TCommandStockRun>()); } @@ -64,10 +65,7 @@ int TCommandStockInit::Run(TConfig& config) { params.PartitionsByLoad = PartitionsByLoad; NYdbWorkload::TWorkloadFactory factory; - auto workloadGen = factory.GetWorkloadQueryGenerator("stock", ¶ms); - if (workloadGen.get() == nullptr) { - throw TMisuseException() << "Invalid path to database." << Endl; - } + auto workloadGen = factory.GetWorkloadQueryGenerator(NYdbWorkload::EWorkload::STOCK, ¶ms); auto session = GetSession(); auto result = session.ExecuteSchemeQuery(workloadGen->GetDDLQueries()).GetValueSync(); @@ -95,6 +93,42 @@ int TCommandStockInit::Run(TConfig& config) { return EXIT_SUCCESS; } +TCommandStockClean::TCommandStockClean() + : TWorkloadCommand("clean", {}, "drop tables created in init phase") {} + +void TCommandStockClean::Config(TConfig& config) { + TWorkloadCommand::Config(config); + config.SetFreeArgsNum(0); +} + +void TCommandStockClean::Parse(TConfig& config) { + TClientCommand::Parse(config); +} + +int TCommandStockClean::Run(TConfig& config) { + Driver = std::make_unique<NYdb::TDriver>(CreateDriver(config)); + TableClient = std::make_unique<NTable::TTableClient>(*Driver); + NYdbWorkload::TStockWorkloadParams params; + params.DbPath = config.Database; + + NYdbWorkload::TWorkloadFactory factory; + auto workloadGen = factory.GetWorkloadQueryGenerator(NYdbWorkload::EWorkload::STOCK, ¶ms); + + auto session = GetSession(); + + auto query = workloadGen->GetCleanDDLQueries(); + TStatus result(EStatus::SUCCESS, NYql::TIssues()); + result = session.ExecuteSchemeQuery(TString(query)).GetValueSync(); + + if (!result.IsSuccess()) { + Cerr << "Query execution failed: " << result.GetIssues().ToString() << Endl + << "Query:\n" << query << Endl; + return EXIT_FAILURE; + } + + return EXIT_SUCCESS; +} + TCommandStockRun::TCommandStockRun() : TClientCommandTree("run", {}, "Run YDB stock workload") { @@ -131,10 +165,7 @@ int TCommandStockRunInsertRandomOrder::Run(TConfig& config) { params.ProductCount = ProductCount; NYdbWorkload::TWorkloadFactory factory; - auto workloadGen = factory.GetWorkloadQueryGenerator("stock", ¶ms); - if (workloadGen.get() == nullptr) { - throw TMisuseException() << "Invalid path to database." << Endl; - } + auto workloadGen = factory.GetWorkloadQueryGenerator(NYdbWorkload::EWorkload::STOCK, ¶ms); return RunWorkload(workloadGen, static_cast<int>(NYdbWorkload::TStockWorkloadGenerator::EType::InsertRandomOrder)); } @@ -165,10 +196,7 @@ int TCommandStockRunSubmitRandomOrder::Run(TConfig& config) { params.ProductCount = ProductCount; NYdbWorkload::TWorkloadFactory factory; - auto workloadGen = factory.GetWorkloadQueryGenerator("stock", ¶ms); - if (workloadGen.get() == nullptr) { - throw TMisuseException() << "Invalid path to database." << Endl; - } + auto workloadGen = factory.GetWorkloadQueryGenerator(NYdbWorkload::EWorkload::STOCK, ¶ms); return RunWorkload(workloadGen, static_cast<int>(NYdbWorkload::TStockWorkloadGenerator::EType::SubmitRandomOrder)); } @@ -199,10 +227,7 @@ int TCommandStockRunSubmitSameOrder::Run(TConfig& config) { params.ProductCount = ProductCount; NYdbWorkload::TWorkloadFactory factory; - auto workloadGen = factory.GetWorkloadQueryGenerator("stock", ¶ms); - if (workloadGen.get() == nullptr) { - throw TMisuseException() << "Invalid path to database." << Endl; - } + auto workloadGen = factory.GetWorkloadQueryGenerator(NYdbWorkload::EWorkload::STOCK, ¶ms); return RunWorkload(workloadGen, static_cast<int>(NYdbWorkload::TStockWorkloadGenerator::EType::SubmitSameOrder)); } @@ -233,11 +258,7 @@ int TCommandStockRunGetRandomCustomerHistory::Run(TConfig& config) { params.Limit = Limit; NYdbWorkload::TWorkloadFactory factory; - auto workloadGen = factory.GetWorkloadQueryGenerator("stock", ¶ms); - if (workloadGen.get() == nullptr) { - throw TMisuseException() << "Invalid path to database." << Endl; - } - + auto workloadGen = factory.GetWorkloadQueryGenerator(NYdbWorkload::EWorkload::STOCK, ¶ms); return RunWorkload(workloadGen, static_cast<int>(NYdbWorkload::TStockWorkloadGenerator::EType::GetRandomCustomerHistory)); } @@ -267,10 +288,7 @@ int TCommandStockRunGetCustomerHistory::Run(TConfig& config) { params.Limit = Limit; NYdbWorkload::TWorkloadFactory factory; - auto workloadGen = factory.GetWorkloadQueryGenerator("stock", ¶ms); - if (workloadGen.get() == nullptr) { - throw TMisuseException() << "Invalid path to database." << Endl; - } + auto workloadGen = factory.GetWorkloadQueryGenerator(NYdbWorkload::EWorkload::STOCK, ¶ms); return RunWorkload(workloadGen, static_cast<int>(NYdbWorkload::TStockWorkloadGenerator::EType::GetCustomerHistory)); } diff --git a/ydb/public/lib/ydb_cli/commands/stock_workload.h b/ydb/public/lib/ydb_cli/commands/stock_workload.h index f755d632260..c2e12fb7cc9 100644 --- a/ydb/public/lib/ydb_cli/commands/stock_workload.h +++ b/ydb/public/lib/ydb_cli/commands/stock_workload.h @@ -25,6 +25,15 @@ private: bool PartitionsByLoad; }; +class TCommandStockClean : public TWorkloadCommand { +public: + TCommandStockClean(); + virtual void Config(TConfig& config) override; + virtual void Parse(TConfig& config) override; + virtual int Run(TConfig& config) override; + +}; + class TCommandStockRun : public TClientCommandTree { public: TCommandStockRun(); |