diff options
author | ssmike <ssmike@ydb.tech> | 2023-07-24 16:08:37 +0300 |
---|---|---|
committer | ssmike <ssmike@ydb.tech> | 2023-07-24 16:08:37 +0300 |
commit | 0f1b1246a9da8633ac9f9090b5e6f7d5efd00951 (patch) | |
tree | cd2ab75daf4f42012149c086ff496a41616c2871 | |
parent | a373df167843da88b64bb8dfbf7c78b7b395847a (diff) | |
download | ydb-0f1b1246a9da8633ac9f9090b5e6f7d5efd00951.tar.gz |
Allow to configure default quota settings
-rw-r--r-- | ydb/core/kqp/node_service/kqp_node_service.cpp | 11 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_read_actor.cpp | 63 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_read_actor.h | 1 | ||||
-rw-r--r-- | ydb/core/protos/config.proto | 6 |
4 files changed, 61 insertions, 20 deletions
diff --git a/ydb/core/kqp/node_service/kqp_node_service.cpp b/ydb/core/kqp/node_service/kqp_node_service.cpp index 7206342966..2b2f290a96 100644 --- a/ydb/core/kqp/node_service/kqp_node_service.cpp +++ b/ydb/core/kqp/node_service/kqp_node_service.cpp @@ -146,6 +146,9 @@ public: if (config.HasIteratorReadsRetrySettings()) { SetIteratorReadsRetrySettings(config.GetIteratorReadsRetrySettings()); } + if (config.HasIteratorReadQuotaSettings()) { + SetIteratorReadsQuotaSettings(config.GetIteratorReadQuotaSettings()); + } } void Bootstrap() { @@ -593,10 +596,18 @@ private: SetIteratorReadsRetrySettings(event.GetConfig().GetTableServiceConfig().GetIteratorReadsRetrySettings()); } + if (event.GetConfig().GetTableServiceConfig().HasIteratorReadQuotaSettings()) { + SetIteratorReadsQuotaSettings(event.GetConfig().GetTableServiceConfig().GetIteratorReadQuotaSettings()); + } + auto responseEv = MakeHolder<NConsole::TEvConsole::TEvConfigNotificationResponse>(event); Send(ev->Sender, responseEv.Release(), IEventHandle::FlagTrackDelivery, ev->Cookie); } + void SetIteratorReadsQuotaSettings(const NKikimrConfig::TTableServiceConfig::TIteratorReadQuotaSettings& settings) { + SetDefaultIteratorQuotaSettings(settings.GetMaxRows(), settings.GetMaxBytes()); + } + void SetIteratorReadsRetrySettings(const NKikimrConfig::TTableServiceConfig::TIteratorReadsRetrySettings& settings) { auto ptr = MakeIntrusive<NKikimr::NKqp::TIteratorReadBackoffSettings>(); ptr->StartRetryDelay = TDuration::MilliSeconds(settings.GetStartDelayMs()); diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp index caaf1fc52b..fd12e5e80e 100644 --- a/ydb/core/kqp/runtime/kqp_read_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp @@ -27,35 +27,39 @@ bool IsDebugLogEnabled(const NActors::TActorSystem* actorSystem, NActors::NLog:: return settings && settings->Satisfies(NActors::NLog::EPriority::PRI_DEBUG, component); } -struct TDefaultRangeEvReadSettings { - NKikimrTxDataShard::TEvRead Data; +struct TEvReadSettings : public TAtomicRefCount<TEvReadSettings> { + NKikimrTxDataShard::TEvRead Read; + NKikimrTxDataShard::TEvReadAck Ack; - TDefaultRangeEvReadSettings() { - Data.SetMaxRows(32767); - Data.SetMaxBytes(5_MB); + TEvReadSettings() { + Read.SetMaxRows(32767); + Read.SetMaxBytes(5_MB); + + Ack.SetMaxRows(32767); + Ack.SetMaxBytes(5_MB); } +}; -} DefaultRangeEvReadSettings; +struct TEvReadDefaultSettings { + THotSwap<TEvReadSettings> Settings; + + TEvReadDefaultSettings() { + Settings.AtomicStore(MakeIntrusive<TEvReadSettings>()); + } + +} DefaultSettings; THolder<NKikimr::TEvDataShard::TEvRead> DefaultReadSettings() { auto result = MakeHolder<NKikimr::TEvDataShard::TEvRead>(); - result->Record.MergeFrom(DefaultRangeEvReadSettings.Data); + auto ptr = DefaultSettings.Settings.AtomicLoad(); + result->Record.MergeFrom(ptr->Read); return result; } -struct TDefaultRangeEvReadAckSettings { - NKikimrTxDataShard::TEvReadAck Data; - - TDefaultRangeEvReadAckSettings() { - Data.SetMaxRows(32767); - Data.SetMaxBytes(5_MB); - } - -} DefaultRangeEvReadAckSettings; - THolder<NKikimr::TEvDataShard::TEvReadAck> DefaultAckSettings() { auto result = MakeHolder<NKikimr::TEvDataShard::TEvReadAck>(); - result->Record.MergeFrom(DefaultRangeEvReadAckSettings.Data); + auto ptr = DefaultSettings.Settings.AtomicLoad(); + result->Record.MergeFrom(ptr->Ack); return result; } @@ -1442,11 +1446,30 @@ void RegisterKqpReadActor(NYql::NDq::TDqAsyncIoFactory& factory, TIntrusivePtr<T } void InjectRangeEvReadSettings(const NKikimrTxDataShard::TEvRead& read) { - ::DefaultRangeEvReadSettings.Data.MergeFrom(read); + auto ptr = ::DefaultSettings.Settings.AtomicLoad(); + TEvReadSettings settings = *ptr; + settings.Read.MergeFrom(read); + ::DefaultSettings.Settings.AtomicStore(MakeIntrusive<TEvReadSettings>(settings)); } void InjectRangeEvReadAckSettings(const NKikimrTxDataShard::TEvReadAck& ack) { - ::DefaultRangeEvReadAckSettings.Data.MergeFrom(ack); + auto ptr = ::DefaultSettings.Settings.AtomicLoad(); + TEvReadSettings settings = *ptr; + settings.Ack.MergeFrom(ack); + ::DefaultSettings.Settings.AtomicStore(MakeIntrusive<TEvReadSettings>(settings)); +} + +void SetDefaultIteratorQuotaSettings(ui32 rows, ui32 bytes) { + auto ptr = ::DefaultSettings.Settings.AtomicLoad(); + TEvReadSettings settings = *ptr; + + settings.Read.SetMaxRows(rows); + settings.Ack.SetMaxRows(rows); + + settings.Read.SetMaxBytes(bytes); + settings.Ack.SetMaxBytes(bytes); + + ::DefaultSettings.Settings.AtomicStore(MakeIntrusive<TEvReadSettings>(settings)); } void InterceptReadActorPipeCache(NActors::TActorId id) { diff --git a/ydb/core/kqp/runtime/kqp_read_actor.h b/ydb/core/kqp/runtime/kqp_read_actor.h index 12da2c844c..4849e99c58 100644 --- a/ydb/core/kqp/runtime/kqp_read_actor.h +++ b/ydb/core/kqp/runtime/kqp_read_actor.h @@ -41,6 +41,7 @@ struct TIteratorReadBackoffSettings : TAtomicRefCount<TIteratorReadBackoffSettin }; void SetReadIteratorBackoffSettings(TIntrusivePtr<TIteratorReadBackoffSettings>); +void SetDefaultIteratorQuotaSettings(ui32 rows, ui32 bytes); void RegisterKqpReadActor(NYql::NDq::TDqAsyncIoFactory&, TIntrusivePtr<TKqpCounters>); diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 29cc4977f6..b00ae5d0ce 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -1272,6 +1272,11 @@ message TTableServiceConfig { optional uint32 MaxTotalRetries = 7; } + message TIteratorReadQuotaSettings { + optional uint32 MaxRows = 1; + optional uint32 MaxBytes = 2; + } + optional uint32 QueryLimitBytes = 1; optional uint32 ParametersLimitBytes = 2; optional uint32 SessionsLimitPerNode = 3; @@ -1325,6 +1330,7 @@ message TTableServiceConfig { optional EBindingsMode BindingsMode = 40 [default = BM_ENABLED]; optional TIteratorReadsRetrySettings IteratorReadsRetrySettings = 41; optional bool PredicateExtract20 = 44 [default = false]; + optional TIteratorReadQuotaSettings IteratorReadQuotaSettings = 45; }; // Config describes immediate controls and allows |