aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlek5andr-Kotov <akotov@ydb.tech>2024-09-13 16:48:38 +0300
committerGitHub <noreply@github.com>2024-09-13 16:48:38 +0300
commit159be177cab8d7e985efab122c30bb4c03fcec78 (patch)
treeb7fb8ed55780750b823e9c25a5feaa0399d0fe26
parent6ee3d1ca96cd19db40c6d4d2d74b3668a975bfc8 (diff)
downloadydb-159be177cab8d7e985efab122c30bb4c03fcec78.tar.gz
`Y_VERIFY` in the `RenameFormedBlobs` function (#8916)
-rw-r--r--ydb/core/persqueue/partition.cpp28
-rw-r--r--ydb/core/persqueue/partition_id.h8
-rw-r--r--ydb/core/persqueue/partition_write.cpp10
-rw-r--r--ydb/core/persqueue/pq_impl.cpp2
-rw-r--r--ydb/core/tx/datashard/datashard_ut_change_exchange.cpp15
-rw-r--r--ydb/core/tx/schemeshard/ut_base/ut_base.cpp3
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp95
7 files changed, 140 insertions, 21 deletions
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp
index 9939fc17397..bf216b3034c 100644
--- a/ydb/core/persqueue/partition.cpp
+++ b/ydb/core/persqueue/partition.cpp
@@ -82,7 +82,7 @@ TString TPartition::LogPrefix() const {
} else {
state = "Unknown";
}
- return TStringBuilder() << "[PQ: " << TabletID << ", Partition:" << Partition << ", State:" << state << "] ";
+ return TStringBuilder() << "[PQ: " << TabletID << ", Partition: " << Partition << ", State: " << state << "] ";
}
bool TPartition::IsActive() const {
@@ -2149,6 +2149,8 @@ bool TPartition::BeginTransaction(const TEvPQ::TEvProposePartitionConfig& event)
void TPartition::CommitWriteOperations(TTransaction& t)
{
+ PQ_LOG_D("TPartition::CommitWriteOperations TxId: " << t.GetTxId());
+
Y_ABORT_UNLESS(PersistRequest);
Y_ABORT_UNLESS(!PartitionedBlob.IsInited());
@@ -2166,6 +2168,10 @@ void TPartition::CommitWriteOperations(TTransaction& t)
HaveWriteMsg = true;
}
+ PQ_LOG_D("t.WriteInfo->BodyKeys.size=" << t.WriteInfo->BodyKeys.size() <<
+ ", t.WriteInfo->BlobsFromHead.size=" << t.WriteInfo->BlobsFromHead.size());
+ PQ_LOG_D("Head=" << Head << ", NewHead=" << NewHead);
+
if (!t.WriteInfo->BodyKeys.empty()) {
PartitionedBlob = TPartitionedBlob(Partition,
NewHead.Offset,
@@ -2180,6 +2186,7 @@ void TPartition::CommitWriteOperations(TTransaction& t)
MaxBlobSize);
for (auto& k : t.WriteInfo->BodyKeys) {
+ PQ_LOG_D("add key " << k.Key.ToString());
auto write = PartitionedBlob.Add(k.Key, k.Size);
if (write && !write->Value.empty()) {
AddCmdWrite(write, PersistRequest.Get(), ctx);
@@ -2188,18 +2195,17 @@ void TPartition::CommitWriteOperations(TTransaction& t)
}
}
- }
- if (const auto& formedBlobs = PartitionedBlob.GetFormedBlobs(); !formedBlobs.empty()) {
- ui32 curWrites = RenameTmpCmdWrites(PersistRequest.Get());
- RenameFormedBlobs(formedBlobs,
- *Parameters,
- curWrites,
- PersistRequest.Get(),
- ctx);
- }
+ PQ_LOG_D("PartitionedBlob.GetFormedBlobs().size=" << PartitionedBlob.GetFormedBlobs().size());
+ if (const auto& formedBlobs = PartitionedBlob.GetFormedBlobs(); !formedBlobs.empty()) {
+ ui32 curWrites = RenameTmpCmdWrites(PersistRequest.Get());
+ RenameFormedBlobs(formedBlobs,
+ *Parameters,
+ curWrites,
+ PersistRequest.Get(),
+ ctx);
+ }
- if (!t.WriteInfo->BodyKeys.empty()) {
const auto& last = t.WriteInfo->BodyKeys.back();
NewHead.Offset += (last.Key.GetOffset() + last.Key.GetCount());
diff --git a/ydb/core/persqueue/partition_id.h b/ydb/core/persqueue/partition_id.h
index 5ef5c4fa75e..0c1dbb8d3af 100644
--- a/ydb/core/persqueue/partition_id.h
+++ b/ydb/core/persqueue/partition_id.h
@@ -7,6 +7,7 @@
#include <util/system/types.h>
#include <util/digest/multi.h>
#include <util/str_stl.h>
+#include <util/string/builder.h>
#include <functional>
@@ -51,6 +52,13 @@ public:
}
}
+ TString ToString() const
+ {
+ TStringBuilder s;
+ s << *this;
+ return s;
+ }
+
bool IsSupportivePartition() const
{
return WriteId.Defined();
diff --git a/ydb/core/persqueue/partition_write.cpp b/ydb/core/persqueue/partition_write.cpp
index 4d85b65a813..1752eed9002 100644
--- a/ydb/core/persqueue/partition_write.cpp
+++ b/ydb/core/persqueue/partition_write.cpp
@@ -1066,16 +1066,16 @@ void TPartition::RenameFormedBlobs(const std::deque<TPartitionedBlob::TRenameFor
}
if (!DataKeysBody.empty() && CompactedKeys.empty()) {
Y_ABORT_UNLESS(DataKeysBody.back().Key.GetOffset() + DataKeysBody.back().Key.GetCount() <= x.NewKey.GetOffset(),
+ "PQ: %" PRIu64 ", Partition: %s, "
"LAST KEY %s, HeadOffset %lu, NEWKEY %s",
+ TabletID, Partition.ToString().c_str(),
DataKeysBody.back().Key.ToString().c_str(),
Head.Offset,
x.NewKey.ToString().c_str());
}
- LOG_DEBUG_S(
- ctx, NKikimrServices::PERSQUEUE,
- "writing blob: topic '" << TopicName() << "' partition " << Partition
- << " " << x.OldKey.ToString() << " size " << x.Size << " WTime " << ctx.Now().MilliSeconds()
- );
+ PQ_LOG_D("writing blob: topic '" << TopicName() << "' partition " << Partition <<
+ " old key " << x.OldKey.ToString() << " new key " << x.NewKey.ToString() <<
+ " size " << x.Size << " WTime " << ctx.Now().MilliSeconds());
CompactedKeys.emplace_back(x.NewKey, x.Size);
}
diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp
index 2c1af1bfb7e..fa13f239fa6 100644
--- a/ydb/core/persqueue/pq_impl.cpp
+++ b/ydb/core/persqueue/pq_impl.cpp
@@ -4661,6 +4661,8 @@ void TPersQueue::TryStartTransaction(const TActorContext& ctx)
Y_ABORT_UNLESS(next);
CheckTxState(ctx, *next);
+
+ TryWriteTxs(ctx);
}
void TPersQueue::OnInitComplete(const TActorContext& ctx)
diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
index 12eafa74c18..ba33b8d19d7 100644
--- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
@@ -1248,6 +1248,8 @@ Y_UNIT_TEST_SUITE(Cdc) {
// get records
{
+ WaitForDataRecords(client, shardIt);
+
auto res = client.GetRecords(shardIt).ExtractValueSync();
UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
UNIT_ASSERT_VALUES_EQUAL(res.GetResult().records().size(), records.size());
@@ -1269,6 +1271,19 @@ Y_UNIT_TEST_SUITE(Cdc) {
}
}
+ static void WaitForDataRecords(TDataStreamsClient& client, const TString& shardIt) {
+ int n = 0;
+ for (; n < 100; ++n) {
+ auto res = client.GetRecords(shardIt).ExtractValueSync();
+ UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
+ if (res.GetResult().records().size()) {
+ break;
+ }
+ Sleep(TDuration::MilliSeconds(100));
+ }
+ UNIT_ASSERT_VALUES_UNEQUAL(n, 100);
+ }
+
static void Write(const TShardedTableOptions& tableDesc, const TCdcStream& streamDesc) {
TTestYdsEnv env(tableDesc, streamDesc);
diff --git a/ydb/core/tx/schemeshard/ut_base/ut_base.cpp b/ydb/core/tx/schemeshard/ut_base/ut_base.cpp
index ed922cfdf25..254604ed563 100644
--- a/ydb/core/tx/schemeshard/ut_base/ut_base.cpp
+++ b/ydb/core/tx/schemeshard/ut_base/ut_base.cpp
@@ -6331,6 +6331,7 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) {
"PartitionPerTablet: 10 "
"PQTabletConfig: {PartitionConfig { LifetimeSeconds : 10}}"
);
+ env.TestWaitNotification(runtime, txId);
TestDescribeResult(DescribePath(runtime, "/MyRoot/DirA/PQGroup_1", true),
{NLs::CheckPartCount("PQGroup_1", 100, 10, 10, 100),
@@ -6853,7 +6854,7 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) {
AsyncForceDropUnsafe(runtime, ++txId, pVer.PathId.LocalPathId);
TestModificationResult(runtime, txId-2, NKikimrScheme::StatusAccepted);
- TestModificationResult(runtime, txId-1, NKikimrScheme::StatusAccepted);
+ TestModificationResults(runtime, txId-1, {NKikimrScheme::StatusAccepted, NKikimrScheme::StatusMultipleModifications});
TestModificationResult(runtime, txId, NKikimrScheme::StatusAccepted);
TActorId sender = runtime.AllocateEdgeActor();
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp b/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp
index dccec2afa8e..260b8875de1 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp
@@ -152,6 +152,8 @@ protected:
void TestTxWithBigBlobs(const TTestTxWithBigBlobsParams& params);
+ void WriteMessagesInTx(size_t big, size_t small);
+
const TDriver& GetDriver() const;
void CheckTabletKeys(const TString& topicName);
@@ -1611,21 +1613,22 @@ void TFixture::TestTxWithBigBlobs(const TTestTxWithBigBlobsParams& params)
for (size_t i = 0; i < params.OldHeadCount; ++i) {
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(100'000, 'x'));
+ WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
++oldHeadMsgCount;
}
for (size_t i = 0; i < params.BigBlobsCount; ++i) {
- WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(7'900'000, 'x'), &tx);
+ WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(7'000'000, 'x'), &tx);
+ WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
++bigBlobMsgCount;
}
for (size_t i = 0; i < params.NewHeadCount; ++i) {
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(100'000, 'x'), &tx);
+ WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
++newHeadMsgCount;
}
- WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
-
if (params.RestartMode == ERestartBeforeCommit) {
RestartPQTablet("topic_A", 0);
}
@@ -1654,7 +1657,7 @@ void TFixture::TestTxWithBigBlobs(const TTestTxWithBigBlobsParams& params)
start += oldHeadMsgCount;
for (size_t i = 0; i < bigBlobMsgCount; ++i) {
- UNIT_ASSERT_VALUES_EQUAL(messages[start + i].size(), 7'900'000);
+ UNIT_ASSERT_VALUES_EQUAL(messages[start + i].size(), 7'000'000);
}
start += bigBlobMsgCount;
@@ -1921,6 +1924,90 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_28, TFixture)
UNIT_ASSERT_VALUES_EQUAL(messages.size(), 2);
}
+void TFixture::WriteMessagesInTx(size_t big, size_t small)
+{
+ CreateTopic("topic_A", TEST_CONSUMER);
+
+ NTable::TSession tableSession = CreateTableSession();
+ NTable::TTransaction tx = BeginTx(tableSession);
+
+ for (size_t i = 0; i < big; ++i) {
+ WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(7'000'000, 'x'), &tx, 0);
+ WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
+ }
+
+ for (size_t i = 0; i < small; ++i) {
+ WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(16'384, 'x'), &tx, 0);
+ WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
+ }
+
+ CommitTx(tx, EStatus::SUCCESS);
+}
+
+Y_UNIT_TEST_F(WriteToTopic_Demo_29, TFixture)
+{
+ WriteMessagesInTx(1, 0);
+ WriteMessagesInTx(1, 0);
+}
+
+Y_UNIT_TEST_F(WriteToTopic_Demo_30, TFixture)
+{
+ WriteMessagesInTx(1, 0);
+ WriteMessagesInTx(0, 1);
+}
+
+Y_UNIT_TEST_F(WriteToTopic_Demo_31, TFixture)
+{
+ WriteMessagesInTx(1, 0);
+ WriteMessagesInTx(1, 1);
+}
+
+Y_UNIT_TEST_F(WriteToTopic_Demo_32, TFixture)
+{
+ WriteMessagesInTx(0, 1);
+ WriteMessagesInTx(1, 0);
+}
+
+Y_UNIT_TEST_F(WriteToTopic_Demo_33, TFixture)
+{
+ WriteMessagesInTx(0, 1);
+ WriteMessagesInTx(0, 1);
+}
+
+Y_UNIT_TEST_F(WriteToTopic_Demo_34, TFixture)
+{
+ WriteMessagesInTx(0, 1);
+ WriteMessagesInTx(1, 1);
+}
+
+Y_UNIT_TEST_F(WriteToTopic_Demo_35, TFixture)
+{
+ WriteMessagesInTx(1, 1);
+ WriteMessagesInTx(1, 0);
+}
+
+Y_UNIT_TEST_F(WriteToTopic_Demo_36, TFixture)
+{
+ WriteMessagesInTx(1, 1);
+ WriteMessagesInTx(0, 1);
+}
+
+Y_UNIT_TEST_F(WriteToTopic_Demo_37, TFixture)
+{
+ WriteMessagesInTx(1, 1);
+ WriteMessagesInTx(1, 1);
+}
+
+
+Y_UNIT_TEST_F(WriteToTopic_Demo_38, TFixture)
+{
+ WriteMessagesInTx(2, 202);
+ WriteMessagesInTx(2, 200);
+ WriteMessagesInTx(0, 1);
+ WriteMessagesInTx(4, 0);
+ WriteMessagesInTx(0, 1);
+}
+
}
}