diff options
author | Alek5andr-Kotov <akotov@ydb.tech> | 2024-09-13 16:48:38 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-09-13 16:48:38 +0300 |
commit | 159be177cab8d7e985efab122c30bb4c03fcec78 (patch) | |
tree | b7fb8ed55780750b823e9c25a5feaa0399d0fe26 | |
parent | 6ee3d1ca96cd19db40c6d4d2d74b3668a975bfc8 (diff) | |
download | ydb-159be177cab8d7e985efab122c30bb4c03fcec78.tar.gz |
`Y_VERIFY` in the `RenameFormedBlobs` function (#8916)
-rw-r--r-- | ydb/core/persqueue/partition.cpp | 28 | ||||
-rw-r--r-- | ydb/core/persqueue/partition_id.h | 8 | ||||
-rw-r--r-- | ydb/core/persqueue/partition_write.cpp | 10 | ||||
-rw-r--r-- | ydb/core/persqueue/pq_impl.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_change_exchange.cpp | 15 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/ut_base/ut_base.cpp | 3 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp | 95 |
7 files changed, 140 insertions, 21 deletions
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index 9939fc17397..bf216b3034c 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -82,7 +82,7 @@ TString TPartition::LogPrefix() const { } else { state = "Unknown"; } - return TStringBuilder() << "[PQ: " << TabletID << ", Partition:" << Partition << ", State:" << state << "] "; + return TStringBuilder() << "[PQ: " << TabletID << ", Partition: " << Partition << ", State: " << state << "] "; } bool TPartition::IsActive() const { @@ -2149,6 +2149,8 @@ bool TPartition::BeginTransaction(const TEvPQ::TEvProposePartitionConfig& event) void TPartition::CommitWriteOperations(TTransaction& t) { + PQ_LOG_D("TPartition::CommitWriteOperations TxId: " << t.GetTxId()); + Y_ABORT_UNLESS(PersistRequest); Y_ABORT_UNLESS(!PartitionedBlob.IsInited()); @@ -2166,6 +2168,10 @@ void TPartition::CommitWriteOperations(TTransaction& t) HaveWriteMsg = true; } + PQ_LOG_D("t.WriteInfo->BodyKeys.size=" << t.WriteInfo->BodyKeys.size() << + ", t.WriteInfo->BlobsFromHead.size=" << t.WriteInfo->BlobsFromHead.size()); + PQ_LOG_D("Head=" << Head << ", NewHead=" << NewHead); + if (!t.WriteInfo->BodyKeys.empty()) { PartitionedBlob = TPartitionedBlob(Partition, NewHead.Offset, @@ -2180,6 +2186,7 @@ void TPartition::CommitWriteOperations(TTransaction& t) MaxBlobSize); for (auto& k : t.WriteInfo->BodyKeys) { + PQ_LOG_D("add key " << k.Key.ToString()); auto write = PartitionedBlob.Add(k.Key, k.Size); if (write && !write->Value.empty()) { AddCmdWrite(write, PersistRequest.Get(), ctx); @@ -2188,18 +2195,17 @@ void TPartition::CommitWriteOperations(TTransaction& t) } } - } - if (const auto& formedBlobs = PartitionedBlob.GetFormedBlobs(); !formedBlobs.empty()) { - ui32 curWrites = RenameTmpCmdWrites(PersistRequest.Get()); - RenameFormedBlobs(formedBlobs, - *Parameters, - curWrites, - PersistRequest.Get(), - ctx); - } + PQ_LOG_D("PartitionedBlob.GetFormedBlobs().size=" << PartitionedBlob.GetFormedBlobs().size()); + if (const auto& formedBlobs = PartitionedBlob.GetFormedBlobs(); !formedBlobs.empty()) { + ui32 curWrites = RenameTmpCmdWrites(PersistRequest.Get()); + RenameFormedBlobs(formedBlobs, + *Parameters, + curWrites, + PersistRequest.Get(), + ctx); + } - if (!t.WriteInfo->BodyKeys.empty()) { const auto& last = t.WriteInfo->BodyKeys.back(); NewHead.Offset += (last.Key.GetOffset() + last.Key.GetCount()); diff --git a/ydb/core/persqueue/partition_id.h b/ydb/core/persqueue/partition_id.h index 5ef5c4fa75e..0c1dbb8d3af 100644 --- a/ydb/core/persqueue/partition_id.h +++ b/ydb/core/persqueue/partition_id.h @@ -7,6 +7,7 @@ #include <util/system/types.h> #include <util/digest/multi.h> #include <util/str_stl.h> +#include <util/string/builder.h> #include <functional> @@ -51,6 +52,13 @@ public: } } + TString ToString() const + { + TStringBuilder s; + s << *this; + return s; + } + bool IsSupportivePartition() const { return WriteId.Defined(); diff --git a/ydb/core/persqueue/partition_write.cpp b/ydb/core/persqueue/partition_write.cpp index 4d85b65a813..1752eed9002 100644 --- a/ydb/core/persqueue/partition_write.cpp +++ b/ydb/core/persqueue/partition_write.cpp @@ -1066,16 +1066,16 @@ void TPartition::RenameFormedBlobs(const std::deque<TPartitionedBlob::TRenameFor } if (!DataKeysBody.empty() && CompactedKeys.empty()) { Y_ABORT_UNLESS(DataKeysBody.back().Key.GetOffset() + DataKeysBody.back().Key.GetCount() <= x.NewKey.GetOffset(), + "PQ: %" PRIu64 ", Partition: %s, " "LAST KEY %s, HeadOffset %lu, NEWKEY %s", + TabletID, Partition.ToString().c_str(), DataKeysBody.back().Key.ToString().c_str(), Head.Offset, x.NewKey.ToString().c_str()); } - LOG_DEBUG_S( - ctx, NKikimrServices::PERSQUEUE, - "writing blob: topic '" << TopicName() << "' partition " << Partition - << " " << x.OldKey.ToString() << " size " << x.Size << " WTime " << ctx.Now().MilliSeconds() - ); + PQ_LOG_D("writing blob: topic '" << TopicName() << "' partition " << Partition << + " old key " << x.OldKey.ToString() << " new key " << x.NewKey.ToString() << + " size " << x.Size << " WTime " << ctx.Now().MilliSeconds()); CompactedKeys.emplace_back(x.NewKey, x.Size); } diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index 2c1af1bfb7e..fa13f239fa6 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -4661,6 +4661,8 @@ void TPersQueue::TryStartTransaction(const TActorContext& ctx) Y_ABORT_UNLESS(next); CheckTxState(ctx, *next); + + TryWriteTxs(ctx); } void TPersQueue::OnInitComplete(const TActorContext& ctx) diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp index 12eafa74c18..ba33b8d19d7 100644 --- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp +++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp @@ -1248,6 +1248,8 @@ Y_UNIT_TEST_SUITE(Cdc) { // get records { + WaitForDataRecords(client, shardIt); + auto res = client.GetRecords(shardIt).ExtractValueSync(); UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString()); UNIT_ASSERT_VALUES_EQUAL(res.GetResult().records().size(), records.size()); @@ -1269,6 +1271,19 @@ Y_UNIT_TEST_SUITE(Cdc) { } } + static void WaitForDataRecords(TDataStreamsClient& client, const TString& shardIt) { + int n = 0; + for (; n < 100; ++n) { + auto res = client.GetRecords(shardIt).ExtractValueSync(); + UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString()); + if (res.GetResult().records().size()) { + break; + } + Sleep(TDuration::MilliSeconds(100)); + } + UNIT_ASSERT_VALUES_UNEQUAL(n, 100); + } + static void Write(const TShardedTableOptions& tableDesc, const TCdcStream& streamDesc) { TTestYdsEnv env(tableDesc, streamDesc); diff --git a/ydb/core/tx/schemeshard/ut_base/ut_base.cpp b/ydb/core/tx/schemeshard/ut_base/ut_base.cpp index ed922cfdf25..254604ed563 100644 --- a/ydb/core/tx/schemeshard/ut_base/ut_base.cpp +++ b/ydb/core/tx/schemeshard/ut_base/ut_base.cpp @@ -6331,6 +6331,7 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) { "PartitionPerTablet: 10 " "PQTabletConfig: {PartitionConfig { LifetimeSeconds : 10}}" ); + env.TestWaitNotification(runtime, txId); TestDescribeResult(DescribePath(runtime, "/MyRoot/DirA/PQGroup_1", true), {NLs::CheckPartCount("PQGroup_1", 100, 10, 10, 100), @@ -6853,7 +6854,7 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) { AsyncForceDropUnsafe(runtime, ++txId, pVer.PathId.LocalPathId); TestModificationResult(runtime, txId-2, NKikimrScheme::StatusAccepted); - TestModificationResult(runtime, txId-1, NKikimrScheme::StatusAccepted); + TestModificationResults(runtime, txId-1, {NKikimrScheme::StatusAccepted, NKikimrScheme::StatusMultipleModifications}); TestModificationResult(runtime, txId, NKikimrScheme::StatusAccepted); TActorId sender = runtime.AllocateEdgeActor(); diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp b/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp index dccec2afa8e..260b8875de1 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp @@ -152,6 +152,8 @@ protected: void TestTxWithBigBlobs(const TTestTxWithBigBlobsParams& params); + void WriteMessagesInTx(size_t big, size_t small); + const TDriver& GetDriver() const; void CheckTabletKeys(const TString& topicName); @@ -1611,21 +1613,22 @@ void TFixture::TestTxWithBigBlobs(const TTestTxWithBigBlobsParams& params) for (size_t i = 0; i < params.OldHeadCount; ++i) { WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(100'000, 'x')); + WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); ++oldHeadMsgCount; } for (size_t i = 0; i < params.BigBlobsCount; ++i) { - WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(7'900'000, 'x'), &tx); + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(7'000'000, 'x'), &tx); + WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); ++bigBlobMsgCount; } for (size_t i = 0; i < params.NewHeadCount; ++i) { WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(100'000, 'x'), &tx); + WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); ++newHeadMsgCount; } - WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); - if (params.RestartMode == ERestartBeforeCommit) { RestartPQTablet("topic_A", 0); } @@ -1654,7 +1657,7 @@ void TFixture::TestTxWithBigBlobs(const TTestTxWithBigBlobsParams& params) start += oldHeadMsgCount; for (size_t i = 0; i < bigBlobMsgCount; ++i) { - UNIT_ASSERT_VALUES_EQUAL(messages[start + i].size(), 7'900'000); + UNIT_ASSERT_VALUES_EQUAL(messages[start + i].size(), 7'000'000); } start += bigBlobMsgCount; @@ -1921,6 +1924,90 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_28, TFixture) UNIT_ASSERT_VALUES_EQUAL(messages.size(), 2); } +void TFixture::WriteMessagesInTx(size_t big, size_t small) +{ + CreateTopic("topic_A", TEST_CONSUMER); + + NTable::TSession tableSession = CreateTableSession(); + NTable::TTransaction tx = BeginTx(tableSession); + + for (size_t i = 0; i < big; ++i) { + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(7'000'000, 'x'), &tx, 0); + WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); + } + + for (size_t i = 0; i < small; ++i) { + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(16'384, 'x'), &tx, 0); + WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); + } + + CommitTx(tx, EStatus::SUCCESS); +} + +Y_UNIT_TEST_F(WriteToTopic_Demo_29, TFixture) +{ + WriteMessagesInTx(1, 0); + WriteMessagesInTx(1, 0); +} + +Y_UNIT_TEST_F(WriteToTopic_Demo_30, TFixture) +{ + WriteMessagesInTx(1, 0); + WriteMessagesInTx(0, 1); +} + +Y_UNIT_TEST_F(WriteToTopic_Demo_31, TFixture) +{ + WriteMessagesInTx(1, 0); + WriteMessagesInTx(1, 1); +} + +Y_UNIT_TEST_F(WriteToTopic_Demo_32, TFixture) +{ + WriteMessagesInTx(0, 1); + WriteMessagesInTx(1, 0); +} + +Y_UNIT_TEST_F(WriteToTopic_Demo_33, TFixture) +{ + WriteMessagesInTx(0, 1); + WriteMessagesInTx(0, 1); +} + +Y_UNIT_TEST_F(WriteToTopic_Demo_34, TFixture) +{ + WriteMessagesInTx(0, 1); + WriteMessagesInTx(1, 1); +} + +Y_UNIT_TEST_F(WriteToTopic_Demo_35, TFixture) +{ + WriteMessagesInTx(1, 1); + WriteMessagesInTx(1, 0); +} + +Y_UNIT_TEST_F(WriteToTopic_Demo_36, TFixture) +{ + WriteMessagesInTx(1, 1); + WriteMessagesInTx(0, 1); +} + +Y_UNIT_TEST_F(WriteToTopic_Demo_37, TFixture) +{ + WriteMessagesInTx(1, 1); + WriteMessagesInTx(1, 1); +} + + +Y_UNIT_TEST_F(WriteToTopic_Demo_38, TFixture) +{ + WriteMessagesInTx(2, 202); + WriteMessagesInTx(2, 200); + WriteMessagesInTx(0, 1); + WriteMessagesInTx(4, 0); + WriteMessagesInTx(0, 1); +} + } } |