diff options
author | chertus <azuikov@ydb.tech> | 2022-09-16 17:33:24 +0300 |
---|---|---|
committer | chertus <azuikov@ydb.tech> | 2022-09-16 17:33:24 +0300 |
commit | 863f8001a8c5bdf25eb95edd587063f5deb93f8f (patch) | |
tree | dc75dde0be3cf05f5cd3d4b01aa114d30e9ac9d9 | |
parent | 3bae125f8c3520947187a60ce193b839064aaf1b (diff) | |
download | ydb-863f8001a8c5bdf25eb95edd587063f5deb93f8f.tar.gz |
add cache for RecordBatches from InsertTable
19 files changed, 201 insertions, 23 deletions
diff --git a/ydb/core/formats/arrow_helpers.cpp b/ydb/core/formats/arrow_helpers.cpp index c83b3429ae..388c460413 100644 --- a/ydb/core/formats/arrow_helpers.cpp +++ b/ydb/core/formats/arrow_helpers.cpp @@ -390,7 +390,25 @@ std::shared_ptr<arrow::RecordBatch> ExtractColumns(const std::shared_ptr<arrow:: return arrow::RecordBatch::Make(dstSchema, srcBatch->num_rows(), columns); } +std::shared_ptr<arrow::RecordBatch> ExtractExistedColumns(const std::shared_ptr<arrow::RecordBatch>& srcBatch, + const arrow::FieldVector& fieldsToExtract) { + std::vector<std::shared_ptr<arrow::Field>> fields; + fields.reserve(fieldsToExtract.size()); + std::vector<std::shared_ptr<arrow::Array>> columns; + columns.reserve(fieldsToExtract.size()); + + auto srcSchema = srcBatch->schema(); + for (auto& fldToExtract : fieldsToExtract) { + auto& name = fldToExtract->name(); + auto field = srcSchema->GetFieldByName(name); + if (field && field->type()->Equals(fldToExtract->type())) { + fields.push_back(field); + columns.push_back(srcBatch->GetColumnByName(name)); + } + } + return arrow::RecordBatch::Make(std::make_shared<arrow::Schema>(fields), srcBatch->num_rows(), columns); +} std::shared_ptr<arrow::Table> CombineInTable(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches) { auto res = arrow::Table::FromRecordBatches(batches); @@ -568,6 +586,15 @@ bool IsSortedAndUnique(const std::shared_ptr<arrow::RecordBatch>& batch, } } +bool HasAllColumns(const std::shared_ptr<arrow::RecordBatch>& batch, const std::shared_ptr<arrow::Schema>& schema) { + for (auto& field : schema->fields()) { + if (batch->schema()->GetFieldIndex(field->name()) < 0) { + return false; + } + } + return true; +} + std::vector<std::unique_ptr<arrow::ArrayBuilder>> MakeBuilders(const std::shared_ptr<arrow::Schema>& schema, size_t reserve) { std::vector<std::unique_ptr<arrow::ArrayBuilder>> builders; diff --git a/ydb/core/formats/arrow_helpers.h b/ydb/core/formats/arrow_helpers.h index 2f6406b68e..bfe8fb88a3 100644 --- a/ydb/core/formats/arrow_helpers.h +++ b/ydb/core/formats/arrow_helpers.h @@ -69,6 +69,14 @@ std::shared_ptr<arrow::RecordBatch> ExtractColumns(const std::shared_ptr<arrow:: std::shared_ptr<arrow::RecordBatch> ExtractColumns(const std::shared_ptr<arrow::RecordBatch>& srcBatch, const std::shared_ptr<arrow::Schema>& dstSchema, bool addNotExisted = false); +std::shared_ptr<arrow::RecordBatch> ExtractExistedColumns(const std::shared_ptr<arrow::RecordBatch>& srcBatch, + const arrow::FieldVector& fields); +inline std::shared_ptr<arrow::RecordBatch> ExtractExistedColumns(const std::shared_ptr<arrow::RecordBatch>& srcBatch, + const std::shared_ptr<arrow::Schema>& dstSchema) +{ + return ExtractExistedColumns(srcBatch, dstSchema->fields()); +} + std::shared_ptr<arrow::Table> CombineInTable(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches); std::shared_ptr<arrow::RecordBatch> ToBatch(const std::shared_ptr<arrow::Table>& combinedTable); std::shared_ptr<arrow::RecordBatch> CombineBatches(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches); @@ -114,6 +122,7 @@ bool IsSorted(const std::shared_ptr<arrow::RecordBatch>& batch, bool IsSortedAndUnique(const std::shared_ptr<arrow::RecordBatch>& batch, const std::shared_ptr<arrow::Schema>& sortingKey, bool desc = false); +bool HasAllColumns(const std::shared_ptr<arrow::RecordBatch>& batch, const std::shared_ptr<arrow::Schema>& schema); template <typename TArr> std::shared_ptr<TArr> GetTypedColumn(const std::shared_ptr<arrow::RecordBatch>& batch, int pos) { diff --git a/ydb/core/tx/columnshard/columnshard.cpp b/ydb/core/tx/columnshard/columnshard.cpp index dcd41a0c3e..f0f3d56e49 100644 --- a/ydb/core/tx/columnshard/columnshard.cpp +++ b/ydb/core/tx/columnshard/columnshard.cpp @@ -243,6 +243,7 @@ ui64 TColumnShard::MemoryUsage() const { if (PrimaryIndex) { memory += PrimaryIndex->MemoryUsage(); } + memory += BatchCache.Bytes(); return memory; } diff --git a/ydb/core/tx/columnshard/columnshard.h b/ydb/core/tx/columnshard/columnshard.h index cb6ca9a92b..cbfb482f74 100644 --- a/ydb/core/tx/columnshard/columnshard.h +++ b/ydb/core/tx/columnshard/columnshard.h @@ -209,6 +209,7 @@ struct TEvColumnShard { NKikimrProto::EReplyStatus PutStatus = NKikimrProto::UNKNOWN; NColumnShard::TUnifiedBlobId BlobId; + std::shared_ptr<arrow::RecordBatch> WrittenBatch; NColumnShard::TBlobBatch BlobBatch; NColumnShard::TUsage ResourceUsage; TVector<ui32> YellowMoveChannels; diff --git a/ydb/core/tx/columnshard/columnshard__index_scan.h b/ydb/core/tx/columnshard/columnshard__index_scan.h index b2ca6c986f..819809a15b 100644 --- a/ydb/core/tx/columnshard/columnshard__index_scan.h +++ b/ydb/core/tx/columnshard/columnshard__index_scan.h @@ -53,6 +53,16 @@ public: GranuleBlobs[granule].insert(blobId); } + // Add cached batches without read + for (auto& [blobId, batch] : ReadMetadata->CommittedBatches) { + auto cmt = WaitCommitted.extract(NOlap::TCommittedBlob{blobId, 0, 0}); + Y_VERIFY(!cmt.empty()); + + const NOlap::TCommittedBlob& cmtBlob = cmt.key(); + ui32 batchNo = cmt.mapped(); + IndexedData.AddNotIndexed(batchNo, batch, cmtBlob.PlanStep, cmtBlob.TxId); + } + // Read all committed blobs for (const auto& cmtBlob : ReadMetadata->CommittedBlobs) { auto& blobId = cmtBlob.BlobId; diff --git a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp index baae719ab5..28d396b613 100644 --- a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp +++ b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp @@ -84,6 +84,12 @@ public: TBlobGroupSelector dsGroupSelector(Self->Info()); NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector); + + // CacheInserted -> CacheCommitted + for (auto& writeId : meta.WriteIds) { + Self->BatchCache.Commit(writeId); + } + auto counters = Self->InsertTable->Commit(dbTable, step, txId, meta.MetaShard, meta.WriteIds); Self->IncCounter(COUNTER_BLOBS_COMMITTED, counters.Rows); Self->IncCounter(COUNTER_BYTES_COMMITTED, counters.Bytes); diff --git a/ydb/core/tx/columnshard/columnshard__read.cpp b/ydb/core/tx/columnshard/columnshard__read.cpp index e0f8183905..f6d08e7f32 100644 --- a/ydb/core/tx/columnshard/columnshard__read.cpp +++ b/ydb/core/tx/columnshard/columnshard__read.cpp @@ -41,6 +41,7 @@ std::shared_ptr<NOlap::TReadMetadata> TTxReadBase::PrepareReadMetadata(const TActorContext& ctx, const TReadDescription& read, const std::unique_ptr<NOlap::TInsertTable>& insertTable, const std::unique_ptr<NOlap::IColumnEngine>& index, + const TBatchCache& batchCache, TString& error) const { Y_UNUSED(ctx); @@ -85,6 +86,11 @@ TTxReadBase::PrepareReadMetadata(const TActorContext& ctx, const TReadDescriptio // insert table out.CommittedBlobs = insertTable->Read(read.PathId, read.PlanStep, read.TxId); + for (auto& cmt : out.CommittedBlobs) { + if (auto batch = batchCache.Get(cmt.BlobId)) { + out.CommittedBatches.emplace(cmt.BlobId, batch); + } + } // index @@ -220,7 +226,7 @@ bool TTxReadBase::ParseProgram(const TActorContext& ctx, NKikimrSchemeOp::EOlapP if (!ssaProgramSteps->Program.empty() && Self->PrimaryIndex) { ssaProgramSteps->Program = NKikimr::NSsaOptimizer::OptimizeProgram(ssaProgramSteps->Program, Self->PrimaryIndex->GetIndexInfo()); } - + read.Program = ssaProgramSteps->Program; read.ProgramSourceColumns = ssaProgramSteps->ProgramSourceColumns; return true; @@ -269,7 +275,8 @@ bool TTxRead::Execute(TTransactionContext& txc, const TActorContext& ctx) { std::shared_ptr<NOlap::TReadMetadata> metadata; if (parseResult) { - metadata = PrepareReadMetadata(ctx, read, Self->InsertTable, Self->PrimaryIndex, ErrorDescription); + metadata = PrepareReadMetadata(ctx, read, Self->InsertTable, Self->PrimaryIndex, Self->BatchCache, + ErrorDescription); } ui32 status = NKikimrTxColumnShard::EResultStatus::ERROR; diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp index 93fd867343..7ab2a7d7ae 100644 --- a/ydb/core/tx/columnshard/columnshard__scan.cpp +++ b/ydb/core/tx/columnshard/columnshard__scan.cpp @@ -583,7 +583,8 @@ std::shared_ptr<NOlap::TReadMetadataBase> TTxScan::CreateReadMetadata(const TAct if (indexStats) { metadata = PrepareStatsReadMetadata(Self->TabletID(), read, Self->PrimaryIndex, ErrorDescription); } else { - metadata = PrepareReadMetadata(ctx, read, Self->InsertTable, Self->PrimaryIndex, ErrorDescription); + metadata = PrepareReadMetadata(ctx, read, Self->InsertTable, Self->PrimaryIndex, Self->BatchCache, + ErrorDescription); } if (!metadata) { diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp index 24b624721a..7fff1e61f7 100644 --- a/ydb/core/tx/columnshard/columnshard__write.cpp +++ b/ydb/core/tx/columnshard/columnshard__write.cpp @@ -62,7 +62,9 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) { // First write wins TBlobGroupSelector dsGroupSelector(Self->Info()); NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector); - ok = Self->InsertTable->Insert(dbTable, NOlap::TInsertedData(metaShard, writeId, tableId, dedupId, logoBlobId, metaStr, time)); + + NOlap::TInsertedData insertData(metaShard, writeId, tableId, dedupId, logoBlobId, metaStr, time); + ok = Self->InsertTable->Insert(dbTable, std::move(insertData)); if (ok) { auto writesToAbort = Self->InsertTable->OldWritesToAbort(time); std::vector<TWriteId> failedAborts; @@ -70,6 +72,7 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) { if (!Self->RemoveLongTxWrite(db, writeId)) { failedAborts.push_back(writeId); } + Self->BatchCache.EraseInserted(TWriteId(writeId)); } for (auto& writeId : failedAborts) { writesToAbort.erase(writeId); @@ -87,10 +90,14 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) { Self->BlobManager->DeleteBlob(abortedData.BlobId, blobManagerDb); } - // Put new data into cache + // Put new data into blob cache Y_VERIFY(logoBlobId.BlobSize() == data.size()); NBlobCache::AddRangeToCache(NBlobCache::TBlobRange(logoBlobId, 0, data.size()), data); + // Put new data into batch cache + Y_VERIFY(Ev->Get()->WrittenBatch); + Self->BatchCache.Insert(TWriteId(writeId), logoBlobId, Ev->Get()->WrittenBatch); + Self->UpdateInsertTableCounters(); ui64 blobsWritten = Ev->Get()->BlobBatch.GetBlobCount(); diff --git a/ydb/core/tx/columnshard/columnshard__write_index.cpp b/ydb/core/tx/columnshard/columnshard__write_index.cpp index e023368504..f18e101b94 100644 --- a/ydb/core/tx/columnshard/columnshard__write_index.cpp +++ b/ydb/core/tx/columnshard/columnshard__write_index.cpp @@ -64,6 +64,7 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx) for (const auto& cmtd : changes->DataToIndex) { Self->InsertTable->EraseCommitted(dbWrap, cmtd); Self->BlobManager->DeleteBlob(cmtd.BlobId, blobManagerDb); + Self->BatchCache.EraseCommitted(cmtd.BlobId); } if (!changes->DataToIndex.empty()) { Self->UpdateInsertTableCounters(); diff --git a/ydb/core/tx/columnshard/columnshard_common.h b/ydb/core/tx/columnshard/columnshard_common.h index 80a9e4599c..a1a0cd25d8 100644 --- a/ydb/core/tx/columnshard/columnshard_common.h +++ b/ydb/core/tx/columnshard/columnshard_common.h @@ -1,8 +1,10 @@ #pragma once +#include "defs.h" #include <ydb/core/formats/arrow_helpers.h> #include <ydb/core/scheme/scheme_tabledefs.h> #include <ydb/core/protos/ssa.pb.h> #include <ydb/core/tx/columnshard/engines/predicate.h> +#include <library/cpp/cache/cache.h> namespace NKikimr::NOlap { struct TIndexInfo; @@ -51,4 +53,72 @@ struct TReadDescription { std::shared_ptr<NArrow::TSsaProgramSteps> AddProgram(const IColumnResolver& columnResolver, const NKikimrSSA::TProgram& program); }; +class TBatchCache { +public: + using TUnifiedBlobId = NOlap::TUnifiedBlobId; + + static constexpr ui32 MAX_COMMITTED_COUNT = 2 * TLimits::MIN_SMALL_BLOBS_TO_INSERT; + static constexpr ui32 MAX_INSERTED_COUNT = 2 * TLimits::MIN_SMALL_BLOBS_TO_INSERT; + static constexpr ui64 MAX_TOTAL_SIZE = 2 * TLimits::MIN_BYTES_TO_INSERT; + + TBatchCache() + : Inserted(MAX_INSERTED_COUNT) + , Committed(MAX_COMMITTED_COUNT) + {} + + void Insert(TWriteId writeId, const TUnifiedBlobId& blobId, std::shared_ptr<arrow::RecordBatch>& batch) { + if (Bytes() + blobId.BlobSize() > MAX_TOTAL_SIZE) { + return; + } + InsertedBytes += blobId.BlobSize(); + Inserted.Insert(writeId, {blobId, batch}); + } + + void Commit(TWriteId writeId) { + auto it = Inserted.FindWithoutPromote(writeId); + if (it != Inserted.End()) { + auto& blobId = it->first; + InsertedBytes -= blobId.BlobSize(); + CommittedBytes += blobId.BlobSize(); + + Committed.Insert(blobId, it->second); + Inserted.Erase(it); + } + } + + void EraseInserted(TWriteId writeId) { + auto it = Inserted.FindWithoutPromote(writeId); + if (it != Inserted.End()) { + InsertedBytes -= (*it).first.BlobSize(); + Inserted.Erase(it); + } + } + + void EraseCommitted(const TUnifiedBlobId& blobId) { + auto it = Committed.FindWithoutPromote(blobId); + if (it != Committed.End()) { + CommittedBytes -= blobId.BlobSize(); + Committed.Erase(it); + } + } + + std::shared_ptr<arrow::RecordBatch> Get(const TUnifiedBlobId& blobId) const { + auto it = Committed.Find(blobId); + if (it != Committed.End()) { + return *it; + } + return {}; + } + + ui64 Bytes() const { + return InsertedBytes + CommittedBytes; + } + +private: + TLRUCache<TWriteId, std::pair<TUnifiedBlobId, std::shared_ptr<arrow::RecordBatch>>> Inserted; + mutable TLRUCache<TUnifiedBlobId, std::shared_ptr<arrow::RecordBatch>> Committed; + ui64 InsertedBytes{0}; + ui64 CommittedBytes{0}; +}; + } diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index d469f0bc59..a891c7f169 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -603,10 +603,16 @@ std::unique_ptr<TEvPrivate::TEvIndexing> TColumnShard::SetupIndexation() { } if (bytesToIndex < (ui64)Limits.MinInsertBytes && blobs < TLimits::MIN_SMALL_BLOBS_TO_INSERT) { - LOG_S_DEBUG("Indexing not started: less data (" << bytesToIndex << " bytes in " << blobs << " blobs, ignored " - << ignored << ") then MIN_BYTES_TO_INSERT / MIN_SMALL_BLOBS_TO_INSERT at tablet " << TabletID()); - return {}; + LOG_S_DEBUG("Few data for indexation (" << bytesToIndex << " bytes in " << blobs << " blobs, ignored " + << ignored << ") at tablet " << TabletID()); + + // Force small indexations simetimes to keep BatchCache smaller + if (!bytesToIndex || SkippedIndexations < TSettings::MAX_INDEXATIONS_TO_SKIP) { + ++SkippedIndexations; + return {}; + } } + SkippedIndexations = 0; LOG_S_DEBUG("Prepare indexing " << bytesToIndex << " bytes in " << dataToIndex.size() << " batches of committed " << size << " bytes in " << blobs << " blobs ignored " << ignored diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h index b7e49660e9..f791eee354 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.h +++ b/ydb/core/tx/columnshard/columnshard_impl.h @@ -40,6 +40,8 @@ IActor* CreateS3Actor(ui64 tabletId, const TActorId& parent, const TString& tier #endif struct TSettings { + static constexpr ui32 MAX_INDEXATIONS_TO_SKIP = 16; + TControlWrapper BlobWriteGrouppingEnabled; TControlWrapper CacheDataAfterIndexing; TControlWrapper CacheDataAfterCompaction; @@ -338,6 +340,7 @@ private: ui64 StorePathId = 0; ui64 StatsReportRound = 0; ui64 BackgroundActivation = 0; + ui32 SkippedIndexations = TSettings::MAX_INDEXATIONS_TO_SKIP; // Force indexation on tablet init TIntrusivePtr<TMediatorTimecastEntry> MediatorTimeCastEntry; bool MediatorTimeCastRegistered = false; @@ -360,6 +363,7 @@ private: std::unique_ptr<NTabletPipe::IClientCache> PipeClientCache; std::unique_ptr<NOlap::TInsertTable> InsertTable; std::unique_ptr<NOlap::IColumnEngine> PrimaryIndex; + TBatchCache BatchCache; THashMap<TString, TTierConfig> TierConfigs; THashSet<NOlap::TUnifiedBlobId> DelayedForgetBlobs; TTtl Ttl; diff --git a/ydb/core/tx/columnshard/columnshard_txs.h b/ydb/core/tx/columnshard/columnshard_txs.h index 4616ae298a..02594bb6d0 100644 --- a/ydb/core/tx/columnshard/columnshard_txs.h +++ b/ydb/core/tx/columnshard/columnshard_txs.h @@ -265,6 +265,7 @@ protected: const TReadDescription& readDescription, const std::unique_ptr<NOlap::TInsertTable>& insertTable, const std::unique_ptr<NOlap::IColumnEngine>& index, + const TBatchCache& batchCache, TString& error) const; protected: diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp index d2693d0e7c..bbc8823f09 100644 --- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp +++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp @@ -270,20 +270,22 @@ void TIndexedReadData::UpdateGranuleWaits(ui32 batchNo) { } std::shared_ptr<arrow::RecordBatch> -TIndexedReadData::MakeNotIndexedBatch(const TString& blob, ui64 planStep, ui64 txId) const { - auto batch = NArrow::DeserializeBatch(blob, ReadMetadata->BlobSchema); - Y_VERIFY(batch); +TIndexedReadData::MakeNotIndexedBatch(const std::shared_ptr<arrow::RecordBatch>& srcBatch, + ui64 planStep, ui64 txId) const { + Y_VERIFY(srcBatch); - batch = TIndexInfo::AddSpecialColumns(batch, planStep, txId); - Y_VERIFY(batch); + // Extract columns (without check), filter, attach snapshot, extract columns with check + // (do not filter snapshot columns) - batch = NArrow::ExtractColumns(batch, ReadMetadata->LoadSchema); + auto batch = NArrow::ExtractExistedColumns(srcBatch, ReadMetadata->LoadSchema); Y_VERIFY(batch); { // Apply predicate // TODO: Extract this info function std::vector<bool> less; if (ReadMetadata->LessPredicate) { + Y_VERIFY(NArrow::HasAllColumns(batch, ReadMetadata->LessPredicate->Batch->schema())); + auto cmpType = ReadMetadata->LessPredicate->Inclusive ? NArrow::ECompareType::LESS_OR_EQUAL : NArrow::ECompareType::LESS; less = NArrow::MakePredicateFilter(batch, ReadMetadata->LessPredicate->Batch, cmpType); @@ -291,6 +293,8 @@ TIndexedReadData::MakeNotIndexedBatch(const TString& blob, ui64 planStep, ui64 t std::vector<bool> greater; if (ReadMetadata->GreaterPredicate) { + Y_VERIFY(NArrow::HasAllColumns(batch, ReadMetadata->GreaterPredicate->Batch->schema())); + auto cmpType = ReadMetadata->GreaterPredicate->Inclusive ? NArrow::ECompareType::GREATER_OR_EQUAL : NArrow::ECompareType::GREATER; greater = NArrow::MakePredicateFilter(batch, ReadMetadata->GreaterPredicate->Batch, cmpType); @@ -305,6 +309,10 @@ TIndexedReadData::MakeNotIndexedBatch(const TString& blob, ui64 planStep, ui64 t } } + batch = TIndexInfo::AddSpecialColumns(batch, planStep, txId); + Y_VERIFY(batch); + + batch = NArrow::ExtractColumns(batch, ReadMetadata->LoadSchema); Y_VERIFY(batch); return batch; } diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.h b/ydb/core/tx/columnshard/engines/indexed_read_data.h index 734e152144..0c7818bbba 100644 --- a/ydb/core/tx/columnshard/engines/indexed_read_data.h +++ b/ydb/core/tx/columnshard/engines/indexed_read_data.h @@ -81,6 +81,7 @@ struct TReadMetadata : public TReadMetadataBase, public std::enable_shared_from_ ui64 TxId = 0; std::shared_ptr<TSelectInfo> SelectInfo; std::vector<TCommittedBlob> CommittedBlobs; + THashMap<TUnifiedBlobId, std::shared_ptr<arrow::RecordBatch>> CommittedBatches; std::shared_ptr<TReadStats> ReadStats; TReadMetadata(const TIndexInfo& info) @@ -194,12 +195,17 @@ public: /// @returns batches and corresponding last keys in correct order (i.e. sorted by by PK) TVector<TPartialReadResult> GetReadyResults(const int64_t maxRowsInBatch); - void AddNotIndexed(ui32 batchNo, TString serializedBach, ui64 planStep, ui64 txId) { + void AddNotIndexed(ui32 batchNo, TString blob, ui64 planStep, ui64 txId) { + auto batch = NArrow::DeserializeBatch(blob, ReadMetadata->BlobSchema); + AddNotIndexed(batchNo, batch, planStep, txId); + } + + void AddNotIndexed(ui32 batchNo, const std::shared_ptr<arrow::RecordBatch>& batch, ui64 planStep, ui64 txId) { Y_VERIFY(batchNo < NotIndexed.size()); if (!NotIndexed[batchNo]) { ++ReadyNotIndexed; } - NotIndexed[batchNo] = MakeNotIndexedBatch(serializedBach, planStep, txId); + NotIndexed[batchNo] = MakeNotIndexedBatch(batch, planStep, txId); } void AddIndexed(const TBlobRange& blobRange, const TString& column); @@ -242,7 +248,8 @@ private: return PortionGranule.find(portion)->second; } - std::shared_ptr<arrow::RecordBatch> MakeNotIndexedBatch(const TString& blob, ui64 planStep, ui64 txId) const; + std::shared_ptr<arrow::RecordBatch> MakeNotIndexedBatch( + const std::shared_ptr<arrow::RecordBatch>& batch, ui64 planStep, ui64 txId) const; std::shared_ptr<arrow::RecordBatch> AssembleIndexedBatch(ui32 batchNo); void UpdateGranuleWaits(ui32 batchNo); THashMap<ui64, std::shared_ptr<arrow::RecordBatch>> SplitByGranules( diff --git a/ydb/core/tx/columnshard/read_actor.cpp b/ydb/core/tx/columnshard/read_actor.cpp index ab49a66f59..607d770a54 100644 --- a/ydb/core/tx/columnshard/read_actor.cpp +++ b/ydb/core/tx/columnshard/read_actor.cpp @@ -143,6 +143,7 @@ public: ui32 notIndexed = 0; for (size_t i = 0; i < ReadMetadata->CommittedBlobs.size(); ++i, ++notIndexed) { const auto& cmtBlob = ReadMetadata->CommittedBlobs[i]; + CommittedBlobs.emplace(cmtBlob.BlobId); WaitCommitted.emplace(cmtBlob, notIndexed); } @@ -152,8 +153,19 @@ public: WaitIndexed.insert(blobRange); } - LOG_S_DEBUG("Starting read (" << WaitIndexed.size() << " indexed, " << WaitCommitted.size() - << " committed blobs) at tablet " << TabletId); + // Add cached batches without read + for (auto& [blobId, batch] : ReadMetadata->CommittedBatches) { + auto cmt = WaitCommitted.extract(NOlap::TCommittedBlob{blobId, 0, 0}); + Y_VERIFY(!cmt.empty()); + + const NOlap::TCommittedBlob& cmtBlob = cmt.key(); + ui32 batchNo = cmt.mapped(); + IndexedData.AddNotIndexed(batchNo, batch, cmtBlob.PlanStep, cmtBlob.TxId); + } + + LOG_S_DEBUG("Starting read (" << WaitIndexed.size() << " indexed, " + << ReadMetadata->CommittedBlobs.size() << " committed, " + << WaitCommitted.size() << " not cached committed) at tablet " << TabletId); bool earlyExit = false; if (Deadline != TInstant::Max()) { diff --git a/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp index 7d5f9b2e51..2a28f40711 100644 --- a/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp +++ b/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp @@ -698,11 +698,11 @@ void TestWriteRead(bool reboots, const TVector<std::pair<TString, TTypeId>>& ydb if (ydbSchema == TTestSchema::YdbSchema()) { if (codec == "" || codec == "lz4") { - UNIT_ASSERT_VALUES_EQUAL(readStats.GetDataBytes(), 5054176); + UNIT_ASSERT_VALUES_EQUAL(readStats.GetDataBytes() / 100000, 50); } else if (codec == "none") { - UNIT_ASSERT_VALUES_EQUAL(readStats.GetDataBytes(), 7557872); + UNIT_ASSERT_VALUES_EQUAL(readStats.GetDataBytes() / 100000, 75); } else if (codec == "zstd") { - UNIT_ASSERT_VALUES_EQUAL(readStats.GetDataBytes(), 2636440); + UNIT_ASSERT_VALUES_EQUAL(readStats.GetDataBytes() / 100000, 26); } else { UNIT_ASSERT(false); } diff --git a/ydb/core/tx/columnshard/write_actor.cpp b/ydb/core/tx/columnshard/write_actor.cpp index ec78578324..6558c08c3d 100644 --- a/ydb/core/tx/columnshard/write_actor.cpp +++ b/ydb/core/tx/columnshard/write_actor.cpp @@ -119,7 +119,7 @@ public: // Heavy operations inside. We cannot run them in tablet event handler. TString strError; - std::shared_ptr<arrow::RecordBatch> batch; + std::shared_ptr<arrow::RecordBatch>& batch = WriteEv->WrittenBatch; { TCpuGuard guard(ResourceUsage); batch = IndexInfo.PrepareForInsert(srcData, meta, strError); |