aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorchertus <azuikov@ydb.tech>2022-09-16 17:33:24 +0300
committerchertus <azuikov@ydb.tech>2022-09-16 17:33:24 +0300
commit863f8001a8c5bdf25eb95edd587063f5deb93f8f (patch)
treedc75dde0be3cf05f5cd3d4b01aa114d30e9ac9d9
parent3bae125f8c3520947187a60ce193b839064aaf1b (diff)
downloadydb-863f8001a8c5bdf25eb95edd587063f5deb93f8f.tar.gz
add cache for RecordBatches from InsertTable
-rw-r--r--ydb/core/formats/arrow_helpers.cpp27
-rw-r--r--ydb/core/formats/arrow_helpers.h9
-rw-r--r--ydb/core/tx/columnshard/columnshard.cpp1
-rw-r--r--ydb/core/tx/columnshard/columnshard.h1
-rw-r--r--ydb/core/tx/columnshard/columnshard__index_scan.h10
-rw-r--r--ydb/core/tx/columnshard/columnshard__progress_tx.cpp6
-rw-r--r--ydb/core/tx/columnshard/columnshard__read.cpp11
-rw-r--r--ydb/core/tx/columnshard/columnshard__scan.cpp3
-rw-r--r--ydb/core/tx/columnshard/columnshard__write.cpp11
-rw-r--r--ydb/core/tx/columnshard/columnshard__write_index.cpp1
-rw-r--r--ydb/core/tx/columnshard/columnshard_common.h70
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.cpp12
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.h4
-rw-r--r--ydb/core/tx/columnshard/columnshard_txs.h1
-rw-r--r--ydb/core/tx/columnshard/engines/indexed_read_data.cpp20
-rw-r--r--ydb/core/tx/columnshard/engines/indexed_read_data.h13
-rw-r--r--ydb/core/tx/columnshard/read_actor.cpp16
-rw-r--r--ydb/core/tx/columnshard/ut_columnshard_read_write.cpp6
-rw-r--r--ydb/core/tx/columnshard/write_actor.cpp2
19 files changed, 201 insertions, 23 deletions
diff --git a/ydb/core/formats/arrow_helpers.cpp b/ydb/core/formats/arrow_helpers.cpp
index c83b3429ae..388c460413 100644
--- a/ydb/core/formats/arrow_helpers.cpp
+++ b/ydb/core/formats/arrow_helpers.cpp
@@ -390,7 +390,25 @@ std::shared_ptr<arrow::RecordBatch> ExtractColumns(const std::shared_ptr<arrow::
return arrow::RecordBatch::Make(dstSchema, srcBatch->num_rows(), columns);
}
+std::shared_ptr<arrow::RecordBatch> ExtractExistedColumns(const std::shared_ptr<arrow::RecordBatch>& srcBatch,
+ const arrow::FieldVector& fieldsToExtract) {
+ std::vector<std::shared_ptr<arrow::Field>> fields;
+ fields.reserve(fieldsToExtract.size());
+ std::vector<std::shared_ptr<arrow::Array>> columns;
+ columns.reserve(fieldsToExtract.size());
+
+ auto srcSchema = srcBatch->schema();
+ for (auto& fldToExtract : fieldsToExtract) {
+ auto& name = fldToExtract->name();
+ auto field = srcSchema->GetFieldByName(name);
+ if (field && field->type()->Equals(fldToExtract->type())) {
+ fields.push_back(field);
+ columns.push_back(srcBatch->GetColumnByName(name));
+ }
+ }
+ return arrow::RecordBatch::Make(std::make_shared<arrow::Schema>(fields), srcBatch->num_rows(), columns);
+}
std::shared_ptr<arrow::Table> CombineInTable(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches) {
auto res = arrow::Table::FromRecordBatches(batches);
@@ -568,6 +586,15 @@ bool IsSortedAndUnique(const std::shared_ptr<arrow::RecordBatch>& batch,
}
}
+bool HasAllColumns(const std::shared_ptr<arrow::RecordBatch>& batch, const std::shared_ptr<arrow::Schema>& schema) {
+ for (auto& field : schema->fields()) {
+ if (batch->schema()->GetFieldIndex(field->name()) < 0) {
+ return false;
+ }
+ }
+ return true;
+}
+
std::vector<std::unique_ptr<arrow::ArrayBuilder>> MakeBuilders(const std::shared_ptr<arrow::Schema>& schema,
size_t reserve) {
std::vector<std::unique_ptr<arrow::ArrayBuilder>> builders;
diff --git a/ydb/core/formats/arrow_helpers.h b/ydb/core/formats/arrow_helpers.h
index 2f6406b68e..bfe8fb88a3 100644
--- a/ydb/core/formats/arrow_helpers.h
+++ b/ydb/core/formats/arrow_helpers.h
@@ -69,6 +69,14 @@ std::shared_ptr<arrow::RecordBatch> ExtractColumns(const std::shared_ptr<arrow::
std::shared_ptr<arrow::RecordBatch> ExtractColumns(const std::shared_ptr<arrow::RecordBatch>& srcBatch,
const std::shared_ptr<arrow::Schema>& dstSchema,
bool addNotExisted = false);
+std::shared_ptr<arrow::RecordBatch> ExtractExistedColumns(const std::shared_ptr<arrow::RecordBatch>& srcBatch,
+ const arrow::FieldVector& fields);
+inline std::shared_ptr<arrow::RecordBatch> ExtractExistedColumns(const std::shared_ptr<arrow::RecordBatch>& srcBatch,
+ const std::shared_ptr<arrow::Schema>& dstSchema)
+{
+ return ExtractExistedColumns(srcBatch, dstSchema->fields());
+}
+
std::shared_ptr<arrow::Table> CombineInTable(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches);
std::shared_ptr<arrow::RecordBatch> ToBatch(const std::shared_ptr<arrow::Table>& combinedTable);
std::shared_ptr<arrow::RecordBatch> CombineBatches(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches);
@@ -114,6 +122,7 @@ bool IsSorted(const std::shared_ptr<arrow::RecordBatch>& batch,
bool IsSortedAndUnique(const std::shared_ptr<arrow::RecordBatch>& batch,
const std::shared_ptr<arrow::Schema>& sortingKey,
bool desc = false);
+bool HasAllColumns(const std::shared_ptr<arrow::RecordBatch>& batch, const std::shared_ptr<arrow::Schema>& schema);
template <typename TArr>
std::shared_ptr<TArr> GetTypedColumn(const std::shared_ptr<arrow::RecordBatch>& batch, int pos) {
diff --git a/ydb/core/tx/columnshard/columnshard.cpp b/ydb/core/tx/columnshard/columnshard.cpp
index dcd41a0c3e..f0f3d56e49 100644
--- a/ydb/core/tx/columnshard/columnshard.cpp
+++ b/ydb/core/tx/columnshard/columnshard.cpp
@@ -243,6 +243,7 @@ ui64 TColumnShard::MemoryUsage() const {
if (PrimaryIndex) {
memory += PrimaryIndex->MemoryUsage();
}
+ memory += BatchCache.Bytes();
return memory;
}
diff --git a/ydb/core/tx/columnshard/columnshard.h b/ydb/core/tx/columnshard/columnshard.h
index cb6ca9a92b..cbfb482f74 100644
--- a/ydb/core/tx/columnshard/columnshard.h
+++ b/ydb/core/tx/columnshard/columnshard.h
@@ -209,6 +209,7 @@ struct TEvColumnShard {
NKikimrProto::EReplyStatus PutStatus = NKikimrProto::UNKNOWN;
NColumnShard::TUnifiedBlobId BlobId;
+ std::shared_ptr<arrow::RecordBatch> WrittenBatch;
NColumnShard::TBlobBatch BlobBatch;
NColumnShard::TUsage ResourceUsage;
TVector<ui32> YellowMoveChannels;
diff --git a/ydb/core/tx/columnshard/columnshard__index_scan.h b/ydb/core/tx/columnshard/columnshard__index_scan.h
index b2ca6c986f..819809a15b 100644
--- a/ydb/core/tx/columnshard/columnshard__index_scan.h
+++ b/ydb/core/tx/columnshard/columnshard__index_scan.h
@@ -53,6 +53,16 @@ public:
GranuleBlobs[granule].insert(blobId);
}
+ // Add cached batches without read
+ for (auto& [blobId, batch] : ReadMetadata->CommittedBatches) {
+ auto cmt = WaitCommitted.extract(NOlap::TCommittedBlob{blobId, 0, 0});
+ Y_VERIFY(!cmt.empty());
+
+ const NOlap::TCommittedBlob& cmtBlob = cmt.key();
+ ui32 batchNo = cmt.mapped();
+ IndexedData.AddNotIndexed(batchNo, batch, cmtBlob.PlanStep, cmtBlob.TxId);
+ }
+
// Read all committed blobs
for (const auto& cmtBlob : ReadMetadata->CommittedBlobs) {
auto& blobId = cmtBlob.BlobId;
diff --git a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp
index baae719ab5..28d396b613 100644
--- a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp
+++ b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp
@@ -84,6 +84,12 @@ public:
TBlobGroupSelector dsGroupSelector(Self->Info());
NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector);
+
+ // CacheInserted -> CacheCommitted
+ for (auto& writeId : meta.WriteIds) {
+ Self->BatchCache.Commit(writeId);
+ }
+
auto counters = Self->InsertTable->Commit(dbTable, step, txId, meta.MetaShard, meta.WriteIds);
Self->IncCounter(COUNTER_BLOBS_COMMITTED, counters.Rows);
Self->IncCounter(COUNTER_BYTES_COMMITTED, counters.Bytes);
diff --git a/ydb/core/tx/columnshard/columnshard__read.cpp b/ydb/core/tx/columnshard/columnshard__read.cpp
index e0f8183905..f6d08e7f32 100644
--- a/ydb/core/tx/columnshard/columnshard__read.cpp
+++ b/ydb/core/tx/columnshard/columnshard__read.cpp
@@ -41,6 +41,7 @@ std::shared_ptr<NOlap::TReadMetadata>
TTxReadBase::PrepareReadMetadata(const TActorContext& ctx, const TReadDescription& read,
const std::unique_ptr<NOlap::TInsertTable>& insertTable,
const std::unique_ptr<NOlap::IColumnEngine>& index,
+ const TBatchCache& batchCache,
TString& error) const {
Y_UNUSED(ctx);
@@ -85,6 +86,11 @@ TTxReadBase::PrepareReadMetadata(const TActorContext& ctx, const TReadDescriptio
// insert table
out.CommittedBlobs = insertTable->Read(read.PathId, read.PlanStep, read.TxId);
+ for (auto& cmt : out.CommittedBlobs) {
+ if (auto batch = batchCache.Get(cmt.BlobId)) {
+ out.CommittedBatches.emplace(cmt.BlobId, batch);
+ }
+ }
// index
@@ -220,7 +226,7 @@ bool TTxReadBase::ParseProgram(const TActorContext& ctx, NKikimrSchemeOp::EOlapP
if (!ssaProgramSteps->Program.empty() && Self->PrimaryIndex) {
ssaProgramSteps->Program = NKikimr::NSsaOptimizer::OptimizeProgram(ssaProgramSteps->Program, Self->PrimaryIndex->GetIndexInfo());
}
-
+
read.Program = ssaProgramSteps->Program;
read.ProgramSourceColumns = ssaProgramSteps->ProgramSourceColumns;
return true;
@@ -269,7 +275,8 @@ bool TTxRead::Execute(TTransactionContext& txc, const TActorContext& ctx) {
std::shared_ptr<NOlap::TReadMetadata> metadata;
if (parseResult) {
- metadata = PrepareReadMetadata(ctx, read, Self->InsertTable, Self->PrimaryIndex, ErrorDescription);
+ metadata = PrepareReadMetadata(ctx, read, Self->InsertTable, Self->PrimaryIndex, Self->BatchCache,
+ ErrorDescription);
}
ui32 status = NKikimrTxColumnShard::EResultStatus::ERROR;
diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp
index 93fd867343..7ab2a7d7ae 100644
--- a/ydb/core/tx/columnshard/columnshard__scan.cpp
+++ b/ydb/core/tx/columnshard/columnshard__scan.cpp
@@ -583,7 +583,8 @@ std::shared_ptr<NOlap::TReadMetadataBase> TTxScan::CreateReadMetadata(const TAct
if (indexStats) {
metadata = PrepareStatsReadMetadata(Self->TabletID(), read, Self->PrimaryIndex, ErrorDescription);
} else {
- metadata = PrepareReadMetadata(ctx, read, Self->InsertTable, Self->PrimaryIndex, ErrorDescription);
+ metadata = PrepareReadMetadata(ctx, read, Self->InsertTable, Self->PrimaryIndex, Self->BatchCache,
+ ErrorDescription);
}
if (!metadata) {
diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp
index 24b624721a..7fff1e61f7 100644
--- a/ydb/core/tx/columnshard/columnshard__write.cpp
+++ b/ydb/core/tx/columnshard/columnshard__write.cpp
@@ -62,7 +62,9 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
// First write wins
TBlobGroupSelector dsGroupSelector(Self->Info());
NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector);
- ok = Self->InsertTable->Insert(dbTable, NOlap::TInsertedData(metaShard, writeId, tableId, dedupId, logoBlobId, metaStr, time));
+
+ NOlap::TInsertedData insertData(metaShard, writeId, tableId, dedupId, logoBlobId, metaStr, time);
+ ok = Self->InsertTable->Insert(dbTable, std::move(insertData));
if (ok) {
auto writesToAbort = Self->InsertTable->OldWritesToAbort(time);
std::vector<TWriteId> failedAborts;
@@ -70,6 +72,7 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
if (!Self->RemoveLongTxWrite(db, writeId)) {
failedAborts.push_back(writeId);
}
+ Self->BatchCache.EraseInserted(TWriteId(writeId));
}
for (auto& writeId : failedAborts) {
writesToAbort.erase(writeId);
@@ -87,10 +90,14 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
Self->BlobManager->DeleteBlob(abortedData.BlobId, blobManagerDb);
}
- // Put new data into cache
+ // Put new data into blob cache
Y_VERIFY(logoBlobId.BlobSize() == data.size());
NBlobCache::AddRangeToCache(NBlobCache::TBlobRange(logoBlobId, 0, data.size()), data);
+ // Put new data into batch cache
+ Y_VERIFY(Ev->Get()->WrittenBatch);
+ Self->BatchCache.Insert(TWriteId(writeId), logoBlobId, Ev->Get()->WrittenBatch);
+
Self->UpdateInsertTableCounters();
ui64 blobsWritten = Ev->Get()->BlobBatch.GetBlobCount();
diff --git a/ydb/core/tx/columnshard/columnshard__write_index.cpp b/ydb/core/tx/columnshard/columnshard__write_index.cpp
index e023368504..f18e101b94 100644
--- a/ydb/core/tx/columnshard/columnshard__write_index.cpp
+++ b/ydb/core/tx/columnshard/columnshard__write_index.cpp
@@ -64,6 +64,7 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx)
for (const auto& cmtd : changes->DataToIndex) {
Self->InsertTable->EraseCommitted(dbWrap, cmtd);
Self->BlobManager->DeleteBlob(cmtd.BlobId, blobManagerDb);
+ Self->BatchCache.EraseCommitted(cmtd.BlobId);
}
if (!changes->DataToIndex.empty()) {
Self->UpdateInsertTableCounters();
diff --git a/ydb/core/tx/columnshard/columnshard_common.h b/ydb/core/tx/columnshard/columnshard_common.h
index 80a9e4599c..a1a0cd25d8 100644
--- a/ydb/core/tx/columnshard/columnshard_common.h
+++ b/ydb/core/tx/columnshard/columnshard_common.h
@@ -1,8 +1,10 @@
#pragma once
+#include "defs.h"
#include <ydb/core/formats/arrow_helpers.h>
#include <ydb/core/scheme/scheme_tabledefs.h>
#include <ydb/core/protos/ssa.pb.h>
#include <ydb/core/tx/columnshard/engines/predicate.h>
+#include <library/cpp/cache/cache.h>
namespace NKikimr::NOlap {
struct TIndexInfo;
@@ -51,4 +53,72 @@ struct TReadDescription {
std::shared_ptr<NArrow::TSsaProgramSteps> AddProgram(const IColumnResolver& columnResolver, const NKikimrSSA::TProgram& program);
};
+class TBatchCache {
+public:
+ using TUnifiedBlobId = NOlap::TUnifiedBlobId;
+
+ static constexpr ui32 MAX_COMMITTED_COUNT = 2 * TLimits::MIN_SMALL_BLOBS_TO_INSERT;
+ static constexpr ui32 MAX_INSERTED_COUNT = 2 * TLimits::MIN_SMALL_BLOBS_TO_INSERT;
+ static constexpr ui64 MAX_TOTAL_SIZE = 2 * TLimits::MIN_BYTES_TO_INSERT;
+
+ TBatchCache()
+ : Inserted(MAX_INSERTED_COUNT)
+ , Committed(MAX_COMMITTED_COUNT)
+ {}
+
+ void Insert(TWriteId writeId, const TUnifiedBlobId& blobId, std::shared_ptr<arrow::RecordBatch>& batch) {
+ if (Bytes() + blobId.BlobSize() > MAX_TOTAL_SIZE) {
+ return;
+ }
+ InsertedBytes += blobId.BlobSize();
+ Inserted.Insert(writeId, {blobId, batch});
+ }
+
+ void Commit(TWriteId writeId) {
+ auto it = Inserted.FindWithoutPromote(writeId);
+ if (it != Inserted.End()) {
+ auto& blobId = it->first;
+ InsertedBytes -= blobId.BlobSize();
+ CommittedBytes += blobId.BlobSize();
+
+ Committed.Insert(blobId, it->second);
+ Inserted.Erase(it);
+ }
+ }
+
+ void EraseInserted(TWriteId writeId) {
+ auto it = Inserted.FindWithoutPromote(writeId);
+ if (it != Inserted.End()) {
+ InsertedBytes -= (*it).first.BlobSize();
+ Inserted.Erase(it);
+ }
+ }
+
+ void EraseCommitted(const TUnifiedBlobId& blobId) {
+ auto it = Committed.FindWithoutPromote(blobId);
+ if (it != Committed.End()) {
+ CommittedBytes -= blobId.BlobSize();
+ Committed.Erase(it);
+ }
+ }
+
+ std::shared_ptr<arrow::RecordBatch> Get(const TUnifiedBlobId& blobId) const {
+ auto it = Committed.Find(blobId);
+ if (it != Committed.End()) {
+ return *it;
+ }
+ return {};
+ }
+
+ ui64 Bytes() const {
+ return InsertedBytes + CommittedBytes;
+ }
+
+private:
+ TLRUCache<TWriteId, std::pair<TUnifiedBlobId, std::shared_ptr<arrow::RecordBatch>>> Inserted;
+ mutable TLRUCache<TUnifiedBlobId, std::shared_ptr<arrow::RecordBatch>> Committed;
+ ui64 InsertedBytes{0};
+ ui64 CommittedBytes{0};
+};
+
}
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp
index d469f0bc59..a891c7f169 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.cpp
+++ b/ydb/core/tx/columnshard/columnshard_impl.cpp
@@ -603,10 +603,16 @@ std::unique_ptr<TEvPrivate::TEvIndexing> TColumnShard::SetupIndexation() {
}
if (bytesToIndex < (ui64)Limits.MinInsertBytes && blobs < TLimits::MIN_SMALL_BLOBS_TO_INSERT) {
- LOG_S_DEBUG("Indexing not started: less data (" << bytesToIndex << " bytes in " << blobs << " blobs, ignored "
- << ignored << ") then MIN_BYTES_TO_INSERT / MIN_SMALL_BLOBS_TO_INSERT at tablet " << TabletID());
- return {};
+ LOG_S_DEBUG("Few data for indexation (" << bytesToIndex << " bytes in " << blobs << " blobs, ignored "
+ << ignored << ") at tablet " << TabletID());
+
+ // Force small indexations simetimes to keep BatchCache smaller
+ if (!bytesToIndex || SkippedIndexations < TSettings::MAX_INDEXATIONS_TO_SKIP) {
+ ++SkippedIndexations;
+ return {};
+ }
}
+ SkippedIndexations = 0;
LOG_S_DEBUG("Prepare indexing " << bytesToIndex << " bytes in " << dataToIndex.size() << " batches of committed "
<< size << " bytes in " << blobs << " blobs ignored " << ignored
diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h
index b7e49660e9..f791eee354 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.h
+++ b/ydb/core/tx/columnshard/columnshard_impl.h
@@ -40,6 +40,8 @@ IActor* CreateS3Actor(ui64 tabletId, const TActorId& parent, const TString& tier
#endif
struct TSettings {
+ static constexpr ui32 MAX_INDEXATIONS_TO_SKIP = 16;
+
TControlWrapper BlobWriteGrouppingEnabled;
TControlWrapper CacheDataAfterIndexing;
TControlWrapper CacheDataAfterCompaction;
@@ -338,6 +340,7 @@ private:
ui64 StorePathId = 0;
ui64 StatsReportRound = 0;
ui64 BackgroundActivation = 0;
+ ui32 SkippedIndexations = TSettings::MAX_INDEXATIONS_TO_SKIP; // Force indexation on tablet init
TIntrusivePtr<TMediatorTimecastEntry> MediatorTimeCastEntry;
bool MediatorTimeCastRegistered = false;
@@ -360,6 +363,7 @@ private:
std::unique_ptr<NTabletPipe::IClientCache> PipeClientCache;
std::unique_ptr<NOlap::TInsertTable> InsertTable;
std::unique_ptr<NOlap::IColumnEngine> PrimaryIndex;
+ TBatchCache BatchCache;
THashMap<TString, TTierConfig> TierConfigs;
THashSet<NOlap::TUnifiedBlobId> DelayedForgetBlobs;
TTtl Ttl;
diff --git a/ydb/core/tx/columnshard/columnshard_txs.h b/ydb/core/tx/columnshard/columnshard_txs.h
index 4616ae298a..02594bb6d0 100644
--- a/ydb/core/tx/columnshard/columnshard_txs.h
+++ b/ydb/core/tx/columnshard/columnshard_txs.h
@@ -265,6 +265,7 @@ protected:
const TReadDescription& readDescription,
const std::unique_ptr<NOlap::TInsertTable>& insertTable,
const std::unique_ptr<NOlap::IColumnEngine>& index,
+ const TBatchCache& batchCache,
TString& error) const;
protected:
diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
index d2693d0e7c..bbc8823f09 100644
--- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
+++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
@@ -270,20 +270,22 @@ void TIndexedReadData::UpdateGranuleWaits(ui32 batchNo) {
}
std::shared_ptr<arrow::RecordBatch>
-TIndexedReadData::MakeNotIndexedBatch(const TString& blob, ui64 planStep, ui64 txId) const {
- auto batch = NArrow::DeserializeBatch(blob, ReadMetadata->BlobSchema);
- Y_VERIFY(batch);
+TIndexedReadData::MakeNotIndexedBatch(const std::shared_ptr<arrow::RecordBatch>& srcBatch,
+ ui64 planStep, ui64 txId) const {
+ Y_VERIFY(srcBatch);
- batch = TIndexInfo::AddSpecialColumns(batch, planStep, txId);
- Y_VERIFY(batch);
+ // Extract columns (without check), filter, attach snapshot, extract columns with check
+ // (do not filter snapshot columns)
- batch = NArrow::ExtractColumns(batch, ReadMetadata->LoadSchema);
+ auto batch = NArrow::ExtractExistedColumns(srcBatch, ReadMetadata->LoadSchema);
Y_VERIFY(batch);
{ // Apply predicate
// TODO: Extract this info function
std::vector<bool> less;
if (ReadMetadata->LessPredicate) {
+ Y_VERIFY(NArrow::HasAllColumns(batch, ReadMetadata->LessPredicate->Batch->schema()));
+
auto cmpType = ReadMetadata->LessPredicate->Inclusive ?
NArrow::ECompareType::LESS_OR_EQUAL : NArrow::ECompareType::LESS;
less = NArrow::MakePredicateFilter(batch, ReadMetadata->LessPredicate->Batch, cmpType);
@@ -291,6 +293,8 @@ TIndexedReadData::MakeNotIndexedBatch(const TString& blob, ui64 planStep, ui64 t
std::vector<bool> greater;
if (ReadMetadata->GreaterPredicate) {
+ Y_VERIFY(NArrow::HasAllColumns(batch, ReadMetadata->GreaterPredicate->Batch->schema()));
+
auto cmpType = ReadMetadata->GreaterPredicate->Inclusive ?
NArrow::ECompareType::GREATER_OR_EQUAL : NArrow::ECompareType::GREATER;
greater = NArrow::MakePredicateFilter(batch, ReadMetadata->GreaterPredicate->Batch, cmpType);
@@ -305,6 +309,10 @@ TIndexedReadData::MakeNotIndexedBatch(const TString& blob, ui64 planStep, ui64 t
}
}
+ batch = TIndexInfo::AddSpecialColumns(batch, planStep, txId);
+ Y_VERIFY(batch);
+
+ batch = NArrow::ExtractColumns(batch, ReadMetadata->LoadSchema);
Y_VERIFY(batch);
return batch;
}
diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.h b/ydb/core/tx/columnshard/engines/indexed_read_data.h
index 734e152144..0c7818bbba 100644
--- a/ydb/core/tx/columnshard/engines/indexed_read_data.h
+++ b/ydb/core/tx/columnshard/engines/indexed_read_data.h
@@ -81,6 +81,7 @@ struct TReadMetadata : public TReadMetadataBase, public std::enable_shared_from_
ui64 TxId = 0;
std::shared_ptr<TSelectInfo> SelectInfo;
std::vector<TCommittedBlob> CommittedBlobs;
+ THashMap<TUnifiedBlobId, std::shared_ptr<arrow::RecordBatch>> CommittedBatches;
std::shared_ptr<TReadStats> ReadStats;
TReadMetadata(const TIndexInfo& info)
@@ -194,12 +195,17 @@ public:
/// @returns batches and corresponding last keys in correct order (i.e. sorted by by PK)
TVector<TPartialReadResult> GetReadyResults(const int64_t maxRowsInBatch);
- void AddNotIndexed(ui32 batchNo, TString serializedBach, ui64 planStep, ui64 txId) {
+ void AddNotIndexed(ui32 batchNo, TString blob, ui64 planStep, ui64 txId) {
+ auto batch = NArrow::DeserializeBatch(blob, ReadMetadata->BlobSchema);
+ AddNotIndexed(batchNo, batch, planStep, txId);
+ }
+
+ void AddNotIndexed(ui32 batchNo, const std::shared_ptr<arrow::RecordBatch>& batch, ui64 planStep, ui64 txId) {
Y_VERIFY(batchNo < NotIndexed.size());
if (!NotIndexed[batchNo]) {
++ReadyNotIndexed;
}
- NotIndexed[batchNo] = MakeNotIndexedBatch(serializedBach, planStep, txId);
+ NotIndexed[batchNo] = MakeNotIndexedBatch(batch, planStep, txId);
}
void AddIndexed(const TBlobRange& blobRange, const TString& column);
@@ -242,7 +248,8 @@ private:
return PortionGranule.find(portion)->second;
}
- std::shared_ptr<arrow::RecordBatch> MakeNotIndexedBatch(const TString& blob, ui64 planStep, ui64 txId) const;
+ std::shared_ptr<arrow::RecordBatch> MakeNotIndexedBatch(
+ const std::shared_ptr<arrow::RecordBatch>& batch, ui64 planStep, ui64 txId) const;
std::shared_ptr<arrow::RecordBatch> AssembleIndexedBatch(ui32 batchNo);
void UpdateGranuleWaits(ui32 batchNo);
THashMap<ui64, std::shared_ptr<arrow::RecordBatch>> SplitByGranules(
diff --git a/ydb/core/tx/columnshard/read_actor.cpp b/ydb/core/tx/columnshard/read_actor.cpp
index ab49a66f59..607d770a54 100644
--- a/ydb/core/tx/columnshard/read_actor.cpp
+++ b/ydb/core/tx/columnshard/read_actor.cpp
@@ -143,6 +143,7 @@ public:
ui32 notIndexed = 0;
for (size_t i = 0; i < ReadMetadata->CommittedBlobs.size(); ++i, ++notIndexed) {
const auto& cmtBlob = ReadMetadata->CommittedBlobs[i];
+
CommittedBlobs.emplace(cmtBlob.BlobId);
WaitCommitted.emplace(cmtBlob, notIndexed);
}
@@ -152,8 +153,19 @@ public:
WaitIndexed.insert(blobRange);
}
- LOG_S_DEBUG("Starting read (" << WaitIndexed.size() << " indexed, " << WaitCommitted.size()
- << " committed blobs) at tablet " << TabletId);
+ // Add cached batches without read
+ for (auto& [blobId, batch] : ReadMetadata->CommittedBatches) {
+ auto cmt = WaitCommitted.extract(NOlap::TCommittedBlob{blobId, 0, 0});
+ Y_VERIFY(!cmt.empty());
+
+ const NOlap::TCommittedBlob& cmtBlob = cmt.key();
+ ui32 batchNo = cmt.mapped();
+ IndexedData.AddNotIndexed(batchNo, batch, cmtBlob.PlanStep, cmtBlob.TxId);
+ }
+
+ LOG_S_DEBUG("Starting read (" << WaitIndexed.size() << " indexed, "
+ << ReadMetadata->CommittedBlobs.size() << " committed, "
+ << WaitCommitted.size() << " not cached committed) at tablet " << TabletId);
bool earlyExit = false;
if (Deadline != TInstant::Max()) {
diff --git a/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp
index 7d5f9b2e51..2a28f40711 100644
--- a/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp
+++ b/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp
@@ -698,11 +698,11 @@ void TestWriteRead(bool reboots, const TVector<std::pair<TString, TTypeId>>& ydb
if (ydbSchema == TTestSchema::YdbSchema()) {
if (codec == "" || codec == "lz4") {
- UNIT_ASSERT_VALUES_EQUAL(readStats.GetDataBytes(), 5054176);
+ UNIT_ASSERT_VALUES_EQUAL(readStats.GetDataBytes() / 100000, 50);
} else if (codec == "none") {
- UNIT_ASSERT_VALUES_EQUAL(readStats.GetDataBytes(), 7557872);
+ UNIT_ASSERT_VALUES_EQUAL(readStats.GetDataBytes() / 100000, 75);
} else if (codec == "zstd") {
- UNIT_ASSERT_VALUES_EQUAL(readStats.GetDataBytes(), 2636440);
+ UNIT_ASSERT_VALUES_EQUAL(readStats.GetDataBytes() / 100000, 26);
} else {
UNIT_ASSERT(false);
}
diff --git a/ydb/core/tx/columnshard/write_actor.cpp b/ydb/core/tx/columnshard/write_actor.cpp
index ec78578324..6558c08c3d 100644
--- a/ydb/core/tx/columnshard/write_actor.cpp
+++ b/ydb/core/tx/columnshard/write_actor.cpp
@@ -119,7 +119,7 @@ public:
// Heavy operations inside. We cannot run them in tablet event handler.
TString strError;
- std::shared_ptr<arrow::RecordBatch> batch;
+ std::shared_ptr<arrow::RecordBatch>& batch = WriteEv->WrittenBatch;
{
TCpuGuard guard(ResourceUsage);
batch = IndexInfo.PrepareForInsert(srcData, meta, strError);