diff options
| author | mdartemenko <[email protected]> | 2022-08-30 17:42:47 +0300 | 
|---|---|---|
| committer | mdartemenko <[email protected]> | 2022-08-30 17:42:47 +0300 | 
| commit | b39c849cda9decf6c19b135df25f21b5cf0d0d5b (patch) | |
| tree | 7c66544c9538c58469c92d184b2764432ba004dc | |
| parent | f79f740e5b92e2c8a10c9c74383e6b47e63a3ef3 (diff) | |
add MORE workloads for KQP
Adds 2 Workloads for KQP:
    1) BigString -> generates kv pairs with big string as second field
    2) MultiColumn -> generates multi column table with FetchCnt not null columns
| -rw-r--r-- | ydb/core/blobstorage/testload/test_load_kqp.cpp | 132 | ||||
| -rw-r--r-- | ydb/core/protos/blobstorage.proto | 4 | ||||
| -rw-r--r-- | ydb/library/workload/kv_workload.cpp | 116 | ||||
| -rw-r--r-- | ydb/library/workload/kv_workload.h | 7 | ||||
| -rw-r--r-- | ydb/library/workload/workload_factory.cpp | 2 | ||||
| -rw-r--r-- | ydb/public/lib/ydb_cli/commands/kv_workload.cpp | 58 | ||||
| -rw-r--r-- | ydb/public/lib/ydb_cli/commands/kv_workload.h | 5 | ||||
| -rw-r--r-- | ydb/public/lib/ydb_cli/commands/stock_workload.cpp | 39 | ||||
| -rw-r--r-- | ydb/public/lib/ydb_cli/commands/ydb_workload.cpp | 45 | ||||
| -rw-r--r-- | ydb/public/lib/ydb_cli/commands/ydb_workload.h | 6 | 
10 files changed, 223 insertions, 191 deletions
| diff --git a/ydb/core/blobstorage/testload/test_load_kqp.cpp b/ydb/core/blobstorage/testload/test_load_kqp.cpp index bb4ad82ff76..1efe502282d 100644 --- a/ydb/core/blobstorage/testload/test_load_kqp.cpp +++ b/ydb/core/blobstorage/testload/test_load_kqp.cpp @@ -45,7 +45,7 @@ public:          : WindowHist(60000, 2)          , WindowErrors(window_errors)      { -        WindowHist.Add(hist);     +        WindowHist.Add(hist);      }      void Add(const MonitoringData& other) { @@ -63,7 +63,7 @@ struct TEvKqpWorkerResponse : TEventLocal<TEvKqpWorkerResponse, EvKqpWorkerRespo  public:      TEvKqpWorkerResponse(const NHdr::THistogram& hist, ui64 window_errors, ui64 phase, ui64 worker_tag)          : Data(hist, window_errors) -        , Phase(phase)  +        , Phase(phase)          , WorkerTag(worker_tag) {}  public: @@ -89,16 +89,16 @@ void ConvertYdbParamsToMiniKQLParams(const NYdb::TParams& input, NKikimrMiniKQL:  class TKqpLoadWorker : public TActorBootstrapped<TKqpLoadWorker> {  public:      TKqpLoadWorker(TActorId parent, -        TString working_dir,  -        std::shared_ptr<NYdbWorkload::IWorkloadQueryGenerator> workload_query_gen,  -        ui64 workload_type,  -        ui64 parentTag,  +        TString working_dir, +        std::shared_ptr<NYdbWorkload::IWorkloadQueryGenerator> workload_query_gen, +        ui64 workload_type, +        ui64 parentTag,          ui64 workerTag,          ui64 durationSeconds,          ui64 windowDuration, -        ui64 windowCount,  +        ui64 windowCount,          NMonitoring::TDynamicCounters::TCounterPtr transactions, -        NMonitoring::TDynamicCounters::TCounterPtr transactionsBytesWritten)  +        NMonitoring::TDynamicCounters::TCounterPtr transactionsBytesWritten)          : Parent(std::move(parent))          , WorkingDir(std::move(working_dir))          , WorkloadQueryGen(workload_query_gen) @@ -114,7 +114,7 @@ public:      void Bootstrap(const TActorContext& ctx) {          LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag << " TKqpLoadWorker Bootstrap called"); -         +          ctx.Schedule(TDuration::Seconds(DurationSeconds), new TEvents::TEvPoisonPill);          ctx.Schedule(TDuration::Seconds(WindowDuration), new TEvUpdateMonitoring); @@ -139,7 +139,7 @@ private:          if (Phase < WindowCount) {              SendMonitoringEvent(ctx);          } -         +          CloseSession(ctx);          Die(ctx);      } @@ -149,9 +149,9 @@ private:          auto ev = MakeHolder<NKqp::TEvKqp::TEvCloseSessionRequest>();          ev->Record.MutableRequest()->SetSessionId(WorkerSession); -     +          auto kqp_proxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()); -        LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag  +        LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag              << " sending session close query to proxy: " + kqp_proxy.ToString());          ctx.Send(kqp_proxy, ev.Release()); @@ -166,11 +166,11 @@ private:          auto ev = MakeHolder<NKqp::TEvKqp::TEvCreateSessionRequest>();          ev->Record.MutableRequest()->SetDatabase(WorkingDir); -         +          auto kqp_proxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()); -        LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag  +        LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag              << " sending event for session creation to proxy: " << kqp_proxy.ToString()); -         +          Send(kqp_proxy, ev.Release());      } @@ -182,7 +182,7 @@ private:              LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag << " Session is created: " + WorkerSession);              CreateDataQuery(ctx);          } else { -            LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag  +            LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag                  << " Session creation failed: " + ev->Get()->ToString());          }      } @@ -195,7 +195,7 @@ private:          auto q = std::move(queries.front());          queries.pop_front(); -        LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag  +        LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag              << " query type: " << WorkloadType << ", params size: " << q.Params.GetValues().size());          Transactions->Inc(); @@ -224,13 +224,13 @@ private:          NKikimrMiniKQL::TParams params;          ConvertYdbParamsToMiniKQLParams(query_params, params);          request->Record.MutableRequest()->MutableParameters()->Swap(¶ms); -     +          auto kqp_proxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()); -        LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag  +        LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag              << " sending data query to proxy: " + kqp_proxy.ToString());          ctx.Send(kqp_proxy, request.Release()); -     +      }      void HandleResponse(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) { @@ -243,7 +243,7 @@ private:              TransactionsBytesWritten->Add(response.GetResponse().GetQueryStats().ByteSize());              WindowHist.RecordValue(response.GetResponse().GetQueryStats().GetDurationUs());          } else { -            LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag  +            LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag                  << " data request status: Fail, Issue: " + ev->Get()->ToString());              ++WindowErrors;          } @@ -258,22 +258,22 @@ private:      // monitoring      void HandleWindowTimer(TEvUpdateMonitoring::TPtr& /*ev*/, const TActorContext& ctx) { -        LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag  +        LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag              << " handle TEvUpdateMonitoring, Phase: " << Phase); -         +          SendMonitoringEvent(ctx);          if (Phase < WindowCount) { -            LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag  -                << " reschedule TEvUpdateMonitoring, Phase: " << Phase);             +            LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag +                << " reschedule TEvUpdateMonitoring, Phase: " << Phase);              ctx.Schedule(TDuration::Seconds(WindowDuration), new TEvUpdateMonitoring);          }      }  private: -     +      // common -     +      void SendMonitoringEvent(const TActorContext& ctx) {          auto ev = MakeHolder<TEvKqpWorkerResponse>(WindowHist, WindowErrors, Phase, WorkerTag); @@ -319,9 +319,9 @@ public:      }      TKqpWriterTestLoadActor(const NKikimrBlobStorage::TEvTestLoadRequest::TKqpLoadStart& cmd, -        const TActorId& parent,  -        const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters,  -        ui64 index,  +        const TActorId& parent, +        const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters, +        ui64 index,          ui64 tag)          : Parent(parent)          , Tag(tag) @@ -345,7 +345,7 @@ public:          for (size_t i = 0; i < WindowCount; ++i) {              Chunk.push_back(std::make_unique<MonitoringData>());          } -         +          NYdbWorkload::TWorkloadFactory factory;          if (cmd.Workload_case() == NKikimrBlobStorage::TEvTestLoadRequest_TKqpLoadStart::WorkloadCase::kStock) { @@ -365,18 +365,20 @@ public:              params.InitRowCount = cmd.GetKv().GetInitRowCount();              params.PartitionsByLoad = cmd.GetKv().GetPartitionsByLoad();              params.MaxFirstKey = cmd.GetKv().GetMaxFirstKey(); +            params.StringLen = cmd.GetKv().GetStringLen(); +            params.ColumnsCnt = cmd.GetKv().GetColumnsCnt();              params.MinPartitions = UniformPartitionsCount;              params.DbPath = WorkingDir;              WorkloadQueryGen = factory.GetWorkloadQueryGenerator(NYdbWorkload::EWorkload::KV, ¶ms);          } else {              return;          } -         +          Y_ASSERT(WorkloadQueryGen.get() != nullptr);          Y_ASSERT(DurationSeconds > DelayBeforeMeasurements.Seconds());          // Monitoring initialization -         +          LoadCounters = counters->GetSubgroup("tag", Sprintf("%" PRIu64, tag));          Transactions = LoadCounters->GetCounter("Transactions", true);          TransactionsBytesWritten = LoadCounters->GetCounter("TransactionsBytesWritten", true); @@ -389,7 +391,7 @@ public:      void Bootstrap(const TActorContext& ctx) {          LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " TKqpWriterTestLoadActor Bootstrap called");          Become(&TKqpWriterTestLoadActor::StateStart); -         +          if (WorkloadClass == NYdbWorkload::EWorkload::STOCK) {              NYdbWorkload::TStockWorkloadParams* params = static_cast<NYdbWorkload::TStockWorkloadParams*>(WorkloadQueryGen->GetParams());              LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " Starting load actor with workload STOCK, Params: {" @@ -407,6 +409,8 @@ public:                  << "PartitionsByLoad: " << params->PartitionsByLoad << " "                  << "MaxFirstKey: " << params->MaxFirstKey << " "                  << "MinPartitions: " << params->MinPartitions << " " +                << "StringLen: " << params->StringLen << " " +                << "ColumnsCnt: " << params->ColumnsCnt << " "                  << "DbPath: " << params->DbPath);          } @@ -457,14 +461,14 @@ private:      void DropTables(const TActorContext& ctx) {          LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " creating event for tables drop"); -         +          auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>();          ev->Record.MutableRequest()->SetDatabase(WorkingDir);          ev->Record.MutableRequest()->SetSessionId(TableSession);          ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);          ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DDL);          ev->Record.MutableRequest()->SetQuery(WorkloadQueryGen->GetCleanDDLQueries()); -     +          auto kqp_proxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId());          LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " sending drop tables query to proxy: " + kqp_proxy.ToString()); @@ -483,12 +487,12 @@ private:          DeathReport(ctx);      } -    void DeathReport(const TActorContext& ctx) {         +    void DeathReport(const TActorContext& ctx) {          CloseSession(ctx);          TIntrusivePtr<TLoadReport> Report(new TLoadReport());          Report->Duration = TDuration::Seconds(DurationSeconds); -         +          ctx.Send(Parent, new TEvTestLoadFinished(Tag, Report, "OK called StartDeathProcess"));          Die(ctx);      } @@ -498,8 +502,8 @@ private:      // monitoring      void HandleMonitoring(TEvKqpWorkerResponse::TPtr& ev, const TActorContext& ctx) {          const auto& response = ev->Get(); -         -        LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " got monitoring response from worker Tag# " << response->WorkerTag  + +        LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " got monitoring response from worker Tag# " << response->WorkerTag              << " Phase: " << response->Phase              << " Min: " << response->Data.WindowHist.GetMin()              << " Max: " << response->Data.WindowHist.GetMax() @@ -517,7 +521,7 @@ private:      void SendNewRowToParent(const TActorContext& ctx) {          Phase += 1; -        LOG_INFO_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag  +        LOG_INFO_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag              << " total: Phase: " << Phase << " -> "              << Total->WindowHist.GetTotalCount() << " | "              << Total->WindowHist.GetTotalCount() / (WindowDuration * std::max(ui64(1), Phase) * 1.0) << " | " @@ -527,7 +531,7 @@ private:              << Total->WindowHist.GetValueAtPercentile(99.0) / (WindowDuration * 1000.0) << " | "              << Total->WindowHist.GetMax() / (WindowDuration * 1000.0)          ); -         +          if (Phase >= WindowCount) {              StartDeathProcess(ctx);          } @@ -542,10 +546,10 @@ private:          auto ev = MakeHolder<NKqp::TEvKqp::TEvCreateSessionRequest>();          ev->Record.MutableRequest()->SetDatabase(WorkingDir); -         +          auto kqp_proxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId());          LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " sending event for session creation to proxy: " << kqp_proxy.ToString()); -         +          Send(kqp_proxy, ev.Release());      } @@ -563,14 +567,14 @@ private:      void CreateTables(const TActorContext& ctx) {          LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " creating event for tables creation"); -         +          auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>();          ev->Record.MutableRequest()->SetDatabase(WorkingDir);          ev->Record.MutableRequest()->SetSessionId(TableSession);          ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);          ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DDL);          ev->Record.MutableRequest()->SetQuery(WorkloadQueryGen->GetDDLQueries()); -     +          auto kqp_proxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId());          LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " sending ddl query to proxy: " + kqp_proxy.ToString()); @@ -580,14 +584,14 @@ private:      void HandleCreateTableResponse(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) {          auto& response = ev->Get()->Record.GetRef(); -        Become(&TKqpWriterTestLoadActor::StateMain); -          if (response.GetYdbStatus() == Ydb::StatusIds_StatusCode_SUCCESS) { -            LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " tables are created"); +            Become(&TKqpWriterTestLoadActor::StateMain); +            LOG_INFO_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " tables are created");              InitData = WorkloadQueryGen->GetInitialData();              InsertInitData(ctx);          } else { -            LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " tables creation failed: " + ev->Get()->ToString()); +            LOG_INFO_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " tables creation failed: " + ev->Get()->ToString()); +            CreateTables(ctx);          }      } @@ -604,7 +608,7 @@ private:          auto q = std::move(InitData.front());          InitData.pop_front(); -        LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag  +        LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag              << " Creating request for init query, need to exec: " << InitData.size() + 1);          TString query_text = TString(q.Query); @@ -631,7 +635,7 @@ private:          NKikimrMiniKQL::TParams params;          ConvertYdbParamsToMiniKQLParams(query_params, params);          request->Record.MutableRequest()->MutableParameters()->Swap(¶ms); -     +          auto kqp_proxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId());          LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag              << " sending init query to proxy: " + kqp_proxy.ToString()); @@ -699,7 +703,7 @@ private:                          TABLED() { str << Total->WindowHist.GetMax() / (WindowDuration * 1000.0); };                      }                      for (size_t i = Phase; i >= 1; --i) { -                        TABLER() {     +                        TABLER() {                              TABLED() { str << i; };                              TABLED() { str << Chunk[i - 1]->WindowHist.GetTotalCount(); };                              TABLED() { str << Chunk[i - 1]->WindowHist.GetTotalCount() / (WindowDuration * 1.0); }; @@ -727,16 +731,16 @@ private:      void InitWorkers(const TActorContext& ctx) {          for (ui64 i = 0; i < NumOfSessions; ++i) {              auto* worker = new TKqpLoadWorker( -                SelfId(),  -                WorkingDir,  -                WorkloadQueryGen,  -                WorkloadType,  -                Tag,  -                i,  -                DurationSeconds,  -                WindowDuration,  -                WindowCount,  -                Transactions,  +                SelfId(), +                WorkingDir, +                WorkloadQueryGen, +                WorkloadType, +                Tag, +                i, +                DurationSeconds, +                WindowDuration, +                WindowCount, +                Transactions,                  TransactionsBytesWritten);              Workers.push_back(ctx.Register(worker));          } @@ -747,7 +751,7 @@ private:          auto ev = MakeHolder<NKqp::TEvKqp::TEvCloseSessionRequest>();          ev->Record.MutableRequest()->SetSessionId(TableSession); -     +          auto kqp_proxy = NKqp::MakeKqpProxyID(ctx.SelfID.NodeId());          LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " sending session close query to proxy: " + kqp_proxy.ToString()); diff --git a/ydb/core/protos/blobstorage.proto b/ydb/core/protos/blobstorage.proto index 72f4f32e177..4209b1dd2ff 100644 --- a/ydb/core/protos/blobstorage.proto +++ b/ydb/core/protos/blobstorage.proto @@ -1544,13 +1544,15 @@ message TEvTestLoadRequest {              optional uint64 ProductCount = 1 [default = 100];              optional uint64 Quantity = 2 [default = 1000];              optional uint64 OrderCount = 3 [default = 100]; -            optional uint64 Limit = 4 [default = 10];  +            optional uint64 Limit = 4 [default = 10];              optional bool PartitionsByLoad = 5 [default = true];          }          message TKvWorkload {              optional uint64 InitRowCount = 1 [default = 1000];              optional bool PartitionsByLoad = 2 [default = true];              optional uint64 MaxFirstKey = 3 [default = 5000]; +            optional uint64 StringLen = 4 [default = 8]; +            optional uint64 ColumnsCnt = 5 [default = 2];          }          optional uint64 Tag = 1;          optional uint32 DurationSeconds = 2; diff --git a/ydb/library/workload/kv_workload.cpp b/ydb/library/workload/kv_workload.cpp index b36b8cd2f30..49786e1b92e 100644 --- a/ydb/library/workload/kv_workload.cpp +++ b/ydb/library/workload/kv_workload.cpp @@ -2,20 +2,24 @@  #include <util/datetime/base.h> +#include <ydb/core/util/lz4_data_generator.h> +  #include <cmath>  #include <iomanip>  #include <string>  #include <thread>  #include <random> +#include <sstream>  namespace NYdbWorkload {  TKvWorkloadGenerator::TKvWorkloadGenerator(const TKvWorkloadParams* params)      : DbPath(params->DbPath)      , Params(*params) +    , BigString(NKikimr::GenDataForLZ4(Params.StringLen))      , Rd()      , Gen(Rd()) -    , UniformDistGen(0, Params.MaxFirstKey) +    , KeyUniformDistGen(0, Params.MaxFirstKey)  {      Gen.seed(Now().MicroSeconds());  } @@ -26,30 +30,26 @@ TKvWorkloadParams* TKvWorkloadGenerator::GetParams() {  std::string TKvWorkloadGenerator::GetDDLQueries() const {      std::string partsNum = std::to_string(Params.MinPartitions); -    std::string KvPartitionsDdl = ""; -    if (Params.PartitionsByLoad) { -        KvPartitionsDdl = "WITH (AUTO_PARTITIONING_BY_LOAD = ENABLED, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = " +  -            partsNum + ", UNIFORM_PARTITIONS = " + partsNum + ", AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = 1000)"; -    } else { -        KvPartitionsDdl = "WITH (AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = " +  -            partsNum + ", UNIFORM_PARTITIONS = " + partsNum + ", AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = 1000)"; -    } +    std::stringstream ss; -    static const char TablesDdl[] = R"(--!syntax_v1 -        CREATE TABLE `%s/kv_test`(a Uint64, b Uint64, PRIMARY KEY(a, b)) %s; -    )"; +    ss << "--!syntax_v1\n"; +    ss << "CREATE TABLE `" << DbPath << "/kv_test`(c0 Uint64, "; -    char buf[sizeof(TablesDdl) + sizeof(KvPartitionsDdl) + 8192*3]; // 32*256 for DbPath -    int res = std::sprintf(buf, TablesDdl,  -        DbPath.c_str(), KvPartitionsDdl.c_str() -    ); +    for (size_t i = 1; i < Params.ColumnsCnt; ++i) { +        ss << "c" << i << " " << "String, "; +    } + +    ss << "PRIMARY KEY(c0)) WITH ("; -    if (res < 0) { -        return ""; +    if (Params.PartitionsByLoad) { +        ss << "AUTO_PARTITIONING_BY_LOAD = ENABLED, ";      } -    return buf; +    ss << "AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = " << partsNum << ", " +       << "UNIFORM_PARTITIONS = " << partsNum << ", AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = 1000)"; + +    return ss.str();  }  TQueryInfoList TKvWorkloadGenerator::GetWorkload(int type) { @@ -64,48 +64,70 @@ TQueryInfoList TKvWorkloadGenerator::GetWorkload(int type) {  }  TQueryInfoList TKvWorkloadGenerator::UpsertRandom() { -    std::string query = R"( -        --!syntax_v1 -        DECLARE $a AS Uint64; -        DECLARE $b AS Uint64; +    std::stringstream ss; -        UPSERT INTO `kv_test` (a, b) VALUES ($a, $b); -    )"; +    NYdb::TParamsBuilder paramsBuilder; -    ui64 a = UniformDistGen(Gen); -    ui64 b = Gen(); +    ss << "--!syntax_v1\n"; +    ss << "DECLARE $c0 AS Uint64;\n"; +    paramsBuilder.AddParam("$c0").Uint64(KeyUniformDistGen(Gen)).Build(); -    NYdb::TParamsBuilder paramsBuilder; -    auto params = paramsBuilder -        .AddParam("$a") -            .Uint64(a) -            .Build() -        .AddParam("$b") -            .Uint64(b) -            .Build() -        .Build(); +    for (size_t col = 1; col < Params.ColumnsCnt; ++col) { +        ss << "DECLARE $c" << col << " AS String;\n"; +        paramsBuilder.AddParam("$c" + std::to_string(col)).String(BigString).Build(); +    } + +    ss << "UPSERT INTO `kv_test`("; + +    for (size_t col = 0; col < Params.ColumnsCnt; ++col) { +        ss << "c" << col; +        if (col + 1 < Params.ColumnsCnt) { +            ss << ", "; +        } +    } + +    ss << ") VALUES ("; + +    for (size_t col = 0; col < Params.ColumnsCnt; ++col) { +        ss << "$c" << col; +        if (col + 1 < Params.ColumnsCnt) { +            ss << ", "; +        } +    } + +    ss << ")"; -    return TQueryInfoList(1, TQueryInfo(query, std::move(params))); +    auto params = paramsBuilder.Build(); + +    return TQueryInfoList(1, TQueryInfo(ss.str(), std::move(params)));  }  TQueryInfoList TKvWorkloadGenerator::SelectRandom() { -    std::string query = R"( -        --!syntax_v1 -        DECLARE $a AS Uint64; +    std::stringstream ss; + +    ss << "--!syntax_v1\n"; +    ss << "DECLARE $c0 AS Uint64;\n"; +    ss << "SELECT "; +    for (size_t col = 0; col < Params.ColumnsCnt; ++col) { +        ss << "c" << col; +        if (col + 1 < Params.ColumnsCnt) { +            ss << ","; +        } +        ss << " "; +    } -        SELECT * FROM `kv_test` WHERE a = $a -    )"; +    ss << "FROM `kv_test` WHERE c0 = $c0"; -    ui64 a = UniformDistGen(Gen); +    ui64 x = KeyUniformDistGen(Gen);      NYdb::TParamsBuilder paramsBuilder;      auto params = paramsBuilder -        .AddParam("$a") -            .Uint64(a) +        .AddParam("$c0") +            .Uint64(x)              .Build()          .Build(); -    return TQueryInfoList(1, TQueryInfo(query, std::move(params))); +    return TQueryInfoList(1, TQueryInfo(ss.str(), std::move(params)));  }  TQueryInfoList TKvWorkloadGenerator::GetInitialData() { @@ -114,7 +136,7 @@ TQueryInfoList TKvWorkloadGenerator::GetInitialData() {          auto queryInfos = UpsertRandom();          res.insert(res.end(), queryInfos.begin(), queryInfos.end());      } -     +      return res;  } diff --git a/ydb/library/workload/kv_workload.h b/ydb/library/workload/kv_workload.h index b681d092ebd..b5e492cfddb 100644 --- a/ydb/library/workload/kv_workload.h +++ b/ydb/library/workload/kv_workload.h @@ -11,6 +11,8 @@ struct TKvWorkloadParams : public TWorkloadParams {      ui64 MinPartitions = 1;      ui64 InitRowCount = 1000;      ui64 MaxFirstKey = 5000; +    ui64 StringLen = 8; +    ui64 ColumnsCnt = 2;      bool PartitionsByLoad = true;  }; @@ -44,17 +46,18 @@ public:  private:      TQueryInfoList UpsertRandom();      TQueryInfoList SelectRandom(); -     +      TKvWorkloadGenerator(const TKvWorkloadParams* params);      TQueryInfo FillKvData() const;      std::string DbPath;      TKvWorkloadParams Params; +    TString BigString;      std::random_device Rd;      std::mt19937_64 Gen; -    std::uniform_int_distribution<ui64> UniformDistGen; +    std::uniform_int_distribution<ui64> KeyUniformDistGen;  };  } // namespace NYdbWorkload
\ No newline at end of file diff --git a/ydb/library/workload/workload_factory.cpp b/ydb/library/workload/workload_factory.cpp index 1b34bc3838c..11038e697f3 100644 --- a/ydb/library/workload/workload_factory.cpp +++ b/ydb/library/workload/workload_factory.cpp @@ -5,7 +5,7 @@  namespace NYdbWorkload { -    std::shared_ptr<IWorkloadQueryGenerator> TWorkloadFactory::GetWorkloadQueryGenerator(const EWorkload& type , const TWorkloadParams* params)  +    std::shared_ptr<IWorkloadQueryGenerator> TWorkloadFactory::GetWorkloadQueryGenerator(const EWorkload& type , const TWorkloadParams* params)      {          if (!params) {              throw yexception() << "Params not specified"; diff --git a/ydb/public/lib/ydb_cli/commands/kv_workload.cpp b/ydb/public/lib/ydb_cli/commands/kv_workload.cpp index 1a0efbc07a8..c8bff4f05c1 100644 --- a/ydb/public/lib/ydb_cli/commands/kv_workload.cpp +++ b/ydb/public/lib/ydb_cli/commands/kv_workload.cpp @@ -19,7 +19,9 @@ TCommandKvInit::TCommandKvInit()      , InitRowCount(1000)      , MinPartitions(1)      , MaxFirstKey(5000) -    , PartitionsByLoad(true)  +    , StringLen(8) +    , ColumnsCnt(2) +    , PartitionsByLoad(true)  {}  void TCommandKvInit::Config(TConfig& config) { @@ -35,6 +37,10 @@ void TCommandKvInit::Config(TConfig& config) {          .DefaultValue(true).StoreResult(&PartitionsByLoad);      config.Opts->AddLongOption("max-first-key", "maximum value of first primary key")          .DefaultValue(5000).StoreResult(&MaxFirstKey); +    config.Opts->AddLongOption("len", "String len") +        .DefaultValue(8).StoreResult(&StringLen); +    config.Opts->AddLongOption("cols", "Number of columns") +        .DefaultValue(2).StoreResult(&ColumnsCnt);  }  void TCommandKvInit::Parse(TConfig& config) { @@ -50,34 +56,13 @@ int TCommandKvInit::Run(TConfig& config) {      params.MinPartitions = MinPartitions;      params.PartitionsByLoad = PartitionsByLoad;      params.MaxFirstKey = MaxFirstKey; +    params.StringLen = StringLen; +    params.ColumnsCnt = ColumnsCnt;      NYdbWorkload::TWorkloadFactory factory;      auto workloadGen = factory.GetWorkloadQueryGenerator(NYdbWorkload::EWorkload::KV, ¶ms); -    auto session = GetSession(); -    auto result = session.ExecuteSchemeQuery(workloadGen->GetDDLQueries()).GetValueSync(); -    ThrowOnError(result); - -    auto queryInfoList = workloadGen->GetInitialData(); -    for (auto queryInfo : queryInfoList) { -        auto prepareResult = session.PrepareDataQuery(queryInfo.Query.c_str()).GetValueSync(); -        if (!prepareResult.IsSuccess()) { -            Cerr << "Prepare failed: " << prepareResult.GetIssues().ToString() << Endl -                << "Query:\n" << queryInfo.Query << Endl; -            return EXIT_FAILURE; -        } - -        auto dataQuery = prepareResult.GetQuery(); -        auto result = dataQuery.Execute(NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx(), -                                        std::move(queryInfo.Params)).GetValueSync(); -        if (!result.IsSuccess()) { -            Cerr << "Query execution failed: " << result.GetIssues().ToString() << Endl -                << "Query:\n" << queryInfo.Query << Endl; -            return EXIT_FAILURE; -        } -    } - -    return EXIT_SUCCESS; +    return InitTables(workloadGen);  } @@ -102,19 +87,7 @@ int TCommandKvClean::Run(TConfig& config) {      NYdbWorkload::TWorkloadFactory factory;      auto workloadGen = factory.GetWorkloadQueryGenerator(NYdbWorkload::EWorkload::KV, ¶ms); -    auto session = GetSession(); - -    auto query = workloadGen->GetCleanDDLQueries(); -    TStatus result(EStatus::SUCCESS, NYql::TIssues()); -    result = session.ExecuteSchemeQuery(TString(query)).GetValueSync(); - -    if (!result.IsSuccess()) { -        Cerr << "Query execution failed: " << result.GetIssues().ToString() << Endl -            << "Query:\n" << query << Endl; -        return EXIT_FAILURE; -    } - -    return EXIT_SUCCESS; +    return CleanTables(workloadGen);  }  TCommandKvRun::TCommandKvRun() @@ -134,6 +107,10 @@ void TCommandKvRunUpsertRandom::Config(TConfig& config) {      config.Opts->AddLongOption("max-first-key", "maximum value of first primary key")          .DefaultValue(5000).StoreResult(&MaxFirstKey); +    config.Opts->AddLongOption("len", "String len") +        .DefaultValue(8).StoreResult(&StringLen); +    config.Opts->AddLongOption("cols", "Number of columns") +        .DefaultValue(2).StoreResult(&ColumnsCnt);  }  void TCommandKvRunUpsertRandom::Parse(TConfig& config) { @@ -146,6 +123,8 @@ int TCommandKvRunUpsertRandom::Run(TConfig& config) {      NYdbWorkload::TKvWorkloadParams params;      params.DbPath = config.Database;      params.MaxFirstKey = MaxFirstKey; +    params.StringLen = StringLen; +    params.ColumnsCnt = ColumnsCnt;      NYdbWorkload::TWorkloadFactory factory;      auto workloadGen = factory.GetWorkloadQueryGenerator(NYdbWorkload::EWorkload::KV, ¶ms); @@ -163,6 +142,8 @@ void TCommandKvRunSelectRandom::Config(TConfig& config) {      config.Opts->AddLongOption("max-first-key", "maximum value of first primary key")          .DefaultValue(5000).StoreResult(&MaxFirstKey); +    config.Opts->AddLongOption("cols", "Number of columns") +        .DefaultValue(2).StoreResult(&ColumnsCnt);  }  void TCommandKvRunSelectRandom::Parse(TConfig& config) { @@ -175,6 +156,7 @@ int TCommandKvRunSelectRandom::Run(TConfig& config) {      NYdbWorkload::TKvWorkloadParams params;      params.DbPath = config.Database;      params.MaxFirstKey = MaxFirstKey; +    params.ColumnsCnt = ColumnsCnt;      NYdbWorkload::TWorkloadFactory factory;      auto workloadGen = factory.GetWorkloadQueryGenerator(NYdbWorkload::EWorkload::KV, ¶ms); diff --git a/ydb/public/lib/ydb_cli/commands/kv_workload.h b/ydb/public/lib/ydb_cli/commands/kv_workload.h index dc7b4fe01ba..b3e55598312 100644 --- a/ydb/public/lib/ydb_cli/commands/kv_workload.h +++ b/ydb/public/lib/ydb_cli/commands/kv_workload.h @@ -21,6 +21,8 @@ private:      ui64 InitRowCount;      ui64 MinPartitions;      ui64 MaxFirstKey; +    ui64 StringLen; +    ui64 ColumnsCnt;      bool PartitionsByLoad;  }; @@ -46,6 +48,8 @@ public:  private:      ui64 MaxFirstKey; +    ui64 StringLen; +    ui64 ColumnsCnt;  }; @@ -58,6 +62,7 @@ public:  private:      ui64 MaxFirstKey; +    ui64 ColumnsCnt;  }; diff --git a/ydb/public/lib/ydb_cli/commands/stock_workload.cpp b/ydb/public/lib/ydb_cli/commands/stock_workload.cpp index 240625924e0..dae5ec87afd 100644 --- a/ydb/public/lib/ydb_cli/commands/stock_workload.cpp +++ b/ydb/public/lib/ydb_cli/commands/stock_workload.cpp @@ -67,30 +67,7 @@ int TCommandStockInit::Run(TConfig& config) {      NYdbWorkload::TWorkloadFactory factory;      auto workloadGen = factory.GetWorkloadQueryGenerator(NYdbWorkload::EWorkload::STOCK, ¶ms); -    auto session = GetSession(); -    auto result = session.ExecuteSchemeQuery(workloadGen->GetDDLQueries()).GetValueSync(); -    ThrowOnError(result); - -    auto queryInfoList = workloadGen->GetInitialData(); -    for (auto queryInfo : queryInfoList) { -        auto prepareResult = session.PrepareDataQuery(queryInfo.Query.c_str()).GetValueSync(); -        if (!prepareResult.IsSuccess()) { -            Cerr << "Prepare failed: " << prepareResult.GetIssues().ToString() << Endl -                << "Query:\n" << queryInfo.Query << Endl; -            return EXIT_FAILURE; -        } - -        auto dataQuery = prepareResult.GetQuery(); -        auto result = dataQuery.Execute(NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx(), -                                        std::move(queryInfo.Params)).GetValueSync(); -        if (!result.IsSuccess()) { -            Cerr << "Query execution failed: " << result.GetIssues().ToString() << Endl -                << "Query:\n" << queryInfo.Query << Endl; -            return EXIT_FAILURE; -        } -    } - -    return EXIT_SUCCESS; +    return InitTables(workloadGen);  }  TCommandStockClean::TCommandStockClean() @@ -114,19 +91,7 @@ int TCommandStockClean::Run(TConfig& config) {      NYdbWorkload::TWorkloadFactory factory;      auto workloadGen = factory.GetWorkloadQueryGenerator(NYdbWorkload::EWorkload::STOCK, ¶ms); -    auto session = GetSession(); - -    auto query = workloadGen->GetCleanDDLQueries(); -    TStatus result(EStatus::SUCCESS, NYql::TIssues()); -    result = session.ExecuteSchemeQuery(TString(query)).GetValueSync(); - -    if (!result.IsSuccess()) { -        Cerr << "Query execution failed: " << result.GetIssues().ToString() << Endl -            << "Query:\n" << query << Endl; -        return EXIT_FAILURE; -    } - -    return EXIT_SUCCESS; +    return CleanTables(workloadGen);  }  TCommandStockRun::TCommandStockRun() diff --git a/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp b/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp index 89c31c8406d..559fe8171ef 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp +++ b/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp @@ -83,7 +83,7 @@ void TWorkloadCommand::PrepareForRun(TConfig& config) {          .SetDatabase(config.Database)          .SetBalancingPolicy(EBalancingPolicy::UseAllNodes)          .SetCredentialsProviderFactory(config.CredentialsGetter(config)); -     +      if (config.EnableSsl) {          driverConfig.UseSecureConnection(config.CaCerts);      } @@ -214,4 +214,47 @@ void TWorkloadCommand::PrintWindowStats(int windowIt) {      }  } +int TWorkloadCommand::InitTables(std::shared_ptr<NYdbWorkload::IWorkloadQueryGenerator> workloadGen) { +    auto session = GetSession(); +    auto result = session.ExecuteSchemeQuery(workloadGen->GetDDLQueries()).GetValueSync(); +    ThrowOnError(result); + +    auto queryInfoList = workloadGen->GetInitialData(); +    for (auto queryInfo : queryInfoList) { +        auto prepareResult = session.PrepareDataQuery(queryInfo.Query.c_str()).GetValueSync(); +        if (!prepareResult.IsSuccess()) { +            Cerr << "Prepare failed: " << prepareResult.GetIssues().ToString() << Endl +                << "Query:\n" << queryInfo.Query << Endl; +            return EXIT_FAILURE; +        } + +        auto dataQuery = prepareResult.GetQuery(); +        auto result = dataQuery.Execute(NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx(), +                                        std::move(queryInfo.Params)).GetValueSync(); +        if (!result.IsSuccess()) { +            Cerr << "Query execution failed: " << result.GetIssues().ToString() << Endl +                << "Query:\n" << queryInfo.Query << Endl; +            return EXIT_FAILURE; +        } +    } + +    return EXIT_SUCCESS; +} + +int TWorkloadCommand::CleanTables(std::shared_ptr<NYdbWorkload::IWorkloadQueryGenerator> workloadGen) { +    auto session = GetSession(); + +    auto query = workloadGen->GetCleanDDLQueries(); +    TStatus result(EStatus::SUCCESS, NYql::TIssues()); +    result = session.ExecuteSchemeQuery(TString(query)).GetValueSync(); + +    if (!result.IsSuccess()) { +        Cerr << "Query execution failed: " << result.GetIssues().ToString() << Endl +            << "Query:\n" << query << Endl; +        return EXIT_FAILURE; +    } + +    return EXIT_SUCCESS; +} +  } // namespace NYdb::NConsoleClient diff --git a/ydb/public/lib/ydb_cli/commands/ydb_workload.h b/ydb/public/lib/ydb_cli/commands/ydb_workload.h index 07221c56a63..6ea3abce25d 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_workload.h +++ b/ydb/public/lib/ydb_cli/commands/ydb_workload.h @@ -66,6 +66,12 @@ protected:      std::atomic_uint64_t WindowRetryCount;      std::atomic_uint64_t TotalErrors;      std::atomic_uint64_t WindowErrors; + +protected: +    int InitTables(std::shared_ptr<NYdbWorkload::IWorkloadQueryGenerator> workloadGen); + +    int CleanTables(std::shared_ptr<NYdbWorkload::IWorkloadQueryGenerator> workloadGen); +  };  } | 
