aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-03-30 10:51:52 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-03-30 10:51:52 +0300
commit1592fa9e7e78df883602d535f7aac49f0ab063a6 (patch)
tree4a8bdcfca39ed406552a40c153c1e3afd8c68f36
parent7c9ee77626609670679db7c62bde1cf66775d9d1 (diff)
downloadydb-1592fa9e7e78df883602d535f7aac49f0ab063a6.tar.gz
split batches for prevent overlimit on writing
-rw-r--r--ydb/core/grpc_services/rpc_long_tx.cpp84
-rw-r--r--ydb/core/kqp/ut/olap/kqp_olap_ut.cpp1
-rw-r--r--ydb/core/protos/tx_columnshard.proto1
-rw-r--r--ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/columnshard.h6
-rw-r--r--ydb/core/tx/columnshard/columnshard__init.cpp5
-rw-r--r--ydb/core/tx/columnshard/columnshard__write.cpp4
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.cpp32
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.h10
-rw-r--r--ydb/core/tx/columnshard/columnshard_schema.h10
-rw-r--r--ydb/core/tx/columnshard/columnshard_ut_common.cpp4
-rw-r--r--ydb/core/tx/columnshard/defs.cpp17
-rw-r--r--ydb/core/tx/columnshard/defs.h3
-rw-r--r--ydb/core/tx/long_tx_service/commit_impl.cpp20
-rw-r--r--ydb/core/tx/long_tx_service/long_tx_service_impl.cpp13
-rw-r--r--ydb/core/tx/long_tx_service/long_tx_service_impl.h5
-rw-r--r--ydb/core/tx/schemeshard/ut_olap.cpp2
20 files changed, 149 insertions, 72 deletions
diff --git a/ydb/core/grpc_services/rpc_long_tx.cpp b/ydb/core/grpc_services/rpc_long_tx.cpp
index e188711b20d..5e7fed496c7 100644
--- a/ydb/core/grpc_services/rpc_long_tx.cpp
+++ b/ydb/core/grpc_services/rpc_long_tx.cpp
@@ -68,17 +68,23 @@ public:
class TFullSplitData {
private:
ui32 ShardsCount = 0;
- THashMap<ui64, TShardInfo> ShardsInfo;
-
+ THashMap<ui64, std::vector<TShardInfo>> ShardsInfo;
+ YDB_ACCESSOR_DEF(TString, ErrorString);
public:
- TString ErrorString;
-
TFullSplitData(const ui32 shardsCount, TString errString = {})
: ShardsCount(shardsCount)
, ErrorString(errString)
{}
- const THashMap<ui64, TShardInfo>& GetShardsInfo() const {
+ ui32 GetShardRequestsCount() const {
+ ui32 result = 0;
+ for (auto&& i : ShardsInfo) {
+ result += i.second.size();
+ }
+ return result;
+ }
+
+ const THashMap<ui64, std::vector<TShardInfo>>& GetShardsInfo() const {
return ShardsInfo;
}
@@ -87,7 +93,7 @@ public:
}
void AddShardInfo(const ui64 tabletId, TShardInfo&& info) {
- ShardsInfo.emplace(tabletId, std::move(info));
+ ShardsInfo[tabletId].emplace_back(std::move(info));
}
};
@@ -100,13 +106,19 @@ TFullSplitData SplitData(const std::shared_ptr<arrow::RecordBatch>& batch,
auto& descSharding = description.GetSharding();
TVector<ui64> tabletIds(descSharding.GetColumnShards().begin(), descSharding.GetColumnShards().end());
- ui32 numShards = tabletIds.size();
+ const ui32 numShards = tabletIds.size();
Y_VERIFY(numShards);
TFullSplitData result(numShards);
if (numShards == 1) {
- TShardInfo splitInfo(NArrow::SerializeBatchNoCompression(batch), batch->num_rows());
- result.AddShardInfo(tabletIds[0], std::move(splitInfo));
+ std::vector<std::shared_ptr<arrow::RecordBatch>> batches;
+ if (!NArrow::SplitBySize(batch, batches, NColumnShard::TLimits::MAX_BLOB_SIZE)) {
+ return result.SetErrorString("cannot split batch in according to limits");
+ }
+ for (auto&& b : batches) {
+ TShardInfo splitInfo(NArrow::SerializeBatchNoCompression(b), b->num_rows());
+ result.AddShardInfo(tabletIds[0], std::move(splitInfo));
+ }
return result;
}
@@ -116,10 +128,9 @@ TFullSplitData SplitData(const std::shared_ptr<arrow::RecordBatch>& batch,
rowSharding = sharding->MakeSharding(batch);
}
if (rowSharding.empty()) {
- result.ErrorString = "empty "
+ return result.SetErrorString("empty "
+ NKikimrSchemeOp::TColumnTableSharding::THashSharding::EHashFunction_Name(descSharding.GetHashSharding().GetFunction())
- + " sharding (" + (sharding ? sharding->DebugString() : "no sharding object") + ")";
- return result;
+ + " sharding (" + (sharding ? sharding->DebugString() : "no sharding object") + ")");
}
std::vector<std::shared_ptr<arrow::RecordBatch>> sharded = NArrow::ShardingSplit(batch, rowSharding, numShards);
@@ -128,8 +139,14 @@ TFullSplitData SplitData(const std::shared_ptr<arrow::RecordBatch>& batch,
THashMap<ui64, TString> out;
for (size_t i = 0; i < sharded.size(); ++i) {
if (sharded[i]) {
- TShardInfo splitInfo(NArrow::SerializeBatchNoCompression(sharded[i]), sharded[i]->num_rows());
- result.AddShardInfo(tabletIds[i], std::move(splitInfo));
+ std::vector<std::shared_ptr<arrow::RecordBatch>> batches;
+ if (!NArrow::SplitBySize(sharded[i], batches, NColumnShard::TLimits::GetBlobSizeForSplit())) {
+ return result.SetErrorString("cannot split batch in according to limits");
+ }
+ for (auto&& b : batches) {
+ TShardInfo splitInfo(NArrow::SerializeBatchNoCompression(b), b->num_rows());
+ result.AddShardInfo(tabletIds[i], std::move(splitInfo));
+ }
}
}
@@ -380,11 +397,13 @@ class TWriteIdForShard {
private:
YDB_READONLY(ui64, ShardId, 0);
YDB_READONLY(ui64, WriteId, 0);
+ YDB_READONLY(ui64, WritePartId, 0);
public:
TWriteIdForShard() = default;
- TWriteIdForShard(const ui64 shardId, const ui64 writeId)
+ TWriteIdForShard(const ui64 shardId, const ui64 writeId, const ui32 writePartId)
: ShardId(shardId)
, WriteId(writeId)
+ , WritePartId(writePartId)
{
}
@@ -430,8 +449,8 @@ public:
WriteIds.resize(WritesCount.Val());
}
- void OnSuccess(const ui64 shardId, const ui64 writeId) {
- WriteIds[WritesIndex.Inc() - 1] = TWriteIdForShard(shardId, writeId);
+ void OnSuccess(const ui64 shardId, const ui64 writeId, const ui32 writePartId) {
+ WriteIds[WritesIndex.Inc() - 1] = TWriteIdForShard(shardId, writeId, writePartId);
if (!WritesCount.Dec()) {
auto req = MakeHolder<TEvLongTxService::TEvAttachColumnShardWrites>(LongTxId);
for (auto&& i : WriteIds) {
@@ -454,6 +473,7 @@ private:
static const constexpr ui32 OverloadedDelayMs = 200;
const ui64 ShardId;
+ const ui64 WritePartIdx;
const ui64 TableId;
const TString DedupId;
const TString Data;
@@ -479,8 +499,9 @@ public:
}
TShardWriter(const ui64 shardId, const ui64 tableId, const TString& dedupId, const TString& data,
- const NWilson::TProfileSpan& parentSpan, TWritersController::TPtr externalController)
+ const NWilson::TProfileSpan& parentSpan, TWritersController::TPtr externalController, const ui32 writePartIdx)
: ShardId(shardId)
+ , WritePartIdx(writePartIdx)
, TableId(tableId)
, DedupId(dedupId)
, Data(data)
@@ -501,7 +522,8 @@ public:
}
void Bootstrap() {
- SendToTablet(MakeHolder<TEvColumnShard::TEvWrite>(SelfId(), ExternalController->GetLongTxId(), TableId, DedupId, Data));
+ auto ev = MakeHolder<TEvColumnShard::TEvWrite>(SelfId(), ExternalController->GetLongTxId(), TableId, DedupId, Data, WritePartIdx);
+ SendToTablet(std::move(ev));
Become(&TShardWriter::StateMain);
}
@@ -526,7 +548,7 @@ public:
return;
}
- ExternalController->OnSuccess(ShardId, msg->Record.GetWriteId());
+ ExternalController->OnSuccess(ShardId, msg->Record.GetWriteId(), WritePartIdx);
}
void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) {
@@ -556,7 +578,8 @@ public:
Schedule(OverloadTimeout(), new TEvents::TEvWakeup());
} else {
++NumRetries;
- SendToTablet(MakeHolder<TEvColumnShard::TEvWrite>(SelfId(), ExternalController->GetLongTxId(), TableId, DedupId, Data));
+ auto ev = MakeHolder<TEvColumnShard::TEvWrite>(SelfId(), ExternalController->GetLongTxId(), TableId, DedupId, Data, WritePartIdx);
+ SendToTablet(std::move(ev));
}
return true;
}
@@ -646,24 +669,27 @@ protected:
IndexReady = true;
}
Y_VERIFY(!InternalController);
+ ui32 writeIdx = 0;
if (sharding.HasRandomSharding()) {
InternalController = std::make_shared<TWritersController>(1, this->SelfId(), LongTxId);
const ui64 shard = sharding.GetColumnShards(RandomNumber<ui32>(sharding.ColumnShardsSize()));
- this->Register(new TShardWriter(shard, tableId, DedupId, GetSerializedData(), ActorSpan, InternalController));
+ this->Register(new TShardWriter(shard, tableId, DedupId, GetSerializedData(), ActorSpan, InternalController, ++writeIdx));
} else if (sharding.HasHashSharding()) {
const TFullSplitData batches = HasDeserializedBatch() ?
SplitData(GetDeserializedBatch(), description) :
SplitData(GetSerializedData(), description);
- if (batches.GetShardsInfo().empty()) {
- return ReplyError(Ydb::StatusIds::SCHEME_ERROR, "Input data sharding error: " + batches.ErrorString);
+ if (batches.GetErrorString()) {
+ return ReplyError(Ydb::StatusIds::SCHEME_ERROR, "Input data sharding error: " + batches.GetErrorString());
}
- InternalController = std::make_shared<TWritersController>(batches.GetShardsInfo().size(), this->SelfId(), LongTxId);
+ InternalController = std::make_shared<TWritersController>(batches.GetShardRequestsCount(), this->SelfId(), LongTxId);
ui32 sumBytes = 0;
ui32 rowsCount = 0;
- for (auto& [shard, info] : batches.GetShardsInfo()) {
- sumBytes += info.GetData().size();
- rowsCount += info.GetRowsCount();
- this->Register(new TShardWriter(shard, tableId, DedupId, info.GetData(), ActorSpan, InternalController));
+ for (auto& [shard, infos] : batches.GetShardsInfo()) {
+ for (auto&& info : infos) {
+ sumBytes += info.GetData().size();
+ rowsCount += info.GetRowsCount();
+ this->Register(new TShardWriter(shard, tableId, DedupId, info.GetData(), ActorSpan, InternalController, ++writeIdx));
+ }
}
pSpan.Attribute("affected_shards_count", (long)batches.GetShardsInfo().size());
pSpan.Attribute("bytes", (long)sumBytes);
diff --git a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
index a70ae1b5ff7..3ba59719c99 100644
--- a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
+++ b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
@@ -2058,6 +2058,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
}
Y_UNIT_TEST(Aggregation_ResultCountExpr) {
+ NColumnShard::TLimits::SetBlobSizeForSplit(10000);
TAggregationTestCase testCase;
testCase.SetQuery(R"(
SELECT
diff --git a/ydb/core/protos/tx_columnshard.proto b/ydb/core/protos/tx_columnshard.proto
index e239807efeb..afec0d6ad1b 100644
--- a/ydb/core/protos/tx_columnshard.proto
+++ b/ydb/core/protos/tx_columnshard.proto
@@ -66,6 +66,7 @@ message TEvWrite {
optional bytes Data = 6;
optional TMetadata Meta = 7;
optional NKikimrLongTxService.TLongTxId LongTxId = 8;
+ optional uint32 WritePartId = 9 [default = 0];
}
message TEvWriteResult {
diff --git a/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt
index 04fc6c670b3..5c35ada84d1 100644
--- a/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt
@@ -67,6 +67,7 @@ target_sources(core-tx-columnshard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_costs.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_private_events.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/compaction_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/defs.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/eviction_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/export_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/indexing_actor.cpp
diff --git a/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt
index 7d7683eed64..19ea4034a03 100644
--- a/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt
@@ -68,6 +68,7 @@ target_sources(core-tx-columnshard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_costs.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_private_events.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/compaction_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/defs.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/eviction_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/export_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/indexing_actor.cpp
diff --git a/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt
index 7d7683eed64..19ea4034a03 100644
--- a/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt
@@ -68,6 +68,7 @@ target_sources(core-tx-columnshard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_costs.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_private_events.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/compaction_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/defs.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/eviction_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/export_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/indexing_actor.cpp
diff --git a/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt
index 04fc6c670b3..5c35ada84d1 100644
--- a/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt
@@ -67,6 +67,7 @@ target_sources(core-tx-columnshard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_costs.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_private_events.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/compaction_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/defs.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/eviction_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/export_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/indexing_actor.cpp
diff --git a/ydb/core/tx/columnshard/columnshard.h b/ydb/core/tx/columnshard/columnshard.h
index bebf58bd1eb..00e2ca03928 100644
--- a/ydb/core/tx/columnshard/columnshard.h
+++ b/ydb/core/tx/columnshard/columnshard.h
@@ -229,21 +229,23 @@ struct TEvColumnShard {
TEvWrite() = default;
TEvWrite(const TActorId& source, ui64 metaShard, ui64 writeId, ui64 tableId,
- const TString& dedupId, const TString& data) {
+ const TString& dedupId, const TString& data, const ui32 writePartId) {
ActorIdToProto(source, Record.MutableSource());
Record.SetTxInitiator(metaShard);
Record.SetWriteId(writeId);
Record.SetTableId(tableId);
Record.SetDedupId(dedupId);
Record.SetData(data);
+ Record.SetWritePartId(writePartId);
}
TEvWrite(const TActorId& source, const NLongTxService::TLongTxId& longTxId, ui64 tableId,
- const TString& dedupId, const TString& data) {
+ const TString& dedupId, const TString& data, const ui32 writePartId) {
ActorIdToProto(source, Record.MutableSource());
Record.SetTableId(tableId);
Record.SetDedupId(dedupId);
Record.SetData(data);
+ Record.SetWritePartId(writePartId);
longTxId.ToProto(Record.MutableLongTxId());
}
diff --git a/ydb/core/tx/columnshard/columnshard__init.cpp b/ydb/core/tx/columnshard/columnshard__init.cpp
index 51f5b23cd6e..23a25c8d83d 100644
--- a/ydb/core/tx/columnshard/columnshard__init.cpp
+++ b/ydb/core/tx/columnshard/columnshard__init.cpp
@@ -270,12 +270,13 @@ bool TTxInit::ReadEverything(TTransactionContext& txc, const TActorContext& ctx)
return false;
while (!rowset.EndOfSet()) {
- const TWriteId writeId = TWriteId{rowset.GetValue<Schema::LongTxWrites::WriteId>()};
+ const TWriteId writeId = TWriteId{ rowset.GetValue<Schema::LongTxWrites::WriteId>() };
+ const ui32 writePartId = rowset.GetValue<Schema::LongTxWrites::WritePartId>();
NKikimrLongTxService::TLongTxId proto;
Y_VERIFY(proto.ParseFromString(rowset.GetValue<Schema::LongTxWrites::LongTxId>()));
const auto longTxId = NLongTxService::TLongTxId::FromProto(proto);
- Self->LoadLongTxWrite(writeId, longTxId);
+ Self->LoadLongTxWrite(writeId, writePartId, longTxId);
if (!rowset.Next())
return false;
diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp
index d73a747f8a3..aada89ffe4f 100644
--- a/ydb/core/tx/columnshard/columnshard__write.cpp
+++ b/ydb/core/tx/columnshard/columnshard__write.cpp
@@ -55,7 +55,7 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
if (record.HasLongTxId()) {
Y_VERIFY(metaShard == 0);
auto longTxId = NLongTxService::TLongTxId::FromProto(record.GetLongTxId());
- writeId = (ui64)Self->GetLongTxWrite(db, longTxId);
+ writeId = (ui64)Self->GetLongTxWrite(db, longTxId, record.GetWritePartId());
}
ui64 writeUnixTime = meta.GetDirtyWriteTimeSeconds();
@@ -198,7 +198,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
if (record.HasLongTxId()) {
// TODO: multiple blobs in one longTx ({longTxId, dedupId} -> writeId)
auto longTxId = NLongTxService::TLongTxId::FromProto(record.GetLongTxId());
- if (ui64 writeId = (ui64)HasLongTxWrite(longTxId)) {
+ if (ui64 writeId = (ui64)HasLongTxWrite(longTxId, record.GetWritePartId())) {
LOG_S_DEBUG("Write (duplicate) into pathId " << tableId
<< " longTx " << longTxId.ToString()
<< " at tablet " << TabletID());
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp
index e8cb254e13d..0278857486e 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.cpp
+++ b/ydb/core/tx/columnshard/columnshard_impl.cpp
@@ -247,28 +247,37 @@ bool TColumnShard::HaveOutdatedTxs() const {
return it->MaxStep <= step;
}
-TWriteId TColumnShard::HasLongTxWrite(const NLongTxService::TLongTxId& longTxId) {
+TWriteId TColumnShard::HasLongTxWrite(const NLongTxService::TLongTxId& longTxId, const ui32 partId) {
auto it = LongTxWritesByUniqueId.find(longTxId.UniqueId);
if (it != LongTxWritesByUniqueId.end()) {
- return (TWriteId)it->second->WriteId;
+ auto itPart = it->second.find(partId);
+ if (itPart != it->second.end()) {
+ return (TWriteId)itPart->second->WriteId;
+ }
}
return (TWriteId)0;
}
-TWriteId TColumnShard::GetLongTxWrite(NIceDb::TNiceDb& db, const NLongTxService::TLongTxId& longTxId) {
+TWriteId TColumnShard::GetLongTxWrite(NIceDb::TNiceDb& db, const NLongTxService::TLongTxId& longTxId, const ui32 partId) {
auto it = LongTxWritesByUniqueId.find(longTxId.UniqueId);
if (it != LongTxWritesByUniqueId.end()) {
- return (TWriteId)it->second->WriteId;
+ auto itPart = it->second.find(partId);
+ if (itPart != it->second.end()) {
+ return (TWriteId)itPart->second->WriteId;
+ }
+ } else {
+ it = LongTxWritesByUniqueId.emplace(longTxId.UniqueId, TPartsForLTXShard()).first;
}
TWriteId writeId = ++LastWriteId;
auto& lw = LongTxWrites[writeId];
lw.WriteId = (ui64)writeId;
+ lw.WritePartId = partId;
lw.LongTxId = longTxId;
- LongTxWritesByUniqueId[longTxId.UniqueId] = &lw;
+ it->second[partId] = &lw;
Schema::SaveSpecialValue(db, Schema::EValueIds::LastWriteId, (ui64)writeId);
- Schema::SaveLongTxWrite(db, writeId, longTxId);
+ Schema::SaveLongTxWrite(db, writeId, partId, longTxId);
return writeId;
}
@@ -278,11 +287,12 @@ void TColumnShard::AddLongTxWrite(TWriteId writeId, ui64 txId) {
lw.PreparedTxId = txId;
}
-void TColumnShard::LoadLongTxWrite(TWriteId writeId, const NLongTxService::TLongTxId& longTxId) {
+void TColumnShard::LoadLongTxWrite(TWriteId writeId, const ui32 writePartId, const NLongTxService::TLongTxId& longTxId) {
auto& lw = LongTxWrites[writeId];
+ lw.WritePartId = writePartId;
lw.WriteId = (ui64)writeId;
lw.LongTxId = longTxId;
- LongTxWritesByUniqueId[longTxId.UniqueId] = &lw;
+ LongTxWritesByUniqueId[longTxId.UniqueId][writePartId] = &lw;
}
bool TColumnShard::RemoveLongTxWrite(NIceDb::TNiceDb& db, TWriteId writeId, ui64 txId) {
@@ -290,7 +300,11 @@ bool TColumnShard::RemoveLongTxWrite(NIceDb::TNiceDb& db, TWriteId writeId, ui64
ui64 prepared = lw->PreparedTxId;
if (!prepared || txId == prepared) {
Schema::EraseLongTxWrite(db, writeId);
- LongTxWritesByUniqueId.erase(lw->LongTxId.UniqueId);
+ auto& ltxParts = LongTxWritesByUniqueId[lw->LongTxId.UniqueId];
+ ltxParts.erase(lw->WritePartId);
+ if (ltxParts.empty()) {
+ LongTxWritesByUniqueId.erase(lw->LongTxId.UniqueId);
+ }
LongTxWrites.erase(writeId);
return true;
}
diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h
index 568690cf9b4..3283e97440e 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.h
+++ b/ydb/core/tx/columnshard/columnshard_impl.h
@@ -359,6 +359,7 @@ private:
struct TLongTxWriteInfo {
ui64 WriteId;
+ ui32 WritePartId;
NLongTxService::TLongTxId LongTxId;
ui64 PreparedTxId = 0;
};
@@ -415,7 +416,8 @@ private:
THashMap<ui32, TSchemaPreset> SchemaPresets;
THashMap<ui64, TTableInfo> Tables;
THashMap<TWriteId, TLongTxWriteInfo> LongTxWrites;
- THashMap<TULID, TLongTxWriteInfo*> LongTxWritesByUniqueId;
+ using TPartsForLTXShard = THashMap<ui32, TLongTxWriteInfo*>;
+ THashMap<TULID, TPartsForLTXShard> LongTxWritesByUniqueId;
TMultiMap<TRowVersion, TEvColumnShard::TEvRead::TPtr> WaitingReads;
TMultiMap<TRowVersion, TEvColumnShard::TEvScan::TPtr> WaitingScans;
THashSet<ui64> PathsToDrop;
@@ -456,10 +458,10 @@ private:
return PrimaryIndex && PrimaryIndex->HasOverloadedGranules();
}
- TWriteId HasLongTxWrite(const NLongTxService::TLongTxId& longTxId);
- TWriteId GetLongTxWrite(NIceDb::TNiceDb& db, const NLongTxService::TLongTxId& longTxId);
+ TWriteId HasLongTxWrite(const NLongTxService::TLongTxId& longTxId, const ui32 partId);
+ TWriteId GetLongTxWrite(NIceDb::TNiceDb& db, const NLongTxService::TLongTxId& longTxId, const ui32 partId);
void AddLongTxWrite(TWriteId writeId, ui64 txId);
- void LoadLongTxWrite(TWriteId writeId, const NLongTxService::TLongTxId& longTxId);
+ void LoadLongTxWrite(TWriteId writeId, const ui32 writePartId, const NLongTxService::TLongTxId& longTxId);
bool RemoveLongTxWrite(NIceDb::TNiceDb& db, TWriteId writeId, ui64 txId = 0);
bool RemoveTx(NTable::TDatabase& database, ui64 txId);
void TryAbortWrites(NIceDb::TNiceDb& db, NOlap::TDbWrapper& dbTable, THashSet<TWriteId>&& writesToAbort);
diff --git a/ydb/core/tx/columnshard/columnshard_schema.h b/ydb/core/tx/columnshard/columnshard_schema.h
index cf5069a7601..8e842e40ed1 100644
--- a/ydb/core/tx/columnshard/columnshard_schema.h
+++ b/ydb/core/tx/columnshard/columnshard_schema.h
@@ -142,11 +142,12 @@ struct Schema : NIceDb::Schema {
};
struct LongTxWrites : Table<6> {
- struct WriteId : Column<1, NScheme::NTypeIds::Uint64> {};
+ struct WriteId: Column<1, NScheme::NTypeIds::Uint64> {};
struct LongTxId : Column<2, NScheme::NTypeIds::String> {};
+ struct WritePartId: Column<3, NScheme::NTypeIds::Uint32> {};
using TKey = TableKey<WriteId>;
- using TColumns = TableColumns<WriteId, LongTxId>;
+ using TColumns = TableColumns<WriteId, LongTxId, WritePartId>;
};
struct BlobsToKeep : Table<7> {
@@ -398,13 +399,14 @@ struct Schema : NIceDb::Schema {
db.Table<TableInfo>().Key(pathId).Delete();
}
- static void SaveLongTxWrite(NIceDb::TNiceDb& db, TWriteId writeId, const NLongTxService::TLongTxId& longTxId) {
+ static void SaveLongTxWrite(NIceDb::TNiceDb& db, TWriteId writeId, const ui32 writePartId, const NLongTxService::TLongTxId& longTxId) {
NKikimrLongTxService::TLongTxId proto;
longTxId.ToProto(&proto);
TString serialized;
Y_VERIFY(proto.SerializeToString(&serialized));
db.Table<LongTxWrites>().Key((ui64)writeId).Update(
- NIceDb::TUpdate<LongTxWrites::LongTxId>(serialized));
+ NIceDb::TUpdate<LongTxWrites::LongTxId>(serialized),
+ NIceDb::TUpdate<LongTxWrites::WritePartId>(writePartId));
}
static void EraseLongTxWrite(NIceDb::TNiceDb& db, TWriteId writeId) {
diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.cpp b/ydb/core/tx/columnshard/columnshard_ut_common.cpp
index f9572611454..f7875b89120 100644
--- a/ydb/core/tx/columnshard/columnshard_ut_common.cpp
+++ b/ydb/core/tx/columnshard/columnshard_ut_common.cpp
@@ -78,7 +78,7 @@ void PlanSchemaTx(TTestBasicRuntime& runtime, TActorId& sender, NOlap::TSnapshot
bool WriteData(TTestBasicRuntime& runtime, TActorId& sender, ui64 metaShard, ui64 writeId, ui64 tableId,
const TString& data, std::shared_ptr<arrow::Schema> schema) {
const TString dedupId = ToString(writeId);
- auto write = std::make_unique<TEvColumnShard::TEvWrite>(sender, metaShard, writeId, tableId, dedupId, data);
+ auto write = std::make_unique<TEvColumnShard::TEvWrite>(sender, metaShard, writeId, tableId, dedupId, data, 1);
if (schema) {
write->SetArrowSchema(NArrow::SerializeSchema(*schema));
}
@@ -97,7 +97,7 @@ std::optional<ui64> WriteData(TTestBasicRuntime& runtime, TActorId& sender, cons
ui64 tableId, const TString& dedupId, const TString& data,
std::shared_ptr<arrow::Schema> schema)
{
- auto write = std::make_unique<TEvColumnShard::TEvWrite>(sender, longTxId, tableId, dedupId, data);
+ auto write = std::make_unique<TEvColumnShard::TEvWrite>(sender, longTxId, tableId, dedupId, data, 1);
if (schema) {
write->SetArrowSchema(NArrow::SerializeSchema(*schema));
}
diff --git a/ydb/core/tx/columnshard/defs.cpp b/ydb/core/tx/columnshard/defs.cpp
new file mode 100644
index 00000000000..90c09e0c60a
--- /dev/null
+++ b/ydb/core/tx/columnshard/defs.cpp
@@ -0,0 +1,17 @@
+#include "defs.h"
+
+namespace NKikimr::NColumnShard {
+
+namespace {
+ std::optional<ui64> BlobSizeForSplit;
+}
+
+ui64 TLimits::GetBlobSizeForSplit() {
+ return BlobSizeForSplit.value_or(MAX_BLOB_SIZE * 0.95);
+}
+
+void TLimits::SetBlobSizeForSplit(const ui64 value) {
+ BlobSizeForSplit = value;
+}
+
+}
diff --git a/ydb/core/tx/columnshard/defs.h b/ydb/core/tx/columnshard/defs.h
index 633d7ca2e93..8d953d79db7 100644
--- a/ydb/core/tx/columnshard/defs.h
+++ b/ydb/core/tx/columnshard/defs.h
@@ -18,6 +18,9 @@ struct TLimits {
static constexpr const ui64 MAX_BYTES_TO_INSERT = 16 * 1024 * 1024;
static constexpr const ui32 MAX_TX_RECORDS = 100000;
+ static ui64 GetBlobSizeForSplit();
+ static void SetBlobSizeForSplit(const ui64 value);
+
TControlWrapper MinInsertBytes;
TControlWrapper MaxInsertBytes;
TControlWrapper InsertTableSize;
diff --git a/ydb/core/tx/long_tx_service/commit_impl.cpp b/ydb/core/tx/long_tx_service/commit_impl.cpp
index 91e082f962d..2a1164c6508 100644
--- a/ydb/core/tx/long_tx_service/commit_impl.cpp
+++ b/ydb/core/tx/long_tx_service/commit_impl.cpp
@@ -20,9 +20,14 @@ namespace NLongTxService {
static constexpr ui32 MaxPlanRetriesPerShard = 1000; // ~5 min
static constexpr ui32 RetryDelayMs = 300;
- ui64 WriteId = 0;
+ std::vector<ui64> WriteIds;
TString TxBody;
ui32 NumRetries = 0;
+ ui32 WritePartId = 0;
+
+ TString GetWriteIdsStr() const {
+ return JoinSeq(", ", WriteIds);
+ }
};
class TLongTxServiceActor::TCommitActor : public TActorBootstrapped<TCommitActor> {
@@ -30,7 +35,7 @@ namespace NLongTxService {
struct TParams {
TLongTxId TxId;
TString DatabaseName;
- THashMap<ui64, ui64> ColumnShardWrites;
+ THashMap<ui64, TTransaction::TShardWriteIds> ColumnShardWrites;
};
public:
@@ -83,13 +88,16 @@ namespace NLongTxService {
void PrepareTransaction() {
for (const auto& pr : Params.ColumnShardWrites) {
const ui64 tabletId = pr.first;
- const ui64 writeId = pr.second;
NKikimrTxColumnShard::TCommitTxBody tx;
- tx.AddWriteIds(writeId);
+ std::vector<ui64> writeIds;
+ for (auto&& wId : pr.second) {
+ tx.AddWriteIds(wId);
+ writeIds.emplace_back(wId);
+ }
TString txBody;
Y_VERIFY(tx.SerializeToString(&txBody));
- WaitingShards.emplace(tabletId, TRetryData{writeId, txBody, 0});
+ WaitingShards.emplace(tabletId, TRetryData{ writeIds, txBody, 0 });
SendPrepareTransaction(tabletId);
}
Become(&TThis::StatePrepare);
@@ -115,7 +123,7 @@ namespace NLongTxService {
return true;
}
- TXLOG_DEBUG("Sending TEvProposeTransaction to ColumnShard# " << tabletId << " WriteId# " << data.WriteId);
+ TXLOG_DEBUG("Sending TEvProposeTransaction to ColumnShard# " << tabletId << " WriteId# " << data.GetWriteIdsStr());
SendToTablet(tabletId, MakeHolder<TEvColumnShard::TEvProposeTransaction>(
NKikimrTxColumnShard::TX_KIND_COMMIT,
diff --git a/ydb/core/tx/long_tx_service/long_tx_service_impl.cpp b/ydb/core/tx/long_tx_service/long_tx_service_impl.cpp
index 11b74196c27..eae23c7d317 100644
--- a/ydb/core/tx/long_tx_service/long_tx_service_impl.cpp
+++ b/ydb/core/tx/long_tx_service/long_tx_service_impl.cpp
@@ -282,18 +282,13 @@ void TLongTxServiceActor::Handle(TEvLongTxService::TEvAttachColumnShardWrites::T
}
for (const auto& write : msg->Record.GetWrites()) {
- ui64 shardId = write.GetColumnShard();
- ui64 writeId = write.GetWriteId();
+ const ui64 shardId = write.GetColumnShard();
+ const ui64 writeId = write.GetWriteId();
auto it = tx.ColumnShardWrites.find(shardId);
if (it == tx.ColumnShardWrites.end()) {
- tx.ColumnShardWrites[shardId] = writeId;
- continue;
+ it = tx.ColumnShardWrites.emplace(shardId, TTransaction::TShardWriteIds()).first;
}
- if (it->second == writeId) {
- continue;
- }
- return SendReply(ERequestType::AttachColumnShardWrites, ev->Sender, ev->Cookie,
- Ydb::StatusIds::GENERIC_ERROR, "Shard write id change detected, transaction may be broken");
+ it->second.emplace_back(writeId);
}
Send(ev->Sender, new TEvLongTxService::TEvAttachColumnShardWritesResult(Ydb::StatusIds::SUCCESS), 0, ev->Cookie);
diff --git a/ydb/core/tx/long_tx_service/long_tx_service_impl.h b/ydb/core/tx/long_tx_service/long_tx_service_impl.h
index 0bda6adadae..8c6280a3fe1 100644
--- a/ydb/core/tx/long_tx_service/long_tx_service_impl.h
+++ b/ydb/core/tx/long_tx_service/long_tx_service_impl.h
@@ -33,8 +33,9 @@ namespace NLongTxService {
TLongTxId TxId;
TString DatabaseName;
ETxState State = ETxState::Uninitialized;
- // Maps column shards to known write ids
- THashMap<ui64, ui64> ColumnShardWrites;
+ // Maps column shards to known write ids by partId
+ using TShardWriteIds = std::vector<ui64>;
+ THashMap<ui64, TShardWriteIds> ColumnShardWrites;
// A list of currently known committers
TVector<TSenderId> Committers;
// The currently running commit actor
diff --git a/ydb/core/tx/schemeshard/ut_olap.cpp b/ydb/core/tx/schemeshard/ut_olap.cpp
index ec2649c1b50..656919d64db 100644
--- a/ydb/core/tx/schemeshard/ut_olap.cpp
+++ b/ydb/core/tx/schemeshard/ut_olap.cpp
@@ -77,7 +77,7 @@ static constexpr ui64 txInitiator = 42; // 0 means LongTx, we need another value
void WriteData(TTestBasicRuntime& runtime, TActorId sender, ui64 tabletId, ui64 pathId, ui64 writeId, TString data) {
const TString dedupId = ToString(writeId);
- auto evWrite = std::make_unique<TEvColumnShard::TEvWrite>(sender, txInitiator, writeId, pathId, dedupId, data);
+ auto evWrite = std::make_unique<TEvColumnShard::TEvWrite>(sender, txInitiator, writeId, pathId, dedupId, data, 1);
ForwardToTablet(runtime, tabletId, sender, evWrite.release());
TAutoPtr<IEventHandle> handle;