aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlek5andr-Kotov <akotov@ydb.tech>2024-08-08 14:17:35 +0300
committerGitHub <noreply@github.com>2024-08-08 14:17:35 +0300
commita08fe025ce9ba1ad357efc8b6bda1f6368e2a1dd (patch)
tree0d7a7adf09af8dc99b77d77b9332d237846de717
parentc2e177fc54584ed38828d4e77d7f5fb54d26836e (diff)
downloadydb-a08fe025ce9ba1ad357efc8b6bda1f6368e2a1dd.tar.gz
Transactions are loaded in chunks (#7549)
-rw-r--r--ydb/core/persqueue/pq_impl.cpp176
-rw-r--r--ydb/core/persqueue/pq_impl.h22
-rw-r--r--ydb/core/persqueue/ut/make_config.cpp10
-rw-r--r--ydb/core/persqueue/ut/make_config.h1
-rw-r--r--ydb/core/persqueue/ut/pqtablet_ut.cpp57
5 files changed, 213 insertions, 53 deletions
diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp
index 556785352a..243ae5cd2e 100644
--- a/ydb/core/persqueue/pq_impl.cpp
+++ b/ydb/core/persqueue/pq_impl.cpp
@@ -782,7 +782,7 @@ void TPersQueue::EndWriteConfig(const NKikimrClient::TResponse& resp, const TAct
NewConfigShouldBeApplied = true; //when config will be inited with old value new config will be applied
}
-void TPersQueue::HandleConfigReadResponse(const NKikimrClient::TResponse& resp, const TActorContext& ctx)
+void TPersQueue::HandleConfigReadResponse(NKikimrClient::TResponse&& resp, const TActorContext& ctx)
{
bool ok =
(resp.GetStatus() == NMsgBusProxy::MSTATUS_OK) &&
@@ -790,16 +790,68 @@ void TPersQueue::HandleConfigReadResponse(const NKikimrClient::TResponse& resp,
(resp.HasSetExecutorFastLogPolicyResult()) &&
(resp.GetSetExecutorFastLogPolicyResult().GetStatus() == NKikimrProto::OK);
if (!ok) {
- LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE,
- "Tablet " << TabletID() << " Config read error: " << resp.DebugString() << " " << ctx.SelfID);
- ctx.Send(ctx.SelfID, new TEvents::TEvPoisonPill());
+ PQ_LOG_ERROR_AND_DIE("Config read error: " << resp.ShortDebugString());
return;
}
- ReadTxInfo(resp.GetReadResult(2), ctx);
- ReadConfig(resp.GetReadResult(0), resp.GetReadRangeResult(0), ctx);
- ReadTxWrites(resp.GetReadResult(2), ctx);
- ReadState(resp.GetReadResult(1), ctx);
+ ConfigReadResponse = std::move(resp);
+
+ BeginInitTransactions();
+ SendTransactionsReadRequest(GetTxKey(Min<ui64>()), true, ctx);
+}
+
+void TPersQueue::SendTransactionsReadRequest(const TString& fromKey, bool includeFrom,
+ const TActorContext& ctx)
+{
+ THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest);
+ request->Record.SetCookie(READ_TXS_COOKIE);
+
+ AddCmdReadTransactionRange(*request, fromKey, includeFrom);
+
+ request->Record.MutableCmdSetExecutorFastLogPolicy()
+ ->SetIsAllowed(AppData(ctx)->PQConfig.GetTactic() == NKikimrClient::TKeyValueRequest::MIN_LATENCY);
+ ctx.Send(ctx.SelfID, request.Release());
+}
+
+TString GetLastKey(const NKikimrClient::TKeyValueResponse::TReadRangeResult& result)
+{
+ if (!result.PairSize()) {
+ return {};
+ }
+
+ return result.GetPair(result.PairSize() - 1).GetKey();
+}
+
+void TPersQueue::HandleTransactionsReadResponse(NKikimrClient::TResponse&& resp, const TActorContext& ctx)
+{
+ bool ok =
+ (resp.GetStatus() == NMsgBusProxy::MSTATUS_OK) &&
+ (resp.ReadRangeResultSize() == 1) &&
+ (resp.HasSetExecutorFastLogPolicyResult()) &&
+ (resp.GetSetExecutorFastLogPolicyResult().GetStatus() == NKikimrProto::OK);
+ const auto& result = resp.GetReadRangeResult(0);
+ auto status = result.GetStatus();
+ if (status != NKikimrProto::OVERRUN &&
+ status != NKikimrProto::OK &&
+ status != NKikimrProto::NODATA) {
+ ok = false;
+ }
+ if (!ok) {
+ PQ_LOG_ERROR_AND_DIE("Transactions read error: " << resp.ShortDebugString());
+ return;
+ }
+
+ TransactionsReadResults.emplace_back(std::move(result));
+
+ if (status == NKikimrProto::OVERRUN) {
+ SendTransactionsReadRequest(GetLastKey(result), false, ctx);
+ return;
+ }
+
+ ReadTxInfo(ConfigReadResponse.GetReadResult(2), ctx);
+ ReadConfig(ConfigReadResponse.GetReadResult(0), TransactionsReadResults, ctx);
+ ReadTxWrites(ConfigReadResponse.GetReadResult(2), ctx);
+ ReadState(ConfigReadResponse.GetReadResult(1), ctx);
}
void TPersQueue::ReadTxInfo(const NKikimrClient::TKeyValueResponse::TReadResult& read,
@@ -1005,7 +1057,7 @@ void TPersQueue::InitTxWrites(const NKikimrPQ::TTabletTxInfo& info,
}
void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult& read,
- const NKikimrClient::TKeyValueResponse::TReadRangeResult& readRange,
+ const TVector<NKikimrClient::TKeyValueResponse::TReadRangeResult>& readRanges,
const TActorContext& ctx)
{
Y_ABORT_UNLESS(read.HasStatus());
@@ -1017,15 +1069,6 @@ void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult&
return;
}
- Y_ABORT_UNLESS(readRange.HasStatus());
- if (readRange.GetStatus() != NKikimrProto::OK && readRange.GetStatus() != NKikimrProto::NODATA) {
- LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE,
- "Tablet " << TabletID() << " Transactions read error " << ctx.SelfID <<
- " Error status code " << readRange.GetStatus());
- ctx.Send(ctx.SelfID, new TEvents::TEvPoisonPill());
- return;
- }
-
Y_ABORT_UNLESS(!ConfigInited);
if (read.GetStatus() == NKikimrProto::OK) {
@@ -1064,9 +1107,43 @@ void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult&
return;
}
- THashMap<ui32, TVector<TTransaction>> partitionTxs;
- InitTransactions(readRange, partitionTxs);
+ for (const auto& readRange : readRanges) {
+ Y_ABORT_UNLESS(readRange.HasStatus());
+ if (readRange.GetStatus() != NKikimrProto::OK &&
+ readRange.GetStatus() != NKikimrProto::OVERRUN &&
+ readRange.GetStatus() != NKikimrProto::NODATA) {
+ PQ_LOG_ERROR_AND_DIE("Transactions read error: " << readRange.GetStatus());
+ return;
+ }
+
+ for (size_t i = 0; i < readRange.PairSize(); ++i) {
+ const auto& pair = readRange.GetPair(i);
+
+ PQ_LOG_D("ReadRange pair." <<
+ " Key " << (pair.HasKey() ? pair.GetKey() : "unknown") <<
+ ", Status " << pair.GetStatus());
+
+ NKikimrPQ::TTransaction tx;
+ Y_ABORT_UNLESS(tx.ParseFromString(pair.GetValue()));
+ PQ_LOG_D("Load tx " << tx.ShortDebugString());
+
+ Txs.emplace(tx.GetTxId(), tx);
+
+ if (tx.HasStep()) {
+ if (std::make_pair(tx.GetStep(), tx.GetTxId()) >= std::make_pair(ExecStep, ExecTxId)) {
+ PlannedTxs.emplace_back(tx.GetStep(), tx.GetTxId());
+ }
+ }
+ }
+ }
+
+ EndInitTransactions();
+ EndReadConfig(ctx);
+}
+
+void TPersQueue::EndReadConfig(const TActorContext& ctx)
+{
for (const auto& partition : Config.GetPartitions()) { // no partitions will be created with empty config
const TPartitionId partitionId(partition.GetPartitionId());
CreateOriginalPartition(Config,
@@ -1254,7 +1331,10 @@ void TPersQueue::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext&
break;
case READ_CONFIG_COOKIE:
// read is only for config - is signal to create interal actors
- HandleConfigReadResponse(resp, ctx);
+ HandleConfigReadResponse(std::move(resp), ctx);
+ break;
+ case READ_TXS_COOKIE:
+ HandleTransactionsReadResponse(std::move(resp), ctx);
break;
case WRITE_STATE_COOKIE:
EndWriteTabletState(resp, ctx);
@@ -3103,19 +3183,28 @@ void TPersQueue::Handle(TEvInterconnect::TEvNodeInfo::TPtr& ev, const TActorCont
request->Record.AddCmdRead()->SetKey(KeyState());
request->Record.AddCmdRead()->SetKey(KeyTxInfo());
- auto cmd = request->Record.AddCmdReadRange();
- cmd->MutableRange()->SetFrom(GetTxKey(Min<ui64>()));
- cmd->MutableRange()->SetIncludeFrom(true);
- cmd->MutableRange()->SetTo(GetTxKey(Max<ui64>()));
- cmd->MutableRange()->SetIncludeTo(true);
- cmd->SetIncludeData(true);
-
request->Record.MutableCmdSetExecutorFastLogPolicy()
->SetIsAllowed(AppData(ctx)->PQConfig.GetTactic() == NKikimrClient::TKeyValueRequest::MIN_LATENCY);
ctx.Send(ctx.SelfID, request.Release());
+
ctx.Schedule(TDuration::Seconds(5), new TEvents::TEvWakeup());
}
+void TPersQueue::AddCmdReadTransactionRange(TEvKeyValue::TEvRequest& request,
+ const TString& fromKey, bool includeFrom)
+{
+ auto cmd = request.Record.AddCmdReadRange();
+ cmd->MutableRange()->SetFrom(fromKey);
+ cmd->MutableRange()->SetIncludeFrom(includeFrom);
+ cmd->MutableRange()->SetTo(GetTxKey(Max<ui64>()));
+ cmd->MutableRange()->SetIncludeTo(true);
+ cmd->SetIncludeData(true);
+
+ PQ_LOG_D("Transactions request." <<
+ " From " << cmd->MutableRange()->GetFrom() <<
+ ", To " << cmd->MutableRange()->GetTo());
+}
+
void TPersQueue::HandleWakeup(const TActorContext& ctx) {
THashSet<TString> groups;
for (auto& p : Partitions) {
@@ -4445,41 +4534,26 @@ void TPersQueue::EnsurePartitionsAreNotDeleted(const NKikimrPQ::TPQTabletConfig&
}
}
-void TPersQueue::InitTransactions(const NKikimrClient::TKeyValueResponse::TReadRangeResult& readRange,
- THashMap<ui32, TVector<TTransaction>>& partitionTxs)
+void TPersQueue::BeginInitTransactions()
{
Txs.clear();
TxQueue.clear();
- std::deque<std::pair<ui64, ui64>> plannedTxs;
-
- for (size_t i = 0; i < readRange.PairSize(); ++i) {
- auto& pair = readRange.GetPair(i);
-
- NKikimrPQ::TTransaction tx;
- Y_ABORT_UNLESS(tx.ParseFromString(pair.GetValue()));
-
- PQ_LOG_D("Load tx " << tx.ShortDebugString());
-
- Txs.emplace(tx.GetTxId(), tx);
+ PlannedTxs.clear();
+}
- if (tx.HasStep()) {
- if (std::make_pair(tx.GetStep(), tx.GetTxId()) >= std::make_pair(ExecStep, ExecTxId)) {
- plannedTxs.emplace_back(tx.GetStep(), tx.GetTxId());
- }
- }
- }
+void TPersQueue::EndInitTransactions()
+{
+ PQ_LOG_D("Txs.size=" << Txs.size() << ", PlannedTxs.size=" << PlannedTxs.size());
- std::sort(plannedTxs.begin(), plannedTxs.end());
- for (auto& item : plannedTxs) {
+ std::sort(PlannedTxs.begin(), PlannedTxs.end());
+ for (auto& item : PlannedTxs) {
TxQueue.push(item);
}
if (!TxQueue.empty()) {
PQ_LOG_D("top tx queue (" << TxQueue.front().first << ", " << TxQueue.front().second << ")");
}
-
- Y_UNUSED(partitionTxs);
}
void TPersQueue::TryStartTransaction(const TActorContext& ctx)
diff --git a/ydb/core/persqueue/pq_impl.h b/ydb/core/persqueue/pq_impl.h
index 73a0ab5d74..ff303d40cf 100644
--- a/ydb/core/persqueue/pq_impl.h
+++ b/ydb/core/persqueue/pq_impl.h
@@ -35,6 +35,7 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
READ_CONFIG_COOKIE = 3,
WRITE_STATE_COOKIE = 4,
WRITE_TX_COOKIE = 5,
+ READ_TXS_COOKIE = 6,
};
void CreatedHook(const TActorContext& ctx) override;
@@ -97,7 +98,8 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
//response from KV on READ or WRITE config request
void Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& ctx);
- void HandleConfigReadResponse(const NKikimrClient::TResponse& resp, const TActorContext& ctx);
+ void HandleConfigReadResponse(NKikimrClient::TResponse&& resp, const TActorContext& ctx);
+ void HandleTransactionsReadResponse(NKikimrClient::TResponse&& resp, const TActorContext& ctx);
void ApplyNewConfigAndReply(const TActorContext& ctx);
void ApplyNewConfig(const NKikimrPQ::TPQTabletConfig& newConfig,
const TActorContext& ctx);
@@ -108,7 +110,7 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
void ReadTxWrites(const NKikimrClient::TKeyValueResponse::TReadResult& read,
const TActorContext& ctx);
void ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult& read,
- const NKikimrClient::TKeyValueResponse::TReadRangeResult& readRange,
+ const TVector<NKikimrClient::TKeyValueResponse::TReadRangeResult>& readRanges,
const TActorContext& ctx);
void ReadState(const NKikimrClient::TKeyValueResponse::TReadResult& read, const TActorContext& ctx);
@@ -506,6 +508,22 @@ private:
void MoveTopTxToCalculating(TDistributedTransaction& tx, const TActorContext& ctx);
void DeletePartition(const TPartitionId& partitionId, const TActorContext& ctx);
+
+ std::deque<std::pair<ui64, ui64>> PlannedTxs;
+
+ void BeginInitTransactions();
+ void EndInitTransactions();
+
+ void EndReadConfig(const TActorContext& ctx);
+
+ void AddCmdReadTransactionRange(TEvKeyValue::TEvRequest& request,
+ const TString& fromKey, bool includeFrom);
+
+ NKikimrClient::TResponse ConfigReadResponse;
+ TVector<NKikimrClient::TKeyValueResponse::TReadRangeResult> TransactionsReadResults;
+
+ void SendTransactionsReadRequest(const TString& fromKey, bool includeFrom,
+ const TActorContext& ctx);
};
diff --git a/ydb/core/persqueue/ut/make_config.cpp b/ydb/core/persqueue/ut/make_config.cpp
index bc5db29e72..eb9ae5e648 100644
--- a/ydb/core/persqueue/ut/make_config.cpp
+++ b/ydb/core/persqueue/ut/make_config.cpp
@@ -1,6 +1,7 @@
#include "make_config.h"
#include <util/datetime/base.h>
+#include <util/string/printf.h>
#include <ydb/core/persqueue/utils.h>
@@ -50,6 +51,15 @@ NKikimrPQ::TPQTabletConfig MakeConfig(const TMakeConfigParams& params)
config.MutablePartitionConfig()->SetLifetimeSeconds(TDuration::Hours(24).Seconds());
config.MutablePartitionConfig()->SetWriteSpeedInBytesPerSecond(10 << 20);
+ if (params.HugeConfig) {
+ for (size_t i = 0; i < 2'500; ++i) {
+ TString name = Sprintf("fake-consumer-%s-%" PRISZT,
+ TString(3'000, 'a').data(), i);
+ config.AddReadRules(name);
+ config.AddReadRuleGenerations(1);
+ }
+ }
+
Migrate(config);
return config;
diff --git a/ydb/core/persqueue/ut/make_config.h b/ydb/core/persqueue/ut/make_config.h
index 6f11cc481e..9b072116be 100644
--- a/ydb/core/persqueue/ut/make_config.h
+++ b/ydb/core/persqueue/ut/make_config.h
@@ -32,6 +32,7 @@ struct TMakeConfigParams {
TVector<TPartitionParams> AllPartitions;
ui32 PartitionsCount = 1;
NKikimrPQ::TPQTabletConfig::EMeteringMode MeteringMode = NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS;
+ bool HugeConfig = false;
};
NKikimrPQ::TPQTabletConfig MakeConfig(const TMakeConfigParams& params);
diff --git a/ydb/core/persqueue/ut/pqtablet_ut.cpp b/ydb/core/persqueue/ut/pqtablet_ut.cpp
index 59dba2354f..a0fa283354 100644
--- a/ydb/core/persqueue/ut/pqtablet_ut.cpp
+++ b/ydb/core/persqueue/ut/pqtablet_ut.cpp
@@ -177,6 +177,7 @@ protected:
void SetUp(NUnitTest::TTestContext&) override;
void TearDown(NUnitTest::TTestContext&) override;
+ void ResetPipe();
void EnsurePipeExist();
void SendToPipe(const TActorId& sender,
IEventBase* event,
@@ -246,6 +247,8 @@ protected:
void TPQTabletFixture::SetUp(NUnitTest::TTestContext&)
{
Ctx.ConstructInPlace();
+ Ctx->EnableDetailedPQLog = true;
+
Finalizer.ConstructInPlace(*Ctx);
Ctx->Prepare();
@@ -254,8 +257,14 @@ void TPQTabletFixture::SetUp(NUnitTest::TTestContext&)
void TPQTabletFixture::TearDown(NUnitTest::TTestContext&)
{
+ ResetPipe();
+}
+
+void TPQTabletFixture::ResetPipe()
+{
if (Pipe != TActorId()) {
Ctx->Runtime->ClosePipe(Pipe, Ctx->Edge, 0);
+ Pipe = TActorId();
}
}
@@ -1569,6 +1578,54 @@ Y_UNIT_TEST_F(All_New_Partitions_In_Another_Tablet, TPQTabletFixture)
WaitReadSetAck(*tablet, {.Step=100, .TxId=txId, .Source=mockTabletId, .Target=Ctx->TabletId, .Consumer=Ctx->TabletId});
}
+Y_UNIT_TEST_F(Huge_ProposeTransacton, TPQTabletFixture)
+{
+ const ui64 mockTabletId = 22222;
+
+ PQTabletPrepare({.partitions=1}, {}, *Ctx);
+
+ auto tabletConfig = NHelpers::MakeConfig({.Version=2,
+ .Consumers={
+ {.Consumer="client-1", .Generation=0},
+ {.Consumer="client-3", .Generation=7},
+ },
+ .Partitions={
+ {.Id=0},
+ {.Id=1},
+ },
+ .AllPartitions={
+ {.Id=0, .TabletId=Ctx->TabletId, .Children={}, .Parents={2}},
+ {.Id=1, .TabletId=Ctx->TabletId, .Children={}, .Parents={2}},
+ {.Id=2, .TabletId=mockTabletId, .Children={0, 1}, .Parents={}}
+ },
+ .HugeConfig = true});
+
+ const ui64 txId_1 = 67890;
+ SendProposeTransactionRequest({.TxId=txId_1,
+ .Configs=NHelpers::TConfigParams{
+ .Tablet=tabletConfig,
+ .Bootstrap=NHelpers::MakeBootstrapConfig(),
+ }});
+ WaitProposeTransactionResponse({.TxId=txId_1,
+ .Status=NKikimrPQ::TEvProposeTransactionResult::PREPARED});
+
+ const ui64 txId_2 = 67891;
+ SendProposeTransactionRequest({.TxId=txId_2,
+ .Configs=NHelpers::TConfigParams{
+ .Tablet=tabletConfig,
+ .Bootstrap=NHelpers::MakeBootstrapConfig(),
+ }});
+ WaitProposeTransactionResponse({.TxId=txId_2,
+ .Status=NKikimrPQ::TEvProposeTransactionResult::PREPARED});
+
+ PQTabletRestart(*Ctx);
+ ResetPipe();
+
+ SendPlanStep({.Step=100, .TxIds={txId_1, txId_2}});
+ WaitPlanStepAck({.Step=100, .TxIds={txId_1, txId_2}});
+ WaitPlanStepAccepted({.Step=100});
+}
+
}
}