diff options
author | nsofya <nsofya@yandex-team.com> | 2023-09-07 12:20:06 +0300 |
---|---|---|
committer | nsofya <nsofya@yandex-team.com> | 2023-09-07 13:35:50 +0300 |
commit | f51f48a9153ef1341700ad8ba322f01d60368b60 (patch) | |
tree | f832df05da7730e3885920c53985dc7ed2202956 | |
parent | b0c000d48fba6fcbd7f69bcbe94290623a172fa7 (diff) | |
download | ydb-f51f48a9153ef1341700ad8ba322f01d60368b60.tar.gz |
Use TBlobRange in InsertedTable
19 files changed, 105 insertions, 83 deletions
diff --git a/ydb/core/blob_depot/events.h b/ydb/core/blob_depot/events.h index 92d7a9f440..0bbd77f9ab 100644 --- a/ydb/core/blob_depot/events.h +++ b/ydb/core/blob_depot/events.h @@ -1,6 +1,7 @@ #pragma once #include <library/cpp/actors/core/event.h> +#include <library/cpp/actors/core/interconnect.h> #include <ydb/core/protos/blob_depot.pb.h> namespace NKikimr { diff --git a/ydb/core/tx/columnshard/blob.h b/ydb/core/tx/columnshard/blob.h index 0ad18d5754..f92e156878 100644 --- a/ydb/core/tx/columnshard/blob.h +++ b/ydb/core/tx/columnshard/blob.h @@ -284,11 +284,28 @@ struct TBlobRange { ui32 Offset; ui32 Size; + const TUnifiedBlobId& GetBlobId() const { + return BlobId; + } + + ui32 GetBlobSize() const { + return Size; + } + + bool IsFullBlob() const { + return Size == BlobId.BlobSize(); + } + explicit TBlobRange(const TUnifiedBlobId& blobId = TUnifiedBlobId(), ui32 offset = 0, ui32 size = 0) : BlobId(blobId) , Offset(offset) , Size(size) - {} + { + if (Size > 0) { + Y_VERIFY(Offset < BlobId.BlobSize()); + Y_VERIFY(Offset + Size <= BlobId.BlobSize()); + } + } bool operator == (const TBlobRange& other) const { return diff --git a/ydb/core/tx/columnshard/columnshard.h b/ydb/core/tx/columnshard/columnshard.h index f55648f30f..256f9358b5 100644 --- a/ydb/core/tx/columnshard/columnshard.h +++ b/ydb/core/tx/columnshard/columnshard.h @@ -1,11 +1,10 @@ #pragma once #include "defs.h" -#include "blob_manager.h" +#include "blob.h" #include <ydb/core/tx/tx.h> #include <ydb/core/tx/message_seqno.h> #include <ydb/core/protos/tx_columnshard.pb.h> -#include <ydb/core/tx/columnshard/engines/writer/write_controller.h> #include <ydb/core/tx/ev_write/write_data.h> #include <ydb/core/tx/long_tx_service/public/types.h> @@ -16,7 +15,6 @@ namespace NKikimr { namespace NColumnShard { -class TBlobGroupSelector; inline Ydb::StatusIds::StatusCode ConvertToYdbStatus(NKikimrTxColumnShard::EResultStatus columnShardStatus) { switch (columnShardStatus) { @@ -211,7 +209,7 @@ struct TEvColumnShard { BlobRanges.reserve(Record.BlobRangesSize()); for (const auto& range : Record.GetBlobRanges()) { - auto blobId = NColumnShard::TUnifiedBlobId::ParseFromString(range.GetBlobId(), dsGroupSelector, + auto blobId = NOlap::TUnifiedBlobId::ParseFromString(range.GetBlobId(), dsGroupSelector, errString); if (!errString.empty()) { return; diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp index 0e67077dd0..30d214fb4d 100644 --- a/ydb/core/tx/columnshard/columnshard__write.cpp +++ b/ydb/core/tx/columnshard/columnshard__write.cpp @@ -44,8 +44,8 @@ bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const TEvPrivate::TEvWrit const NKikimrTxColumnShard::TLogicalMetadata& meta = blobData.GetLogicalMeta(); - const auto& logoBlobId = blobData.GetBlobId(); - Y_VERIFY(logoBlobId.IsValid()); + const auto& blobRange = blobData.GetBlobRange(); + Y_VERIFY(blobRange.GetBlobId().IsValid()); ui64 writeUnixTime = meta.GetDirtyWriteTimeSeconds(); TInstant time = TInstant::Seconds(writeUnixTime); @@ -58,7 +58,7 @@ bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const TEvPrivate::TEvWrit auto tableSchema = Self->TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetSchemaUnsafe(PutBlobResult->Get()->GetSchemaVersion()); - NOlap::TInsertedData insertData((ui64)writeId, writeMeta.GetTableId(), writeMeta.GetDedupId(), logoBlobId, meta, tableSchema->GetSnapshot()); + NOlap::TInsertedData insertData((ui64)writeId, writeMeta.GetTableId(), writeMeta.GetDedupId(), blobRange, meta, tableSchema->GetSnapshot()); bool ok = Self->InsertTable->Insert(dbTable, std::move(insertData)); if (ok) { THashSet<TWriteId> writesToAbort = Self->InsertTable->OldWritesToAbort(time); @@ -71,24 +71,15 @@ bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const TEvPrivate::TEvWrit auto allAborted = Self->InsertTable->GetAborted(); // copy (src is modified in cycle) for (auto& [abortedWriteId, abortedData] : allAborted) { Self->InsertTable->EraseAborted(dbTable, abortedData); - Self->BlobManager->DeleteBlob(abortedData.BlobId, blobManagerDb); + Y_VERIFY(blobRange.IsFullBlob()); + Self->BlobManager->DeleteBlob(abortedData.GetBlobRange().GetBlobId(), blobManagerDb); } // Put new data into blob cache - Y_VERIFY(logoBlobId.BlobSize() == data.size()); - NBlobCache::AddRangeToCache(NBlobCache::TBlobRange(logoBlobId, 0, data.size()), data); + Y_VERIFY(blobRange.IsFullBlob()); + NBlobCache::AddRangeToCache(blobRange, blobData.GetBlobData()); Self->UpdateInsertTableCounters(); - - const auto& blobBatch(PutBlobResult->Get()->GetPutResult().GetBlobBatch()); - ui64 blobsWritten = blobBatch.GetBlobCount(); - ui64 bytesWritten = blobBatch.GetTotalSize(); - Self->IncCounter(COUNTER_UPSERT_BLOBS_WRITTEN, blobsWritten); - Self->IncCounter(COUNTER_UPSERT_BYTES_WRITTEN, bytesWritten); - Self->IncCounter(COUNTER_RAW_BYTES_UPSERTED, meta.GetRawBytes()); - Self->IncCounter(COUNTER_WRITE_SUCCESS); - - Self->BlobManager->SaveBlobBatch((std::move(PutBlobResult->Get()->GetPutResultPtr()->ReleaseBlobBatch())), blobManagerDb); return true; } return false; @@ -112,6 +103,7 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) { } TVector<TWriteId> writeIds; + ui64 insertedBytes = 0; for (auto blobData : PutBlobResult->Get()->GetBlobData()) { auto writeId = TWriteId(writeMeta.GetWriteId()); if (operation) { @@ -121,13 +113,28 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) { writeId = Self->GetLongTxWrite(db, writeMeta.GetLongTxIdUnsafe(), writeMeta.GetWritePartId()); } - if (!InsertOneBlob(txc, blobData, writeId)) { + if (InsertOneBlob(txc, blobData, writeId)) { + insertedBytes += blobData.GetLogicalMeta().GetRawBytes(); + } else { LOG_S_DEBUG(TxPrefix() << "duplicate writeId " << (ui64)writeId << TxSuffix()); Self->IncCounter(COUNTER_WRITE_DUPLICATE); } writeIds.push_back(writeId); } + if (insertedBytes > 0) { + TBlobManagerDb blobManagerDb(txc.DB); + const auto& blobBatch(PutBlobResult->Get()->GetPutResult().GetBlobBatch()); + ui64 blobsWritten = blobBatch.GetBlobCount(); + ui64 bytesWritten = blobBatch.GetTotalSize(); + Self->IncCounter(COUNTER_UPSERT_BLOBS_WRITTEN, blobsWritten); + Self->IncCounter(COUNTER_UPSERT_BYTES_WRITTEN, bytesWritten); + Self->IncCounter(COUNTER_RAW_BYTES_UPSERTED, insertedBytes); + Self->IncCounter(COUNTER_WRITE_SUCCESS); + + Self->BlobManager->SaveBlobBatch((std::move(PutBlobResult->Get()->GetPutResultPtr()->ReleaseBlobBatch())), blobManagerDb); + } + if (operation) { operation->OnWriteFinish(txc, writeIds); auto txInfo = Self->ProgressTxController.RegisterTxWithDeadline(operation->GetTxId(), NKikimrTxColumnShard::TX_KIND_COMMIT_WRITE, "", writeMeta.GetSource(), 0, txc); diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index fe8170efcc..089b8dae24 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -693,7 +693,6 @@ void TColumnShard::SetupIndexation() { << " at tablet " << TabletID()); std::vector<NOlap::TInsertedData> data; - THashMap<TUnifiedBlobId, std::shared_ptr<arrow::RecordBatch>> cachedBlobs; data.reserve(dataToIndex.size()); for (auto& ptr : dataToIndex) { data.push_back(*ptr); @@ -709,7 +708,7 @@ void TColumnShard::SetupIndexation() { auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndex(); indexChanges->Start(*this); auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move(actualIndexInfo), indexChanges, - Settings.CacheDataAfterIndexing, std::move(cachedBlobs)); + Settings.CacheDataAfterIndexing); ActorContext().Send(IndexingActor, std::make_unique<TEvPrivate::TEvIndexing>(std::move(ev))); } diff --git a/ydb/core/tx/columnshard/columnshard_private_events.h b/ydb/core/tx/columnshard/columnshard_private_events.h index c5601a3883..4dddca7b53 100644 --- a/ydb/core/tx/columnshard/columnshard_private_events.h +++ b/ydb/core/tx/columnshard/columnshard_private_events.h @@ -32,7 +32,6 @@ struct TEvPrivate { struct TEvWriteIndex : public TEventLocal<TEvWriteIndex, EvWriteIndex> { NOlap::TVersionedIndex IndexInfo; std::shared_ptr<NOlap::TColumnEngineChanges> IndexChanges; - THashMap<TUnifiedBlobId, std::shared_ptr<arrow::RecordBatch>> CachedBlobs; bool GranuleCompaction{false}; TBlobBatch BlobBatch; TUsage ResourceUsage; @@ -42,11 +41,9 @@ struct TEvPrivate { TEvWriteIndex(NOlap::TVersionedIndex&& indexInfo, std::shared_ptr<NOlap::TColumnEngineChanges> indexChanges, - bool cacheData, - THashMap<TUnifiedBlobId, std::shared_ptr<arrow::RecordBatch>>&& cachedBlobs = {}) + bool cacheData) : IndexInfo(std::move(indexInfo)) , IndexChanges(indexChanges) - , CachedBlobs(std::move(cachedBlobs)) , CacheData(cacheData) { PutResult = std::make_shared<TBlobPutResult>(NKikimrProto::UNKNOWN); @@ -252,7 +249,7 @@ struct TEvPrivate { class TEvWriteBlobsResult : public TEventLocal<TEvWriteBlobsResult, EvWriteBlobsResult> { public: class TPutBlobData { - YDB_READONLY_DEF(TUnifiedBlobId, BlobId); + YDB_READONLY_DEF(TBlobRange, BlobRange); YDB_READONLY_DEF(TString, BlobData); YDB_READONLY_DEF(NKikimrTxColumnShard::TLogicalMetadata, LogicalMeta); YDB_ACCESSOR(ui64, RowsCount, 0); @@ -260,8 +257,8 @@ struct TEvPrivate { public: TPutBlobData() = default; - TPutBlobData(const TUnifiedBlobId& blobId, const TString& data, const NArrow::TFirstLastSpecialKeys& specialKeys, ui64 rowsCount, ui64 rawBytes, const TInstant dirtyTime) - : BlobId(blobId) + TPutBlobData(const TBlobRange& blobRange, const TString& data, const NArrow::TFirstLastSpecialKeys& specialKeys, ui64 rowsCount, ui64 rawBytes, const TInstant dirtyTime) + : BlobRange(blobRange) , BlobData(data) , RowsCount(rowsCount) , RawBytes(rawBytes) diff --git a/ydb/core/tx/columnshard/columnshard_schema.cpp b/ydb/core/tx/columnshard/columnshard_schema.cpp index 5f77776fec..251d734fdd 100644 --- a/ydb/core/tx/columnshard/columnshard_schema.cpp +++ b/ydb/core/tx/columnshard/columnshard_schema.cpp @@ -52,7 +52,7 @@ bool Schema::InsertTable_Load(NIceDb::TNiceDb& db, const IBlobGroupSelector* dsG if (metaStr) { Y_VERIFY(meta.ParseFromString(metaStr)); } - TInsertedData data(planStep, writeTxId, pathId, dedupId, blobId, meta, indexSnapshot); + TInsertedData data(planStep, writeTxId, pathId, dedupId, NOlap::TBlobRange(blobId, 0, blobId.BlobSize()), meta, indexSnapshot); switch (recType) { case EInsertTableIds::Inserted: diff --git a/ydb/core/tx/columnshard/columnshard_schema.h b/ydb/core/tx/columnshard/columnshard_schema.h index d1428d5935..e96920909e 100644 --- a/ydb/core/tx/columnshard/columnshard_schema.h +++ b/ydb/core/tx/columnshard/columnshard_schema.h @@ -440,7 +440,7 @@ struct Schema : NIceDb::Schema { static void InsertTable_Upsert(NIceDb::TNiceDb& db, EInsertTableIds recType, const TInsertedData& data) { Y_VERIFY(data.GetSchemaSnapshot().Valid()); db.Table<InsertTable>().Key((ui8)recType, data.PlanStep, data.WriteTxId, data.PathId, data.DedupId).Update( - NIceDb::TUpdate<InsertTable::BlobId>(data.BlobId.ToStringLegacy()), + NIceDb::TUpdate<InsertTable::BlobId>(data.GetBlobRange().GetBlobId().ToStringLegacy()), NIceDb::TUpdate<InsertTable::Meta>(data.GetMeta().SerializeToProto().SerializeAsString()), NIceDb::TUpdate<InsertTable::IndexPlanStep>(data.GetSchemaSnapshot().GetPlanStep()), NIceDb::TUpdate<InsertTable::IndexTxId>(data.GetSchemaSnapshot().GetTxId()) diff --git a/ydb/core/tx/columnshard/engines/changes/indexation.cpp b/ydb/core/tx/columnshard/engines/changes/indexation.cpp index 9426d81cab..a981ac7443 100644 --- a/ydb/core/tx/columnshard/engines/changes/indexation.cpp +++ b/ydb/core/tx/columnshard/engines/changes/indexation.cpp @@ -7,16 +7,12 @@ namespace NKikimr::NOlap { -THashMap<NKikimr::NOlap::TUnifiedBlobId, std::vector<NKikimr::NOlap::TBlobRange>> TInsertColumnEngineChanges::GetGroupedBlobRanges() const { +THashMap<TUnifiedBlobId, std::vector<TBlobRange>> TInsertColumnEngineChanges::GetGroupedBlobRanges() const { THashMap<TUnifiedBlobId, std::vector<TBlobRange>> result; for (size_t i = 0; i < DataToIndex.size(); ++i) { - auto& blobId = DataToIndex[i].BlobId; - if (CachedBlobs.contains(blobId)) { - continue; - } - Y_VERIFY(!result.contains(blobId)); - std::vector<TBlobRange> blobsVector = { NBlobCache::TBlobRange(blobId, 0, blobId.BlobSize()) }; - result.emplace(blobId, std::move(blobsVector)); + const auto& indsertedData = DataToIndex[i]; + Y_VERIFY(indsertedData.GetBlobRange().IsFullBlob()); + Y_VERIFY(result.emplace(indsertedData.GetBlobRange().GetBlobId(), TVector<TBlobRange>{ indsertedData.GetBlobRange() }).second); } return result; } @@ -30,9 +26,10 @@ bool TInsertColumnEngineChanges::DoApplyChanges(TColumnEngineForLogs& self, TApp void TInsertColumnEngineChanges::DoWriteIndex(NColumnShard::TColumnShard& self, TWriteIndexContext& context) { TBase::DoWriteIndex(self, context); - for (const auto& cmtd : DataToIndex) { - self.InsertTable->EraseCommitted(context.DBWrapper, cmtd); - self.BlobManager->DeleteBlob(cmtd.BlobId, *context.BlobManagerDb); + for (const auto& indsertedData : DataToIndex) { + self.InsertTable->EraseCommitted(context.DBWrapper, indsertedData); + Y_VERIFY(indsertedData.GetBlobRange().IsFullBlob()); + self.BlobManager->DeleteBlob(indsertedData.GetBlobRange().GetBlobId(), *context.BlobManagerDb); } if (!DataToIndex.empty()) { self.UpdateInsertTableCounters(); @@ -87,16 +84,14 @@ TConclusionStatus TInsertColumnEngineChanges::DoConstructBlobs(TConstructionCont THashMap<ui64, std::vector<std::shared_ptr<arrow::RecordBatch>>> pathBatches; for (auto& inserted : DataToIndex) { - TBlobRange blobRange(inserted.BlobId, 0, inserted.BlobId.BlobSize()); + const TBlobRange& blobRange = inserted.GetBlobRange(); auto blobSchema = context.SchemaVersions.GetSchema(inserted.GetSchemaSnapshot()); auto& indexInfo = blobSchema->GetIndexInfo(); Y_VERIFY(indexInfo.IsSorted()); std::shared_ptr<arrow::RecordBatch> batch; - if (auto it = CachedBlobs.find(inserted.BlobId); it != CachedBlobs.end()) { - batch = it->second; - } else if (auto* blobData = Blobs.FindPtr(blobRange)) { + if (auto* blobData = Blobs.FindPtr(blobRange)) { Y_VERIFY(!blobData->empty(), "Blob data not present"); // Prepare batch batch = NArrow::DeserializeBatch(*blobData, indexInfo.ArrowSchema()); diff --git a/ydb/core/tx/columnshard/engines/changes/indexation.h b/ydb/core/tx/columnshard/engines/changes/indexation.h index 3ae8b04433..00d4e8ae3e 100644 --- a/ydb/core/tx/columnshard/engines/changes/indexation.h +++ b/ydb/core/tx/columnshard/engines/changes/indexation.h @@ -24,7 +24,6 @@ protected: public: const TMark DefaultMark; THashMap<ui64, std::vector<std::pair<TMark, ui64>>> PathToGranule; // pathId -> {mark, granule} - THashMap<TUnifiedBlobId, std::shared_ptr<arrow::RecordBatch>> CachedBlobs; public: TInsertColumnEngineChanges(const TMark& defaultMark, std::vector<NOlap::TInsertedData>&& dataToIndex, const TSplitSettings& splitSettings) : TBase(splitSettings) diff --git a/ydb/core/tx/columnshard/engines/insert_table/data.h b/ydb/core/tx/columnshard/engines/insert_table/data.h index f812ef5944..327d323e06 100644 --- a/ydb/core/tx/columnshard/engines/insert_table/data.h +++ b/ydb/core/tx/columnshard/engines/insert_table/data.h @@ -9,8 +9,9 @@ namespace NKikimr::NOlap { struct TInsertedData { private: TInsertedDataMeta Meta; -public: + YDB_READONLY_DEF(TBlobRange, BlobRange); +public: const TInsertedDataMeta& GetMeta() const { return Meta; } @@ -19,27 +20,33 @@ public: ui64 WriteTxId = 0; ui64 PathId = 0; TString DedupId; - TUnifiedBlobId BlobId; TInsertedData() = delete; // avoid invalid TInsertedData anywhere - TInsertedData(ui64 planStep, ui64 writeTxId, ui64 pathId, TString dedupId, const TUnifiedBlobId& blobId, + TInsertedData(ui64 planStep, ui64 writeTxId, ui64 pathId, TString dedupId, const TBlobRange& blobRange, const NKikimrTxColumnShard::TLogicalMetadata& proto, const TSnapshot& schemaVersion) : Meta(proto) + , BlobRange(blobRange) , PlanStep(planStep) , WriteTxId(writeTxId) , PathId(pathId) , DedupId(dedupId) - , BlobId(blobId) , SchemaVersion(schemaVersion) { Y_VERIFY(SchemaVersion.Valid()); } - TInsertedData(ui64 writeTxId, ui64 pathId, TString dedupId, const TUnifiedBlobId& blobId, + TInsertedData(ui64 writeTxId, ui64 pathId, TString dedupId, const TBlobRange& blobRange, const NKikimrTxColumnShard::TLogicalMetadata& proto, const TSnapshot& schemaVersion) - : TInsertedData(0, writeTxId, pathId, dedupId, blobId, proto, schemaVersion) + : TInsertedData(0, writeTxId, pathId, dedupId, blobRange, proto, schemaVersion) {} + + TInsertedData(ui64 writeTxId, ui64 pathId, TString dedupId, const TUnifiedBlobId& blobId, + const NKikimrTxColumnShard::TLogicalMetadata& proto, const TSnapshot& schemaVersion) + : TInsertedData(0, writeTxId, pathId, dedupId, TBlobRange(blobId, 0, blobId.BlobSize()), proto, schemaVersion) + { + } + bool operator < (const TInsertedData& key) const { if (PlanStep < key.PlanStep) { return true; @@ -100,7 +107,7 @@ public: return SchemaVersion; } - ui32 BlobSize() const { return BlobId.BlobSize(); } + ui32 BlobSize() const { return BlobRange.GetBlobSize(); } private: TSnapshot SchemaVersion = TSnapshot::Zero(); @@ -108,7 +115,7 @@ private: class TCommittedBlob { private: - TUnifiedBlobId BlobId; + TBlobRange BlobRange; TSnapshot CommitSnapshot; TSnapshot SchemaSnapshot; YDB_READONLY_DEF(std::optional<NArrow::TReplaceKey>, First); @@ -124,8 +131,8 @@ public: return *Last; } - TCommittedBlob(const TUnifiedBlobId& blobId, const TSnapshot& snapshot, const TSnapshot& schemaSnapshot, const std::optional<NArrow::TReplaceKey>& first, const std::optional<NArrow::TReplaceKey>& last) - : BlobId(blobId) + TCommittedBlob(const TBlobRange& blobRange, const TSnapshot& snapshot, const TSnapshot& schemaSnapshot, const std::optional<NArrow::TReplaceKey>& first, const std::optional<NArrow::TReplaceKey>& last) + : BlobRange(blobRange) , CommitSnapshot(snapshot) , SchemaSnapshot(schemaSnapshot) , First(first) @@ -134,10 +141,10 @@ public: /// It uses trick then we place key wtih planStep:txId in container and find them later by BlobId only. /// So hash() and equality should depend on BlobId only. - bool operator == (const TCommittedBlob& key) const { return BlobId == key.BlobId; } - ui64 Hash() const noexcept { return BlobId.Hash(); } + bool operator == (const TCommittedBlob& key) const { return BlobRange == key.BlobRange; } + ui64 Hash() const noexcept { return BlobRange.Hash(); } TString DebugString() const { - return TStringBuilder() << BlobId << ";ps=" << CommitSnapshot.GetPlanStep() << ";ti=" << CommitSnapshot.GetTxId(); + return TStringBuilder() << BlobRange << ";ps=" << CommitSnapshot.GetPlanStep() << ";ti=" << CommitSnapshot.GetTxId(); } const TSnapshot& GetSnapshot() const { @@ -148,8 +155,8 @@ public: return SchemaSnapshot; } - const TUnifiedBlobId& GetBlobId() const { - return BlobId; + const TBlobRange& GetBlobRange() const { + return BlobRange; } }; diff --git a/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp b/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp index 628dc73cad..c5d9d3714d 100644 --- a/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp +++ b/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp @@ -125,7 +125,7 @@ std::vector<TCommittedBlob> TInsertTable::Read(ui64 pathId, const TSnapshot& sna std::vector<TCommittedBlob> result; result.reserve(ret.size()); for (auto&& i : ret) { - result.emplace_back(TCommittedBlob(i->BlobId, i->GetSnapshot(), i->GetSchemaSnapshot(), i->GetMeta().GetMin(pkSchema), i->GetMeta().GetMax(pkSchema))); + result.emplace_back(TCommittedBlob(i->GetBlobRange(), i->GetSnapshot(), i->GetSchemaSnapshot(), i->GetMeta().GetMin(pkSchema), i->GetMeta().GetMax(pkSchema))); } return result; diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp index 3b7ba86529..ea035e3cc0 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp @@ -121,7 +121,7 @@ void TCommittedDataSource::DoFetch() { if (!ReadStarted) { Y_VERIFY(!ResultReady); ReadStarted = true; - ReadData.AddBlobForFetch(GetSourceIdx(), TBlobRange(CommittedBlob.GetBlobId(), 0, CommittedBlob.GetBlobId().BlobSize())); + ReadData.AddBlobForFetch(GetSourceIdx(), CommittedBlob.GetBlobRange()); } } diff --git a/ydb/core/tx/columnshard/engines/reader/read_metadata.h b/ydb/core/tx/columnshard/engines/reader/read_metadata.h index 9faa66ee63..4bf3b64841 100644 --- a/ydb/core/tx/columnshard/engines/reader/read_metadata.h +++ b/ydb/core/tx/columnshard/engines/reader/read_metadata.h @@ -147,7 +147,6 @@ public: } std::shared_ptr<TSelectInfo> SelectInfo; std::vector<TCommittedBlob> CommittedBlobs; - THashMap<TUnifiedBlobId, std::shared_ptr<arrow::RecordBatch>> CommittedBatches; std::shared_ptr<TReadStats> ReadStats; const TSnapshot& GetSnapshot() const { @@ -175,7 +174,7 @@ public: } return IndexVersions.GetSchema(version); } - + ISnapshotSchema::TPtr GetLoadSchema(const std::optional<TSnapshot>& version = {}) const { if (!version) { if (!EmptyVersionSchemaCache) { diff --git a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp index 7d0aff6a34..6f563f38ff 100644 --- a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp +++ b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp @@ -292,6 +292,11 @@ TCompactionLimits TestLimits() { bool Insert(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap, std::vector<TInsertedData>&& dataToIndex, THashMap<TBlobRange, TString>& blobs, ui32& step) { + + for (ui32 i = 0; i < dataToIndex.size(); ++i) { + // Commited data always has nonzero planstep (for WriteLoadRead tests) + dataToIndex[i].PlanStep = i + 1; + }; std::shared_ptr<TInsertColumnEngineChanges> changes = engine.StartInsert(std::move(dataToIndex)); if (!changes) { return false; @@ -419,8 +424,8 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { engine.Load(db, lostBlobs); std::vector<TInsertedData> dataToIndex = { - TInsertedData(1, 2, paths[0], "", blobRanges[0].BlobId, {}, indexSnaphot), - TInsertedData(2, 1, paths[0], "", blobRanges[1].BlobId, {}, indexSnaphot) + TInsertedData(2, paths[0], "", blobRanges[0].BlobId, {}, indexSnaphot), + TInsertedData(1, paths[0], "", blobRanges[1].BlobId, {}, indexSnaphot) }; // write @@ -430,7 +435,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { blobs[blobRanges[0]] = testBlob; blobs[blobRanges[1]] = testBlob; Insert(engine, db, TSnapshot(1, 2), std::move(dataToIndex), blobs, step); - + // selects auto lastSchema = engine.GetVersionedIndex().GetLastSchema(); @@ -515,7 +520,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { // PlanStep, TxId, PathId, DedupId, BlobId, Data, [Metadata] std::vector<TInsertedData> dataToIndex; dataToIndex.push_back( - TInsertedData{planStep, txId, pathId, "", blobRange.BlobId, {}, indexSnapshot}); + TInsertedData{txId, pathId, "", blobRange.BlobId, {}, indexSnapshot}); bool ok = Insert(engine, db, TSnapshot(planStep, txId), std::move(dataToIndex), blobs, step); UNIT_ASSERT(ok); @@ -611,7 +616,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { // PlanStep, TxId, PathId, DedupId, BlobId, Data, [Metadata] std::vector<TInsertedData> dataToIndex; dataToIndex.push_back( - TInsertedData{planStep, txId, pathId, "", blobRange.BlobId, {}, indexSnapshot}); + TInsertedData{txId, pathId, "", blobRange.BlobId, {}, indexSnapshot}); bool ok = Insert(engine, db, TSnapshot(planStep, txId), std::move(dataToIndex), blobs, step); UNIT_ASSERT(ok); @@ -640,7 +645,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { // PlanStep, TxId, PathId, DedupId, BlobId, Data, [Metadata] std::vector<TInsertedData> dataToIndex; dataToIndex.push_back( - TInsertedData(planStep, txId, pathId, "", blobRange.BlobId, {}, indexSnapshot)); + TInsertedData(txId, pathId, "", blobRange.BlobId, {}, indexSnapshot)); bool ok = Insert(engine, db, TSnapshot(planStep, txId), std::move(dataToIndex), blobs, step); UNIT_ASSERT(ok); @@ -680,7 +685,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { // PlanStep, TxId, PathId, DedupId, BlobId, Data, [Metadata] std::vector<TInsertedData> dataToIndex; dataToIndex.push_back( - TInsertedData(planStep, txId, pathId, "", blobRange.BlobId, {}, indexSnapshot)); + TInsertedData(txId, pathId, "", blobRange.BlobId, {}, indexSnapshot)); bool ok = Insert(engine, db, TSnapshot(planStep, txId), std::move(dataToIndex), blobs, step); UNIT_ASSERT(ok); diff --git a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp index fa1f3bbac9..acb8596d8e 100644 --- a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp +++ b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp @@ -25,7 +25,7 @@ IBlobConstructor::EStatus TIndexedWriteController::TBlobConstructor::BuildNext() bool TIndexedWriteController::TBlobConstructor::RegisterBlobId(const TUnifiedBlobId& blobId) { const auto& blobInfo = BlobsSplitted[CurrentIndex - 1]; - Owner.BlobData.emplace_back(blobId, blobInfo.GetData(), blobInfo.GetSpecialKeys(), blobInfo.GetRowsCount(), blobInfo.GetRawBytes(), AppData()->TimeProvider->Now()); + Owner.BlobData.emplace_back(TBlobRange(blobId, 0, blobId.BlobSize()), blobInfo.GetData(), blobInfo.GetSpecialKeys(), blobInfo.GetRowsCount(), blobInfo.GetRawBytes(), AppData()->TimeProvider->Now()); return true; } diff --git a/ydb/core/tx/columnshard/indexing_actor.cpp b/ydb/core/tx/columnshard/indexing_actor.cpp index c0fbf8dcc8..e1dcded8cb 100644 --- a/ydb/core/tx/columnshard/indexing_actor.cpp +++ b/ydb/core/tx/columnshard/indexing_actor.cpp @@ -32,7 +32,6 @@ public: Y_VERIFY(TxEvent); auto indexChanges = dynamic_pointer_cast<NOlap::TInsertColumnEngineChanges>(TxEvent->IndexChanges); Y_VERIFY(indexChanges); - indexChanges->CachedBlobs = std::move(TxEvent->CachedBlobs); for (auto& [blobId, ranges] : event.GroupedBlobRanges) { Y_VERIFY(ranges.size() == 1); diff --git a/ydb/core/tx/columnshard/inflight_request_tracker.h b/ydb/core/tx/columnshard/inflight_request_tracker.h index 3ef8ebf6f8..eca4f203d5 100644 --- a/ydb/core/tx/columnshard/inflight_request_tracker.h +++ b/ydb/core/tx/columnshard/inflight_request_tracker.h @@ -1,8 +1,7 @@ #pragma once #include "blob.h" -#include "engines/reader/read_metadata.h" -#include "engines/column_engine.h" +#include <ydb/core/tx/columnshard/engines/reader/read_metadata.h> namespace NKikimr::NColumnShard { @@ -59,8 +58,8 @@ public: } for (const auto& committedBlob : readMeta->CommittedBlobs) { - if (blobTracker.SetBlobInUse(committedBlob.GetBlobId(), false)) { - freedBlobs.emplace(committedBlob.GetBlobId()); + if (blobTracker.SetBlobInUse(committedBlob.GetBlobRange().GetBlobId(), false)) { + freedBlobs.emplace(committedBlob.GetBlobRange().GetBlobId()); } } } @@ -103,7 +102,7 @@ private: } for (const auto& committedBlob : readMeta->CommittedBlobs) { - blobTracker.SetBlobInUse(committedBlob.GetBlobId(), true); + blobTracker.SetBlobInUse(committedBlob.GetBlobRange().GetBlobId(), true); } } diff --git a/ydb/core/tx/columnshard/operations/write.cpp b/ydb/core/tx/columnshard/operations/write.cpp index cab7325042..8a7850081a 100644 --- a/ydb/core/tx/columnshard/operations/write.cpp +++ b/ydb/core/tx/columnshard/operations/write.cpp @@ -82,7 +82,7 @@ namespace NKikimr::NColumnShard { auto allAborted = owner.InsertTable->GetAborted(); for (auto& [abortedWriteId, abortedData] : allAborted) { owner.InsertTable->EraseAborted(dbTable, abortedData); - owner.BlobManager->DeleteBlob(abortedData.BlobId, blobManagerDb); + owner.BlobManager->DeleteBlob(abortedData.GetBlobRange().GetBlobId(), blobManagerDb); } } |