diff options
author | Nikolay Shestakov <tesseract@ydb.tech> | 2025-06-02 12:36:29 +0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-06-02 10:36:29 +0300 |
commit | d70a7cc0fc49da471a08b7ea2fc8c3c12f76041c (patch) | |
tree | d15c0c6dadf0579585467e89ec23f838d0c9b87d | |
parent | 5d9c33f56d1f684ac079aaf428dbb79f01e95690 (diff) | |
download | ydb-d70a7cc0fc49da471a08b7ea2fc8c3c12f76041c.tar.gz |
Supported topic autopartitioning for the transfer (#19043)
-rw-r--r-- | ydb/core/persqueue/pq_impl.cpp | 2 | ||||
-rw-r--r-- | ydb/core/persqueue/read_balancer.cpp | 4 | ||||
-rw-r--r-- | ydb/core/transfer/transfer_writer.cpp | 4 | ||||
-rw-r--r-- | ydb/core/transfer/ut/common/utils.h | 60 | ||||
-rw-r--r-- | ydb/core/transfer/ut/large/transfer_ut.cpp | 142 | ||||
-rw-r--r-- | ydb/core/transfer/ut/large/ya.make | 2 | ||||
-rw-r--r-- | ydb/core/tx/replication/service/topic_reader.cpp | 27 | ||||
-rw-r--r-- | ydb/core/tx/replication/ydb_proxy/partition_end_watcher.h | 6 | ||||
-rw-r--r-- | ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp | 14 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/actors/schema_actors.cpp | 9 |
10 files changed, 194 insertions, 76 deletions
diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index 2e7fa26ac2e..d611675c097 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -1802,8 +1802,6 @@ void TPersQueue::Handle(TEvPersQueue::TEvStatus::TPtr& ev, const TActorContext& { PQ_LOG_T("Handle TEvPersQueue::TEvStatus"); - ReadBalancerActorId = ev->Sender; - if (!ConfigInited || !AllOriginalPartitionsInited()) { PQ_LOG_D("Postpone the request." << " ConfigInited " << static_cast<int>(ConfigInited) << diff --git a/ydb/core/persqueue/read_balancer.cpp b/ydb/core/persqueue/read_balancer.cpp index e7d0011f8de..2bd3c5c6950 100644 --- a/ydb/core/persqueue/read_balancer.cpp +++ b/ydb/core/persqueue/read_balancer.cpp @@ -534,6 +534,8 @@ void TPersQueueReadBalancer::RequestTabletIfNeeded(const ui64 tabletId, const TA } else { TActorId pipeClient = GetPipeClient(tabletId, ctx); + NTabletPipe::SendData(ctx, pipeClient, new TEvPQ::TEvSubDomainStatus(SubDomainOutOfSpace)); + auto it = AggregatedStats.Cookies.find(tabletId); if (!pipeReconnected || it != AggregatedStats.Cookies.end()) { ui64 cookie; @@ -548,8 +550,6 @@ void TPersQueueReadBalancer::RequestTabletIfNeeded(const ui64 tabletId, const TA TStringBuilder() << "Send TEvPersQueue::TEvStatus TabletId: " << tabletId << " Cookie: " << cookie); NTabletPipe::SendData(ctx, pipeClient, new TEvPersQueue::TEvStatus("", true), cookie); } - - NTabletPipe::SendData(ctx, pipeClient, new TEvPQ::TEvSubDomainStatus(SubDomainOutOfSpace)); } } diff --git a/ydb/core/transfer/transfer_writer.cpp b/ydb/core/transfer/transfer_writer.cpp index 272418c6e42..6cea16fbb4f 100644 --- a/ydb/core/transfer/transfer_writer.cpp +++ b/ydb/core/transfer/transfer_writer.cpp @@ -818,6 +818,10 @@ private: Send(Worker, new TEvWorker::TEvPoll()); } + if (LastWriteTime) { + LastWriteTime = TInstant::Now(); + } + return StartWork(); } diff --git a/ydb/core/transfer/ut/common/utils.h b/ydb/core/transfer/ut/common/utils.h index b705a324416..7a603228b0e 100644 --- a/ydb/core/transfer/ut/common/utils.h +++ b/ydb/core/transfer/ut/common/utils.h @@ -168,7 +168,7 @@ struct MainTestCase { if (user) { config.SetAuthToken(TStringBuilder() << user.value() << "@builtin"); } - // config.SetLog(std::unique_ptr<TLogBackend>(CreateLogBackend("cerr", ELogPriority::TLOG_INFO).Release())) + // config.SetLog(std::unique_ptr<TLogBackend>(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG).Release())); return config; } @@ -184,7 +184,6 @@ struct MainTestCase { , TransferName(TStringBuilder() << "Transfer_" << Id) , Driver(CreateDriverConfig(ConnectionString, user)) , TableClient(Driver) - , Session(TableClient.GetSession().GetValueSync().GetSession()) , TopicClient(Driver) { } @@ -193,9 +192,13 @@ struct MainTestCase { Driver.Stop(true); } + auto Session() { + return TableClient.GetSession().GetValueSync().GetSession(); + } + void ExecuteDDL(const std::string& ddl, bool checkResult = true, const std::optional<std::string> expectedMessage = std::nullopt) { Cerr << "DDL: " << ddl << Endl << Flush; - auto res = Session.ExecuteQuery(ddl, TTxControl::NoTx()).GetValueSync(); + auto res = Session().ExecuteQuery(ddl, TTxControl::NoTx()).GetValueSync(); if (checkResult) { if (expectedMessage) { UNIT_ASSERT(!res.IsSuccess()); @@ -211,7 +214,7 @@ struct MainTestCase { auto ExecuteQuery(const std::string& query, bool retry = true) { for (size_t i = 10; i--;) { Cerr << ">>>>> Query: " << query << Endl << Flush; - auto res = Session.ExecuteQuery(query, TTxControl::NoTx()).GetValueSync(); + auto res = Session().ExecuteQuery(query, TTxControl::NoTx()).GetValueSync(); if (!res.IsSuccess()) { Cerr << ">>>>> Query error: " << res.GetIssues().ToString() << Endl << Flush; } @@ -282,13 +285,41 @@ struct MainTestCase { )", SourceTableName.data(), ChangefeedName.data())); } + struct CreatTopicSettings { + size_t MinPartitionCount = 1; + size_t MaxPartitionCount = 100; + bool AutoPartitioningEnabled = false; + }; + void CreateTopic(size_t partitionCount = 10) { - ExecuteDDL(Sprintf(R"( - CREATE TOPIC `%s` - WITH ( - min_active_partitions = %d - ); - )", TopicName.data(), partitionCount)); + CreateTopic({ + .MinPartitionCount = partitionCount + }); + } + + void CreateTopic(const CreatTopicSettings& settings) { + if (settings.AutoPartitioningEnabled) { + ExecuteDDL(Sprintf(R"( + CREATE TOPIC `%s` + WITH ( + MIN_ACTIVE_PARTITIONS = %d, + MAX_ACTIVE_PARTITIONS = %d, + AUTO_PARTITIONING_STRATEGY = 'UP', + auto_partitioning_down_utilization_percent = 1, + auto_partitioning_up_utilization_percent=2, + auto_partitioning_stabilization_window = Interval('PT1S'), + partition_write_speed_bytes_per_second = 3 + ); + )", TopicName.data(), settings.MinPartitionCount, settings.MaxPartitionCount)); + } else { + ExecuteDDL(Sprintf(R"( + CREATE TOPIC `%s` + WITH ( + MIN_ACTIVE_PARTITIONS = %d + + ); + )", TopicName.data(), settings.MinPartitionCount)); + } } void DropTopic() { @@ -427,7 +458,7 @@ struct MainTestCase { setOptions = TStringBuilder() << "SET (" << sb << " )"; } - auto res = Session.ExecuteQuery(Sprintf(R"( + auto res = Session().ExecuteQuery(Sprintf(R"( %s; ALTER TRANSFER `%s` @@ -474,7 +505,7 @@ struct MainTestCase { settings.IncludeStats(true); auto c = TopicClient.DescribeConsumer(TopicName, consumerName, settings).GetValueSync(); - UNIT_ASSERT(c.IsSuccess()); + UNIT_ASSERT_C(c.IsSuccess(), c.GetIssues().ToOneLineString()); return c; } @@ -554,7 +585,9 @@ struct MainTestCase { settings.IncludeLocation(true); settings.IncludeStats(true); - return TopicClient.DescribeTopic(TopicName, settings).ExtractValueSync(); + auto result = TopicClient.DescribeTopic(TopicName, settings).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToOneLineString()); + return result; } void CreateUser(const std::string& username) { @@ -704,7 +737,6 @@ struct MainTestCase { TDriver Driver; TQueryClient TableClient; - TSession Session; TTopicClient TopicClient; }; diff --git a/ydb/core/transfer/ut/large/transfer_ut.cpp b/ydb/core/transfer/ut/large/transfer_ut.cpp index 03b23d65624..1068c64ad30 100644 --- a/ydb/core/transfer/ut/large/transfer_ut.cpp +++ b/ydb/core/transfer/ut/large/transfer_ut.cpp @@ -1,5 +1,6 @@ #include <thread> +#include <util/generic/guid.h> #include <ydb/core/transfer/ut/common/utils.h> using namespace NReplicationTest; @@ -7,14 +8,13 @@ using namespace NReplicationTest; Y_UNIT_TEST_SUITE(TransferLarge) { - auto CreateWriter(MainTestCase& setup, const size_t partitionId) { - Cerr << "CREATE PARTITION WRITER " << partitionId << Endl << Flush; + auto CreateWriter(MainTestCase& setup, const size_t writerId) { + Cerr << "CREATE PARTITION WRITER " << writerId << Endl << Flush; - TString producerId = TStringBuilder() << "producer-" << partitionId; + TString producerId = TStringBuilder() << "writer-" << writerId << "-" << CreateGuidAsString(); TWriteSessionSettings writeSettings; writeSettings.Path(setup.TopicName); - writeSettings.PartitionId(partitionId); writeSettings.DeduplicationEnabled(true); writeSettings.ProducerId(producerId); writeSettings.MessageGroupId(producerId); @@ -23,10 +23,10 @@ Y_UNIT_TEST_SUITE(TransferLarge) return client.CreateSimpleBlockingWriteSession(writeSettings); } - void Write(MainTestCase& setup, const size_t partitionId, const size_t messageCount, const size_t messageSize) { - auto writer = CreateWriter(setup, partitionId); + void Write(MainTestCase& setup, const size_t writerId, const size_t messageCount, const size_t messageSize) { + auto writer = CreateWriter(setup, writerId); - Cerr << "PARTITION " << partitionId << " START WRITE " << messageCount << " MESSAGES" << Endl << Flush; + Cerr << "PARTITION " << writerId << " START WRITE " << messageCount << " MESSAGES" << Endl << Flush; TString msg(messageSize, '*'); for (size_t i = 0; i < messageCount;) { @@ -40,30 +40,42 @@ Y_UNIT_TEST_SUITE(TransferLarge) } } - writer->Close(TDuration::Minutes(1)); - Cerr << "PARTITION " << partitionId << " ALL MESSAGES HAVE BEEN WRITEN" << Endl << Flush; + //writer->Close(TDuration::Minutes(1)); + writer->Close(); + Cerr << "PARTITION " << writerId << " ALL MESSAGES HAVE BEEN WRITTEN" << Endl << Flush; } - void CheckAllMessagesHaveBeenWritten(MainTestCase& setup, const size_t expected) { - // Check all messages have been writen - auto d = setup.DescribeTopic(); - for (auto& p : d.GetTopicDescription().GetPartitions()) { - UNIT_ASSERT_VALUES_EQUAL_C(expected, p.GetPartitionStats()->GetEndOffset(), "Partition " << p.GetPartitionId()); - } - } - - void WaitAllMessagesHaveBeenCommitted(MainTestCase& setup, const size_t expected, const TDuration timeout = TDuration::Seconds(10)) { + void WaitAllMessagesHaveBeenCommitted(MainTestCase& setup, size_t expected, const TDuration timeout = TDuration::Seconds(10)) { TInstant endTime = TInstant::Now() + timeout; + bool allPartitionsHaveBeenCommitted = false; while(TInstant::Now() < endTime) { + std::map<size_t, size_t> offsets; + { + auto d = setup.DescribeTopic(); + for (auto& p : d.GetTopicDescription().GetPartitions()) { + offsets[p.GetPartitionId()] = p.GetPartitionStats()->GetEndOffset(); + } + } + + size_t messages = 0; + for (auto& [partitionId, count] : offsets) { + Cerr << "PARTITION " << partitionId << " END OFFSET " << count << Endl << Flush; + messages += count; + } + + Cerr << "ALL MESSAGES " << messages << " EXPECTED " << expected << Endl << Flush; + UNIT_ASSERT_VALUES_EQUAL(expected, messages); + auto d = setup.DescribeConsumer(); auto& p = d.GetConsumerDescription().GetPartitions(); allPartitionsHaveBeenCommitted = AllOf(p.begin(), p.end(), [&](auto& x) { - Cerr << "WAIT COMMITTED expected=" << expected + Cerr << "WAIT COMMITTED partition=" << x.GetPartitionId() + << " expected=" << offsets[x.GetPartitionId()] << " read=" << x.GetPartitionConsumerStats()->GetLastReadOffset() << " committed=" << x.GetPartitionConsumerStats()->GetCommittedOffset() << Endl << Flush; - return x.GetPartitionConsumerStats()->GetCommittedOffset() == expected; + return x.GetPartitionConsumerStats()->GetCommittedOffset() == offsets[x.GetPartitionId()]; }); if (allPartitionsHaveBeenCommitted) { @@ -76,7 +88,13 @@ Y_UNIT_TEST_SUITE(TransferLarge) UNIT_ASSERT_C(allPartitionsHaveBeenCommitted, "Partitions haven`t been commited to end"); } - void CheckSourceTableIsValid(MainTestCase& setup, const size_t partitionCount, size_t expectedOffset) { + void CheckSourceTableIsValid(MainTestCase& setup) { + std::map<size_t, size_t> offsets; + auto d = setup.DescribeTopic(); + for (auto& p : d.GetTopicDescription().GetPartitions()) { + offsets[p.GetPartitionId()] = p.GetPartitionStats()->GetEndOffset(); + } + auto r = setup.ExecuteQuery(Sprintf(R"( SELECT a.Partition, a.Offset, b.Offset FROM %s AS a @@ -89,20 +107,17 @@ Y_UNIT_TEST_SUITE(TransferLarge) )", setup.TableName.data(), setup.TableName.data())); const auto proto = NYdb::TProtoAccessor::GetProto(r.GetResultSet(0)); - UNIT_ASSERT_VALUES_EQUAL(partitionCount, proto.rows_size()); for (size_t i = 0; i < (size_t)proto.rows_size(); ++i) { auto& row = proto.rows(i); auto partition = row.items(0).uint32_value(); auto offset = row.items(1).uint64_value(); Cerr << "RESULT PARTITION=" << partition << " OFFSET=" << offset << Endl << Flush; - - UNIT_ASSERT_VALUES_EQUAL(i, partition); - UNIT_ASSERT_VALUES_EQUAL_C(expectedOffset - 1, offset, "Partition " << i); + UNIT_ASSERT_VALUES_EQUAL_C(offsets[partition] - 1, offset, "Partition " << i); } } - void BigTransfer(const std::string tableType, const size_t partitionCount, const size_t messageCount, const size_t messageSize) { + void BigTransfer(const std::string tableType, const size_t threadsCount, const size_t messageCount, const size_t messageSize, bool autopartitioning) { MainTestCase testCase(std::nullopt, tableType); testCase.CreateTable(R"( CREATE TABLE `%s` ( @@ -114,7 +129,16 @@ Y_UNIT_TEST_SUITE(TransferLarge) STORE = %s ); )"); - testCase.CreateTopic(partitionCount); + if (autopartitioning) { + testCase.CreateTopic({ + .MinPartitionCount = std::max<ui64>(1, threadsCount >> 4), + .MaxPartitionCount = threadsCount, + .AutoPartitioningEnabled = true + }); + } else { + testCase.CreateTopic(threadsCount); + + } testCase.CreateTransfer(R"( $l = ($x) -> { return [ @@ -128,60 +152,100 @@ Y_UNIT_TEST_SUITE(TransferLarge) )", MainTestCase::CreateTransferSettings::WithBatching(TDuration::Seconds(1), 8_MB)); std::vector<std::thread> writerThreads; - writerThreads.reserve(partitionCount); - for (size_t i = 0; i < partitionCount; ++i) { + writerThreads.reserve(threadsCount); + for (size_t i = 0; i < threadsCount; ++i) { writerThreads.emplace_back([&, i = i]() { Write(testCase, i, messageCount, messageSize); }); Sleep(TDuration::MilliSeconds(25)); } - for (size_t i = 0; i < partitionCount; ++i) { + for (size_t i = 0; i < threadsCount; ++i) { Cerr << "WAIT THREAD " << i << Endl << Flush; writerThreads[i].join(); } Cerr << "WAIT REPLICATION FINISHED" << Endl << Flush; - CheckAllMessagesHaveBeenWritten(testCase, messageCount); + Sleep(TDuration::Seconds(3)); + testCase.CheckReplicationState(TReplicationDescription::EState::Running); - WaitAllMessagesHaveBeenCommitted(testCase, messageCount); + Cerr << "WaitAllMessagesHaveBeenCommitted" << Endl << Flush; + WaitAllMessagesHaveBeenCommitted(testCase, messageCount * threadsCount); - CheckSourceTableIsValid(testCase, partitionCount, messageCount); + CheckSourceTableIsValid(testCase); testCase.DropTransfer(); testCase.DropTable(); testCase.DropTopic(); } + // + // Topic autopartitioning is disabled + // + Y_UNIT_TEST(Transfer1KM_1P_ColumnTable) { - BigTransfer("COLUMN", 1, 1000, 64); + BigTransfer("COLUMN", 1, 1000, 64, false); } Y_UNIT_TEST(Transfer1KM_1KP_ColumnTable) { - BigTransfer("COLUMN", 1000, 1000, 64); + BigTransfer("COLUMN", 1000, 1000, 64, false); } Y_UNIT_TEST(Transfer100KM_10P_ColumnTable) { - BigTransfer("COLUMN", 10, 100000, 64); + BigTransfer("COLUMN", 10, 100000, 64, false); } Y_UNIT_TEST(Transfer1KM_1P_RowTable) { - BigTransfer("ROW", 1, 1000, 64); + BigTransfer("ROW", 1, 1000, 64, false); } Y_UNIT_TEST(Transfer1KM_1KP_RowTable) { - BigTransfer("ROW", 1000, 1000, 64); + BigTransfer("ROW", 1000, 1000, 64, false); } Y_UNIT_TEST(Transfer100KM_10P_RowTable) { - BigTransfer("ROW", 10, 100000, 64); + BigTransfer("ROW", 10, 100000, 64, false); + } + + // + // Topic autopartitioning is enabled + // + + Y_UNIT_TEST(Transfer1KM_1P_ColumnTable_TopicAutoPartitioning) + { + BigTransfer("COLUMN", 1, 1000, 64, true); + } + + Y_UNIT_TEST(Transfer1KM_1KP_ColumnTable_TopicAutoPartitioning) + { + BigTransfer("COLUMN", 1000, 1000, 64, true); + } + + Y_UNIT_TEST(Transfer100KM_10P_ColumnTable_TopicAutoPartitioning) + { + BigTransfer("COLUMN", 10, 100000, 64, true); + } + + Y_UNIT_TEST(Transfer1KM_1P_RowTable_TopicAutoPartitioning) + { + BigTransfer("ROW", 1, 1000, 64, true); + } + + Y_UNIT_TEST(Transfer1KM_1KP_RowTable_TopicAutoPartitioning) + { + BigTransfer("ROW", 1000, 1000, 64, true); + } + + Y_UNIT_TEST(Transfer100KM_10P_RowTable_TopicAutoPartitioning) + { + BigTransfer("ROW", 10, 100000, 64, true); } } diff --git a/ydb/core/transfer/ut/large/ya.make b/ydb/core/transfer/ut/large/ya.make index fad77beec4a..480ebc7dfed 100644 --- a/ydb/core/transfer/ut/large/ya.make +++ b/ydb/core/transfer/ut/large/ya.make @@ -17,6 +17,8 @@ SRCS( INCLUDE(${ARCADIA_ROOT}/ydb/public/tools/ydb_recipe/recipe.inc) +#SIZE(MEDIUM) +#TIMEOUT(600) SIZE(LARGE) TAG(ya:fat) diff --git a/ydb/core/tx/replication/service/topic_reader.cpp b/ydb/core/tx/replication/service/topic_reader.cpp index e88d0ea976b..fc7a867eb37 100644 --- a/ydb/core/tx/replication/service/topic_reader.cpp +++ b/ydb/core/tx/replication/service/topic_reader.cpp @@ -88,14 +88,8 @@ class TRemoteTopicReader: public TActor<TRemoteTopicReader> { return Leave(TEvWorker::TEvGone::UNAVAILABLE); } - auto settings = NYdb::NTopic::TCommitOffsetSettings() - .ReadSessionId(ReadSessionId); - - const auto& topicName = Settings.GetBase().Topics_.at(0).Path_; - const auto partitionId = Settings.GetBase().Topics_.at(0).PartitionIds_.at(0); - const auto& consumerName = Settings.GetBase().ConsumerName_; - - Send(YdbProxy, new TEvYdbProxy::TEvCommitOffsetRequest(topicName, partitionId, consumerName, ev->Get()->Offset, std::move(settings))); + CommittedOffset = ev->Get()->Offset; + Send(YdbProxy, CreateCommitOffsetRequest().release()); } void Handle(TEvYdbProxy::TEvCommitOffsetResponse::TPtr& ev) { @@ -103,10 +97,24 @@ class TRemoteTopicReader: public TActor<TRemoteTopicReader> { LOG_W("Handle " << ev->Get()->ToString()); return Leave(TEvWorker::TEvGone::UNAVAILABLE); } else { - LOG_D("Handle " << ev->Get()->ToString()); + LOG_D("Handle " << CommittedOffset << " " << ev->Get()->ToString()); + if (CommittedOffset) { + Send(ReadSession, CreateCommitOffsetRequest().release()); + } } } + std::unique_ptr<TEvYdbProxy::TEvCommitOffsetRequest> CreateCommitOffsetRequest() { + auto settings = NYdb::NTopic::TCommitOffsetSettings() + .ReadSessionId(ReadSessionId); + + const auto& topicName = Settings.GetBase().Topics_.at(0).Path_; + const auto partitionId = Settings.GetBase().Topics_.at(0).PartitionIds_.at(0); + const auto& consumerName = Settings.GetBase().ConsumerName_; + + return std::make_unique<TEvYdbProxy::TEvCommitOffsetRequest>(topicName, partitionId, consumerName, CommittedOffset, std::move(settings)); + } + void Handle(TEvYdbProxy::TEvTopicReaderGone::TPtr& ev) { LOG_D("Handle " << ev->Get()->ToString()); @@ -172,6 +180,7 @@ private: TActorId Worker; TActorId ReadSession; TString ReadSessionId; + ui64 CommittedOffset = 0; }; // TRemoteTopicReader diff --git a/ydb/core/tx/replication/ydb_proxy/partition_end_watcher.h b/ydb/core/tx/replication/ydb_proxy/partition_end_watcher.h index 180165afe9c..f263ebab5e6 100644 --- a/ydb/core/tx/replication/ydb_proxy/partition_end_watcher.h +++ b/ydb/core/tx/replication/ydb_proxy/partition_end_watcher.h @@ -43,9 +43,9 @@ public: MaybeSendPartitionEnd(client); } - inline void Clear() { - PendingCommittedOffset = 0; - CommittedOffset = 0; + inline void Clear(ui64 committedOffset) { + PendingCommittedOffset = committedOffset; + CommittedOffset = committedOffset; EndPartitionSessionEvent.Clear(); } diff --git a/ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp b/ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp index 8a32687788a..29a9fac2312 100644 --- a/ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp +++ b/ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp @@ -191,6 +191,17 @@ class TTopicReader: public TBaseProxyActor<TTopicReader> { WaitEvent(ev->Sender, ev->Cookie); } + void Handle(TEvYdbProxy::TEvCommitOffsetRequest::TPtr& ev) { + auto [path, partitionId, consumerName, offset, settings] = std::move(ev->Get()->GetArgs()); + + Y_UNUSED(path); + Y_UNUSED(partitionId); + Y_UNUSED(consumerName); + Y_UNUSED(settings); + + PartitionEndWatcher.SetCommittedOffset(offset - 1, ev->Sender); + } + void WaitEvent(const TActorId& sender, ui64 cookie) { auto request = MakeRequest(SelfId()); auto cb = [request, sender, cookie](const NThreading::TFuture<void>&) { @@ -209,7 +220,7 @@ class TTopicReader: public TBaseProxyActor<TTopicReader> { } if (auto* x = std::get_if<TReadSessionEvent::TStartPartitionSessionEvent>(&*event)) { - PartitionEndWatcher.Clear(); + PartitionEndWatcher.Clear(x->GetCommittedOffset()); x->Confirm(); Send(ev->Get()->Sender, new TEvYdbProxy::TEvStartTopicReadingSession(*x), 0, ev->Get()->Cookie); return WaitEvent(ev->Get()->Sender, ev->Get()->Cookie); @@ -263,6 +274,7 @@ public: switch (ev->GetTypeRewrite()) { hFunc(TEvYdbProxy::TEvReadTopicRequest, Handle); hFunc(TEvPrivate::TEvTopicEventReady, Handle); + hFunc(TEvYdbProxy::TEvCommitOffsetRequest, Handle); default: return StateBase(ev); diff --git a/ydb/services/persqueue_v1/actors/schema_actors.cpp b/ydb/services/persqueue_v1/actors/schema_actors.cpp index fd1ac8a47a3..af93589534c 100644 --- a/ydb/services/persqueue_v1/actors/schema_actors.cpp +++ b/ydb/services/persqueue_v1/actors/schema_actors.cpp @@ -661,7 +661,7 @@ void TDescribeTopicActorImpl::RequestPartitionsLocation(const TActorContext& ctx for (auto p : Settings.Partitions) { if (p >= TotalPartitions) { return RaiseError( - TStringBuilder() << "No partition " << Settings.Partitions[0] << " in topic", + TStringBuilder() << "No partition " << p << " in topic", Ydb::PersQueue::ErrorCode::BAD_REQUEST, Ydb::StatusIds::BAD_REQUEST, ctx ); } @@ -900,14 +900,12 @@ bool TDescribeTopicActor::ApplyResponse( TEvPersQueue::TEvGetPartitionsLocationResponse::TPtr& ev, const TActorContext& ) { const auto& record = ev->Get()->Record; - Y_ABORT_UNLESS(record.LocationsSize() == TotalPartitions); Y_ABORT_UNLESS(Settings.RequireLocation); - for (auto i = 0u; i < TotalPartitions; ++i) { + for (auto i = 0u; i < std::min<ui64>(record.LocationsSize(), TotalPartitions); ++i) { const auto& location = record.GetLocations(i); auto* locationResult = Result.mutable_partitions(i)->mutable_partition_location(); SetPartitionLocation(location, locationResult); - } return true; } @@ -1017,9 +1015,8 @@ bool TDescribeConsumerActor::ApplyResponse( TEvPersQueue::TEvGetPartitionsLocationResponse::TPtr& ev, const TActorContext& ) { const auto& record = ev->Get()->Record; - Y_ABORT_UNLESS(record.LocationsSize() == TotalPartitions); Y_ABORT_UNLESS(Settings.RequireLocation); - for (auto i = 0u; i < TotalPartitions; ++i) { + for (auto i = 0u; i < std::min<ui64>(record.LocationsSize(), TotalPartitions); ++i) { const auto& location = record.GetLocations(i); auto* locationResult = Result.mutable_partitions(i)->mutable_partition_location(); SetPartitionLocation(location, locationResult); |