diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-03-30 10:51:52 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-03-30 10:51:52 +0300 |
commit | 1592fa9e7e78df883602d535f7aac49f0ab063a6 (patch) | |
tree | 4a8bdcfca39ed406552a40c153c1e3afd8c68f36 | |
parent | 7c9ee77626609670679db7c62bde1cf66775d9d1 (diff) | |
download | ydb-1592fa9e7e78df883602d535f7aac49f0ab063a6.tar.gz |
split batches for prevent overlimit on writing
20 files changed, 149 insertions, 72 deletions
diff --git a/ydb/core/grpc_services/rpc_long_tx.cpp b/ydb/core/grpc_services/rpc_long_tx.cpp index e188711b20d..5e7fed496c7 100644 --- a/ydb/core/grpc_services/rpc_long_tx.cpp +++ b/ydb/core/grpc_services/rpc_long_tx.cpp @@ -68,17 +68,23 @@ public: class TFullSplitData { private: ui32 ShardsCount = 0; - THashMap<ui64, TShardInfo> ShardsInfo; - + THashMap<ui64, std::vector<TShardInfo>> ShardsInfo; + YDB_ACCESSOR_DEF(TString, ErrorString); public: - TString ErrorString; - TFullSplitData(const ui32 shardsCount, TString errString = {}) : ShardsCount(shardsCount) , ErrorString(errString) {} - const THashMap<ui64, TShardInfo>& GetShardsInfo() const { + ui32 GetShardRequestsCount() const { + ui32 result = 0; + for (auto&& i : ShardsInfo) { + result += i.second.size(); + } + return result; + } + + const THashMap<ui64, std::vector<TShardInfo>>& GetShardsInfo() const { return ShardsInfo; } @@ -87,7 +93,7 @@ public: } void AddShardInfo(const ui64 tabletId, TShardInfo&& info) { - ShardsInfo.emplace(tabletId, std::move(info)); + ShardsInfo[tabletId].emplace_back(std::move(info)); } }; @@ -100,13 +106,19 @@ TFullSplitData SplitData(const std::shared_ptr<arrow::RecordBatch>& batch, auto& descSharding = description.GetSharding(); TVector<ui64> tabletIds(descSharding.GetColumnShards().begin(), descSharding.GetColumnShards().end()); - ui32 numShards = tabletIds.size(); + const ui32 numShards = tabletIds.size(); Y_VERIFY(numShards); TFullSplitData result(numShards); if (numShards == 1) { - TShardInfo splitInfo(NArrow::SerializeBatchNoCompression(batch), batch->num_rows()); - result.AddShardInfo(tabletIds[0], std::move(splitInfo)); + std::vector<std::shared_ptr<arrow::RecordBatch>> batches; + if (!NArrow::SplitBySize(batch, batches, NColumnShard::TLimits::MAX_BLOB_SIZE)) { + return result.SetErrorString("cannot split batch in according to limits"); + } + for (auto&& b : batches) { + TShardInfo splitInfo(NArrow::SerializeBatchNoCompression(b), b->num_rows()); + result.AddShardInfo(tabletIds[0], std::move(splitInfo)); + } return result; } @@ -116,10 +128,9 @@ TFullSplitData SplitData(const std::shared_ptr<arrow::RecordBatch>& batch, rowSharding = sharding->MakeSharding(batch); } if (rowSharding.empty()) { - result.ErrorString = "empty " + return result.SetErrorString("empty " + NKikimrSchemeOp::TColumnTableSharding::THashSharding::EHashFunction_Name(descSharding.GetHashSharding().GetFunction()) - + " sharding (" + (sharding ? sharding->DebugString() : "no sharding object") + ")"; - return result; + + " sharding (" + (sharding ? sharding->DebugString() : "no sharding object") + ")"); } std::vector<std::shared_ptr<arrow::RecordBatch>> sharded = NArrow::ShardingSplit(batch, rowSharding, numShards); @@ -128,8 +139,14 @@ TFullSplitData SplitData(const std::shared_ptr<arrow::RecordBatch>& batch, THashMap<ui64, TString> out; for (size_t i = 0; i < sharded.size(); ++i) { if (sharded[i]) { - TShardInfo splitInfo(NArrow::SerializeBatchNoCompression(sharded[i]), sharded[i]->num_rows()); - result.AddShardInfo(tabletIds[i], std::move(splitInfo)); + std::vector<std::shared_ptr<arrow::RecordBatch>> batches; + if (!NArrow::SplitBySize(sharded[i], batches, NColumnShard::TLimits::GetBlobSizeForSplit())) { + return result.SetErrorString("cannot split batch in according to limits"); + } + for (auto&& b : batches) { + TShardInfo splitInfo(NArrow::SerializeBatchNoCompression(b), b->num_rows()); + result.AddShardInfo(tabletIds[i], std::move(splitInfo)); + } } } @@ -380,11 +397,13 @@ class TWriteIdForShard { private: YDB_READONLY(ui64, ShardId, 0); YDB_READONLY(ui64, WriteId, 0); + YDB_READONLY(ui64, WritePartId, 0); public: TWriteIdForShard() = default; - TWriteIdForShard(const ui64 shardId, const ui64 writeId) + TWriteIdForShard(const ui64 shardId, const ui64 writeId, const ui32 writePartId) : ShardId(shardId) , WriteId(writeId) + , WritePartId(writePartId) { } @@ -430,8 +449,8 @@ public: WriteIds.resize(WritesCount.Val()); } - void OnSuccess(const ui64 shardId, const ui64 writeId) { - WriteIds[WritesIndex.Inc() - 1] = TWriteIdForShard(shardId, writeId); + void OnSuccess(const ui64 shardId, const ui64 writeId, const ui32 writePartId) { + WriteIds[WritesIndex.Inc() - 1] = TWriteIdForShard(shardId, writeId, writePartId); if (!WritesCount.Dec()) { auto req = MakeHolder<TEvLongTxService::TEvAttachColumnShardWrites>(LongTxId); for (auto&& i : WriteIds) { @@ -454,6 +473,7 @@ private: static const constexpr ui32 OverloadedDelayMs = 200; const ui64 ShardId; + const ui64 WritePartIdx; const ui64 TableId; const TString DedupId; const TString Data; @@ -479,8 +499,9 @@ public: } TShardWriter(const ui64 shardId, const ui64 tableId, const TString& dedupId, const TString& data, - const NWilson::TProfileSpan& parentSpan, TWritersController::TPtr externalController) + const NWilson::TProfileSpan& parentSpan, TWritersController::TPtr externalController, const ui32 writePartIdx) : ShardId(shardId) + , WritePartIdx(writePartIdx) , TableId(tableId) , DedupId(dedupId) , Data(data) @@ -501,7 +522,8 @@ public: } void Bootstrap() { - SendToTablet(MakeHolder<TEvColumnShard::TEvWrite>(SelfId(), ExternalController->GetLongTxId(), TableId, DedupId, Data)); + auto ev = MakeHolder<TEvColumnShard::TEvWrite>(SelfId(), ExternalController->GetLongTxId(), TableId, DedupId, Data, WritePartIdx); + SendToTablet(std::move(ev)); Become(&TShardWriter::StateMain); } @@ -526,7 +548,7 @@ public: return; } - ExternalController->OnSuccess(ShardId, msg->Record.GetWriteId()); + ExternalController->OnSuccess(ShardId, msg->Record.GetWriteId(), WritePartIdx); } void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) { @@ -556,7 +578,8 @@ public: Schedule(OverloadTimeout(), new TEvents::TEvWakeup()); } else { ++NumRetries; - SendToTablet(MakeHolder<TEvColumnShard::TEvWrite>(SelfId(), ExternalController->GetLongTxId(), TableId, DedupId, Data)); + auto ev = MakeHolder<TEvColumnShard::TEvWrite>(SelfId(), ExternalController->GetLongTxId(), TableId, DedupId, Data, WritePartIdx); + SendToTablet(std::move(ev)); } return true; } @@ -646,24 +669,27 @@ protected: IndexReady = true; } Y_VERIFY(!InternalController); + ui32 writeIdx = 0; if (sharding.HasRandomSharding()) { InternalController = std::make_shared<TWritersController>(1, this->SelfId(), LongTxId); const ui64 shard = sharding.GetColumnShards(RandomNumber<ui32>(sharding.ColumnShardsSize())); - this->Register(new TShardWriter(shard, tableId, DedupId, GetSerializedData(), ActorSpan, InternalController)); + this->Register(new TShardWriter(shard, tableId, DedupId, GetSerializedData(), ActorSpan, InternalController, ++writeIdx)); } else if (sharding.HasHashSharding()) { const TFullSplitData batches = HasDeserializedBatch() ? SplitData(GetDeserializedBatch(), description) : SplitData(GetSerializedData(), description); - if (batches.GetShardsInfo().empty()) { - return ReplyError(Ydb::StatusIds::SCHEME_ERROR, "Input data sharding error: " + batches.ErrorString); + if (batches.GetErrorString()) { + return ReplyError(Ydb::StatusIds::SCHEME_ERROR, "Input data sharding error: " + batches.GetErrorString()); } - InternalController = std::make_shared<TWritersController>(batches.GetShardsInfo().size(), this->SelfId(), LongTxId); + InternalController = std::make_shared<TWritersController>(batches.GetShardRequestsCount(), this->SelfId(), LongTxId); ui32 sumBytes = 0; ui32 rowsCount = 0; - for (auto& [shard, info] : batches.GetShardsInfo()) { - sumBytes += info.GetData().size(); - rowsCount += info.GetRowsCount(); - this->Register(new TShardWriter(shard, tableId, DedupId, info.GetData(), ActorSpan, InternalController)); + for (auto& [shard, infos] : batches.GetShardsInfo()) { + for (auto&& info : infos) { + sumBytes += info.GetData().size(); + rowsCount += info.GetRowsCount(); + this->Register(new TShardWriter(shard, tableId, DedupId, info.GetData(), ActorSpan, InternalController, ++writeIdx)); + } } pSpan.Attribute("affected_shards_count", (long)batches.GetShardsInfo().size()); pSpan.Attribute("bytes", (long)sumBytes); diff --git a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp index a70ae1b5ff7..3ba59719c99 100644 --- a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp +++ b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp @@ -2058,6 +2058,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) { } Y_UNIT_TEST(Aggregation_ResultCountExpr) { + NColumnShard::TLimits::SetBlobSizeForSplit(10000); TAggregationTestCase testCase; testCase.SetQuery(R"( SELECT diff --git a/ydb/core/protos/tx_columnshard.proto b/ydb/core/protos/tx_columnshard.proto index e239807efeb..afec0d6ad1b 100644 --- a/ydb/core/protos/tx_columnshard.proto +++ b/ydb/core/protos/tx_columnshard.proto @@ -66,6 +66,7 @@ message TEvWrite { optional bytes Data = 6; optional TMetadata Meta = 7; optional NKikimrLongTxService.TLongTxId LongTxId = 8; + optional uint32 WritePartId = 9 [default = 0]; } message TEvWriteResult { diff --git a/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt index 04fc6c670b3..5c35ada84d1 100644 --- a/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt @@ -67,6 +67,7 @@ target_sources(core-tx-columnshard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_costs.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_private_events.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/compaction_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/defs.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/eviction_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/export_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/indexing_actor.cpp diff --git a/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt index 7d7683eed64..19ea4034a03 100644 --- a/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt @@ -68,6 +68,7 @@ target_sources(core-tx-columnshard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_costs.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_private_events.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/compaction_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/defs.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/eviction_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/export_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/indexing_actor.cpp diff --git a/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt index 7d7683eed64..19ea4034a03 100644 --- a/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt @@ -68,6 +68,7 @@ target_sources(core-tx-columnshard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_costs.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_private_events.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/compaction_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/defs.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/eviction_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/export_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/indexing_actor.cpp diff --git a/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt index 04fc6c670b3..5c35ada84d1 100644 --- a/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt @@ -67,6 +67,7 @@ target_sources(core-tx-columnshard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_costs.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_private_events.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/compaction_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/defs.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/eviction_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/export_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/indexing_actor.cpp diff --git a/ydb/core/tx/columnshard/columnshard.h b/ydb/core/tx/columnshard/columnshard.h index bebf58bd1eb..00e2ca03928 100644 --- a/ydb/core/tx/columnshard/columnshard.h +++ b/ydb/core/tx/columnshard/columnshard.h @@ -229,21 +229,23 @@ struct TEvColumnShard { TEvWrite() = default; TEvWrite(const TActorId& source, ui64 metaShard, ui64 writeId, ui64 tableId, - const TString& dedupId, const TString& data) { + const TString& dedupId, const TString& data, const ui32 writePartId) { ActorIdToProto(source, Record.MutableSource()); Record.SetTxInitiator(metaShard); Record.SetWriteId(writeId); Record.SetTableId(tableId); Record.SetDedupId(dedupId); Record.SetData(data); + Record.SetWritePartId(writePartId); } TEvWrite(const TActorId& source, const NLongTxService::TLongTxId& longTxId, ui64 tableId, - const TString& dedupId, const TString& data) { + const TString& dedupId, const TString& data, const ui32 writePartId) { ActorIdToProto(source, Record.MutableSource()); Record.SetTableId(tableId); Record.SetDedupId(dedupId); Record.SetData(data); + Record.SetWritePartId(writePartId); longTxId.ToProto(Record.MutableLongTxId()); } diff --git a/ydb/core/tx/columnshard/columnshard__init.cpp b/ydb/core/tx/columnshard/columnshard__init.cpp index 51f5b23cd6e..23a25c8d83d 100644 --- a/ydb/core/tx/columnshard/columnshard__init.cpp +++ b/ydb/core/tx/columnshard/columnshard__init.cpp @@ -270,12 +270,13 @@ bool TTxInit::ReadEverything(TTransactionContext& txc, const TActorContext& ctx) return false; while (!rowset.EndOfSet()) { - const TWriteId writeId = TWriteId{rowset.GetValue<Schema::LongTxWrites::WriteId>()}; + const TWriteId writeId = TWriteId{ rowset.GetValue<Schema::LongTxWrites::WriteId>() }; + const ui32 writePartId = rowset.GetValue<Schema::LongTxWrites::WritePartId>(); NKikimrLongTxService::TLongTxId proto; Y_VERIFY(proto.ParseFromString(rowset.GetValue<Schema::LongTxWrites::LongTxId>())); const auto longTxId = NLongTxService::TLongTxId::FromProto(proto); - Self->LoadLongTxWrite(writeId, longTxId); + Self->LoadLongTxWrite(writeId, writePartId, longTxId); if (!rowset.Next()) return false; diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp index d73a747f8a3..aada89ffe4f 100644 --- a/ydb/core/tx/columnshard/columnshard__write.cpp +++ b/ydb/core/tx/columnshard/columnshard__write.cpp @@ -55,7 +55,7 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) { if (record.HasLongTxId()) { Y_VERIFY(metaShard == 0); auto longTxId = NLongTxService::TLongTxId::FromProto(record.GetLongTxId()); - writeId = (ui64)Self->GetLongTxWrite(db, longTxId); + writeId = (ui64)Self->GetLongTxWrite(db, longTxId, record.GetWritePartId()); } ui64 writeUnixTime = meta.GetDirtyWriteTimeSeconds(); @@ -198,7 +198,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex if (record.HasLongTxId()) { // TODO: multiple blobs in one longTx ({longTxId, dedupId} -> writeId) auto longTxId = NLongTxService::TLongTxId::FromProto(record.GetLongTxId()); - if (ui64 writeId = (ui64)HasLongTxWrite(longTxId)) { + if (ui64 writeId = (ui64)HasLongTxWrite(longTxId, record.GetWritePartId())) { LOG_S_DEBUG("Write (duplicate) into pathId " << tableId << " longTx " << longTxId.ToString() << " at tablet " << TabletID()); diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index e8cb254e13d..0278857486e 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -247,28 +247,37 @@ bool TColumnShard::HaveOutdatedTxs() const { return it->MaxStep <= step; } -TWriteId TColumnShard::HasLongTxWrite(const NLongTxService::TLongTxId& longTxId) { +TWriteId TColumnShard::HasLongTxWrite(const NLongTxService::TLongTxId& longTxId, const ui32 partId) { auto it = LongTxWritesByUniqueId.find(longTxId.UniqueId); if (it != LongTxWritesByUniqueId.end()) { - return (TWriteId)it->second->WriteId; + auto itPart = it->second.find(partId); + if (itPart != it->second.end()) { + return (TWriteId)itPart->second->WriteId; + } } return (TWriteId)0; } -TWriteId TColumnShard::GetLongTxWrite(NIceDb::TNiceDb& db, const NLongTxService::TLongTxId& longTxId) { +TWriteId TColumnShard::GetLongTxWrite(NIceDb::TNiceDb& db, const NLongTxService::TLongTxId& longTxId, const ui32 partId) { auto it = LongTxWritesByUniqueId.find(longTxId.UniqueId); if (it != LongTxWritesByUniqueId.end()) { - return (TWriteId)it->second->WriteId; + auto itPart = it->second.find(partId); + if (itPart != it->second.end()) { + return (TWriteId)itPart->second->WriteId; + } + } else { + it = LongTxWritesByUniqueId.emplace(longTxId.UniqueId, TPartsForLTXShard()).first; } TWriteId writeId = ++LastWriteId; auto& lw = LongTxWrites[writeId]; lw.WriteId = (ui64)writeId; + lw.WritePartId = partId; lw.LongTxId = longTxId; - LongTxWritesByUniqueId[longTxId.UniqueId] = &lw; + it->second[partId] = &lw; Schema::SaveSpecialValue(db, Schema::EValueIds::LastWriteId, (ui64)writeId); - Schema::SaveLongTxWrite(db, writeId, longTxId); + Schema::SaveLongTxWrite(db, writeId, partId, longTxId); return writeId; } @@ -278,11 +287,12 @@ void TColumnShard::AddLongTxWrite(TWriteId writeId, ui64 txId) { lw.PreparedTxId = txId; } -void TColumnShard::LoadLongTxWrite(TWriteId writeId, const NLongTxService::TLongTxId& longTxId) { +void TColumnShard::LoadLongTxWrite(TWriteId writeId, const ui32 writePartId, const NLongTxService::TLongTxId& longTxId) { auto& lw = LongTxWrites[writeId]; + lw.WritePartId = writePartId; lw.WriteId = (ui64)writeId; lw.LongTxId = longTxId; - LongTxWritesByUniqueId[longTxId.UniqueId] = &lw; + LongTxWritesByUniqueId[longTxId.UniqueId][writePartId] = &lw; } bool TColumnShard::RemoveLongTxWrite(NIceDb::TNiceDb& db, TWriteId writeId, ui64 txId) { @@ -290,7 +300,11 @@ bool TColumnShard::RemoveLongTxWrite(NIceDb::TNiceDb& db, TWriteId writeId, ui64 ui64 prepared = lw->PreparedTxId; if (!prepared || txId == prepared) { Schema::EraseLongTxWrite(db, writeId); - LongTxWritesByUniqueId.erase(lw->LongTxId.UniqueId); + auto& ltxParts = LongTxWritesByUniqueId[lw->LongTxId.UniqueId]; + ltxParts.erase(lw->WritePartId); + if (ltxParts.empty()) { + LongTxWritesByUniqueId.erase(lw->LongTxId.UniqueId); + } LongTxWrites.erase(writeId); return true; } diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h index 568690cf9b4..3283e97440e 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.h +++ b/ydb/core/tx/columnshard/columnshard_impl.h @@ -359,6 +359,7 @@ private: struct TLongTxWriteInfo { ui64 WriteId; + ui32 WritePartId; NLongTxService::TLongTxId LongTxId; ui64 PreparedTxId = 0; }; @@ -415,7 +416,8 @@ private: THashMap<ui32, TSchemaPreset> SchemaPresets; THashMap<ui64, TTableInfo> Tables; THashMap<TWriteId, TLongTxWriteInfo> LongTxWrites; - THashMap<TULID, TLongTxWriteInfo*> LongTxWritesByUniqueId; + using TPartsForLTXShard = THashMap<ui32, TLongTxWriteInfo*>; + THashMap<TULID, TPartsForLTXShard> LongTxWritesByUniqueId; TMultiMap<TRowVersion, TEvColumnShard::TEvRead::TPtr> WaitingReads; TMultiMap<TRowVersion, TEvColumnShard::TEvScan::TPtr> WaitingScans; THashSet<ui64> PathsToDrop; @@ -456,10 +458,10 @@ private: return PrimaryIndex && PrimaryIndex->HasOverloadedGranules(); } - TWriteId HasLongTxWrite(const NLongTxService::TLongTxId& longTxId); - TWriteId GetLongTxWrite(NIceDb::TNiceDb& db, const NLongTxService::TLongTxId& longTxId); + TWriteId HasLongTxWrite(const NLongTxService::TLongTxId& longTxId, const ui32 partId); + TWriteId GetLongTxWrite(NIceDb::TNiceDb& db, const NLongTxService::TLongTxId& longTxId, const ui32 partId); void AddLongTxWrite(TWriteId writeId, ui64 txId); - void LoadLongTxWrite(TWriteId writeId, const NLongTxService::TLongTxId& longTxId); + void LoadLongTxWrite(TWriteId writeId, const ui32 writePartId, const NLongTxService::TLongTxId& longTxId); bool RemoveLongTxWrite(NIceDb::TNiceDb& db, TWriteId writeId, ui64 txId = 0); bool RemoveTx(NTable::TDatabase& database, ui64 txId); void TryAbortWrites(NIceDb::TNiceDb& db, NOlap::TDbWrapper& dbTable, THashSet<TWriteId>&& writesToAbort); diff --git a/ydb/core/tx/columnshard/columnshard_schema.h b/ydb/core/tx/columnshard/columnshard_schema.h index cf5069a7601..8e842e40ed1 100644 --- a/ydb/core/tx/columnshard/columnshard_schema.h +++ b/ydb/core/tx/columnshard/columnshard_schema.h @@ -142,11 +142,12 @@ struct Schema : NIceDb::Schema { }; struct LongTxWrites : Table<6> { - struct WriteId : Column<1, NScheme::NTypeIds::Uint64> {}; + struct WriteId: Column<1, NScheme::NTypeIds::Uint64> {}; struct LongTxId : Column<2, NScheme::NTypeIds::String> {}; + struct WritePartId: Column<3, NScheme::NTypeIds::Uint32> {}; using TKey = TableKey<WriteId>; - using TColumns = TableColumns<WriteId, LongTxId>; + using TColumns = TableColumns<WriteId, LongTxId, WritePartId>; }; struct BlobsToKeep : Table<7> { @@ -398,13 +399,14 @@ struct Schema : NIceDb::Schema { db.Table<TableInfo>().Key(pathId).Delete(); } - static void SaveLongTxWrite(NIceDb::TNiceDb& db, TWriteId writeId, const NLongTxService::TLongTxId& longTxId) { + static void SaveLongTxWrite(NIceDb::TNiceDb& db, TWriteId writeId, const ui32 writePartId, const NLongTxService::TLongTxId& longTxId) { NKikimrLongTxService::TLongTxId proto; longTxId.ToProto(&proto); TString serialized; Y_VERIFY(proto.SerializeToString(&serialized)); db.Table<LongTxWrites>().Key((ui64)writeId).Update( - NIceDb::TUpdate<LongTxWrites::LongTxId>(serialized)); + NIceDb::TUpdate<LongTxWrites::LongTxId>(serialized), + NIceDb::TUpdate<LongTxWrites::WritePartId>(writePartId)); } static void EraseLongTxWrite(NIceDb::TNiceDb& db, TWriteId writeId) { diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.cpp b/ydb/core/tx/columnshard/columnshard_ut_common.cpp index f9572611454..f7875b89120 100644 --- a/ydb/core/tx/columnshard/columnshard_ut_common.cpp +++ b/ydb/core/tx/columnshard/columnshard_ut_common.cpp @@ -78,7 +78,7 @@ void PlanSchemaTx(TTestBasicRuntime& runtime, TActorId& sender, NOlap::TSnapshot bool WriteData(TTestBasicRuntime& runtime, TActorId& sender, ui64 metaShard, ui64 writeId, ui64 tableId, const TString& data, std::shared_ptr<arrow::Schema> schema) { const TString dedupId = ToString(writeId); - auto write = std::make_unique<TEvColumnShard::TEvWrite>(sender, metaShard, writeId, tableId, dedupId, data); + auto write = std::make_unique<TEvColumnShard::TEvWrite>(sender, metaShard, writeId, tableId, dedupId, data, 1); if (schema) { write->SetArrowSchema(NArrow::SerializeSchema(*schema)); } @@ -97,7 +97,7 @@ std::optional<ui64> WriteData(TTestBasicRuntime& runtime, TActorId& sender, cons ui64 tableId, const TString& dedupId, const TString& data, std::shared_ptr<arrow::Schema> schema) { - auto write = std::make_unique<TEvColumnShard::TEvWrite>(sender, longTxId, tableId, dedupId, data); + auto write = std::make_unique<TEvColumnShard::TEvWrite>(sender, longTxId, tableId, dedupId, data, 1); if (schema) { write->SetArrowSchema(NArrow::SerializeSchema(*schema)); } diff --git a/ydb/core/tx/columnshard/defs.cpp b/ydb/core/tx/columnshard/defs.cpp new file mode 100644 index 00000000000..90c09e0c60a --- /dev/null +++ b/ydb/core/tx/columnshard/defs.cpp @@ -0,0 +1,17 @@ +#include "defs.h" + +namespace NKikimr::NColumnShard { + +namespace { + std::optional<ui64> BlobSizeForSplit; +} +
+ui64 TLimits::GetBlobSizeForSplit() {
+ return BlobSizeForSplit.value_or(MAX_BLOB_SIZE * 0.95);
+}
+
+void TLimits::SetBlobSizeForSplit(const ui64 value) {
+ BlobSizeForSplit = value;
+}
+
+} diff --git a/ydb/core/tx/columnshard/defs.h b/ydb/core/tx/columnshard/defs.h index 633d7ca2e93..8d953d79db7 100644 --- a/ydb/core/tx/columnshard/defs.h +++ b/ydb/core/tx/columnshard/defs.h @@ -18,6 +18,9 @@ struct TLimits { static constexpr const ui64 MAX_BYTES_TO_INSERT = 16 * 1024 * 1024; static constexpr const ui32 MAX_TX_RECORDS = 100000; + static ui64 GetBlobSizeForSplit(); + static void SetBlobSizeForSplit(const ui64 value); + TControlWrapper MinInsertBytes; TControlWrapper MaxInsertBytes; TControlWrapper InsertTableSize; diff --git a/ydb/core/tx/long_tx_service/commit_impl.cpp b/ydb/core/tx/long_tx_service/commit_impl.cpp index 91e082f962d..2a1164c6508 100644 --- a/ydb/core/tx/long_tx_service/commit_impl.cpp +++ b/ydb/core/tx/long_tx_service/commit_impl.cpp @@ -20,9 +20,14 @@ namespace NLongTxService { static constexpr ui32 MaxPlanRetriesPerShard = 1000; // ~5 min static constexpr ui32 RetryDelayMs = 300; - ui64 WriteId = 0; + std::vector<ui64> WriteIds; TString TxBody; ui32 NumRetries = 0; + ui32 WritePartId = 0; + + TString GetWriteIdsStr() const { + return JoinSeq(", ", WriteIds); + } }; class TLongTxServiceActor::TCommitActor : public TActorBootstrapped<TCommitActor> { @@ -30,7 +35,7 @@ namespace NLongTxService { struct TParams { TLongTxId TxId; TString DatabaseName; - THashMap<ui64, ui64> ColumnShardWrites; + THashMap<ui64, TTransaction::TShardWriteIds> ColumnShardWrites; }; public: @@ -83,13 +88,16 @@ namespace NLongTxService { void PrepareTransaction() { for (const auto& pr : Params.ColumnShardWrites) { const ui64 tabletId = pr.first; - const ui64 writeId = pr.second; NKikimrTxColumnShard::TCommitTxBody tx; - tx.AddWriteIds(writeId); + std::vector<ui64> writeIds; + for (auto&& wId : pr.second) { + tx.AddWriteIds(wId); + writeIds.emplace_back(wId); + } TString txBody; Y_VERIFY(tx.SerializeToString(&txBody)); - WaitingShards.emplace(tabletId, TRetryData{writeId, txBody, 0}); + WaitingShards.emplace(tabletId, TRetryData{ writeIds, txBody, 0 }); SendPrepareTransaction(tabletId); } Become(&TThis::StatePrepare); @@ -115,7 +123,7 @@ namespace NLongTxService { return true; } - TXLOG_DEBUG("Sending TEvProposeTransaction to ColumnShard# " << tabletId << " WriteId# " << data.WriteId); + TXLOG_DEBUG("Sending TEvProposeTransaction to ColumnShard# " << tabletId << " WriteId# " << data.GetWriteIdsStr()); SendToTablet(tabletId, MakeHolder<TEvColumnShard::TEvProposeTransaction>( NKikimrTxColumnShard::TX_KIND_COMMIT, diff --git a/ydb/core/tx/long_tx_service/long_tx_service_impl.cpp b/ydb/core/tx/long_tx_service/long_tx_service_impl.cpp index 11b74196c27..eae23c7d317 100644 --- a/ydb/core/tx/long_tx_service/long_tx_service_impl.cpp +++ b/ydb/core/tx/long_tx_service/long_tx_service_impl.cpp @@ -282,18 +282,13 @@ void TLongTxServiceActor::Handle(TEvLongTxService::TEvAttachColumnShardWrites::T } for (const auto& write : msg->Record.GetWrites()) { - ui64 shardId = write.GetColumnShard(); - ui64 writeId = write.GetWriteId(); + const ui64 shardId = write.GetColumnShard(); + const ui64 writeId = write.GetWriteId(); auto it = tx.ColumnShardWrites.find(shardId); if (it == tx.ColumnShardWrites.end()) { - tx.ColumnShardWrites[shardId] = writeId; - continue; + it = tx.ColumnShardWrites.emplace(shardId, TTransaction::TShardWriteIds()).first; } - if (it->second == writeId) { - continue; - } - return SendReply(ERequestType::AttachColumnShardWrites, ev->Sender, ev->Cookie, - Ydb::StatusIds::GENERIC_ERROR, "Shard write id change detected, transaction may be broken"); + it->second.emplace_back(writeId); } Send(ev->Sender, new TEvLongTxService::TEvAttachColumnShardWritesResult(Ydb::StatusIds::SUCCESS), 0, ev->Cookie); diff --git a/ydb/core/tx/long_tx_service/long_tx_service_impl.h b/ydb/core/tx/long_tx_service/long_tx_service_impl.h index 0bda6adadae..8c6280a3fe1 100644 --- a/ydb/core/tx/long_tx_service/long_tx_service_impl.h +++ b/ydb/core/tx/long_tx_service/long_tx_service_impl.h @@ -33,8 +33,9 @@ namespace NLongTxService { TLongTxId TxId; TString DatabaseName; ETxState State = ETxState::Uninitialized; - // Maps column shards to known write ids - THashMap<ui64, ui64> ColumnShardWrites; + // Maps column shards to known write ids by partId + using TShardWriteIds = std::vector<ui64>; + THashMap<ui64, TShardWriteIds> ColumnShardWrites; // A list of currently known committers TVector<TSenderId> Committers; // The currently running commit actor diff --git a/ydb/core/tx/schemeshard/ut_olap.cpp b/ydb/core/tx/schemeshard/ut_olap.cpp index ec2649c1b50..656919d64db 100644 --- a/ydb/core/tx/schemeshard/ut_olap.cpp +++ b/ydb/core/tx/schemeshard/ut_olap.cpp @@ -77,7 +77,7 @@ static constexpr ui64 txInitiator = 42; // 0 means LongTx, we need another value void WriteData(TTestBasicRuntime& runtime, TActorId sender, ui64 tabletId, ui64 pathId, ui64 writeId, TString data) { const TString dedupId = ToString(writeId); - auto evWrite = std::make_unique<TEvColumnShard::TEvWrite>(sender, txInitiator, writeId, pathId, dedupId, data); + auto evWrite = std::make_unique<TEvColumnShard::TEvWrite>(sender, txInitiator, writeId, pathId, dedupId, data, 1); ForwardToTablet(runtime, tabletId, sender, evWrite.release()); TAutoPtr<IEventHandle> handle; |