aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexnick <alexnick@ydb.tech>2022-09-02 20:07:56 +0300
committeralexnick <alexnick@ydb.tech>2022-09-02 20:07:56 +0300
commit1be29b7cc0683062a9229d6e6c17ef77512736fd (patch)
tree2b4615d57516c0646ff58102247c202ac9d6e2cc
parent6440a16858ae8cb24ba12c6bcfbb37d20ef0ff44 (diff)
downloadydb-1be29b7cc0683062a9229d6e6c17ef77512736fd.tar.gz
option for choosing metering mode
-rw-r--r--ydb/core/grpc_services/service_datastreams.h1
-rw-r--r--ydb/core/http_proxy/http_req.cpp1
-rw-r--r--ydb/public/api/grpc/draft/ydb_datastreams_v1.proto2
-rw-r--r--ydb/public/api/protos/draft/datastreams.proto41
-rw-r--r--ydb/public/sdk/cpp/client/ydb_datastreams/datastreams.cpp40
-rw-r--r--ydb/public/sdk/cpp/client/ydb_datastreams/datastreams.h14
-rw-r--r--ydb/services/datastreams/datastreams_proxy.cpp136
-rw-r--r--ydb/services/datastreams/datastreams_proxy.h1
-rw-r--r--ydb/services/datastreams/datastreams_ut.cpp13
-rw-r--r--ydb/services/datastreams/grpc_service.cpp1
10 files changed, 230 insertions, 20 deletions
diff --git a/ydb/core/grpc_services/service_datastreams.h b/ydb/core/grpc_services/service_datastreams.h
index 7d8488c30e0..4f4c7233455 100644
--- a/ydb/core/grpc_services/service_datastreams.h
+++ b/ydb/core/grpc_services/service_datastreams.h
@@ -29,6 +29,7 @@ void DoDataStreamsDescribeStreamSummaryRequest(std::unique_ptr<IRequestOpCtx> p,
void DoDataStreamsDecreaseStreamRetentionPeriodRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
void DoDataStreamsIncreaseStreamRetentionPeriodRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
void DoDataStreamsUpdateShardCountRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+void DoDataStreamsUpdateStreamModeRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
void DoDataStreamsListStreamConsumersRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
void DoDataStreamsAddTagsToStreamRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
void DoDataStreamsDisableEnhancedMonitoringRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
diff --git a/ydb/core/http_proxy/http_req.cpp b/ydb/core/http_proxy/http_req.cpp
index c41ecc96dca..9b07c0cbef0 100644
--- a/ydb/core/http_proxy/http_req.cpp
+++ b/ydb/core/http_proxy/http_req.cpp
@@ -563,6 +563,7 @@ namespace NKikimr::NHttpProxy {
DECLARE_PROCESSOR(DecreaseStreamRetentionPeriod);
DECLARE_PROCESSOR(IncreaseStreamRetentionPeriod);
DECLARE_PROCESSOR(UpdateShardCount);
+ DECLARE_PROCESSOR(UpdateStreamMode);
DECLARE_PROCESSOR(RegisterStreamConsumer);
DECLARE_PROCESSOR(DeregisterStreamConsumer);
DECLARE_PROCESSOR(DescribeStreamConsumer);
diff --git a/ydb/public/api/grpc/draft/ydb_datastreams_v1.proto b/ydb/public/api/grpc/draft/ydb_datastreams_v1.proto
index 069ee7ad4c9..c87857ea485 100644
--- a/ydb/public/api/grpc/draft/ydb_datastreams_v1.proto
+++ b/ydb/public/api/grpc/draft/ydb_datastreams_v1.proto
@@ -30,6 +30,8 @@ service DataStreamsService {
rpc DecreaseStreamRetentionPeriod(DecreaseStreamRetentionPeriodRequest) returns (DecreaseStreamRetentionPeriodResponse);
rpc IncreaseStreamRetentionPeriod(IncreaseStreamRetentionPeriodRequest) returns (IncreaseStreamRetentionPeriodResponse);
rpc UpdateShardCount(UpdateShardCountRequest) returns (UpdateShardCountResponse);
+ rpc UpdateStreamMode(UpdateStreamModeRequest) returns (UpdateStreamModeResponse);
+
// stream consumer methods
rpc RegisterStreamConsumer(RegisterStreamConsumerRequest) returns (RegisterStreamConsumerResponse);
rpc DeregisterStreamConsumer(DeregisterStreamConsumerRequest) returns (DeregisterStreamConsumerResponse);
diff --git a/ydb/public/api/protos/draft/datastreams.proto b/ydb/public/api/protos/draft/datastreams.proto
index a0791736cae..ef7f7489945 100644
--- a/ydb/public/api/protos/draft/datastreams.proto
+++ b/ydb/public/api/protos/draft/datastreams.proto
@@ -16,6 +16,7 @@ enum EFieldTransformationType {
TRANSFORM_EMPTY_TO_NOTHING = 3;
}
+
extend google.protobuf.FieldOptions {
EFieldTransformationType FieldTransformer = 58123;
}
@@ -28,6 +29,13 @@ enum EncryptionType {
KMS = 2;
}
+enum StreamMode {
+ STREAM_MODE_UNDEFINED = 0;
+ PROVISIONED = 1;
+ ON_DEMAND = 2;
+}
+
+
message EnhancedMetrics {
// List of shard-level metrics
repeated string shard_level_metrics = 1;
@@ -109,6 +117,9 @@ message StreamDescription {
string owner = 12;
// YDS-specific Storage limit in MB of the stream
int64 storage_limit_mb = 13;
+
+ // stream metering mode
+ StreamModeDetails stream_mode_details = 14;
}
// Represents range of possible sequence numbers for the shard
@@ -257,6 +268,11 @@ message Tag {
string value = 2;
}
+// Represents stream mode details
+message StreamModeDetails {
+ StreamMode stream_mode = 1;
+}
+
message CreateStreamRequest {
Ydb.Operations.OperationParams operation_params = 1;
// Name of the stream
@@ -271,6 +287,8 @@ message CreateStreamRequest {
// Retention storage in megabytes
int32 retention_storage_megabytes = 6;
}
+ // stream metering mode
+ StreamModeDetails stream_mode_details = 7;
}
message CreateStreamResponse {
@@ -375,6 +393,9 @@ message UpdateStreamRequest {
// Retention storage in megabytes
int32 retention_storage_megabytes = 6;
}
+ // stream metering mode
+ StreamModeDetails stream_mode_details = 7;
+
}
message UpdateStreamResponse {
@@ -868,3 +889,23 @@ message UpdateShardCountResult {
// Updated number of shards
int32 target_shard_count = 3;
}
+
+
+message UpdateStreamModeRequest {
+ Ydb.Operations.OperationParams operation_params = 1;
+ // Stream name or arn
+ string stream_arn = 2;
+ // stream metering mode
+ StreamModeDetails stream_mode_details = 3;
+
+
+}
+
+message UpdateStreamModeResponse {
+ // Result of request will be inside operation.
+ Ydb.Operations.Operation operation = 1;
+}
+
+message UpdateStreamModeResult {
+}
+
diff --git a/ydb/public/sdk/cpp/client/ydb_datastreams/datastreams.cpp b/ydb/public/sdk/cpp/client/ydb_datastreams/datastreams.cpp
index e84ec7ffd65..6604fbe83cf 100644
--- a/ydb/public/sdk/cpp/client/ydb_datastreams/datastreams.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_datastreams/datastreams.cpp
@@ -84,6 +84,11 @@ namespace NYdb::NDataStreams::V1 {
req.set_retention_period_hours(24);
}
req.set_write_quota_kb_per_sec(settings.WriteQuotaKbPerSec_);
+ if (settings.StreamMode_.Defined()) {
+ req.mutable_stream_mode_details()->set_stream_mode(
+ *settings.StreamMode_ == ESM_PROVISIONED ? Ydb::DataStreams::V1::StreamMode::PROVISIONED
+ : Ydb::DataStreams::V1::StreamMode::ON_DEMAND);
+ }
});
}
@@ -225,6 +230,20 @@ namespace NYdb::NDataStreams::V1 {
});
}
+ TAsyncUpdateStreamModeResult UpdateStreamMode(const TString& path, TUpdateStreamModeSettings settings) {
+ return CallImpl<Ydb::DataStreams::V1::DataStreamsService,
+ Ydb::DataStreams::V1::UpdateStreamModeRequest,
+ Ydb::DataStreams::V1::UpdateStreamModeResponse,
+ Ydb::DataStreams::V1::UpdateStreamModeResult>(settings, &Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncUpdateStreamMode,
+ [&](Ydb::DataStreams::V1::UpdateStreamModeRequest& req) {
+ req.set_stream_arn(path);
+
+ req.mutable_stream_mode_details()->set_stream_mode(
+ settings.StreamMode_ == ESM_PROVISIONED ? Ydb::DataStreams::V1::StreamMode::PROVISIONED
+ : Ydb::DataStreams::V1::StreamMode::ON_DEMAND);
+ });
+ }
+
TAsyncRegisterStreamConsumerResult RegisterStreamConsumer(const TString& path, const TString& consumer_name, TRegisterStreamConsumerSettings settings) {
return CallImpl<Ydb::DataStreams::V1::DataStreamsService,
Ydb::DataStreams::V1::RegisterStreamConsumerRequest,
@@ -349,6 +368,11 @@ namespace NYdb::NDataStreams::V1 {
req.set_retention_storage_megabytes(*settings.RetentionStorageMegabytes_.Get());
}
req.set_write_quota_kb_per_sec(settings.WriteQuotaKbPerSec_);
+ if (settings.StreamMode_.Defined()) {
+ req.mutable_stream_mode_details()->set_stream_mode(
+ *settings.StreamMode_ == ESM_PROVISIONED ? Ydb::DataStreams::V1::StreamMode::PROVISIONED
+ : Ydb::DataStreams::V1::StreamMode::ON_DEMAND);
+ }
});
}
@@ -469,6 +493,10 @@ namespace NYdb::NDataStreams::V1 {
return Impl_->UpdateShardCount(path, settings);
}
+ TAsyncUpdateStreamModeResult TDataStreamsClient::UpdateStreamMode(const TString& path, TUpdateStreamModeSettings settings) {
+ return Impl_->UpdateStreamMode(path, settings);
+ }
+
TAsyncRegisterStreamConsumerResult TDataStreamsClient::RegisterStreamConsumer(const TString& path, const TString& consumer_name, const TRegisterStreamConsumerSettings settings) {
return Impl_->RegisterStreamConsumer(path, consumer_name, settings);
}
@@ -688,6 +716,18 @@ namespace NYdb::NDataStreams::V1 {
TProtoRequestSettings settings
);
+ template NThreading::TFuture<TProtoResultWrapper<Ydb::DataStreams::V1::UpdateStreamModeResult>> TDataStreamsClient::DoProtoRequest
+ <
+ Ydb::DataStreams::V1::UpdateStreamModeRequest,
+ Ydb::DataStreams::V1::UpdateStreamModeResponse,
+ Ydb::DataStreams::V1::UpdateStreamModeResult,
+ decltype(&Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncUpdateStreamMode)
+ >(
+ const Ydb::DataStreams::V1::UpdateStreamModeRequest& request,
+ decltype(&Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncUpdateStreamMode) method,
+ TProtoRequestSettings settings
+ );
+
template NThreading::TFuture<TProtoResultWrapper<Ydb::DataStreams::V1::RegisterStreamConsumerResult>> TDataStreamsClient::DoProtoRequest
<
Ydb::DataStreams::V1::RegisterStreamConsumerRequest,
diff --git a/ydb/public/sdk/cpp/client/ydb_datastreams/datastreams.h b/ydb/public/sdk/cpp/client/ydb_datastreams/datastreams.h
index 63d7fb29186..ef1f475d16a 100644
--- a/ydb/public/sdk/cpp/client/ydb_datastreams/datastreams.h
+++ b/ydb/public/sdk/cpp/client/ydb_datastreams/datastreams.h
@@ -28,6 +28,11 @@ namespace NYdb::NDataStreams::V1 {
std::unique_ptr<TProtoResult> Result;
};
+ enum EStreamMode {
+ ESM_PROVISIONED = 1,
+ ESM_ON_DEMAND = 2,
+ };
+
using TCreateStreamResult = TProtoResultWrapper<Ydb::DataStreams::V1::CreateStreamResult>;
using TDeleteStreamResult = TProtoResultWrapper<Ydb::DataStreams::V1::DeleteStreamResult>;
using TDescribeStreamResult = TProtoResultWrapper<Ydb::DataStreams::V1::DescribeStreamResult>;
@@ -46,6 +51,7 @@ namespace NYdb::NDataStreams::V1 {
using TDecreaseStreamRetentionPeriodResult = TProtoResultWrapper<Ydb::DataStreams::V1::DecreaseStreamRetentionPeriodResult>;
using TIncreaseStreamRetentionPeriodResult = TProtoResultWrapper<Ydb::DataStreams::V1::IncreaseStreamRetentionPeriodResult>;
using TUpdateShardCountResult = TProtoResultWrapper<Ydb::DataStreams::V1::UpdateShardCountResult>;
+ using TUpdateStreamModeResult = TProtoResultWrapper<Ydb::DataStreams::V1::UpdateStreamModeResult>;
using TListStreamConsumersResult = TProtoResultWrapper<Ydb::DataStreams::V1::ListStreamConsumersResult>;
using TAddTagsToStreamResult = TProtoResultWrapper<Ydb::DataStreams::V1::AddTagsToStreamResult>;
using TDisableEnhancedMonitoringResult = TProtoResultWrapper<Ydb::DataStreams::V1::DisableEnhancedMonitoringResult>;
@@ -76,6 +82,7 @@ namespace NYdb::NDataStreams::V1 {
using TAsyncDecreaseStreamRetentionPeriodResult = NThreading::TFuture<TDecreaseStreamRetentionPeriodResult>;
using TAsyncIncreaseStreamRetentionPeriodResult = NThreading::TFuture<TIncreaseStreamRetentionPeriodResult>;
using TAsyncUpdateShardCountResult = NThreading::TFuture<TUpdateShardCountResult>;
+ using TAsyncUpdateStreamModeResult = NThreading::TFuture<TUpdateStreamModeResult>;
using TAsyncListStreamConsumersResult = NThreading::TFuture<TListStreamConsumersResult>;
using TAsyncAddTagsToStreamResult = NThreading::TFuture<TAddTagsToStreamResult>;
using TAsyncDisableEnhancedMonitoringResult = NThreading::TFuture<TDisableEnhancedMonitoringResult>;
@@ -99,6 +106,7 @@ namespace NYdb::NDataStreams::V1 {
FLUENT_SETTING_OPTIONAL(ui32, RetentionPeriodHours);
FLUENT_SETTING_OPTIONAL(ui32, RetentionStorageMegabytes);
FLUENT_SETTING(ui64, WriteQuotaKbPerSec);
+ FLUENT_SETTING_OPTIONAL(EStreamMode, StreamMode);
};
struct TListStreamsSettings : public NYdb::TOperationRequestSettings<TListStreamsSettings> {
FLUENT_SETTING(ui32, Limit);
@@ -137,11 +145,16 @@ namespace NYdb::NDataStreams::V1 {
struct TUpdateShardCountSettings : public NYdb::TOperationRequestSettings<TUpdateShardCountSettings> {
FLUENT_SETTING(ui32, TargetShardCount);
};
+ struct TUpdateStreamModeSettings : public NYdb::TOperationRequestSettings<TUpdateStreamModeSettings> {
+ FLUENT_SETTING_DEFAULT(EStreamMode, StreamMode, ESM_PROVISIONED);
+ };
struct TUpdateStreamSettings : public NYdb::TOperationRequestSettings<TUpdateStreamSettings> {
FLUENT_SETTING(ui32, TargetShardCount);
FLUENT_SETTING_OPTIONAL(ui32, RetentionPeriodHours);
FLUENT_SETTING_OPTIONAL(ui32, RetentionStorageMegabytes);
FLUENT_SETTING(ui64, WriteQuotaKbPerSec);
+ FLUENT_SETTING_OPTIONAL(EStreamMode, StreamMode);
+
};
struct TPutRecordSettings : public NYdb::TOperationRequestSettings<TPutRecordSettings> {};
struct TPutRecordsSettings : public NYdb::TOperationRequestSettings<TPutRecordsSettings> {};
@@ -185,6 +198,7 @@ namespace NYdb::NDataStreams::V1 {
TAsyncDecreaseStreamRetentionPeriodResult DecreaseStreamRetentionPeriod(const TString& path, TDecreaseStreamRetentionPeriodSettings settings = TDecreaseStreamRetentionPeriodSettings());
TAsyncIncreaseStreamRetentionPeriodResult IncreaseStreamRetentionPeriod(const TString& path, TIncreaseStreamRetentionPeriodSettings settings = TIncreaseStreamRetentionPeriodSettings());
TAsyncUpdateShardCountResult UpdateShardCount(const TString& path, TUpdateShardCountSettings settings = TUpdateShardCountSettings());
+ TAsyncUpdateStreamModeResult UpdateStreamMode(const TString& path, TUpdateStreamModeSettings settings = TUpdateStreamModeSettings());
TAsyncRegisterStreamConsumerResult RegisterStreamConsumer(const TString& path, const TString& consumer_name, TRegisterStreamConsumerSettings settings = TRegisterStreamConsumerSettings());
TAsyncDeregisterStreamConsumerResult DeregisterStreamConsumer(const TString& path, const TString& consumer_name, TDeregisterStreamConsumerSettings settings = TDeregisterStreamConsumerSettings());
TAsyncDescribeStreamConsumerResult DescribeStreamConsumer(TDescribeStreamConsumerSettings settings = TDescribeStreamConsumerSettings());
diff --git a/ydb/services/datastreams/datastreams_proxy.cpp b/ydb/services/datastreams/datastreams_proxy.cpp
index 1e7be043d8a..dab04f1d85d 100644
--- a/ydb/services/datastreams/datastreams_proxy.cpp
+++ b/ydb/services/datastreams/datastreams_proxy.cpp
@@ -10,6 +10,7 @@
#include <ydb/core/persqueue/partition.h>
#include <ydb/core/persqueue/write_meta.h>
+#include <ydb/public/api/protos/ydb_topic.pb.h>
#include <ydb/services/lib/actors/pq_schema_actor.h>
#include <ydb/services/lib/sharding/sharding.h>
#include <ydb/services/persqueue_v1/actors/persqueue_utils.h>
@@ -27,7 +28,7 @@ using grpc::Status;
namespace NKikimr::NDataStreams::V1 {
const TString YDS_SERVICE_TYPE = "data-streams";
const i32 DEFAULT_STREAM_DAY_RETENTION = TDuration::Days(1).Hours();
- // const i32 DEFAULT_STREAM_WEEK_RETENTION = TDuration::Days(7).Hours();
+ const i32 DEFAULT_STREAM_WEEK_RETENTION = TDuration::Days(7).Hours();
using namespace NGRpcService;
using namespace NGRpcProxy::V1;
@@ -101,28 +102,52 @@ namespace NKikimr::NDataStreams::V1 {
{
NKikimrSchemeOp::TModifyScheme& modifyScheme(*proposal.Record.MutableTransaction()->MutableModifyScheme());
- Ydb::PersQueue::V1::TopicSettings topicSettings;
- topicSettings.set_partitions_count(GetProtoRequest()->shard_count());
+ Ydb::Topic::CreateTopicRequest topicRequest;
+ topicRequest.mutable_partitioning_settings()->set_min_active_partitions(GetProtoRequest()->shard_count());
switch (GetProtoRequest()->retention_case()) {
case Ydb::DataStreams::V1::CreateStreamRequest::RetentionCase::kRetentionPeriodHours:
- topicSettings.set_retention_period_ms(
- TDuration::Hours(GetProtoRequest()->retention_period_hours()).MilliSeconds());
+ topicRequest.mutable_retention_period()->set_seconds(
+ TDuration::Hours(GetProtoRequest()->retention_period_hours()).Seconds());
break;
case Ydb::DataStreams::V1::CreateStreamRequest::RetentionCase::kRetentionStorageMegabytes:
- topicSettings.set_retention_storage_bytes(
- GetProtoRequest()->retention_storage_megabytes() * 1_MB);
+ topicRequest.set_retention_storage_mb(
+ GetProtoRequest()->retention_storage_megabytes());
+ topicRequest.mutable_retention_period()->set_seconds(
+ TDuration::Hours(DEFAULT_STREAM_WEEK_RETENTION).Seconds());
break;
default:
- topicSettings.set_retention_period_ms(
- TDuration::Hours(DEFAULT_STREAM_DAY_RETENTION).MilliSeconds());
+ topicRequest.mutable_retention_period()->set_seconds(
+ TDuration::Hours(DEFAULT_STREAM_DAY_RETENTION).Seconds());
}
- topicSettings.set_supported_format(Ydb::PersQueue::V1::TopicSettings::FORMAT_BASE);
- topicSettings.add_supported_codecs(Ydb::PersQueue::V1::CODEC_RAW);
- topicSettings.set_max_partition_write_speed(
+ topicRequest.mutable_supported_codecs()->add_codecs(Ydb::Topic::CODEC_RAW);
+ topicRequest.set_partition_write_speed_bytes_per_second(
PartitionWriteSpeedInBytesPerSec(GetProtoRequest()->write_quota_kb_per_sec()));
- topicSettings.set_max_partition_write_burst(
+ topicRequest.set_partition_write_burst_bytes(
PartitionWriteSpeedInBytesPerSec(GetProtoRequest()->write_quota_kb_per_sec()));
+ if (AppData(ctx)->PQConfig.GetBillingMeteringConfig().GetEnabled()) {
+ topicRequest.set_metering_mode(Ydb::Topic::METERING_MODE_RESERVED_CAPACITY);
+
+ if (GetProtoRequest()->has_stream_mode_details()) {
+ switch(GetProtoRequest()->stream_mode_details().stream_mode()) {
+ case Ydb::DataStreams::V1::StreamMode::PROVISIONED:
+ topicRequest.set_metering_mode(Ydb::Topic::METERING_MODE_RESERVED_CAPACITY);
+ break;
+ case Ydb::DataStreams::V1::StreamMode::ON_DEMAND:
+ topicRequest.set_metering_mode(Ydb::Topic::METERING_MODE_REQUEST_UNITS);
+ break;
+ default:
+ return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::BAD_REQUEST,
+ "streams can't be created with unknown metering mode", ctx);
+ }
+ }
+ } else {
+ if (GetProtoRequest()->has_stream_mode_details()) {
+ return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::BAD_REQUEST,
+ "streams can't be created with metering mode", ctx);
+ }
+ }
+
if (workingDir != proposal.Record.GetDatabaseName() && !proposal.Record.GetDatabaseName().empty()) {
return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::BAD_REQUEST,
"streams can be created only at database root", ctx);
@@ -138,13 +163,8 @@ namespace NKikimr::NDataStreams::V1 {
modifyScheme.SetWorkingDir(workingDir);
pqDescr->SetPartitionPerTablet(1);
- // TODO: support StreamMode
- if (AppData(ctx)->PQConfig.GetBillingMeteringConfig().GetEnabled()) {
- pqDescr->MutablePQTabletConfig()->SetMeteringMode(NKikimrPQ::TPQTabletConfig::METERING_MODE_RESERVED_CAPACITY);
- }
-
TString error;
- auto status = NKikimr::NGRpcProxy::V1::FillProposeRequestImpl(name, topicSettings, modifyScheme, ctx, false, error,
+ auto status = NKikimr::NGRpcProxy::V1::FillProposeRequestImpl(name, topicRequest, modifyScheme, ctx, error,
workingDir, proposal.Record.GetDatabaseName());
if (status != Ydb::StatusIds::SUCCESS) {
@@ -278,6 +298,57 @@ namespace NKikimr::NDataStreams::V1 {
//-----------------------------------------------------------------------------------------------------------
+ class TUpdateStreamModeActor : public TUpdateSchemeActor<TUpdateStreamModeActor, TEvDataStreamsUpdateStreamModeRequest> {
+ using TBase = TUpdateSchemeActor<TUpdateStreamModeActor, TEvDataStreamsUpdateStreamModeRequest>;
+ using TProtoRequest = typename TBase::TProtoRequest;
+ public:
+ TUpdateStreamModeActor(NKikimr::NGRpcService::IRequestOpCtx* request)
+ : TBase(request, GetRequest<TProtoRequest>(request)->stream_arn())
+ {
+ }
+
+ void Bootstrap(const TActorContext& ctx);
+ void ModifyPersqueueConfig(const TActorContext& ctx,
+ NKikimrSchemeOp::TPersQueueGroupDescription& groupConfig,
+ const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription,
+ const NKikimrSchemeOp::TDirEntry& selfInfo);
+ };
+
+ void TUpdateStreamModeActor::Bootstrap(const TActorContext& ctx) {
+ TBase::Bootstrap(ctx);
+ SendDescribeProposeRequest(ctx);
+ Become(&TBase::StateWork);
+ }
+
+ void TUpdateStreamModeActor::ModifyPersqueueConfig(
+ const TActorContext& ctx,
+ NKikimrSchemeOp::TPersQueueGroupDescription& groupConfig,
+ const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription,
+ const NKikimrSchemeOp::TDirEntry& selfInfo
+ ) {
+ Y_UNUSED(selfInfo);
+ Y_UNUSED(pqGroupDescription);
+ if (!AppData(ctx)->PQConfig.GetBillingMeteringConfig().GetEnabled()) {
+ return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::BAD_REQUEST,
+ "streams can't be created with metering mode", ctx);
+ }
+
+ switch(GetProtoRequest()->stream_mode_details().stream_mode()) {
+ case Ydb::DataStreams::V1::StreamMode::PROVISIONED:
+ groupConfig.MutablePQTabletConfig()->SetMeteringMode(NKikimrPQ::TPQTabletConfig::METERING_MODE_RESERVED_CAPACITY);
+ break;
+ case Ydb::DataStreams::V1::StreamMode::ON_DEMAND:
+ groupConfig.MutablePQTabletConfig()->SetMeteringMode(NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS);
+ break;
+ default:
+ return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::BAD_REQUEST,
+ "streams can't be created with unknown metering mode", ctx);
+ }
+ }
+
+
+ //-----------------------------------------------------------------------------------------------------------
+
class TUpdateStreamActor : public TUpdateSchemeActor<TUpdateStreamActor, TEvDataStreamsUpdateStreamRequest> {
using TBase = TUpdateSchemeActor<TUpdateStreamActor, TEvDataStreamsUpdateStreamRequest>;
using TProtoRequest = typename TBase::TProtoRequest;
@@ -333,6 +404,25 @@ namespace NKikimr::NDataStreams::V1 {
pqConfig->MutablePartitionConfig()->SetWriteSpeedInBytesPerSecond(
PartitionWriteSpeedInBytesPerSec(GetProtoRequest()->write_quota_kb_per_sec()));
+ if (GetProtoRequest()->has_stream_mode_details()) {
+ if (!AppData(ctx)->PQConfig.GetBillingMeteringConfig().GetEnabled()) {
+ return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::BAD_REQUEST,
+ "streams can't be created with metering mode", ctx);
+ }
+
+ switch(GetProtoRequest()->stream_mode_details().stream_mode()) {
+ case Ydb::DataStreams::V1::StreamMode::PROVISIONED:
+ groupConfig.MutablePQTabletConfig()->SetMeteringMode(NKikimrPQ::TPQTabletConfig::METERING_MODE_RESERVED_CAPACITY);
+ break;
+ case Ydb::DataStreams::V1::StreamMode::ON_DEMAND:
+ groupConfig.MutablePQTabletConfig()->SetMeteringMode(NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS);
+ break;
+ default:
+ return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::BAD_REQUEST,
+ "streams can't be created with unknown metering mode", ctx);
+ }
+ }
+
auto serviceTypes = GetSupportedClientServiceTypes(ctx);
auto status = CheckConfig(*pqConfig, serviceTypes, error, ctx);
if (status != Ydb::StatusIds::SUCCESS) {
@@ -590,6 +680,13 @@ namespace NKikimr::NDataStreams::V1 {
description.set_stream_status(Ydb::DataStreams::V1::StreamDescription::CREATING);
}
+ if (AppData(ctx)->PQConfig.GetBillingMeteringConfig().GetEnabled()) {
+ description.mutable_stream_mode_details()->set_stream_mode(
+ pqConfig.GetMeteringMode() == NKikimrPQ::TPQTabletConfig::METERING_MODE_RESERVED_CAPACITY ? Ydb::DataStreams::V1::StreamMode::PROVISIONED
+ : Ydb::DataStreams::V1::StreamMode::ON_DEMAND
+ );
+ }
+
bool startShardFound = GetProtoRequest()->exclusive_start_shard_id().empty();
description.set_has_more_shards(false);
@@ -1812,6 +1909,7 @@ DECLARE_RPC_NI(SubscribeToShard);
DECLARE_RPC_NI(DescribeLimits);
DECLARE_RPC(DescribeStreamSummary);
DECLARE_RPC(UpdateShardCount);
+DECLARE_RPC(UpdateStreamMode);
DECLARE_RPC(ListStreamConsumers);
DECLARE_RPC_NI(AddTagsToStream);
DECLARE_RPC_NI(DisableEnhancedMonitoring);
diff --git a/ydb/services/datastreams/datastreams_proxy.h b/ydb/services/datastreams/datastreams_proxy.h
index 72b9e1ae4b4..7a3791a4d0d 100644
--- a/ydb/services/datastreams/datastreams_proxy.h
+++ b/ydb/services/datastreams/datastreams_proxy.h
@@ -32,6 +32,7 @@ using TEvDataStreamsDescribeStreamSummaryRequest = TGrpcRequestOperationCall<Ydb
using TEvDataStreamsDecreaseStreamRetentionPeriodRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::DecreaseStreamRetentionPeriodRequest, Ydb::DataStreams::V1::DecreaseStreamRetentionPeriodResponse>;
using TEvDataStreamsIncreaseStreamRetentionPeriodRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::IncreaseStreamRetentionPeriodRequest, Ydb::DataStreams::V1::IncreaseStreamRetentionPeriodResponse>;
using TEvDataStreamsUpdateShardCountRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::UpdateShardCountRequest, Ydb::DataStreams::V1::UpdateShardCountResponse>;
+using TEvDataStreamsUpdateStreamModeRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::UpdateStreamModeRequest, Ydb::DataStreams::V1::UpdateStreamModeResponse>;
using TEvDataStreamsUpdateStreamRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::UpdateStreamRequest, Ydb::DataStreams::V1::UpdateStreamResponse>;
using TEvDataStreamsSetWriteQuotaRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::SetWriteQuotaRequest, Ydb::DataStreams::V1::SetWriteQuotaResponse>;
using TEvDataStreamsListStreamConsumersRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::ListStreamConsumersRequest, Ydb::DataStreams::V1::ListStreamConsumersResponse>;
diff --git a/ydb/services/datastreams/datastreams_ut.cpp b/ydb/services/datastreams/datastreams_ut.cpp
index a5112838fae..31653ed3bb6 100644
--- a/ydb/services/datastreams/datastreams_ut.cpp
+++ b/ydb/services/datastreams/datastreams_ut.cpp
@@ -292,7 +292,14 @@ Y_UNIT_TEST_SUITE(DataStreams) {
{
auto result = testServer.DataStreamsClient->CreateStream(streamName,
NYDS_V1::TCreateStreamSettings().ShardCount(10)
- .RetentionPeriodHours(20)).ExtractValueSync();
+ .RetentionPeriodHours(20).StreamMode(NYdb::NDataStreams::V1::ESM_ON_DEMAND)).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ {
+ auto result = testServer.DataStreamsClient->UpdateStreamMode(streamName,
+ NYDS_V1::TUpdateStreamModeSettings().StreamMode(NYdb::NDataStreams::V1::ESM_PROVISIONED)).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}
@@ -810,6 +817,10 @@ Y_UNIT_TEST_SUITE(DataStreams) {
UNIT_ASSERT_VALUES_EQUAL(result.GetResult().stream_description().retention_period_hours(),
TDuration::Days(7).Hours());
UNIT_ASSERT_VALUES_EQUAL(result.GetResult().stream_description().storage_limit_mb(), 50_GB / 1_MB);
+ UNIT_ASSERT_VALUES_EQUAL(result.GetResult().stream_description().stream_mode_details().stream_mode(),
+ Ydb::DataStreams::V1::StreamMode::PROVISIONED);
+
+
}
}
diff --git a/ydb/services/datastreams/grpc_service.cpp b/ydb/services/datastreams/grpc_service.cpp
index 7df7b15f819..73d350834fb 100644
--- a/ydb/services/datastreams/grpc_service.cpp
+++ b/ydb/services/datastreams/grpc_service.cpp
@@ -72,6 +72,7 @@ void TGRpcDataStreamsService::SetupIncomingRequests(NGrpc::TLoggerPtr logger)
ADD_REQUEST(DecreaseStreamRetentionPeriod, DoDataStreamsDecreaseStreamRetentionPeriodRequest, nullptr)
ADD_REQUEST(IncreaseStreamRetentionPeriod, DoDataStreamsIncreaseStreamRetentionPeriodRequest, nullptr)
ADD_REQUEST(UpdateShardCount, DoDataStreamsUpdateShardCountRequest, nullptr)
+ ADD_REQUEST(UpdateStreamMode, DoDataStreamsUpdateStreamModeRequest, nullptr)
ADD_REQUEST(RegisterStreamConsumer, DoDataStreamsRegisterStreamConsumerRequest, nullptr)
ADD_REQUEST(DeregisterStreamConsumer, DoDataStreamsDeregisterStreamConsumerRequest, nullptr)
ADD_REQUEST(DescribeStreamConsumer, DoDataStreamsDescribeStreamConsumerRequest, nullptr)