diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-10-01 19:48:13 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-10-01 20:03:27 +0300 |
commit | a8bdeac6cbeb7b346252304f66ffa226e9047d53 (patch) | |
tree | 543586ec773002b033e88475c9224e1021cecf28 | |
parent | 33c98f7f7bf065218f7bc5ff613552a8404447ce (diff) | |
download | ydb-a8bdeac6cbeb7b346252304f66ffa226e9047d53.tar.gz |
KIKIMR-19505: store blob into TInsertedData for reuse in future in indexation changes
7 files changed, 110 insertions, 41 deletions
diff --git a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp index 0f25c5b80e4..56b9de2a0f6 100644 --- a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp +++ b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp @@ -1,7 +1,7 @@ #include "tx_write.h" namespace NKikimr::NColumnShard { -bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const TEvPrivate::TEvWriteBlobsResult::TPutBlobData& blobData, const TWriteId writeId) { +bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const TEvPrivate::TEvWriteBlobsResult::TPutBlobData& blobData, const TWriteId writeId, const TString& blob) { const NKikimrTxColumnShard::TLogicalMetadata& meta = blobData.GetLogicalMeta(); const auto& blobRange = blobData.GetBlobRange(); @@ -15,7 +15,7 @@ bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const TEvPrivate::TEvWrit auto tableSchema = Self->TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetSchemaUnsafe(PutBlobResult->Get()->GetSchemaVersion()); - NOlap::TInsertedData insertData((ui64)writeId, writeMeta.GetTableId(), writeMeta.GetDedupId(), blobRange, meta, tableSchema->GetSnapshot()); + NOlap::TInsertedData insertData((ui64)writeId, writeMeta.GetTableId(), writeMeta.GetDedupId(), blobRange, meta, tableSchema->GetSnapshot(), blob); bool ok = Self->InsertTable->Insert(dbTable, std::move(insertData)); if (ok) { // Put new data into blob cache @@ -29,15 +29,15 @@ bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const TEvPrivate::TEvWrit bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) { - LOG_S_DEBUG(TxPrefix() << "execute" << TxSuffix()); - + NActors::TLogContextGuard logGuard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("tx_state", "execute"); + ACFL_DEBUG("event", "start_execute"); const auto& writeMeta(PutBlobResult->Get()->GetWriteMeta()); Y_VERIFY(Self->TablesManager.IsReadyForWrite(writeMeta.GetTableId())); txc.DB.NoMoreReadsForTx(); TWriteOperation::TPtr operation; if (writeMeta.HasLongTxId()) { - Y_VERIFY_S(PutBlobResult->Get()->GetBlobData().size() == 1, TStringBuilder() << "Blobs count: " << PutBlobResult->Get()->GetBlobData().size()); + AFL_VERIFY(PutBlobResult->Get()->GetBlobData().size() == 1)("count", PutBlobResult->Get()->GetBlobData().size()); } else { operation = Self->OperationsManager.GetOperation((TWriteId)writeMeta.GetWriteId()); Y_VERIFY(operation); @@ -54,7 +54,7 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) { writeId = Self->GetLongTxWrite(db, writeMeta.GetLongTxIdUnsafe(), writeMeta.GetWritePartId()); } - if (!InsertOneBlob(txc, blobData, writeId)) { + if (!InsertOneBlob(txc, blobData, writeId, PutBlobResult->Get()->GetBlobVerified(blobData.GetBlobRange()))) { LOG_S_DEBUG(TxPrefix() << "duplicate writeId " << (ui64)writeId << TxSuffix()); Self->IncCounter(COUNTER_WRITE_DUPLICATE); } @@ -62,6 +62,8 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) { } TBlobManagerDb blobManagerDb(txc.DB); + AFL_VERIFY(PutBlobResult->Get()->GetActions().size() == 1); + AFL_VERIFY(PutBlobResult->Get()->GetActions().front()->GetBlobsCount() == PutBlobResult->Get()->GetBlobData().size()); for (auto&& i : PutBlobResult->Get()->GetActions()) { i->OnExecuteTxAfterWrite(*Self, blobManagerDb, true); } @@ -80,8 +82,8 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) { } void TTxWrite::Complete(const TActorContext& ctx) { - Y_VERIFY(Result); - LOG_S_DEBUG(TxPrefix() << "complete" << TxSuffix()); + NActors::TLogContextGuard logGuard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("tx_state", "complete"); + AFL_VERIFY(Result); Self->CSCounters.OnWriteTxComplete((TMonotonic::Now() - PutBlobResult->Get()->GetWriteMeta().GetWriteStartInstant()).MilliSeconds()); Self->CSCounters.OnSuccessWriteResponse(); ctx.Send(PutBlobResult->Get()->GetWriteMeta().GetSource(), Result.release()); diff --git a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.h b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.h index 2bb0adaad8a..c0b6688c306 100644 --- a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.h +++ b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.h @@ -15,7 +15,7 @@ public: void Complete(const TActorContext& ctx) override; TTxType GetTxType() const override { return TXTYPE_WRITE; } - bool InsertOneBlob(TTransactionContext& txc, const TEvPrivate::TEvWriteBlobsResult::TPutBlobData& blobData, const TWriteId writeId); + bool InsertOneBlob(TTransactionContext& txc, const TEvPrivate::TEvWriteBlobsResult::TPutBlobData& blobData, const TWriteId writeId, const TString& blob); private: TEvPrivate::TEvWriteBlobsResult::TPtr PutBlobResult; diff --git a/ydb/core/tx/columnshard/columnshard_schema.cpp b/ydb/core/tx/columnshard/columnshard_schema.cpp index 251d734fdd6..980b9938ad0 100644 --- a/ydb/core/tx/columnshard/columnshard_schema.cpp +++ b/ydb/core/tx/columnshard/columnshard_schema.cpp @@ -27,8 +27,9 @@ bool Schema::IndexColumns_Load(NIceDb::TNiceDb& db, const IBlobGroupSelector* ds bool Schema::InsertTable_Load(NIceDb::TNiceDb& db, const IBlobGroupSelector* dsGroupSelector, NOlap::TInsertTableAccessor& insertTable, const TInstant& /*loadTime*/) { auto rowset = db.Table<InsertTable>().GreaterOrEqual(0, 0, 0, 0, "").Select(); - if (!rowset.IsReady()) + if (!rowset.IsReady()) { return false; + } while (!rowset.EndOfSet()) { EInsertTableIds recType = (EInsertTableIds)rowset.GetValue<InsertTable::Committed>(); @@ -52,7 +53,7 @@ bool Schema::InsertTable_Load(NIceDb::TNiceDb& db, const IBlobGroupSelector* dsG if (metaStr) { Y_VERIFY(meta.ParseFromString(metaStr)); } - TInsertedData data(planStep, writeTxId, pathId, dedupId, NOlap::TBlobRange(blobId, 0, blobId.BlobSize()), meta, indexSnapshot); + TInsertedData data(planStep, writeTxId, pathId, dedupId, NOlap::TBlobRange(blobId, 0, blobId.BlobSize()), meta, indexSnapshot, {}); switch (recType) { case EInsertTableIds::Inserted: @@ -66,8 +67,9 @@ bool Schema::InsertTable_Load(NIceDb::TNiceDb& db, const IBlobGroupSelector* dsG break; } - if (!rowset.Next()) + if (!rowset.Next()) { return false; + } } return true; } diff --git a/ydb/core/tx/columnshard/engines/insert_table/data.cpp b/ydb/core/tx/columnshard/engines/insert_table/data.cpp index 2830fe24c17..18f41acfdc9 100644 --- a/ydb/core/tx/columnshard/engines/insert_table/data.cpp +++ b/ydb/core/tx/columnshard/engines/insert_table/data.cpp @@ -1,5 +1,57 @@ #include "data.h" +#include <library/cpp/actors/core/log.h> namespace NKikimr::NOlap { +namespace { + +class TInsertTableCacheController { +private: + TAtomicCounter BlobsCacheSize = 0; + const i64 BlobsCacheLimit = (i64)1 << 30; +public: + void Return(const ui64 size) { + const i64 val = BlobsCacheSize.Sub(size); + AFL_VERIFY(val >= 0)("size", size)("val", val); + } + + bool Take(const ui64 size) { + if (BlobsCacheSize.Add(size) <= BlobsCacheLimit) { + return true; + } + const i64 val = BlobsCacheSize.Sub(size); + AFL_VERIFY(val >= 0)("size", size)("val", val); + return false; + } +}; + +} + +TInsertedData::TBlobStorageGuard::~TBlobStorageGuard() { + Singleton<TInsertTableCacheController>()->Return(Data.size()); +} + +TInsertedData::~TInsertedData() { +} + +TInsertedData::TInsertedData(ui64 planStep, ui64 writeTxId, ui64 pathId, TString dedupId, const TBlobRange& blobRange, + const NKikimrTxColumnShard::TLogicalMetadata& proto, const TSnapshot& schemaVersion, + const std::optional<TString>& blobData /*= {}*/) + : Meta(proto) + , BlobRange(blobRange) + , PlanStep(planStep) + , WriteTxId(writeTxId) + , PathId(pathId) + , DedupId(dedupId) + , SchemaVersion(schemaVersion) +{ + if (blobData) { + AFL_VERIFY(blobData->size() == BlobRange.Size); + if (Singleton<TInsertTableCacheController>()->Take(blobData->size())) { + BlobDataGuard = std::make_shared<TBlobStorageGuard>(*blobData); + } + } + Y_VERIFY(SchemaVersion.Valid()); +} + } diff --git a/ydb/core/tx/columnshard/engines/insert_table/data.h b/ydb/core/tx/columnshard/engines/insert_table/data.h index 327d323e063..d891fbca17e 100644 --- a/ydb/core/tx/columnshard/engines/insert_table/data.h +++ b/ydb/core/tx/columnshard/engines/insert_table/data.h @@ -10,43 +10,58 @@ struct TInsertedData { private: TInsertedDataMeta Meta; YDB_READONLY_DEF(TBlobRange, BlobRange); + class TBlobStorageGuard { + private: + YDB_READONLY_DEF(TString, Data); + public: + TBlobStorageGuard(const TString& data) + : Data(data) + { -public: - const TInsertedDataMeta& GetMeta() const { - return Meta; - } + } + ~TBlobStorageGuard(); + }; + std::shared_ptr<TBlobStorageGuard> BlobDataGuard; +public: ui64 PlanStep = 0; ui64 WriteTxId = 0; ui64 PathId = 0; TString DedupId; +private: + TSnapshot SchemaVersion = TSnapshot::Zero(); +public: + std::optional<TString> GetBlobData() const { + if (BlobDataGuard) { + return BlobDataGuard->GetData(); + } else { + return {}; + } + } + + const TInsertedDataMeta& GetMeta() const { + return Meta; + } TInsertedData() = delete; // avoid invalid TInsertedData anywhere TInsertedData(ui64 planStep, ui64 writeTxId, ui64 pathId, TString dedupId, const TBlobRange& blobRange, - const NKikimrTxColumnShard::TLogicalMetadata& proto, const TSnapshot& schemaVersion) - : Meta(proto) - , BlobRange(blobRange) - , PlanStep(planStep) - , WriteTxId(writeTxId) - , PathId(pathId) - , DedupId(dedupId) - , SchemaVersion(schemaVersion) { - Y_VERIFY(SchemaVersion.Valid()); - } + const NKikimrTxColumnShard::TLogicalMetadata& proto, const TSnapshot& schemaVersion, const std::optional<TString>& blobData); TInsertedData(ui64 writeTxId, ui64 pathId, TString dedupId, const TBlobRange& blobRange, - const NKikimrTxColumnShard::TLogicalMetadata& proto, const TSnapshot& schemaVersion) - : TInsertedData(0, writeTxId, pathId, dedupId, blobRange, proto, schemaVersion) + const NKikimrTxColumnShard::TLogicalMetadata& proto, const TSnapshot& schemaVersion, const std::optional<TString>& blobData) + : TInsertedData(0, writeTxId, pathId, dedupId, blobRange, proto, schemaVersion, blobData) {} TInsertedData(ui64 writeTxId, ui64 pathId, TString dedupId, const TUnifiedBlobId& blobId, - const NKikimrTxColumnShard::TLogicalMetadata& proto, const TSnapshot& schemaVersion) - : TInsertedData(0, writeTxId, pathId, dedupId, TBlobRange(blobId, 0, blobId.BlobSize()), proto, schemaVersion) + const NKikimrTxColumnShard::TLogicalMetadata& proto, const TSnapshot& schemaVersion, const std::optional<TString>& blobData) + : TInsertedData(0, writeTxId, pathId, dedupId, TBlobRange(blobId, 0, blobId.BlobSize()), proto, schemaVersion, blobData) { } + ~TInsertedData(); + bool operator < (const TInsertedData& key) const { if (PlanStep < key.PlanStep) { return true; @@ -109,8 +124,6 @@ public: ui32 BlobSize() const { return BlobRange.GetBlobSize(); } -private: - TSnapshot SchemaVersion = TSnapshot::Zero(); }; class TCommittedBlob { diff --git a/ydb/core/tx/columnshard/engines/ut_insert_table.cpp b/ydb/core/tx/columnshard/engines/ut_insert_table.cpp index e0dc9cf4e2c..b14b5b6a18a 100644 --- a/ydb/core/tx/columnshard/engines/ut_insert_table.cpp +++ b/ydb/core/tx/columnshard/engines/ut_insert_table.cpp @@ -54,16 +54,16 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestInsertTable) { TSnapshot indexSnapshot(1, 1); // insert, not commited - bool ok = insertTable.Insert(dbTable, TInsertedData(writeId, tableId, dedupId, blobId1, {}, indexSnapshot)); + bool ok = insertTable.Insert(dbTable, TInsertedData(writeId, tableId, dedupId, blobId1, {}, indexSnapshot, {})); UNIT_ASSERT(ok); // insert the same blobId1 again - ok = insertTable.Insert(dbTable, TInsertedData(writeId, tableId, dedupId, blobId1, {}, indexSnapshot)); + ok = insertTable.Insert(dbTable, TInsertedData(writeId, tableId, dedupId, blobId1, {}, indexSnapshot, {})); UNIT_ASSERT(!ok); // insert different blodId with the same writeId and dedupId TUnifiedBlobId blobId2(2222, 1, 2, 100, 1); - ok = insertTable.Insert(dbTable, TInsertedData(writeId, tableId, dedupId, blobId2, {}, indexSnapshot)); + ok = insertTable.Insert(dbTable, TInsertedData(writeId, tableId, dedupId, blobId2, {}, indexSnapshot, {})); UNIT_ASSERT(!ok); // read nothing diff --git a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp index 07d2cdfe9f4..ada2883e5f0 100644 --- a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp +++ b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp @@ -442,8 +442,8 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { engine.Load(db, lostBlobs); std::vector<TInsertedData> dataToIndex = { - TInsertedData(2, paths[0], "", blobRanges[0].BlobId, {}, indexSnaphot), - TInsertedData(1, paths[0], "", blobRanges[1].BlobId, {}, indexSnaphot) + TInsertedData(2, paths[0], "", blobRanges[0].BlobId, {}, indexSnaphot, {}), + TInsertedData(1, paths[0], "", blobRanges[1].BlobId, {}, indexSnaphot, {}) }; // write @@ -538,7 +538,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { // PlanStep, TxId, PathId, DedupId, BlobId, Data, [Metadata] std::vector<TInsertedData> dataToIndex; dataToIndex.push_back( - TInsertedData{txId, pathId, "", blobRange.BlobId, {}, indexSnapshot}); + TInsertedData(txId, pathId, "", blobRange.BlobId, {}, indexSnapshot, {})); bool ok = Insert(engine, db, TSnapshot(planStep, txId), std::move(dataToIndex), blobs, step); UNIT_ASSERT(ok); @@ -634,7 +634,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { // PlanStep, TxId, PathId, DedupId, BlobId, Data, [Metadata] std::vector<TInsertedData> dataToIndex; dataToIndex.push_back( - TInsertedData{txId, pathId, "", blobRange.BlobId, {}, indexSnapshot}); + TInsertedData(txId, pathId, "", blobRange.BlobId, {}, indexSnapshot, {})); bool ok = Insert(engine, db, TSnapshot(planStep, txId), std::move(dataToIndex), blobs, step); UNIT_ASSERT(ok); @@ -663,7 +663,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { // PlanStep, TxId, PathId, DedupId, BlobId, Data, [Metadata] std::vector<TInsertedData> dataToIndex; dataToIndex.push_back( - TInsertedData(txId, pathId, "", blobRange.BlobId, {}, indexSnapshot)); + TInsertedData(txId, pathId, "", blobRange.BlobId, {}, indexSnapshot, {})); bool ok = Insert(engine, db, TSnapshot(planStep, txId), std::move(dataToIndex), blobs, step); UNIT_ASSERT(ok); @@ -703,7 +703,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { // PlanStep, TxId, PathId, DedupId, BlobId, Data, [Metadata] std::vector<TInsertedData> dataToIndex; dataToIndex.push_back( - TInsertedData(txId, pathId, "", blobRange.BlobId, {}, indexSnapshot)); + TInsertedData(txId, pathId, "", blobRange.BlobId, {}, indexSnapshot, {})); bool ok = Insert(engine, db, TSnapshot(planStep, txId), std::move(dataToIndex), blobs, step); UNIT_ASSERT(ok); |