aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorNikita Vasilev <ns-vasilev@ydb.tech>2025-02-27 20:56:04 +0300
committerGitHub <noreply@github.com>2025-02-27 20:56:04 +0300
commite2e92e47ba28b4bb3ebb43d9c2cedc60ec47017f (patch)
tree041765d1738d8aa0e4ef01c6646cf9ca7b30a2df
parente0f640d679f4b722068265507a666576b913863b (diff)
downloadydb-e2e92e47ba28b4bb3ebb43d9c2cedc60ec47017f.tar.gz
Fix splitted commit message for EvWrite (#15062)
-rw-r--r--ydb/core/kqp/node_service/kqp_node_service.cpp1
-rw-r--r--ydb/core/kqp/runtime/kqp_write_actor.cpp2
-rw-r--r--ydb/core/kqp/runtime/kqp_write_actor_settings.h1
-rw-r--r--ydb/core/kqp/runtime/kqp_write_table.cpp17
-rw-r--r--ydb/core/kqp/runtime/kqp_write_table.h1
-rw-r--r--ydb/core/protos/table_service_config.proto1
6 files changed, 11 insertions, 12 deletions
diff --git a/ydb/core/kqp/node_service/kqp_node_service.cpp b/ydb/core/kqp/node_service/kqp_node_service.cpp
index eacc6bdb39..7774878700 100644
--- a/ydb/core/kqp/node_service/kqp_node_service.cpp
+++ b/ydb/core/kqp/node_service/kqp_node_service.cpp
@@ -457,7 +457,6 @@ private:
ptr->InFlightMemoryLimitPerActorBytes = settings.GetInFlightMemoryLimitPerActorBytes();
ptr->MemoryLimitPerMessageBytes = settings.GetMemoryLimitPerMessageBytes();
- ptr->MaxBatchesPerMessage = settings.GetMaxBatchesPerMessage();
ptr->StartRetryDelay = TDuration::MilliSeconds(settings.GetStartRetryDelayMs());
ptr->MaxRetryDelay = TDuration::MilliSeconds(settings.GetMaxRetryDelayMs());
diff --git a/ydb/core/kqp/runtime/kqp_write_actor.cpp b/ydb/core/kqp/runtime/kqp_write_actor.cpp
index 333b65aef2..b73a01b4bf 100644
--- a/ydb/core/kqp/runtime/kqp_write_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_write_actor.cpp
@@ -239,12 +239,12 @@ public:
, Counters(counters)
, TableWriteActorSpan(TWilsonKqp::TableWriteActor, NWilson::TTraceId(traceId), "TKqpTableWriteActor")
{
+ AFL_ENSURE(MessageSettings.InFlightMemoryLimitPerActorBytes >= MessageSettings.MemoryLimitPerMessageBytes);
LogPrefix = TStringBuilder() << "Table: `" << TablePath << "` (" << TableId << "), " << "SessionActorId: " << sessionActorId;
ShardedWriteController = CreateShardedWriteController(
TShardedWriteControllerSettings {
.MemoryLimitTotal = MessageSettings.InFlightMemoryLimitPerActorBytes,
.MemoryLimitPerMessage = MessageSettings.MemoryLimitPerMessageBytes,
- .MaxBatchesPerMessage = MessageSettings.MaxBatchesPerMessage,
},
TypeEnv,
Alloc);
diff --git a/ydb/core/kqp/runtime/kqp_write_actor_settings.h b/ydb/core/kqp/runtime/kqp_write_actor_settings.h
index 37e8bfe910..9b02d8e246 100644
--- a/ydb/core/kqp/runtime/kqp_write_actor_settings.h
+++ b/ydb/core/kqp/runtime/kqp_write_actor_settings.h
@@ -10,7 +10,6 @@ namespace NKqp {
struct TWriteActorSettings : TAtomicRefCount<TWriteActorSettings> {
i64 InFlightMemoryLimitPerActorBytes = 64_MB;
i64 MemoryLimitPerMessageBytes = 64_MB;
- i64 MaxBatchesPerMessage = 1000;
i64 MaxForwardedSize = 64_MB;
TDuration StartRetryDelay = TDuration::Seconds(1);
diff --git a/ydb/core/kqp/runtime/kqp_write_table.cpp b/ydb/core/kqp/runtime/kqp_write_table.cpp
index a917a72c13..857b8d784d 100644
--- a/ydb/core/kqp/runtime/kqp_write_table.cpp
+++ b/ydb/core/kqp/runtime/kqp_write_table.cpp
@@ -927,20 +927,21 @@ public:
return IsClosed() && IsEmpty();
}
- void MakeNextBatches(i64 maxDataSize, ui64 maxCount) {
+ void MakeNextBatches(i64 maxDataSize, std::optional<ui64> maxCount) {
YQL_ENSURE(BatchesInFlight == 0);
YQL_ENSURE(!IsEmpty());
- YQL_ENSURE(maxCount != 0);
i64 dataSize = 0;
// For columnshard batch can be slightly larger than the limit.
- while (BatchesInFlight < maxCount
+ while ((!maxCount || BatchesInFlight < *maxCount)
&& BatchesInFlight < Batches.size()
&& (dataSize + GetBatch(BatchesInFlight).GetMemory() <= maxDataSize || BatchesInFlight == 0)) {
dataSize += GetBatch(BatchesInFlight).GetMemory();
++BatchesInFlight;
}
YQL_ENSURE(BatchesInFlight != 0);
- YQL_ENSURE(BatchesInFlight == Batches.size() || BatchesInFlight >= maxCount || dataSize + GetBatch(BatchesInFlight).GetMemory() > maxDataSize);
+ YQL_ENSURE(BatchesInFlight == Batches.size()
+ || (maxCount && BatchesInFlight >= *maxCount)
+ || dataSize + GetBatch(BatchesInFlight).GetMemory() > maxDataSize);
}
TBatchWithMetadata& GetBatch(size_t index) {
@@ -1451,9 +1452,11 @@ private:
void BuildBatchesForShard(TShardsInfo::TShardInfo& shard) {
if (shard.GetBatchesInFlight() == 0) {
YQL_ENSURE(IsOlap != std::nullopt);
- shard.MakeNextBatches(
- Settings.MemoryLimitPerMessage,
- (*IsOlap) ? 1 : Settings.MaxBatchesPerMessage);
+ if (*IsOlap) {
+ shard.MakeNextBatches(Settings.MemoryLimitPerMessage, 1);
+ } else {
+ shard.MakeNextBatches(Settings.MemoryLimitPerMessage, std::nullopt);
+ }
}
}
diff --git a/ydb/core/kqp/runtime/kqp_write_table.h b/ydb/core/kqp/runtime/kqp_write_table.h
index 83aa4e69f9..f947fc3ee1 100644
--- a/ydb/core/kqp/runtime/kqp_write_table.h
+++ b/ydb/core/kqp/runtime/kqp_write_table.h
@@ -116,7 +116,6 @@ using IShardedWriteControllerPtr = TIntrusivePtr<IShardedWriteController>;
struct TShardedWriteControllerSettings {
i64 MemoryLimitTotal;
i64 MemoryLimitPerMessage;
- i64 MaxBatchesPerMessage;
bool Inconsistent;
};
diff --git a/ydb/core/protos/table_service_config.proto b/ydb/core/protos/table_service_config.proto
index 6aab3ffd08..f8f2874ad0 100644
--- a/ydb/core/protos/table_service_config.proto
+++ b/ydb/core/protos/table_service_config.proto
@@ -330,7 +330,6 @@ message TTableServiceConfig {
message TWriteActorSettings {
optional uint64 InFlightMemoryLimitPerActorBytes = 1 [ default = 67108864 ];
optional uint64 MemoryLimitPerMessageBytes = 2 [ default = 67108864 ];
- optional uint64 MaxBatchesPerMessage = 3 [ default = 1000 ];
optional uint64 StartRetryDelayMs = 4 [ default = 1000 ];
optional uint64 MaxRetryDelayMs = 5 [ default = 10000 ];