diff options
author | Alek5andr-Kotov <akotov@ydb.tech> | 2024-08-08 14:17:35 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-08-08 14:17:35 +0300 |
commit | a08fe025ce9ba1ad357efc8b6bda1f6368e2a1dd (patch) | |
tree | 0d7a7adf09af8dc99b77d77b9332d237846de717 | |
parent | c2e177fc54584ed38828d4e77d7f5fb54d26836e (diff) | |
download | ydb-a08fe025ce9ba1ad357efc8b6bda1f6368e2a1dd.tar.gz |
Transactions are loaded in chunks (#7549)
-rw-r--r-- | ydb/core/persqueue/pq_impl.cpp | 176 | ||||
-rw-r--r-- | ydb/core/persqueue/pq_impl.h | 22 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/make_config.cpp | 10 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/make_config.h | 1 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/pqtablet_ut.cpp | 57 |
5 files changed, 213 insertions, 53 deletions
diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index 556785352a..243ae5cd2e 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -782,7 +782,7 @@ void TPersQueue::EndWriteConfig(const NKikimrClient::TResponse& resp, const TAct NewConfigShouldBeApplied = true; //when config will be inited with old value new config will be applied } -void TPersQueue::HandleConfigReadResponse(const NKikimrClient::TResponse& resp, const TActorContext& ctx) +void TPersQueue::HandleConfigReadResponse(NKikimrClient::TResponse&& resp, const TActorContext& ctx) { bool ok = (resp.GetStatus() == NMsgBusProxy::MSTATUS_OK) && @@ -790,16 +790,68 @@ void TPersQueue::HandleConfigReadResponse(const NKikimrClient::TResponse& resp, (resp.HasSetExecutorFastLogPolicyResult()) && (resp.GetSetExecutorFastLogPolicyResult().GetStatus() == NKikimrProto::OK); if (!ok) { - LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE, - "Tablet " << TabletID() << " Config read error: " << resp.DebugString() << " " << ctx.SelfID); - ctx.Send(ctx.SelfID, new TEvents::TEvPoisonPill()); + PQ_LOG_ERROR_AND_DIE("Config read error: " << resp.ShortDebugString()); return; } - ReadTxInfo(resp.GetReadResult(2), ctx); - ReadConfig(resp.GetReadResult(0), resp.GetReadRangeResult(0), ctx); - ReadTxWrites(resp.GetReadResult(2), ctx); - ReadState(resp.GetReadResult(1), ctx); + ConfigReadResponse = std::move(resp); + + BeginInitTransactions(); + SendTransactionsReadRequest(GetTxKey(Min<ui64>()), true, ctx); +} + +void TPersQueue::SendTransactionsReadRequest(const TString& fromKey, bool includeFrom, + const TActorContext& ctx) +{ + THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest); + request->Record.SetCookie(READ_TXS_COOKIE); + + AddCmdReadTransactionRange(*request, fromKey, includeFrom); + + request->Record.MutableCmdSetExecutorFastLogPolicy() + ->SetIsAllowed(AppData(ctx)->PQConfig.GetTactic() == NKikimrClient::TKeyValueRequest::MIN_LATENCY); + ctx.Send(ctx.SelfID, request.Release()); +} + +TString GetLastKey(const NKikimrClient::TKeyValueResponse::TReadRangeResult& result) +{ + if (!result.PairSize()) { + return {}; + } + + return result.GetPair(result.PairSize() - 1).GetKey(); +} + +void TPersQueue::HandleTransactionsReadResponse(NKikimrClient::TResponse&& resp, const TActorContext& ctx) +{ + bool ok = + (resp.GetStatus() == NMsgBusProxy::MSTATUS_OK) && + (resp.ReadRangeResultSize() == 1) && + (resp.HasSetExecutorFastLogPolicyResult()) && + (resp.GetSetExecutorFastLogPolicyResult().GetStatus() == NKikimrProto::OK); + const auto& result = resp.GetReadRangeResult(0); + auto status = result.GetStatus(); + if (status != NKikimrProto::OVERRUN && + status != NKikimrProto::OK && + status != NKikimrProto::NODATA) { + ok = false; + } + if (!ok) { + PQ_LOG_ERROR_AND_DIE("Transactions read error: " << resp.ShortDebugString()); + return; + } + + TransactionsReadResults.emplace_back(std::move(result)); + + if (status == NKikimrProto::OVERRUN) { + SendTransactionsReadRequest(GetLastKey(result), false, ctx); + return; + } + + ReadTxInfo(ConfigReadResponse.GetReadResult(2), ctx); + ReadConfig(ConfigReadResponse.GetReadResult(0), TransactionsReadResults, ctx); + ReadTxWrites(ConfigReadResponse.GetReadResult(2), ctx); + ReadState(ConfigReadResponse.GetReadResult(1), ctx); } void TPersQueue::ReadTxInfo(const NKikimrClient::TKeyValueResponse::TReadResult& read, @@ -1005,7 +1057,7 @@ void TPersQueue::InitTxWrites(const NKikimrPQ::TTabletTxInfo& info, } void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult& read, - const NKikimrClient::TKeyValueResponse::TReadRangeResult& readRange, + const TVector<NKikimrClient::TKeyValueResponse::TReadRangeResult>& readRanges, const TActorContext& ctx) { Y_ABORT_UNLESS(read.HasStatus()); @@ -1017,15 +1069,6 @@ void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult& return; } - Y_ABORT_UNLESS(readRange.HasStatus()); - if (readRange.GetStatus() != NKikimrProto::OK && readRange.GetStatus() != NKikimrProto::NODATA) { - LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE, - "Tablet " << TabletID() << " Transactions read error " << ctx.SelfID << - " Error status code " << readRange.GetStatus()); - ctx.Send(ctx.SelfID, new TEvents::TEvPoisonPill()); - return; - } - Y_ABORT_UNLESS(!ConfigInited); if (read.GetStatus() == NKikimrProto::OK) { @@ -1064,9 +1107,43 @@ void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult& return; } - THashMap<ui32, TVector<TTransaction>> partitionTxs; - InitTransactions(readRange, partitionTxs); + for (const auto& readRange : readRanges) { + Y_ABORT_UNLESS(readRange.HasStatus()); + if (readRange.GetStatus() != NKikimrProto::OK && + readRange.GetStatus() != NKikimrProto::OVERRUN && + readRange.GetStatus() != NKikimrProto::NODATA) { + PQ_LOG_ERROR_AND_DIE("Transactions read error: " << readRange.GetStatus()); + return; + } + + for (size_t i = 0; i < readRange.PairSize(); ++i) { + const auto& pair = readRange.GetPair(i); + + PQ_LOG_D("ReadRange pair." << + " Key " << (pair.HasKey() ? pair.GetKey() : "unknown") << + ", Status " << pair.GetStatus()); + + NKikimrPQ::TTransaction tx; + Y_ABORT_UNLESS(tx.ParseFromString(pair.GetValue())); + PQ_LOG_D("Load tx " << tx.ShortDebugString()); + + Txs.emplace(tx.GetTxId(), tx); + + if (tx.HasStep()) { + if (std::make_pair(tx.GetStep(), tx.GetTxId()) >= std::make_pair(ExecStep, ExecTxId)) { + PlannedTxs.emplace_back(tx.GetStep(), tx.GetTxId()); + } + } + } + } + + EndInitTransactions(); + EndReadConfig(ctx); +} + +void TPersQueue::EndReadConfig(const TActorContext& ctx) +{ for (const auto& partition : Config.GetPartitions()) { // no partitions will be created with empty config const TPartitionId partitionId(partition.GetPartitionId()); CreateOriginalPartition(Config, @@ -1254,7 +1331,10 @@ void TPersQueue::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& break; case READ_CONFIG_COOKIE: // read is only for config - is signal to create interal actors - HandleConfigReadResponse(resp, ctx); + HandleConfigReadResponse(std::move(resp), ctx); + break; + case READ_TXS_COOKIE: + HandleTransactionsReadResponse(std::move(resp), ctx); break; case WRITE_STATE_COOKIE: EndWriteTabletState(resp, ctx); @@ -3103,19 +3183,28 @@ void TPersQueue::Handle(TEvInterconnect::TEvNodeInfo::TPtr& ev, const TActorCont request->Record.AddCmdRead()->SetKey(KeyState()); request->Record.AddCmdRead()->SetKey(KeyTxInfo()); - auto cmd = request->Record.AddCmdReadRange(); - cmd->MutableRange()->SetFrom(GetTxKey(Min<ui64>())); - cmd->MutableRange()->SetIncludeFrom(true); - cmd->MutableRange()->SetTo(GetTxKey(Max<ui64>())); - cmd->MutableRange()->SetIncludeTo(true); - cmd->SetIncludeData(true); - request->Record.MutableCmdSetExecutorFastLogPolicy() ->SetIsAllowed(AppData(ctx)->PQConfig.GetTactic() == NKikimrClient::TKeyValueRequest::MIN_LATENCY); ctx.Send(ctx.SelfID, request.Release()); + ctx.Schedule(TDuration::Seconds(5), new TEvents::TEvWakeup()); } +void TPersQueue::AddCmdReadTransactionRange(TEvKeyValue::TEvRequest& request, + const TString& fromKey, bool includeFrom) +{ + auto cmd = request.Record.AddCmdReadRange(); + cmd->MutableRange()->SetFrom(fromKey); + cmd->MutableRange()->SetIncludeFrom(includeFrom); + cmd->MutableRange()->SetTo(GetTxKey(Max<ui64>())); + cmd->MutableRange()->SetIncludeTo(true); + cmd->SetIncludeData(true); + + PQ_LOG_D("Transactions request." << + " From " << cmd->MutableRange()->GetFrom() << + ", To " << cmd->MutableRange()->GetTo()); +} + void TPersQueue::HandleWakeup(const TActorContext& ctx) { THashSet<TString> groups; for (auto& p : Partitions) { @@ -4445,41 +4534,26 @@ void TPersQueue::EnsurePartitionsAreNotDeleted(const NKikimrPQ::TPQTabletConfig& } } -void TPersQueue::InitTransactions(const NKikimrClient::TKeyValueResponse::TReadRangeResult& readRange, - THashMap<ui32, TVector<TTransaction>>& partitionTxs) +void TPersQueue::BeginInitTransactions() { Txs.clear(); TxQueue.clear(); - std::deque<std::pair<ui64, ui64>> plannedTxs; - - for (size_t i = 0; i < readRange.PairSize(); ++i) { - auto& pair = readRange.GetPair(i); - - NKikimrPQ::TTransaction tx; - Y_ABORT_UNLESS(tx.ParseFromString(pair.GetValue())); - - PQ_LOG_D("Load tx " << tx.ShortDebugString()); - - Txs.emplace(tx.GetTxId(), tx); + PlannedTxs.clear(); +} - if (tx.HasStep()) { - if (std::make_pair(tx.GetStep(), tx.GetTxId()) >= std::make_pair(ExecStep, ExecTxId)) { - plannedTxs.emplace_back(tx.GetStep(), tx.GetTxId()); - } - } - } +void TPersQueue::EndInitTransactions() +{ + PQ_LOG_D("Txs.size=" << Txs.size() << ", PlannedTxs.size=" << PlannedTxs.size()); - std::sort(plannedTxs.begin(), plannedTxs.end()); - for (auto& item : plannedTxs) { + std::sort(PlannedTxs.begin(), PlannedTxs.end()); + for (auto& item : PlannedTxs) { TxQueue.push(item); } if (!TxQueue.empty()) { PQ_LOG_D("top tx queue (" << TxQueue.front().first << ", " << TxQueue.front().second << ")"); } - - Y_UNUSED(partitionTxs); } void TPersQueue::TryStartTransaction(const TActorContext& ctx) diff --git a/ydb/core/persqueue/pq_impl.h b/ydb/core/persqueue/pq_impl.h index 73a0ab5d74..ff303d40cf 100644 --- a/ydb/core/persqueue/pq_impl.h +++ b/ydb/core/persqueue/pq_impl.h @@ -35,6 +35,7 @@ class TPersQueue : public NKeyValue::TKeyValueFlat { READ_CONFIG_COOKIE = 3, WRITE_STATE_COOKIE = 4, WRITE_TX_COOKIE = 5, + READ_TXS_COOKIE = 6, }; void CreatedHook(const TActorContext& ctx) override; @@ -97,7 +98,8 @@ class TPersQueue : public NKeyValue::TKeyValueFlat { //response from KV on READ or WRITE config request void Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& ctx); - void HandleConfigReadResponse(const NKikimrClient::TResponse& resp, const TActorContext& ctx); + void HandleConfigReadResponse(NKikimrClient::TResponse&& resp, const TActorContext& ctx); + void HandleTransactionsReadResponse(NKikimrClient::TResponse&& resp, const TActorContext& ctx); void ApplyNewConfigAndReply(const TActorContext& ctx); void ApplyNewConfig(const NKikimrPQ::TPQTabletConfig& newConfig, const TActorContext& ctx); @@ -108,7 +110,7 @@ class TPersQueue : public NKeyValue::TKeyValueFlat { void ReadTxWrites(const NKikimrClient::TKeyValueResponse::TReadResult& read, const TActorContext& ctx); void ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult& read, - const NKikimrClient::TKeyValueResponse::TReadRangeResult& readRange, + const TVector<NKikimrClient::TKeyValueResponse::TReadRangeResult>& readRanges, const TActorContext& ctx); void ReadState(const NKikimrClient::TKeyValueResponse::TReadResult& read, const TActorContext& ctx); @@ -506,6 +508,22 @@ private: void MoveTopTxToCalculating(TDistributedTransaction& tx, const TActorContext& ctx); void DeletePartition(const TPartitionId& partitionId, const TActorContext& ctx); + + std::deque<std::pair<ui64, ui64>> PlannedTxs; + + void BeginInitTransactions(); + void EndInitTransactions(); + + void EndReadConfig(const TActorContext& ctx); + + void AddCmdReadTransactionRange(TEvKeyValue::TEvRequest& request, + const TString& fromKey, bool includeFrom); + + NKikimrClient::TResponse ConfigReadResponse; + TVector<NKikimrClient::TKeyValueResponse::TReadRangeResult> TransactionsReadResults; + + void SendTransactionsReadRequest(const TString& fromKey, bool includeFrom, + const TActorContext& ctx); }; diff --git a/ydb/core/persqueue/ut/make_config.cpp b/ydb/core/persqueue/ut/make_config.cpp index bc5db29e72..eb9ae5e648 100644 --- a/ydb/core/persqueue/ut/make_config.cpp +++ b/ydb/core/persqueue/ut/make_config.cpp @@ -1,6 +1,7 @@ #include "make_config.h" #include <util/datetime/base.h> +#include <util/string/printf.h> #include <ydb/core/persqueue/utils.h> @@ -50,6 +51,15 @@ NKikimrPQ::TPQTabletConfig MakeConfig(const TMakeConfigParams& params) config.MutablePartitionConfig()->SetLifetimeSeconds(TDuration::Hours(24).Seconds()); config.MutablePartitionConfig()->SetWriteSpeedInBytesPerSecond(10 << 20); + if (params.HugeConfig) { + for (size_t i = 0; i < 2'500; ++i) { + TString name = Sprintf("fake-consumer-%s-%" PRISZT, + TString(3'000, 'a').data(), i); + config.AddReadRules(name); + config.AddReadRuleGenerations(1); + } + } + Migrate(config); return config; diff --git a/ydb/core/persqueue/ut/make_config.h b/ydb/core/persqueue/ut/make_config.h index 6f11cc481e..9b072116be 100644 --- a/ydb/core/persqueue/ut/make_config.h +++ b/ydb/core/persqueue/ut/make_config.h @@ -32,6 +32,7 @@ struct TMakeConfigParams { TVector<TPartitionParams> AllPartitions; ui32 PartitionsCount = 1; NKikimrPQ::TPQTabletConfig::EMeteringMode MeteringMode = NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS; + bool HugeConfig = false; }; NKikimrPQ::TPQTabletConfig MakeConfig(const TMakeConfigParams& params); diff --git a/ydb/core/persqueue/ut/pqtablet_ut.cpp b/ydb/core/persqueue/ut/pqtablet_ut.cpp index 59dba2354f..a0fa283354 100644 --- a/ydb/core/persqueue/ut/pqtablet_ut.cpp +++ b/ydb/core/persqueue/ut/pqtablet_ut.cpp @@ -177,6 +177,7 @@ protected: void SetUp(NUnitTest::TTestContext&) override; void TearDown(NUnitTest::TTestContext&) override; + void ResetPipe(); void EnsurePipeExist(); void SendToPipe(const TActorId& sender, IEventBase* event, @@ -246,6 +247,8 @@ protected: void TPQTabletFixture::SetUp(NUnitTest::TTestContext&) { Ctx.ConstructInPlace(); + Ctx->EnableDetailedPQLog = true; + Finalizer.ConstructInPlace(*Ctx); Ctx->Prepare(); @@ -254,8 +257,14 @@ void TPQTabletFixture::SetUp(NUnitTest::TTestContext&) void TPQTabletFixture::TearDown(NUnitTest::TTestContext&) { + ResetPipe(); +} + +void TPQTabletFixture::ResetPipe() +{ if (Pipe != TActorId()) { Ctx->Runtime->ClosePipe(Pipe, Ctx->Edge, 0); + Pipe = TActorId(); } } @@ -1569,6 +1578,54 @@ Y_UNIT_TEST_F(All_New_Partitions_In_Another_Tablet, TPQTabletFixture) WaitReadSetAck(*tablet, {.Step=100, .TxId=txId, .Source=mockTabletId, .Target=Ctx->TabletId, .Consumer=Ctx->TabletId}); } +Y_UNIT_TEST_F(Huge_ProposeTransacton, TPQTabletFixture) +{ + const ui64 mockTabletId = 22222; + + PQTabletPrepare({.partitions=1}, {}, *Ctx); + + auto tabletConfig = NHelpers::MakeConfig({.Version=2, + .Consumers={ + {.Consumer="client-1", .Generation=0}, + {.Consumer="client-3", .Generation=7}, + }, + .Partitions={ + {.Id=0}, + {.Id=1}, + }, + .AllPartitions={ + {.Id=0, .TabletId=Ctx->TabletId, .Children={}, .Parents={2}}, + {.Id=1, .TabletId=Ctx->TabletId, .Children={}, .Parents={2}}, + {.Id=2, .TabletId=mockTabletId, .Children={0, 1}, .Parents={}} + }, + .HugeConfig = true}); + + const ui64 txId_1 = 67890; + SendProposeTransactionRequest({.TxId=txId_1, + .Configs=NHelpers::TConfigParams{ + .Tablet=tabletConfig, + .Bootstrap=NHelpers::MakeBootstrapConfig(), + }}); + WaitProposeTransactionResponse({.TxId=txId_1, + .Status=NKikimrPQ::TEvProposeTransactionResult::PREPARED}); + + const ui64 txId_2 = 67891; + SendProposeTransactionRequest({.TxId=txId_2, + .Configs=NHelpers::TConfigParams{ + .Tablet=tabletConfig, + .Bootstrap=NHelpers::MakeBootstrapConfig(), + }}); + WaitProposeTransactionResponse({.TxId=txId_2, + .Status=NKikimrPQ::TEvProposeTransactionResult::PREPARED}); + + PQTabletRestart(*Ctx); + ResetPipe(); + + SendPlanStep({.Step=100, .TxIds={txId_1, txId_2}}); + WaitPlanStepAck({.Step=100, .TxIds={txId_1, txId_2}}); + WaitPlanStepAccepted({.Step=100}); +} + } } |