diff options
author | alexnick <alexnick@ydb.tech> | 2022-09-02 20:07:56 +0300 |
---|---|---|
committer | alexnick <alexnick@ydb.tech> | 2022-09-02 20:07:56 +0300 |
commit | 1be29b7cc0683062a9229d6e6c17ef77512736fd (patch) | |
tree | 2b4615d57516c0646ff58102247c202ac9d6e2cc | |
parent | 6440a16858ae8cb24ba12c6bcfbb37d20ef0ff44 (diff) | |
download | ydb-1be29b7cc0683062a9229d6e6c17ef77512736fd.tar.gz |
option for choosing metering mode
-rw-r--r-- | ydb/core/grpc_services/service_datastreams.h | 1 | ||||
-rw-r--r-- | ydb/core/http_proxy/http_req.cpp | 1 | ||||
-rw-r--r-- | ydb/public/api/grpc/draft/ydb_datastreams_v1.proto | 2 | ||||
-rw-r--r-- | ydb/public/api/protos/draft/datastreams.proto | 41 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_datastreams/datastreams.cpp | 40 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_datastreams/datastreams.h | 14 | ||||
-rw-r--r-- | ydb/services/datastreams/datastreams_proxy.cpp | 136 | ||||
-rw-r--r-- | ydb/services/datastreams/datastreams_proxy.h | 1 | ||||
-rw-r--r-- | ydb/services/datastreams/datastreams_ut.cpp | 13 | ||||
-rw-r--r-- | ydb/services/datastreams/grpc_service.cpp | 1 |
10 files changed, 230 insertions, 20 deletions
diff --git a/ydb/core/grpc_services/service_datastreams.h b/ydb/core/grpc_services/service_datastreams.h index 7d8488c30e0..4f4c7233455 100644 --- a/ydb/core/grpc_services/service_datastreams.h +++ b/ydb/core/grpc_services/service_datastreams.h @@ -29,6 +29,7 @@ void DoDataStreamsDescribeStreamSummaryRequest(std::unique_ptr<IRequestOpCtx> p, void DoDataStreamsDecreaseStreamRetentionPeriodRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); void DoDataStreamsIncreaseStreamRetentionPeriodRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); void DoDataStreamsUpdateShardCountRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); +void DoDataStreamsUpdateStreamModeRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); void DoDataStreamsListStreamConsumersRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); void DoDataStreamsAddTagsToStreamRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); void DoDataStreamsDisableEnhancedMonitoringRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); diff --git a/ydb/core/http_proxy/http_req.cpp b/ydb/core/http_proxy/http_req.cpp index c41ecc96dca..9b07c0cbef0 100644 --- a/ydb/core/http_proxy/http_req.cpp +++ b/ydb/core/http_proxy/http_req.cpp @@ -563,6 +563,7 @@ namespace NKikimr::NHttpProxy { DECLARE_PROCESSOR(DecreaseStreamRetentionPeriod); DECLARE_PROCESSOR(IncreaseStreamRetentionPeriod); DECLARE_PROCESSOR(UpdateShardCount); + DECLARE_PROCESSOR(UpdateStreamMode); DECLARE_PROCESSOR(RegisterStreamConsumer); DECLARE_PROCESSOR(DeregisterStreamConsumer); DECLARE_PROCESSOR(DescribeStreamConsumer); diff --git a/ydb/public/api/grpc/draft/ydb_datastreams_v1.proto b/ydb/public/api/grpc/draft/ydb_datastreams_v1.proto index 069ee7ad4c9..c87857ea485 100644 --- a/ydb/public/api/grpc/draft/ydb_datastreams_v1.proto +++ b/ydb/public/api/grpc/draft/ydb_datastreams_v1.proto @@ -30,6 +30,8 @@ service DataStreamsService { rpc DecreaseStreamRetentionPeriod(DecreaseStreamRetentionPeriodRequest) returns (DecreaseStreamRetentionPeriodResponse); rpc IncreaseStreamRetentionPeriod(IncreaseStreamRetentionPeriodRequest) returns (IncreaseStreamRetentionPeriodResponse); rpc UpdateShardCount(UpdateShardCountRequest) returns (UpdateShardCountResponse); + rpc UpdateStreamMode(UpdateStreamModeRequest) returns (UpdateStreamModeResponse); + // stream consumer methods rpc RegisterStreamConsumer(RegisterStreamConsumerRequest) returns (RegisterStreamConsumerResponse); rpc DeregisterStreamConsumer(DeregisterStreamConsumerRequest) returns (DeregisterStreamConsumerResponse); diff --git a/ydb/public/api/protos/draft/datastreams.proto b/ydb/public/api/protos/draft/datastreams.proto index a0791736cae..ef7f7489945 100644 --- a/ydb/public/api/protos/draft/datastreams.proto +++ b/ydb/public/api/protos/draft/datastreams.proto @@ -16,6 +16,7 @@ enum EFieldTransformationType { TRANSFORM_EMPTY_TO_NOTHING = 3; } + extend google.protobuf.FieldOptions { EFieldTransformationType FieldTransformer = 58123; } @@ -28,6 +29,13 @@ enum EncryptionType { KMS = 2; } +enum StreamMode { + STREAM_MODE_UNDEFINED = 0; + PROVISIONED = 1; + ON_DEMAND = 2; +} + + message EnhancedMetrics { // List of shard-level metrics repeated string shard_level_metrics = 1; @@ -109,6 +117,9 @@ message StreamDescription { string owner = 12; // YDS-specific Storage limit in MB of the stream int64 storage_limit_mb = 13; + + // stream metering mode + StreamModeDetails stream_mode_details = 14; } // Represents range of possible sequence numbers for the shard @@ -257,6 +268,11 @@ message Tag { string value = 2; } +// Represents stream mode details +message StreamModeDetails { + StreamMode stream_mode = 1; +} + message CreateStreamRequest { Ydb.Operations.OperationParams operation_params = 1; // Name of the stream @@ -271,6 +287,8 @@ message CreateStreamRequest { // Retention storage in megabytes int32 retention_storage_megabytes = 6; } + // stream metering mode + StreamModeDetails stream_mode_details = 7; } message CreateStreamResponse { @@ -375,6 +393,9 @@ message UpdateStreamRequest { // Retention storage in megabytes int32 retention_storage_megabytes = 6; } + // stream metering mode + StreamModeDetails stream_mode_details = 7; + } message UpdateStreamResponse { @@ -868,3 +889,23 @@ message UpdateShardCountResult { // Updated number of shards int32 target_shard_count = 3; } + + +message UpdateStreamModeRequest { + Ydb.Operations.OperationParams operation_params = 1; + // Stream name or arn + string stream_arn = 2; + // stream metering mode + StreamModeDetails stream_mode_details = 3; + + +} + +message UpdateStreamModeResponse { + // Result of request will be inside operation. + Ydb.Operations.Operation operation = 1; +} + +message UpdateStreamModeResult { +} + diff --git a/ydb/public/sdk/cpp/client/ydb_datastreams/datastreams.cpp b/ydb/public/sdk/cpp/client/ydb_datastreams/datastreams.cpp index e84ec7ffd65..6604fbe83cf 100644 --- a/ydb/public/sdk/cpp/client/ydb_datastreams/datastreams.cpp +++ b/ydb/public/sdk/cpp/client/ydb_datastreams/datastreams.cpp @@ -84,6 +84,11 @@ namespace NYdb::NDataStreams::V1 { req.set_retention_period_hours(24); } req.set_write_quota_kb_per_sec(settings.WriteQuotaKbPerSec_); + if (settings.StreamMode_.Defined()) { + req.mutable_stream_mode_details()->set_stream_mode( + *settings.StreamMode_ == ESM_PROVISIONED ? Ydb::DataStreams::V1::StreamMode::PROVISIONED + : Ydb::DataStreams::V1::StreamMode::ON_DEMAND); + } }); } @@ -225,6 +230,20 @@ namespace NYdb::NDataStreams::V1 { }); } + TAsyncUpdateStreamModeResult UpdateStreamMode(const TString& path, TUpdateStreamModeSettings settings) { + return CallImpl<Ydb::DataStreams::V1::DataStreamsService, + Ydb::DataStreams::V1::UpdateStreamModeRequest, + Ydb::DataStreams::V1::UpdateStreamModeResponse, + Ydb::DataStreams::V1::UpdateStreamModeResult>(settings, &Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncUpdateStreamMode, + [&](Ydb::DataStreams::V1::UpdateStreamModeRequest& req) { + req.set_stream_arn(path); + + req.mutable_stream_mode_details()->set_stream_mode( + settings.StreamMode_ == ESM_PROVISIONED ? Ydb::DataStreams::V1::StreamMode::PROVISIONED + : Ydb::DataStreams::V1::StreamMode::ON_DEMAND); + }); + } + TAsyncRegisterStreamConsumerResult RegisterStreamConsumer(const TString& path, const TString& consumer_name, TRegisterStreamConsumerSettings settings) { return CallImpl<Ydb::DataStreams::V1::DataStreamsService, Ydb::DataStreams::V1::RegisterStreamConsumerRequest, @@ -349,6 +368,11 @@ namespace NYdb::NDataStreams::V1 { req.set_retention_storage_megabytes(*settings.RetentionStorageMegabytes_.Get()); } req.set_write_quota_kb_per_sec(settings.WriteQuotaKbPerSec_); + if (settings.StreamMode_.Defined()) { + req.mutable_stream_mode_details()->set_stream_mode( + *settings.StreamMode_ == ESM_PROVISIONED ? Ydb::DataStreams::V1::StreamMode::PROVISIONED + : Ydb::DataStreams::V1::StreamMode::ON_DEMAND); + } }); } @@ -469,6 +493,10 @@ namespace NYdb::NDataStreams::V1 { return Impl_->UpdateShardCount(path, settings); } + TAsyncUpdateStreamModeResult TDataStreamsClient::UpdateStreamMode(const TString& path, TUpdateStreamModeSettings settings) { + return Impl_->UpdateStreamMode(path, settings); + } + TAsyncRegisterStreamConsumerResult TDataStreamsClient::RegisterStreamConsumer(const TString& path, const TString& consumer_name, const TRegisterStreamConsumerSettings settings) { return Impl_->RegisterStreamConsumer(path, consumer_name, settings); } @@ -688,6 +716,18 @@ namespace NYdb::NDataStreams::V1 { TProtoRequestSettings settings ); + template NThreading::TFuture<TProtoResultWrapper<Ydb::DataStreams::V1::UpdateStreamModeResult>> TDataStreamsClient::DoProtoRequest + < + Ydb::DataStreams::V1::UpdateStreamModeRequest, + Ydb::DataStreams::V1::UpdateStreamModeResponse, + Ydb::DataStreams::V1::UpdateStreamModeResult, + decltype(&Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncUpdateStreamMode) + >( + const Ydb::DataStreams::V1::UpdateStreamModeRequest& request, + decltype(&Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncUpdateStreamMode) method, + TProtoRequestSettings settings + ); + template NThreading::TFuture<TProtoResultWrapper<Ydb::DataStreams::V1::RegisterStreamConsumerResult>> TDataStreamsClient::DoProtoRequest < Ydb::DataStreams::V1::RegisterStreamConsumerRequest, diff --git a/ydb/public/sdk/cpp/client/ydb_datastreams/datastreams.h b/ydb/public/sdk/cpp/client/ydb_datastreams/datastreams.h index 63d7fb29186..ef1f475d16a 100644 --- a/ydb/public/sdk/cpp/client/ydb_datastreams/datastreams.h +++ b/ydb/public/sdk/cpp/client/ydb_datastreams/datastreams.h @@ -28,6 +28,11 @@ namespace NYdb::NDataStreams::V1 { std::unique_ptr<TProtoResult> Result; }; + enum EStreamMode { + ESM_PROVISIONED = 1, + ESM_ON_DEMAND = 2, + }; + using TCreateStreamResult = TProtoResultWrapper<Ydb::DataStreams::V1::CreateStreamResult>; using TDeleteStreamResult = TProtoResultWrapper<Ydb::DataStreams::V1::DeleteStreamResult>; using TDescribeStreamResult = TProtoResultWrapper<Ydb::DataStreams::V1::DescribeStreamResult>; @@ -46,6 +51,7 @@ namespace NYdb::NDataStreams::V1 { using TDecreaseStreamRetentionPeriodResult = TProtoResultWrapper<Ydb::DataStreams::V1::DecreaseStreamRetentionPeriodResult>; using TIncreaseStreamRetentionPeriodResult = TProtoResultWrapper<Ydb::DataStreams::V1::IncreaseStreamRetentionPeriodResult>; using TUpdateShardCountResult = TProtoResultWrapper<Ydb::DataStreams::V1::UpdateShardCountResult>; + using TUpdateStreamModeResult = TProtoResultWrapper<Ydb::DataStreams::V1::UpdateStreamModeResult>; using TListStreamConsumersResult = TProtoResultWrapper<Ydb::DataStreams::V1::ListStreamConsumersResult>; using TAddTagsToStreamResult = TProtoResultWrapper<Ydb::DataStreams::V1::AddTagsToStreamResult>; using TDisableEnhancedMonitoringResult = TProtoResultWrapper<Ydb::DataStreams::V1::DisableEnhancedMonitoringResult>; @@ -76,6 +82,7 @@ namespace NYdb::NDataStreams::V1 { using TAsyncDecreaseStreamRetentionPeriodResult = NThreading::TFuture<TDecreaseStreamRetentionPeriodResult>; using TAsyncIncreaseStreamRetentionPeriodResult = NThreading::TFuture<TIncreaseStreamRetentionPeriodResult>; using TAsyncUpdateShardCountResult = NThreading::TFuture<TUpdateShardCountResult>; + using TAsyncUpdateStreamModeResult = NThreading::TFuture<TUpdateStreamModeResult>; using TAsyncListStreamConsumersResult = NThreading::TFuture<TListStreamConsumersResult>; using TAsyncAddTagsToStreamResult = NThreading::TFuture<TAddTagsToStreamResult>; using TAsyncDisableEnhancedMonitoringResult = NThreading::TFuture<TDisableEnhancedMonitoringResult>; @@ -99,6 +106,7 @@ namespace NYdb::NDataStreams::V1 { FLUENT_SETTING_OPTIONAL(ui32, RetentionPeriodHours); FLUENT_SETTING_OPTIONAL(ui32, RetentionStorageMegabytes); FLUENT_SETTING(ui64, WriteQuotaKbPerSec); + FLUENT_SETTING_OPTIONAL(EStreamMode, StreamMode); }; struct TListStreamsSettings : public NYdb::TOperationRequestSettings<TListStreamsSettings> { FLUENT_SETTING(ui32, Limit); @@ -137,11 +145,16 @@ namespace NYdb::NDataStreams::V1 { struct TUpdateShardCountSettings : public NYdb::TOperationRequestSettings<TUpdateShardCountSettings> { FLUENT_SETTING(ui32, TargetShardCount); }; + struct TUpdateStreamModeSettings : public NYdb::TOperationRequestSettings<TUpdateStreamModeSettings> { + FLUENT_SETTING_DEFAULT(EStreamMode, StreamMode, ESM_PROVISIONED); + }; struct TUpdateStreamSettings : public NYdb::TOperationRequestSettings<TUpdateStreamSettings> { FLUENT_SETTING(ui32, TargetShardCount); FLUENT_SETTING_OPTIONAL(ui32, RetentionPeriodHours); FLUENT_SETTING_OPTIONAL(ui32, RetentionStorageMegabytes); FLUENT_SETTING(ui64, WriteQuotaKbPerSec); + FLUENT_SETTING_OPTIONAL(EStreamMode, StreamMode); + }; struct TPutRecordSettings : public NYdb::TOperationRequestSettings<TPutRecordSettings> {}; struct TPutRecordsSettings : public NYdb::TOperationRequestSettings<TPutRecordsSettings> {}; @@ -185,6 +198,7 @@ namespace NYdb::NDataStreams::V1 { TAsyncDecreaseStreamRetentionPeriodResult DecreaseStreamRetentionPeriod(const TString& path, TDecreaseStreamRetentionPeriodSettings settings = TDecreaseStreamRetentionPeriodSettings()); TAsyncIncreaseStreamRetentionPeriodResult IncreaseStreamRetentionPeriod(const TString& path, TIncreaseStreamRetentionPeriodSettings settings = TIncreaseStreamRetentionPeriodSettings()); TAsyncUpdateShardCountResult UpdateShardCount(const TString& path, TUpdateShardCountSettings settings = TUpdateShardCountSettings()); + TAsyncUpdateStreamModeResult UpdateStreamMode(const TString& path, TUpdateStreamModeSettings settings = TUpdateStreamModeSettings()); TAsyncRegisterStreamConsumerResult RegisterStreamConsumer(const TString& path, const TString& consumer_name, TRegisterStreamConsumerSettings settings = TRegisterStreamConsumerSettings()); TAsyncDeregisterStreamConsumerResult DeregisterStreamConsumer(const TString& path, const TString& consumer_name, TDeregisterStreamConsumerSettings settings = TDeregisterStreamConsumerSettings()); TAsyncDescribeStreamConsumerResult DescribeStreamConsumer(TDescribeStreamConsumerSettings settings = TDescribeStreamConsumerSettings()); diff --git a/ydb/services/datastreams/datastreams_proxy.cpp b/ydb/services/datastreams/datastreams_proxy.cpp index 1e7be043d8a..dab04f1d85d 100644 --- a/ydb/services/datastreams/datastreams_proxy.cpp +++ b/ydb/services/datastreams/datastreams_proxy.cpp @@ -10,6 +10,7 @@ #include <ydb/core/persqueue/partition.h> #include <ydb/core/persqueue/write_meta.h> +#include <ydb/public/api/protos/ydb_topic.pb.h> #include <ydb/services/lib/actors/pq_schema_actor.h> #include <ydb/services/lib/sharding/sharding.h> #include <ydb/services/persqueue_v1/actors/persqueue_utils.h> @@ -27,7 +28,7 @@ using grpc::Status; namespace NKikimr::NDataStreams::V1 { const TString YDS_SERVICE_TYPE = "data-streams"; const i32 DEFAULT_STREAM_DAY_RETENTION = TDuration::Days(1).Hours(); - // const i32 DEFAULT_STREAM_WEEK_RETENTION = TDuration::Days(7).Hours(); + const i32 DEFAULT_STREAM_WEEK_RETENTION = TDuration::Days(7).Hours(); using namespace NGRpcService; using namespace NGRpcProxy::V1; @@ -101,28 +102,52 @@ namespace NKikimr::NDataStreams::V1 { { NKikimrSchemeOp::TModifyScheme& modifyScheme(*proposal.Record.MutableTransaction()->MutableModifyScheme()); - Ydb::PersQueue::V1::TopicSettings topicSettings; - topicSettings.set_partitions_count(GetProtoRequest()->shard_count()); + Ydb::Topic::CreateTopicRequest topicRequest; + topicRequest.mutable_partitioning_settings()->set_min_active_partitions(GetProtoRequest()->shard_count()); switch (GetProtoRequest()->retention_case()) { case Ydb::DataStreams::V1::CreateStreamRequest::RetentionCase::kRetentionPeriodHours: - topicSettings.set_retention_period_ms( - TDuration::Hours(GetProtoRequest()->retention_period_hours()).MilliSeconds()); + topicRequest.mutable_retention_period()->set_seconds( + TDuration::Hours(GetProtoRequest()->retention_period_hours()).Seconds()); break; case Ydb::DataStreams::V1::CreateStreamRequest::RetentionCase::kRetentionStorageMegabytes: - topicSettings.set_retention_storage_bytes( - GetProtoRequest()->retention_storage_megabytes() * 1_MB); + topicRequest.set_retention_storage_mb( + GetProtoRequest()->retention_storage_megabytes()); + topicRequest.mutable_retention_period()->set_seconds( + TDuration::Hours(DEFAULT_STREAM_WEEK_RETENTION).Seconds()); break; default: - topicSettings.set_retention_period_ms( - TDuration::Hours(DEFAULT_STREAM_DAY_RETENTION).MilliSeconds()); + topicRequest.mutable_retention_period()->set_seconds( + TDuration::Hours(DEFAULT_STREAM_DAY_RETENTION).Seconds()); } - topicSettings.set_supported_format(Ydb::PersQueue::V1::TopicSettings::FORMAT_BASE); - topicSettings.add_supported_codecs(Ydb::PersQueue::V1::CODEC_RAW); - topicSettings.set_max_partition_write_speed( + topicRequest.mutable_supported_codecs()->add_codecs(Ydb::Topic::CODEC_RAW); + topicRequest.set_partition_write_speed_bytes_per_second( PartitionWriteSpeedInBytesPerSec(GetProtoRequest()->write_quota_kb_per_sec())); - topicSettings.set_max_partition_write_burst( + topicRequest.set_partition_write_burst_bytes( PartitionWriteSpeedInBytesPerSec(GetProtoRequest()->write_quota_kb_per_sec())); + if (AppData(ctx)->PQConfig.GetBillingMeteringConfig().GetEnabled()) { + topicRequest.set_metering_mode(Ydb::Topic::METERING_MODE_RESERVED_CAPACITY); + + if (GetProtoRequest()->has_stream_mode_details()) { + switch(GetProtoRequest()->stream_mode_details().stream_mode()) { + case Ydb::DataStreams::V1::StreamMode::PROVISIONED: + topicRequest.set_metering_mode(Ydb::Topic::METERING_MODE_RESERVED_CAPACITY); + break; + case Ydb::DataStreams::V1::StreamMode::ON_DEMAND: + topicRequest.set_metering_mode(Ydb::Topic::METERING_MODE_REQUEST_UNITS); + break; + default: + return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::BAD_REQUEST, + "streams can't be created with unknown metering mode", ctx); + } + } + } else { + if (GetProtoRequest()->has_stream_mode_details()) { + return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::BAD_REQUEST, + "streams can't be created with metering mode", ctx); + } + } + if (workingDir != proposal.Record.GetDatabaseName() && !proposal.Record.GetDatabaseName().empty()) { return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::BAD_REQUEST, "streams can be created only at database root", ctx); @@ -138,13 +163,8 @@ namespace NKikimr::NDataStreams::V1 { modifyScheme.SetWorkingDir(workingDir); pqDescr->SetPartitionPerTablet(1); - // TODO: support StreamMode - if (AppData(ctx)->PQConfig.GetBillingMeteringConfig().GetEnabled()) { - pqDescr->MutablePQTabletConfig()->SetMeteringMode(NKikimrPQ::TPQTabletConfig::METERING_MODE_RESERVED_CAPACITY); - } - TString error; - auto status = NKikimr::NGRpcProxy::V1::FillProposeRequestImpl(name, topicSettings, modifyScheme, ctx, false, error, + auto status = NKikimr::NGRpcProxy::V1::FillProposeRequestImpl(name, topicRequest, modifyScheme, ctx, error, workingDir, proposal.Record.GetDatabaseName()); if (status != Ydb::StatusIds::SUCCESS) { @@ -278,6 +298,57 @@ namespace NKikimr::NDataStreams::V1 { //----------------------------------------------------------------------------------------------------------- + class TUpdateStreamModeActor : public TUpdateSchemeActor<TUpdateStreamModeActor, TEvDataStreamsUpdateStreamModeRequest> { + using TBase = TUpdateSchemeActor<TUpdateStreamModeActor, TEvDataStreamsUpdateStreamModeRequest>; + using TProtoRequest = typename TBase::TProtoRequest; + public: + TUpdateStreamModeActor(NKikimr::NGRpcService::IRequestOpCtx* request) + : TBase(request, GetRequest<TProtoRequest>(request)->stream_arn()) + { + } + + void Bootstrap(const TActorContext& ctx); + void ModifyPersqueueConfig(const TActorContext& ctx, + NKikimrSchemeOp::TPersQueueGroupDescription& groupConfig, + const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription, + const NKikimrSchemeOp::TDirEntry& selfInfo); + }; + + void TUpdateStreamModeActor::Bootstrap(const TActorContext& ctx) { + TBase::Bootstrap(ctx); + SendDescribeProposeRequest(ctx); + Become(&TBase::StateWork); + } + + void TUpdateStreamModeActor::ModifyPersqueueConfig( + const TActorContext& ctx, + NKikimrSchemeOp::TPersQueueGroupDescription& groupConfig, + const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription, + const NKikimrSchemeOp::TDirEntry& selfInfo + ) { + Y_UNUSED(selfInfo); + Y_UNUSED(pqGroupDescription); + if (!AppData(ctx)->PQConfig.GetBillingMeteringConfig().GetEnabled()) { + return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::BAD_REQUEST, + "streams can't be created with metering mode", ctx); + } + + switch(GetProtoRequest()->stream_mode_details().stream_mode()) { + case Ydb::DataStreams::V1::StreamMode::PROVISIONED: + groupConfig.MutablePQTabletConfig()->SetMeteringMode(NKikimrPQ::TPQTabletConfig::METERING_MODE_RESERVED_CAPACITY); + break; + case Ydb::DataStreams::V1::StreamMode::ON_DEMAND: + groupConfig.MutablePQTabletConfig()->SetMeteringMode(NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS); + break; + default: + return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::BAD_REQUEST, + "streams can't be created with unknown metering mode", ctx); + } + } + + + //----------------------------------------------------------------------------------------------------------- + class TUpdateStreamActor : public TUpdateSchemeActor<TUpdateStreamActor, TEvDataStreamsUpdateStreamRequest> { using TBase = TUpdateSchemeActor<TUpdateStreamActor, TEvDataStreamsUpdateStreamRequest>; using TProtoRequest = typename TBase::TProtoRequest; @@ -333,6 +404,25 @@ namespace NKikimr::NDataStreams::V1 { pqConfig->MutablePartitionConfig()->SetWriteSpeedInBytesPerSecond( PartitionWriteSpeedInBytesPerSec(GetProtoRequest()->write_quota_kb_per_sec())); + if (GetProtoRequest()->has_stream_mode_details()) { + if (!AppData(ctx)->PQConfig.GetBillingMeteringConfig().GetEnabled()) { + return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::BAD_REQUEST, + "streams can't be created with metering mode", ctx); + } + + switch(GetProtoRequest()->stream_mode_details().stream_mode()) { + case Ydb::DataStreams::V1::StreamMode::PROVISIONED: + groupConfig.MutablePQTabletConfig()->SetMeteringMode(NKikimrPQ::TPQTabletConfig::METERING_MODE_RESERVED_CAPACITY); + break; + case Ydb::DataStreams::V1::StreamMode::ON_DEMAND: + groupConfig.MutablePQTabletConfig()->SetMeteringMode(NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS); + break; + default: + return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::BAD_REQUEST, + "streams can't be created with unknown metering mode", ctx); + } + } + auto serviceTypes = GetSupportedClientServiceTypes(ctx); auto status = CheckConfig(*pqConfig, serviceTypes, error, ctx); if (status != Ydb::StatusIds::SUCCESS) { @@ -590,6 +680,13 @@ namespace NKikimr::NDataStreams::V1 { description.set_stream_status(Ydb::DataStreams::V1::StreamDescription::CREATING); } + if (AppData(ctx)->PQConfig.GetBillingMeteringConfig().GetEnabled()) { + description.mutable_stream_mode_details()->set_stream_mode( + pqConfig.GetMeteringMode() == NKikimrPQ::TPQTabletConfig::METERING_MODE_RESERVED_CAPACITY ? Ydb::DataStreams::V1::StreamMode::PROVISIONED + : Ydb::DataStreams::V1::StreamMode::ON_DEMAND + ); + } + bool startShardFound = GetProtoRequest()->exclusive_start_shard_id().empty(); description.set_has_more_shards(false); @@ -1812,6 +1909,7 @@ DECLARE_RPC_NI(SubscribeToShard); DECLARE_RPC_NI(DescribeLimits); DECLARE_RPC(DescribeStreamSummary); DECLARE_RPC(UpdateShardCount); +DECLARE_RPC(UpdateStreamMode); DECLARE_RPC(ListStreamConsumers); DECLARE_RPC_NI(AddTagsToStream); DECLARE_RPC_NI(DisableEnhancedMonitoring); diff --git a/ydb/services/datastreams/datastreams_proxy.h b/ydb/services/datastreams/datastreams_proxy.h index 72b9e1ae4b4..7a3791a4d0d 100644 --- a/ydb/services/datastreams/datastreams_proxy.h +++ b/ydb/services/datastreams/datastreams_proxy.h @@ -32,6 +32,7 @@ using TEvDataStreamsDescribeStreamSummaryRequest = TGrpcRequestOperationCall<Ydb using TEvDataStreamsDecreaseStreamRetentionPeriodRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::DecreaseStreamRetentionPeriodRequest, Ydb::DataStreams::V1::DecreaseStreamRetentionPeriodResponse>; using TEvDataStreamsIncreaseStreamRetentionPeriodRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::IncreaseStreamRetentionPeriodRequest, Ydb::DataStreams::V1::IncreaseStreamRetentionPeriodResponse>; using TEvDataStreamsUpdateShardCountRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::UpdateShardCountRequest, Ydb::DataStreams::V1::UpdateShardCountResponse>; +using TEvDataStreamsUpdateStreamModeRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::UpdateStreamModeRequest, Ydb::DataStreams::V1::UpdateStreamModeResponse>; using TEvDataStreamsUpdateStreamRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::UpdateStreamRequest, Ydb::DataStreams::V1::UpdateStreamResponse>; using TEvDataStreamsSetWriteQuotaRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::SetWriteQuotaRequest, Ydb::DataStreams::V1::SetWriteQuotaResponse>; using TEvDataStreamsListStreamConsumersRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::ListStreamConsumersRequest, Ydb::DataStreams::V1::ListStreamConsumersResponse>; diff --git a/ydb/services/datastreams/datastreams_ut.cpp b/ydb/services/datastreams/datastreams_ut.cpp index a5112838fae..31653ed3bb6 100644 --- a/ydb/services/datastreams/datastreams_ut.cpp +++ b/ydb/services/datastreams/datastreams_ut.cpp @@ -292,7 +292,14 @@ Y_UNIT_TEST_SUITE(DataStreams) { { auto result = testServer.DataStreamsClient->CreateStream(streamName, NYDS_V1::TCreateStreamSettings().ShardCount(10) - .RetentionPeriodHours(20)).ExtractValueSync(); + .RetentionPeriodHours(20).StreamMode(NYdb::NDataStreams::V1::ESM_ON_DEMAND)).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { + auto result = testServer.DataStreamsClient->UpdateStreamMode(streamName, + NYDS_V1::TUpdateStreamModeSettings().StreamMode(NYdb::NDataStreams::V1::ESM_PROVISIONED)).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); } @@ -810,6 +817,10 @@ Y_UNIT_TEST_SUITE(DataStreams) { UNIT_ASSERT_VALUES_EQUAL(result.GetResult().stream_description().retention_period_hours(), TDuration::Days(7).Hours()); UNIT_ASSERT_VALUES_EQUAL(result.GetResult().stream_description().storage_limit_mb(), 50_GB / 1_MB); + UNIT_ASSERT_VALUES_EQUAL(result.GetResult().stream_description().stream_mode_details().stream_mode(), + Ydb::DataStreams::V1::StreamMode::PROVISIONED); + + } } diff --git a/ydb/services/datastreams/grpc_service.cpp b/ydb/services/datastreams/grpc_service.cpp index 7df7b15f819..73d350834fb 100644 --- a/ydb/services/datastreams/grpc_service.cpp +++ b/ydb/services/datastreams/grpc_service.cpp @@ -72,6 +72,7 @@ void TGRpcDataStreamsService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) ADD_REQUEST(DecreaseStreamRetentionPeriod, DoDataStreamsDecreaseStreamRetentionPeriodRequest, nullptr) ADD_REQUEST(IncreaseStreamRetentionPeriod, DoDataStreamsIncreaseStreamRetentionPeriodRequest, nullptr) ADD_REQUEST(UpdateShardCount, DoDataStreamsUpdateShardCountRequest, nullptr) + ADD_REQUEST(UpdateStreamMode, DoDataStreamsUpdateStreamModeRequest, nullptr) ADD_REQUEST(RegisterStreamConsumer, DoDataStreamsRegisterStreamConsumerRequest, nullptr) ADD_REQUEST(DeregisterStreamConsumer, DoDataStreamsDeregisterStreamConsumerRequest, nullptr) ADD_REQUEST(DescribeStreamConsumer, DoDataStreamsDescribeStreamConsumerRequest, nullptr) |