diff options
author | Nikita Vasilev <ns-vasilev@ydb.tech> | 2025-02-27 20:56:04 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-02-27 20:56:04 +0300 |
commit | e2e92e47ba28b4bb3ebb43d9c2cedc60ec47017f (patch) | |
tree | 041765d1738d8aa0e4ef01c6646cf9ca7b30a2df | |
parent | e0f640d679f4b722068265507a666576b913863b (diff) | |
download | ydb-e2e92e47ba28b4bb3ebb43d9c2cedc60ec47017f.tar.gz |
Fix splitted commit message for EvWrite (#15062)
-rw-r--r-- | ydb/core/kqp/node_service/kqp_node_service.cpp | 1 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_write_actor.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_write_actor_settings.h | 1 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_write_table.cpp | 17 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_write_table.h | 1 | ||||
-rw-r--r-- | ydb/core/protos/table_service_config.proto | 1 |
6 files changed, 11 insertions, 12 deletions
diff --git a/ydb/core/kqp/node_service/kqp_node_service.cpp b/ydb/core/kqp/node_service/kqp_node_service.cpp index eacc6bdb39..7774878700 100644 --- a/ydb/core/kqp/node_service/kqp_node_service.cpp +++ b/ydb/core/kqp/node_service/kqp_node_service.cpp @@ -457,7 +457,6 @@ private: ptr->InFlightMemoryLimitPerActorBytes = settings.GetInFlightMemoryLimitPerActorBytes(); ptr->MemoryLimitPerMessageBytes = settings.GetMemoryLimitPerMessageBytes(); - ptr->MaxBatchesPerMessage = settings.GetMaxBatchesPerMessage(); ptr->StartRetryDelay = TDuration::MilliSeconds(settings.GetStartRetryDelayMs()); ptr->MaxRetryDelay = TDuration::MilliSeconds(settings.GetMaxRetryDelayMs()); diff --git a/ydb/core/kqp/runtime/kqp_write_actor.cpp b/ydb/core/kqp/runtime/kqp_write_actor.cpp index 333b65aef2..b73a01b4bf 100644 --- a/ydb/core/kqp/runtime/kqp_write_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_write_actor.cpp @@ -239,12 +239,12 @@ public: , Counters(counters) , TableWriteActorSpan(TWilsonKqp::TableWriteActor, NWilson::TTraceId(traceId), "TKqpTableWriteActor") { + AFL_ENSURE(MessageSettings.InFlightMemoryLimitPerActorBytes >= MessageSettings.MemoryLimitPerMessageBytes); LogPrefix = TStringBuilder() << "Table: `" << TablePath << "` (" << TableId << "), " << "SessionActorId: " << sessionActorId; ShardedWriteController = CreateShardedWriteController( TShardedWriteControllerSettings { .MemoryLimitTotal = MessageSettings.InFlightMemoryLimitPerActorBytes, .MemoryLimitPerMessage = MessageSettings.MemoryLimitPerMessageBytes, - .MaxBatchesPerMessage = MessageSettings.MaxBatchesPerMessage, }, TypeEnv, Alloc); diff --git a/ydb/core/kqp/runtime/kqp_write_actor_settings.h b/ydb/core/kqp/runtime/kqp_write_actor_settings.h index 37e8bfe910..9b02d8e246 100644 --- a/ydb/core/kqp/runtime/kqp_write_actor_settings.h +++ b/ydb/core/kqp/runtime/kqp_write_actor_settings.h @@ -10,7 +10,6 @@ namespace NKqp { struct TWriteActorSettings : TAtomicRefCount<TWriteActorSettings> { i64 InFlightMemoryLimitPerActorBytes = 64_MB; i64 MemoryLimitPerMessageBytes = 64_MB; - i64 MaxBatchesPerMessage = 1000; i64 MaxForwardedSize = 64_MB; TDuration StartRetryDelay = TDuration::Seconds(1); diff --git a/ydb/core/kqp/runtime/kqp_write_table.cpp b/ydb/core/kqp/runtime/kqp_write_table.cpp index a917a72c13..857b8d784d 100644 --- a/ydb/core/kqp/runtime/kqp_write_table.cpp +++ b/ydb/core/kqp/runtime/kqp_write_table.cpp @@ -927,20 +927,21 @@ public: return IsClosed() && IsEmpty(); } - void MakeNextBatches(i64 maxDataSize, ui64 maxCount) { + void MakeNextBatches(i64 maxDataSize, std::optional<ui64> maxCount) { YQL_ENSURE(BatchesInFlight == 0); YQL_ENSURE(!IsEmpty()); - YQL_ENSURE(maxCount != 0); i64 dataSize = 0; // For columnshard batch can be slightly larger than the limit. - while (BatchesInFlight < maxCount + while ((!maxCount || BatchesInFlight < *maxCount) && BatchesInFlight < Batches.size() && (dataSize + GetBatch(BatchesInFlight).GetMemory() <= maxDataSize || BatchesInFlight == 0)) { dataSize += GetBatch(BatchesInFlight).GetMemory(); ++BatchesInFlight; } YQL_ENSURE(BatchesInFlight != 0); - YQL_ENSURE(BatchesInFlight == Batches.size() || BatchesInFlight >= maxCount || dataSize + GetBatch(BatchesInFlight).GetMemory() > maxDataSize); + YQL_ENSURE(BatchesInFlight == Batches.size() + || (maxCount && BatchesInFlight >= *maxCount) + || dataSize + GetBatch(BatchesInFlight).GetMemory() > maxDataSize); } TBatchWithMetadata& GetBatch(size_t index) { @@ -1451,9 +1452,11 @@ private: void BuildBatchesForShard(TShardsInfo::TShardInfo& shard) { if (shard.GetBatchesInFlight() == 0) { YQL_ENSURE(IsOlap != std::nullopt); - shard.MakeNextBatches( - Settings.MemoryLimitPerMessage, - (*IsOlap) ? 1 : Settings.MaxBatchesPerMessage); + if (*IsOlap) { + shard.MakeNextBatches(Settings.MemoryLimitPerMessage, 1); + } else { + shard.MakeNextBatches(Settings.MemoryLimitPerMessage, std::nullopt); + } } } diff --git a/ydb/core/kqp/runtime/kqp_write_table.h b/ydb/core/kqp/runtime/kqp_write_table.h index 83aa4e69f9..f947fc3ee1 100644 --- a/ydb/core/kqp/runtime/kqp_write_table.h +++ b/ydb/core/kqp/runtime/kqp_write_table.h @@ -116,7 +116,6 @@ using IShardedWriteControllerPtr = TIntrusivePtr<IShardedWriteController>; struct TShardedWriteControllerSettings { i64 MemoryLimitTotal; i64 MemoryLimitPerMessage; - i64 MaxBatchesPerMessage; bool Inconsistent; }; diff --git a/ydb/core/protos/table_service_config.proto b/ydb/core/protos/table_service_config.proto index 6aab3ffd08..f8f2874ad0 100644 --- a/ydb/core/protos/table_service_config.proto +++ b/ydb/core/protos/table_service_config.proto @@ -330,7 +330,6 @@ message TTableServiceConfig { message TWriteActorSettings { optional uint64 InFlightMemoryLimitPerActorBytes = 1 [ default = 67108864 ]; optional uint64 MemoryLimitPerMessageBytes = 2 [ default = 67108864 ]; - optional uint64 MaxBatchesPerMessage = 3 [ default = 1000 ]; optional uint64 StartRetryDelayMs = 4 [ default = 1000 ]; optional uint64 MaxRetryDelayMs = 5 [ default = 10000 ]; |