aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorNikolay Shestakov <tesseract@ydb.tech>2025-06-02 12:36:29 +0500
committerGitHub <noreply@github.com>2025-06-02 10:36:29 +0300
commitd70a7cc0fc49da471a08b7ea2fc8c3c12f76041c (patch)
treed15c0c6dadf0579585467e89ec23f838d0c9b87d
parent5d9c33f56d1f684ac079aaf428dbb79f01e95690 (diff)
downloadydb-d70a7cc0fc49da471a08b7ea2fc8c3c12f76041c.tar.gz
Supported topic autopartitioning for the transfer (#19043)
-rw-r--r--ydb/core/persqueue/pq_impl.cpp2
-rw-r--r--ydb/core/persqueue/read_balancer.cpp4
-rw-r--r--ydb/core/transfer/transfer_writer.cpp4
-rw-r--r--ydb/core/transfer/ut/common/utils.h60
-rw-r--r--ydb/core/transfer/ut/large/transfer_ut.cpp142
-rw-r--r--ydb/core/transfer/ut/large/ya.make2
-rw-r--r--ydb/core/tx/replication/service/topic_reader.cpp27
-rw-r--r--ydb/core/tx/replication/ydb_proxy/partition_end_watcher.h6
-rw-r--r--ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp14
-rw-r--r--ydb/services/persqueue_v1/actors/schema_actors.cpp9
10 files changed, 194 insertions, 76 deletions
diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp
index 2e7fa26ac2e..d611675c097 100644
--- a/ydb/core/persqueue/pq_impl.cpp
+++ b/ydb/core/persqueue/pq_impl.cpp
@@ -1802,8 +1802,6 @@ void TPersQueue::Handle(TEvPersQueue::TEvStatus::TPtr& ev, const TActorContext&
{
PQ_LOG_T("Handle TEvPersQueue::TEvStatus");
- ReadBalancerActorId = ev->Sender;
-
if (!ConfigInited || !AllOriginalPartitionsInited()) {
PQ_LOG_D("Postpone the request." <<
" ConfigInited " << static_cast<int>(ConfigInited) <<
diff --git a/ydb/core/persqueue/read_balancer.cpp b/ydb/core/persqueue/read_balancer.cpp
index e7d0011f8de..2bd3c5c6950 100644
--- a/ydb/core/persqueue/read_balancer.cpp
+++ b/ydb/core/persqueue/read_balancer.cpp
@@ -534,6 +534,8 @@ void TPersQueueReadBalancer::RequestTabletIfNeeded(const ui64 tabletId, const TA
} else {
TActorId pipeClient = GetPipeClient(tabletId, ctx);
+ NTabletPipe::SendData(ctx, pipeClient, new TEvPQ::TEvSubDomainStatus(SubDomainOutOfSpace));
+
auto it = AggregatedStats.Cookies.find(tabletId);
if (!pipeReconnected || it != AggregatedStats.Cookies.end()) {
ui64 cookie;
@@ -548,8 +550,6 @@ void TPersQueueReadBalancer::RequestTabletIfNeeded(const ui64 tabletId, const TA
TStringBuilder() << "Send TEvPersQueue::TEvStatus TabletId: " << tabletId << " Cookie: " << cookie);
NTabletPipe::SendData(ctx, pipeClient, new TEvPersQueue::TEvStatus("", true), cookie);
}
-
- NTabletPipe::SendData(ctx, pipeClient, new TEvPQ::TEvSubDomainStatus(SubDomainOutOfSpace));
}
}
diff --git a/ydb/core/transfer/transfer_writer.cpp b/ydb/core/transfer/transfer_writer.cpp
index 272418c6e42..6cea16fbb4f 100644
--- a/ydb/core/transfer/transfer_writer.cpp
+++ b/ydb/core/transfer/transfer_writer.cpp
@@ -818,6 +818,10 @@ private:
Send(Worker, new TEvWorker::TEvPoll());
}
+ if (LastWriteTime) {
+ LastWriteTime = TInstant::Now();
+ }
+
return StartWork();
}
diff --git a/ydb/core/transfer/ut/common/utils.h b/ydb/core/transfer/ut/common/utils.h
index b705a324416..7a603228b0e 100644
--- a/ydb/core/transfer/ut/common/utils.h
+++ b/ydb/core/transfer/ut/common/utils.h
@@ -168,7 +168,7 @@ struct MainTestCase {
if (user) {
config.SetAuthToken(TStringBuilder() << user.value() << "@builtin");
}
- // config.SetLog(std::unique_ptr<TLogBackend>(CreateLogBackend("cerr", ELogPriority::TLOG_INFO).Release()))
+ // config.SetLog(std::unique_ptr<TLogBackend>(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG).Release()));
return config;
}
@@ -184,7 +184,6 @@ struct MainTestCase {
, TransferName(TStringBuilder() << "Transfer_" << Id)
, Driver(CreateDriverConfig(ConnectionString, user))
, TableClient(Driver)
- , Session(TableClient.GetSession().GetValueSync().GetSession())
, TopicClient(Driver)
{
}
@@ -193,9 +192,13 @@ struct MainTestCase {
Driver.Stop(true);
}
+ auto Session() {
+ return TableClient.GetSession().GetValueSync().GetSession();
+ }
+
void ExecuteDDL(const std::string& ddl, bool checkResult = true, const std::optional<std::string> expectedMessage = std::nullopt) {
Cerr << "DDL: " << ddl << Endl << Flush;
- auto res = Session.ExecuteQuery(ddl, TTxControl::NoTx()).GetValueSync();
+ auto res = Session().ExecuteQuery(ddl, TTxControl::NoTx()).GetValueSync();
if (checkResult) {
if (expectedMessage) {
UNIT_ASSERT(!res.IsSuccess());
@@ -211,7 +214,7 @@ struct MainTestCase {
auto ExecuteQuery(const std::string& query, bool retry = true) {
for (size_t i = 10; i--;) {
Cerr << ">>>>> Query: " << query << Endl << Flush;
- auto res = Session.ExecuteQuery(query, TTxControl::NoTx()).GetValueSync();
+ auto res = Session().ExecuteQuery(query, TTxControl::NoTx()).GetValueSync();
if (!res.IsSuccess()) {
Cerr << ">>>>> Query error: " << res.GetIssues().ToString() << Endl << Flush;
}
@@ -282,13 +285,41 @@ struct MainTestCase {
)", SourceTableName.data(), ChangefeedName.data()));
}
+ struct CreatTopicSettings {
+ size_t MinPartitionCount = 1;
+ size_t MaxPartitionCount = 100;
+ bool AutoPartitioningEnabled = false;
+ };
+
void CreateTopic(size_t partitionCount = 10) {
- ExecuteDDL(Sprintf(R"(
- CREATE TOPIC `%s`
- WITH (
- min_active_partitions = %d
- );
- )", TopicName.data(), partitionCount));
+ CreateTopic({
+ .MinPartitionCount = partitionCount
+ });
+ }
+
+ void CreateTopic(const CreatTopicSettings& settings) {
+ if (settings.AutoPartitioningEnabled) {
+ ExecuteDDL(Sprintf(R"(
+ CREATE TOPIC `%s`
+ WITH (
+ MIN_ACTIVE_PARTITIONS = %d,
+ MAX_ACTIVE_PARTITIONS = %d,
+ AUTO_PARTITIONING_STRATEGY = 'UP',
+ auto_partitioning_down_utilization_percent = 1,
+ auto_partitioning_up_utilization_percent=2,
+ auto_partitioning_stabilization_window = Interval('PT1S'),
+ partition_write_speed_bytes_per_second = 3
+ );
+ )", TopicName.data(), settings.MinPartitionCount, settings.MaxPartitionCount));
+ } else {
+ ExecuteDDL(Sprintf(R"(
+ CREATE TOPIC `%s`
+ WITH (
+ MIN_ACTIVE_PARTITIONS = %d
+
+ );
+ )", TopicName.data(), settings.MinPartitionCount));
+ }
}
void DropTopic() {
@@ -427,7 +458,7 @@ struct MainTestCase {
setOptions = TStringBuilder() << "SET (" << sb << " )";
}
- auto res = Session.ExecuteQuery(Sprintf(R"(
+ auto res = Session().ExecuteQuery(Sprintf(R"(
%s;
ALTER TRANSFER `%s`
@@ -474,7 +505,7 @@ struct MainTestCase {
settings.IncludeStats(true);
auto c = TopicClient.DescribeConsumer(TopicName, consumerName, settings).GetValueSync();
- UNIT_ASSERT(c.IsSuccess());
+ UNIT_ASSERT_C(c.IsSuccess(), c.GetIssues().ToOneLineString());
return c;
}
@@ -554,7 +585,9 @@ struct MainTestCase {
settings.IncludeLocation(true);
settings.IncludeStats(true);
- return TopicClient.DescribeTopic(TopicName, settings).ExtractValueSync();
+ auto result = TopicClient.DescribeTopic(TopicName, settings).ExtractValueSync();
+ UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToOneLineString());
+ return result;
}
void CreateUser(const std::string& username) {
@@ -704,7 +737,6 @@ struct MainTestCase {
TDriver Driver;
TQueryClient TableClient;
- TSession Session;
TTopicClient TopicClient;
};
diff --git a/ydb/core/transfer/ut/large/transfer_ut.cpp b/ydb/core/transfer/ut/large/transfer_ut.cpp
index 03b23d65624..1068c64ad30 100644
--- a/ydb/core/transfer/ut/large/transfer_ut.cpp
+++ b/ydb/core/transfer/ut/large/transfer_ut.cpp
@@ -1,5 +1,6 @@
#include <thread>
+#include <util/generic/guid.h>
#include <ydb/core/transfer/ut/common/utils.h>
using namespace NReplicationTest;
@@ -7,14 +8,13 @@ using namespace NReplicationTest;
Y_UNIT_TEST_SUITE(TransferLarge)
{
- auto CreateWriter(MainTestCase& setup, const size_t partitionId) {
- Cerr << "CREATE PARTITION WRITER " << partitionId << Endl << Flush;
+ auto CreateWriter(MainTestCase& setup, const size_t writerId) {
+ Cerr << "CREATE PARTITION WRITER " << writerId << Endl << Flush;
- TString producerId = TStringBuilder() << "producer-" << partitionId;
+ TString producerId = TStringBuilder() << "writer-" << writerId << "-" << CreateGuidAsString();
TWriteSessionSettings writeSettings;
writeSettings.Path(setup.TopicName);
- writeSettings.PartitionId(partitionId);
writeSettings.DeduplicationEnabled(true);
writeSettings.ProducerId(producerId);
writeSettings.MessageGroupId(producerId);
@@ -23,10 +23,10 @@ Y_UNIT_TEST_SUITE(TransferLarge)
return client.CreateSimpleBlockingWriteSession(writeSettings);
}
- void Write(MainTestCase& setup, const size_t partitionId, const size_t messageCount, const size_t messageSize) {
- auto writer = CreateWriter(setup, partitionId);
+ void Write(MainTestCase& setup, const size_t writerId, const size_t messageCount, const size_t messageSize) {
+ auto writer = CreateWriter(setup, writerId);
- Cerr << "PARTITION " << partitionId << " START WRITE " << messageCount << " MESSAGES" << Endl << Flush;
+ Cerr << "PARTITION " << writerId << " START WRITE " << messageCount << " MESSAGES" << Endl << Flush;
TString msg(messageSize, '*');
for (size_t i = 0; i < messageCount;) {
@@ -40,30 +40,42 @@ Y_UNIT_TEST_SUITE(TransferLarge)
}
}
- writer->Close(TDuration::Minutes(1));
- Cerr << "PARTITION " << partitionId << " ALL MESSAGES HAVE BEEN WRITEN" << Endl << Flush;
+ //writer->Close(TDuration::Minutes(1));
+ writer->Close();
+ Cerr << "PARTITION " << writerId << " ALL MESSAGES HAVE BEEN WRITTEN" << Endl << Flush;
}
- void CheckAllMessagesHaveBeenWritten(MainTestCase& setup, const size_t expected) {
- // Check all messages have been writen
- auto d = setup.DescribeTopic();
- for (auto& p : d.GetTopicDescription().GetPartitions()) {
- UNIT_ASSERT_VALUES_EQUAL_C(expected, p.GetPartitionStats()->GetEndOffset(), "Partition " << p.GetPartitionId());
- }
- }
-
- void WaitAllMessagesHaveBeenCommitted(MainTestCase& setup, const size_t expected, const TDuration timeout = TDuration::Seconds(10)) {
+ void WaitAllMessagesHaveBeenCommitted(MainTestCase& setup, size_t expected, const TDuration timeout = TDuration::Seconds(10)) {
TInstant endTime = TInstant::Now() + timeout;
+
bool allPartitionsHaveBeenCommitted = false;
while(TInstant::Now() < endTime) {
+ std::map<size_t, size_t> offsets;
+ {
+ auto d = setup.DescribeTopic();
+ for (auto& p : d.GetTopicDescription().GetPartitions()) {
+ offsets[p.GetPartitionId()] = p.GetPartitionStats()->GetEndOffset();
+ }
+ }
+
+ size_t messages = 0;
+ for (auto& [partitionId, count] : offsets) {
+ Cerr << "PARTITION " << partitionId << " END OFFSET " << count << Endl << Flush;
+ messages += count;
+ }
+
+ Cerr << "ALL MESSAGES " << messages << " EXPECTED " << expected << Endl << Flush;
+ UNIT_ASSERT_VALUES_EQUAL(expected, messages);
+
auto d = setup.DescribeConsumer();
auto& p = d.GetConsumerDescription().GetPartitions();
allPartitionsHaveBeenCommitted = AllOf(p.begin(), p.end(), [&](auto& x) {
- Cerr << "WAIT COMMITTED expected=" << expected
+ Cerr << "WAIT COMMITTED partition=" << x.GetPartitionId()
+ << " expected=" << offsets[x.GetPartitionId()]
<< " read=" << x.GetPartitionConsumerStats()->GetLastReadOffset()
<< " committed=" << x.GetPartitionConsumerStats()->GetCommittedOffset() << Endl << Flush;
- return x.GetPartitionConsumerStats()->GetCommittedOffset() == expected;
+ return x.GetPartitionConsumerStats()->GetCommittedOffset() == offsets[x.GetPartitionId()];
});
if (allPartitionsHaveBeenCommitted) {
@@ -76,7 +88,13 @@ Y_UNIT_TEST_SUITE(TransferLarge)
UNIT_ASSERT_C(allPartitionsHaveBeenCommitted, "Partitions haven`t been commited to end");
}
- void CheckSourceTableIsValid(MainTestCase& setup, const size_t partitionCount, size_t expectedOffset) {
+ void CheckSourceTableIsValid(MainTestCase& setup) {
+ std::map<size_t, size_t> offsets;
+ auto d = setup.DescribeTopic();
+ for (auto& p : d.GetTopicDescription().GetPartitions()) {
+ offsets[p.GetPartitionId()] = p.GetPartitionStats()->GetEndOffset();
+ }
+
auto r = setup.ExecuteQuery(Sprintf(R"(
SELECT a.Partition, a.Offset, b.Offset
FROM %s AS a
@@ -89,20 +107,17 @@ Y_UNIT_TEST_SUITE(TransferLarge)
)", setup.TableName.data(), setup.TableName.data()));
const auto proto = NYdb::TProtoAccessor::GetProto(r.GetResultSet(0));
- UNIT_ASSERT_VALUES_EQUAL(partitionCount, proto.rows_size());
for (size_t i = 0; i < (size_t)proto.rows_size(); ++i) {
auto& row = proto.rows(i);
auto partition = row.items(0).uint32_value();
auto offset = row.items(1).uint64_value();
Cerr << "RESULT PARTITION=" << partition << " OFFSET=" << offset << Endl << Flush;
-
- UNIT_ASSERT_VALUES_EQUAL(i, partition);
- UNIT_ASSERT_VALUES_EQUAL_C(expectedOffset - 1, offset, "Partition " << i);
+ UNIT_ASSERT_VALUES_EQUAL_C(offsets[partition] - 1, offset, "Partition " << i);
}
}
- void BigTransfer(const std::string tableType, const size_t partitionCount, const size_t messageCount, const size_t messageSize) {
+ void BigTransfer(const std::string tableType, const size_t threadsCount, const size_t messageCount, const size_t messageSize, bool autopartitioning) {
MainTestCase testCase(std::nullopt, tableType);
testCase.CreateTable(R"(
CREATE TABLE `%s` (
@@ -114,7 +129,16 @@ Y_UNIT_TEST_SUITE(TransferLarge)
STORE = %s
);
)");
- testCase.CreateTopic(partitionCount);
+ if (autopartitioning) {
+ testCase.CreateTopic({
+ .MinPartitionCount = std::max<ui64>(1, threadsCount >> 4),
+ .MaxPartitionCount = threadsCount,
+ .AutoPartitioningEnabled = true
+ });
+ } else {
+ testCase.CreateTopic(threadsCount);
+
+ }
testCase.CreateTransfer(R"(
$l = ($x) -> {
return [
@@ -128,60 +152,100 @@ Y_UNIT_TEST_SUITE(TransferLarge)
)", MainTestCase::CreateTransferSettings::WithBatching(TDuration::Seconds(1), 8_MB));
std::vector<std::thread> writerThreads;
- writerThreads.reserve(partitionCount);
- for (size_t i = 0; i < partitionCount; ++i) {
+ writerThreads.reserve(threadsCount);
+ for (size_t i = 0; i < threadsCount; ++i) {
writerThreads.emplace_back([&, i = i]() {
Write(testCase, i, messageCount, messageSize);
});
Sleep(TDuration::MilliSeconds(25));
}
- for (size_t i = 0; i < partitionCount; ++i) {
+ for (size_t i = 0; i < threadsCount; ++i) {
Cerr << "WAIT THREAD " << i << Endl << Flush;
writerThreads[i].join();
}
Cerr << "WAIT REPLICATION FINISHED" << Endl << Flush;
- CheckAllMessagesHaveBeenWritten(testCase, messageCount);
+ Sleep(TDuration::Seconds(3));
+
testCase.CheckReplicationState(TReplicationDescription::EState::Running);
- WaitAllMessagesHaveBeenCommitted(testCase, messageCount);
+ Cerr << "WaitAllMessagesHaveBeenCommitted" << Endl << Flush;
+ WaitAllMessagesHaveBeenCommitted(testCase, messageCount * threadsCount);
- CheckSourceTableIsValid(testCase, partitionCount, messageCount);
+ CheckSourceTableIsValid(testCase);
testCase.DropTransfer();
testCase.DropTable();
testCase.DropTopic();
}
+ //
+ // Topic autopartitioning is disabled
+ //
+
Y_UNIT_TEST(Transfer1KM_1P_ColumnTable)
{
- BigTransfer("COLUMN", 1, 1000, 64);
+ BigTransfer("COLUMN", 1, 1000, 64, false);
}
Y_UNIT_TEST(Transfer1KM_1KP_ColumnTable)
{
- BigTransfer("COLUMN", 1000, 1000, 64);
+ BigTransfer("COLUMN", 1000, 1000, 64, false);
}
Y_UNIT_TEST(Transfer100KM_10P_ColumnTable)
{
- BigTransfer("COLUMN", 10, 100000, 64);
+ BigTransfer("COLUMN", 10, 100000, 64, false);
}
Y_UNIT_TEST(Transfer1KM_1P_RowTable)
{
- BigTransfer("ROW", 1, 1000, 64);
+ BigTransfer("ROW", 1, 1000, 64, false);
}
Y_UNIT_TEST(Transfer1KM_1KP_RowTable)
{
- BigTransfer("ROW", 1000, 1000, 64);
+ BigTransfer("ROW", 1000, 1000, 64, false);
}
Y_UNIT_TEST(Transfer100KM_10P_RowTable)
{
- BigTransfer("ROW", 10, 100000, 64);
+ BigTransfer("ROW", 10, 100000, 64, false);
+ }
+
+ //
+ // Topic autopartitioning is enabled
+ //
+
+ Y_UNIT_TEST(Transfer1KM_1P_ColumnTable_TopicAutoPartitioning)
+ {
+ BigTransfer("COLUMN", 1, 1000, 64, true);
+ }
+
+ Y_UNIT_TEST(Transfer1KM_1KP_ColumnTable_TopicAutoPartitioning)
+ {
+ BigTransfer("COLUMN", 1000, 1000, 64, true);
+ }
+
+ Y_UNIT_TEST(Transfer100KM_10P_ColumnTable_TopicAutoPartitioning)
+ {
+ BigTransfer("COLUMN", 10, 100000, 64, true);
+ }
+
+ Y_UNIT_TEST(Transfer1KM_1P_RowTable_TopicAutoPartitioning)
+ {
+ BigTransfer("ROW", 1, 1000, 64, true);
+ }
+
+ Y_UNIT_TEST(Transfer1KM_1KP_RowTable_TopicAutoPartitioning)
+ {
+ BigTransfer("ROW", 1000, 1000, 64, true);
+ }
+
+ Y_UNIT_TEST(Transfer100KM_10P_RowTable_TopicAutoPartitioning)
+ {
+ BigTransfer("ROW", 10, 100000, 64, true);
}
}
diff --git a/ydb/core/transfer/ut/large/ya.make b/ydb/core/transfer/ut/large/ya.make
index fad77beec4a..480ebc7dfed 100644
--- a/ydb/core/transfer/ut/large/ya.make
+++ b/ydb/core/transfer/ut/large/ya.make
@@ -17,6 +17,8 @@ SRCS(
INCLUDE(${ARCADIA_ROOT}/ydb/public/tools/ydb_recipe/recipe.inc)
+#SIZE(MEDIUM)
+#TIMEOUT(600)
SIZE(LARGE)
TAG(ya:fat)
diff --git a/ydb/core/tx/replication/service/topic_reader.cpp b/ydb/core/tx/replication/service/topic_reader.cpp
index e88d0ea976b..fc7a867eb37 100644
--- a/ydb/core/tx/replication/service/topic_reader.cpp
+++ b/ydb/core/tx/replication/service/topic_reader.cpp
@@ -88,14 +88,8 @@ class TRemoteTopicReader: public TActor<TRemoteTopicReader> {
return Leave(TEvWorker::TEvGone::UNAVAILABLE);
}
- auto settings = NYdb::NTopic::TCommitOffsetSettings()
- .ReadSessionId(ReadSessionId);
-
- const auto& topicName = Settings.GetBase().Topics_.at(0).Path_;
- const auto partitionId = Settings.GetBase().Topics_.at(0).PartitionIds_.at(0);
- const auto& consumerName = Settings.GetBase().ConsumerName_;
-
- Send(YdbProxy, new TEvYdbProxy::TEvCommitOffsetRequest(topicName, partitionId, consumerName, ev->Get()->Offset, std::move(settings)));
+ CommittedOffset = ev->Get()->Offset;
+ Send(YdbProxy, CreateCommitOffsetRequest().release());
}
void Handle(TEvYdbProxy::TEvCommitOffsetResponse::TPtr& ev) {
@@ -103,10 +97,24 @@ class TRemoteTopicReader: public TActor<TRemoteTopicReader> {
LOG_W("Handle " << ev->Get()->ToString());
return Leave(TEvWorker::TEvGone::UNAVAILABLE);
} else {
- LOG_D("Handle " << ev->Get()->ToString());
+ LOG_D("Handle " << CommittedOffset << " " << ev->Get()->ToString());
+ if (CommittedOffset) {
+ Send(ReadSession, CreateCommitOffsetRequest().release());
+ }
}
}
+ std::unique_ptr<TEvYdbProxy::TEvCommitOffsetRequest> CreateCommitOffsetRequest() {
+ auto settings = NYdb::NTopic::TCommitOffsetSettings()
+ .ReadSessionId(ReadSessionId);
+
+ const auto& topicName = Settings.GetBase().Topics_.at(0).Path_;
+ const auto partitionId = Settings.GetBase().Topics_.at(0).PartitionIds_.at(0);
+ const auto& consumerName = Settings.GetBase().ConsumerName_;
+
+ return std::make_unique<TEvYdbProxy::TEvCommitOffsetRequest>(topicName, partitionId, consumerName, CommittedOffset, std::move(settings));
+ }
+
void Handle(TEvYdbProxy::TEvTopicReaderGone::TPtr& ev) {
LOG_D("Handle " << ev->Get()->ToString());
@@ -172,6 +180,7 @@ private:
TActorId Worker;
TActorId ReadSession;
TString ReadSessionId;
+ ui64 CommittedOffset = 0;
}; // TRemoteTopicReader
diff --git a/ydb/core/tx/replication/ydb_proxy/partition_end_watcher.h b/ydb/core/tx/replication/ydb_proxy/partition_end_watcher.h
index 180165afe9c..f263ebab5e6 100644
--- a/ydb/core/tx/replication/ydb_proxy/partition_end_watcher.h
+++ b/ydb/core/tx/replication/ydb_proxy/partition_end_watcher.h
@@ -43,9 +43,9 @@ public:
MaybeSendPartitionEnd(client);
}
- inline void Clear() {
- PendingCommittedOffset = 0;
- CommittedOffset = 0;
+ inline void Clear(ui64 committedOffset) {
+ PendingCommittedOffset = committedOffset;
+ CommittedOffset = committedOffset;
EndPartitionSessionEvent.Clear();
}
diff --git a/ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp b/ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp
index 8a32687788a..29a9fac2312 100644
--- a/ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp
+++ b/ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp
@@ -191,6 +191,17 @@ class TTopicReader: public TBaseProxyActor<TTopicReader> {
WaitEvent(ev->Sender, ev->Cookie);
}
+ void Handle(TEvYdbProxy::TEvCommitOffsetRequest::TPtr& ev) {
+ auto [path, partitionId, consumerName, offset, settings] = std::move(ev->Get()->GetArgs());
+
+ Y_UNUSED(path);
+ Y_UNUSED(partitionId);
+ Y_UNUSED(consumerName);
+ Y_UNUSED(settings);
+
+ PartitionEndWatcher.SetCommittedOffset(offset - 1, ev->Sender);
+ }
+
void WaitEvent(const TActorId& sender, ui64 cookie) {
auto request = MakeRequest(SelfId());
auto cb = [request, sender, cookie](const NThreading::TFuture<void>&) {
@@ -209,7 +220,7 @@ class TTopicReader: public TBaseProxyActor<TTopicReader> {
}
if (auto* x = std::get_if<TReadSessionEvent::TStartPartitionSessionEvent>(&*event)) {
- PartitionEndWatcher.Clear();
+ PartitionEndWatcher.Clear(x->GetCommittedOffset());
x->Confirm();
Send(ev->Get()->Sender, new TEvYdbProxy::TEvStartTopicReadingSession(*x), 0, ev->Get()->Cookie);
return WaitEvent(ev->Get()->Sender, ev->Get()->Cookie);
@@ -263,6 +274,7 @@ public:
switch (ev->GetTypeRewrite()) {
hFunc(TEvYdbProxy::TEvReadTopicRequest, Handle);
hFunc(TEvPrivate::TEvTopicEventReady, Handle);
+ hFunc(TEvYdbProxy::TEvCommitOffsetRequest, Handle);
default:
return StateBase(ev);
diff --git a/ydb/services/persqueue_v1/actors/schema_actors.cpp b/ydb/services/persqueue_v1/actors/schema_actors.cpp
index fd1ac8a47a3..af93589534c 100644
--- a/ydb/services/persqueue_v1/actors/schema_actors.cpp
+++ b/ydb/services/persqueue_v1/actors/schema_actors.cpp
@@ -661,7 +661,7 @@ void TDescribeTopicActorImpl::RequestPartitionsLocation(const TActorContext& ctx
for (auto p : Settings.Partitions) {
if (p >= TotalPartitions) {
return RaiseError(
- TStringBuilder() << "No partition " << Settings.Partitions[0] << " in topic",
+ TStringBuilder() << "No partition " << p << " in topic",
Ydb::PersQueue::ErrorCode::BAD_REQUEST, Ydb::StatusIds::BAD_REQUEST, ctx
);
}
@@ -900,14 +900,12 @@ bool TDescribeTopicActor::ApplyResponse(
TEvPersQueue::TEvGetPartitionsLocationResponse::TPtr& ev, const TActorContext&
) {
const auto& record = ev->Get()->Record;
- Y_ABORT_UNLESS(record.LocationsSize() == TotalPartitions);
Y_ABORT_UNLESS(Settings.RequireLocation);
- for (auto i = 0u; i < TotalPartitions; ++i) {
+ for (auto i = 0u; i < std::min<ui64>(record.LocationsSize(), TotalPartitions); ++i) {
const auto& location = record.GetLocations(i);
auto* locationResult = Result.mutable_partitions(i)->mutable_partition_location();
SetPartitionLocation(location, locationResult);
-
}
return true;
}
@@ -1017,9 +1015,8 @@ bool TDescribeConsumerActor::ApplyResponse(
TEvPersQueue::TEvGetPartitionsLocationResponse::TPtr& ev, const TActorContext&
) {
const auto& record = ev->Get()->Record;
- Y_ABORT_UNLESS(record.LocationsSize() == TotalPartitions);
Y_ABORT_UNLESS(Settings.RequireLocation);
- for (auto i = 0u; i < TotalPartitions; ++i) {
+ for (auto i = 0u; i < std::min<ui64>(record.LocationsSize(), TotalPartitions); ++i) {
const auto& location = record.GetLocations(i);
auto* locationResult = Result.mutable_partitions(i)->mutable_partition_location();
SetPartitionLocation(location, locationResult);