aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authornsofya <nsofya@yandex-team.com>2023-09-07 12:20:06 +0300
committernsofya <nsofya@yandex-team.com>2023-09-07 13:35:50 +0300
commitf51f48a9153ef1341700ad8ba322f01d60368b60 (patch)
treef832df05da7730e3885920c53985dc7ed2202956
parentb0c000d48fba6fcbd7f69bcbe94290623a172fa7 (diff)
downloadydb-f51f48a9153ef1341700ad8ba322f01d60368b60.tar.gz
Use TBlobRange in InsertedTable
-rw-r--r--ydb/core/blob_depot/events.h1
-rw-r--r--ydb/core/tx/columnshard/blob.h19
-rw-r--r--ydb/core/tx/columnshard/columnshard.h6
-rw-r--r--ydb/core/tx/columnshard/columnshard__write.cpp41
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.cpp3
-rw-r--r--ydb/core/tx/columnshard/columnshard_private_events.h11
-rw-r--r--ydb/core/tx/columnshard/columnshard_schema.cpp2
-rw-r--r--ydb/core/tx/columnshard/columnshard_schema.h2
-rw-r--r--ydb/core/tx/columnshard/engines/changes/indexation.cpp25
-rw-r--r--ydb/core/tx/columnshard/engines/changes/indexation.h1
-rw-r--r--ydb/core/tx/columnshard/engines/insert_table/data.h37
-rw-r--r--ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/read_metadata.h3
-rw-r--r--ydb/core/tx/columnshard/engines/ut_logs_engine.cpp19
-rw-r--r--ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp2
-rw-r--r--ydb/core/tx/columnshard/indexing_actor.cpp1
-rw-r--r--ydb/core/tx/columnshard/inflight_request_tracker.h9
-rw-r--r--ydb/core/tx/columnshard/operations/write.cpp2
19 files changed, 105 insertions, 83 deletions
diff --git a/ydb/core/blob_depot/events.h b/ydb/core/blob_depot/events.h
index 92d7a9f440..0bbd77f9ab 100644
--- a/ydb/core/blob_depot/events.h
+++ b/ydb/core/blob_depot/events.h
@@ -1,6 +1,7 @@
#pragma once
#include <library/cpp/actors/core/event.h>
+#include <library/cpp/actors/core/interconnect.h>
#include <ydb/core/protos/blob_depot.pb.h>
namespace NKikimr {
diff --git a/ydb/core/tx/columnshard/blob.h b/ydb/core/tx/columnshard/blob.h
index 0ad18d5754..f92e156878 100644
--- a/ydb/core/tx/columnshard/blob.h
+++ b/ydb/core/tx/columnshard/blob.h
@@ -284,11 +284,28 @@ struct TBlobRange {
ui32 Offset;
ui32 Size;
+ const TUnifiedBlobId& GetBlobId() const {
+ return BlobId;
+ }
+
+ ui32 GetBlobSize() const {
+ return Size;
+ }
+
+ bool IsFullBlob() const {
+ return Size == BlobId.BlobSize();
+ }
+
explicit TBlobRange(const TUnifiedBlobId& blobId = TUnifiedBlobId(), ui32 offset = 0, ui32 size = 0)
: BlobId(blobId)
, Offset(offset)
, Size(size)
- {}
+ {
+ if (Size > 0) {
+ Y_VERIFY(Offset < BlobId.BlobSize());
+ Y_VERIFY(Offset + Size <= BlobId.BlobSize());
+ }
+ }
bool operator == (const TBlobRange& other) const {
return
diff --git a/ydb/core/tx/columnshard/columnshard.h b/ydb/core/tx/columnshard/columnshard.h
index f55648f30f..256f9358b5 100644
--- a/ydb/core/tx/columnshard/columnshard.h
+++ b/ydb/core/tx/columnshard/columnshard.h
@@ -1,11 +1,10 @@
#pragma once
#include "defs.h"
-#include "blob_manager.h"
+#include "blob.h"
#include <ydb/core/tx/tx.h>
#include <ydb/core/tx/message_seqno.h>
#include <ydb/core/protos/tx_columnshard.pb.h>
-#include <ydb/core/tx/columnshard/engines/writer/write_controller.h>
#include <ydb/core/tx/ev_write/write_data.h>
#include <ydb/core/tx/long_tx_service/public/types.h>
@@ -16,7 +15,6 @@
namespace NKikimr {
namespace NColumnShard {
-class TBlobGroupSelector;
inline Ydb::StatusIds::StatusCode ConvertToYdbStatus(NKikimrTxColumnShard::EResultStatus columnShardStatus) {
switch (columnShardStatus) {
@@ -211,7 +209,7 @@ struct TEvColumnShard {
BlobRanges.reserve(Record.BlobRangesSize());
for (const auto& range : Record.GetBlobRanges()) {
- auto blobId = NColumnShard::TUnifiedBlobId::ParseFromString(range.GetBlobId(), dsGroupSelector,
+ auto blobId = NOlap::TUnifiedBlobId::ParseFromString(range.GetBlobId(), dsGroupSelector,
errString);
if (!errString.empty()) {
return;
diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp
index 0e67077dd0..30d214fb4d 100644
--- a/ydb/core/tx/columnshard/columnshard__write.cpp
+++ b/ydb/core/tx/columnshard/columnshard__write.cpp
@@ -44,8 +44,8 @@ bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const TEvPrivate::TEvWrit
const NKikimrTxColumnShard::TLogicalMetadata& meta = blobData.GetLogicalMeta();
- const auto& logoBlobId = blobData.GetBlobId();
- Y_VERIFY(logoBlobId.IsValid());
+ const auto& blobRange = blobData.GetBlobRange();
+ Y_VERIFY(blobRange.GetBlobId().IsValid());
ui64 writeUnixTime = meta.GetDirtyWriteTimeSeconds();
TInstant time = TInstant::Seconds(writeUnixTime);
@@ -58,7 +58,7 @@ bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const TEvPrivate::TEvWrit
auto tableSchema = Self->TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetSchemaUnsafe(PutBlobResult->Get()->GetSchemaVersion());
- NOlap::TInsertedData insertData((ui64)writeId, writeMeta.GetTableId(), writeMeta.GetDedupId(), logoBlobId, meta, tableSchema->GetSnapshot());
+ NOlap::TInsertedData insertData((ui64)writeId, writeMeta.GetTableId(), writeMeta.GetDedupId(), blobRange, meta, tableSchema->GetSnapshot());
bool ok = Self->InsertTable->Insert(dbTable, std::move(insertData));
if (ok) {
THashSet<TWriteId> writesToAbort = Self->InsertTable->OldWritesToAbort(time);
@@ -71,24 +71,15 @@ bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const TEvPrivate::TEvWrit
auto allAborted = Self->InsertTable->GetAborted(); // copy (src is modified in cycle)
for (auto& [abortedWriteId, abortedData] : allAborted) {
Self->InsertTable->EraseAborted(dbTable, abortedData);
- Self->BlobManager->DeleteBlob(abortedData.BlobId, blobManagerDb);
+ Y_VERIFY(blobRange.IsFullBlob());
+ Self->BlobManager->DeleteBlob(abortedData.GetBlobRange().GetBlobId(), blobManagerDb);
}
// Put new data into blob cache
- Y_VERIFY(logoBlobId.BlobSize() == data.size());
- NBlobCache::AddRangeToCache(NBlobCache::TBlobRange(logoBlobId, 0, data.size()), data);
+ Y_VERIFY(blobRange.IsFullBlob());
+ NBlobCache::AddRangeToCache(blobRange, blobData.GetBlobData());
Self->UpdateInsertTableCounters();
-
- const auto& blobBatch(PutBlobResult->Get()->GetPutResult().GetBlobBatch());
- ui64 blobsWritten = blobBatch.GetBlobCount();
- ui64 bytesWritten = blobBatch.GetTotalSize();
- Self->IncCounter(COUNTER_UPSERT_BLOBS_WRITTEN, blobsWritten);
- Self->IncCounter(COUNTER_UPSERT_BYTES_WRITTEN, bytesWritten);
- Self->IncCounter(COUNTER_RAW_BYTES_UPSERTED, meta.GetRawBytes());
- Self->IncCounter(COUNTER_WRITE_SUCCESS);
-
- Self->BlobManager->SaveBlobBatch((std::move(PutBlobResult->Get()->GetPutResultPtr()->ReleaseBlobBatch())), blobManagerDb);
return true;
}
return false;
@@ -112,6 +103,7 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
}
TVector<TWriteId> writeIds;
+ ui64 insertedBytes = 0;
for (auto blobData : PutBlobResult->Get()->GetBlobData()) {
auto writeId = TWriteId(writeMeta.GetWriteId());
if (operation) {
@@ -121,13 +113,28 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
writeId = Self->GetLongTxWrite(db, writeMeta.GetLongTxIdUnsafe(), writeMeta.GetWritePartId());
}
- if (!InsertOneBlob(txc, blobData, writeId)) {
+ if (InsertOneBlob(txc, blobData, writeId)) {
+ insertedBytes += blobData.GetLogicalMeta().GetRawBytes();
+ } else {
LOG_S_DEBUG(TxPrefix() << "duplicate writeId " << (ui64)writeId << TxSuffix());
Self->IncCounter(COUNTER_WRITE_DUPLICATE);
}
writeIds.push_back(writeId);
}
+ if (insertedBytes > 0) {
+ TBlobManagerDb blobManagerDb(txc.DB);
+ const auto& blobBatch(PutBlobResult->Get()->GetPutResult().GetBlobBatch());
+ ui64 blobsWritten = blobBatch.GetBlobCount();
+ ui64 bytesWritten = blobBatch.GetTotalSize();
+ Self->IncCounter(COUNTER_UPSERT_BLOBS_WRITTEN, blobsWritten);
+ Self->IncCounter(COUNTER_UPSERT_BYTES_WRITTEN, bytesWritten);
+ Self->IncCounter(COUNTER_RAW_BYTES_UPSERTED, insertedBytes);
+ Self->IncCounter(COUNTER_WRITE_SUCCESS);
+
+ Self->BlobManager->SaveBlobBatch((std::move(PutBlobResult->Get()->GetPutResultPtr()->ReleaseBlobBatch())), blobManagerDb);
+ }
+
if (operation) {
operation->OnWriteFinish(txc, writeIds);
auto txInfo = Self->ProgressTxController.RegisterTxWithDeadline(operation->GetTxId(), NKikimrTxColumnShard::TX_KIND_COMMIT_WRITE, "", writeMeta.GetSource(), 0, txc);
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp
index fe8170efcc..089b8dae24 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.cpp
+++ b/ydb/core/tx/columnshard/columnshard_impl.cpp
@@ -693,7 +693,6 @@ void TColumnShard::SetupIndexation() {
<< " at tablet " << TabletID());
std::vector<NOlap::TInsertedData> data;
- THashMap<TUnifiedBlobId, std::shared_ptr<arrow::RecordBatch>> cachedBlobs;
data.reserve(dataToIndex.size());
for (auto& ptr : dataToIndex) {
data.push_back(*ptr);
@@ -709,7 +708,7 @@ void TColumnShard::SetupIndexation() {
auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndex();
indexChanges->Start(*this);
auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move(actualIndexInfo), indexChanges,
- Settings.CacheDataAfterIndexing, std::move(cachedBlobs));
+ Settings.CacheDataAfterIndexing);
ActorContext().Send(IndexingActor, std::make_unique<TEvPrivate::TEvIndexing>(std::move(ev)));
}
diff --git a/ydb/core/tx/columnshard/columnshard_private_events.h b/ydb/core/tx/columnshard/columnshard_private_events.h
index c5601a3883..4dddca7b53 100644
--- a/ydb/core/tx/columnshard/columnshard_private_events.h
+++ b/ydb/core/tx/columnshard/columnshard_private_events.h
@@ -32,7 +32,6 @@ struct TEvPrivate {
struct TEvWriteIndex : public TEventLocal<TEvWriteIndex, EvWriteIndex> {
NOlap::TVersionedIndex IndexInfo;
std::shared_ptr<NOlap::TColumnEngineChanges> IndexChanges;
- THashMap<TUnifiedBlobId, std::shared_ptr<arrow::RecordBatch>> CachedBlobs;
bool GranuleCompaction{false};
TBlobBatch BlobBatch;
TUsage ResourceUsage;
@@ -42,11 +41,9 @@ struct TEvPrivate {
TEvWriteIndex(NOlap::TVersionedIndex&& indexInfo,
std::shared_ptr<NOlap::TColumnEngineChanges> indexChanges,
- bool cacheData,
- THashMap<TUnifiedBlobId, std::shared_ptr<arrow::RecordBatch>>&& cachedBlobs = {})
+ bool cacheData)
: IndexInfo(std::move(indexInfo))
, IndexChanges(indexChanges)
- , CachedBlobs(std::move(cachedBlobs))
, CacheData(cacheData)
{
PutResult = std::make_shared<TBlobPutResult>(NKikimrProto::UNKNOWN);
@@ -252,7 +249,7 @@ struct TEvPrivate {
class TEvWriteBlobsResult : public TEventLocal<TEvWriteBlobsResult, EvWriteBlobsResult> {
public:
class TPutBlobData {
- YDB_READONLY_DEF(TUnifiedBlobId, BlobId);
+ YDB_READONLY_DEF(TBlobRange, BlobRange);
YDB_READONLY_DEF(TString, BlobData);
YDB_READONLY_DEF(NKikimrTxColumnShard::TLogicalMetadata, LogicalMeta);
YDB_ACCESSOR(ui64, RowsCount, 0);
@@ -260,8 +257,8 @@ struct TEvPrivate {
public:
TPutBlobData() = default;
- TPutBlobData(const TUnifiedBlobId& blobId, const TString& data, const NArrow::TFirstLastSpecialKeys& specialKeys, ui64 rowsCount, ui64 rawBytes, const TInstant dirtyTime)
- : BlobId(blobId)
+ TPutBlobData(const TBlobRange& blobRange, const TString& data, const NArrow::TFirstLastSpecialKeys& specialKeys, ui64 rowsCount, ui64 rawBytes, const TInstant dirtyTime)
+ : BlobRange(blobRange)
, BlobData(data)
, RowsCount(rowsCount)
, RawBytes(rawBytes)
diff --git a/ydb/core/tx/columnshard/columnshard_schema.cpp b/ydb/core/tx/columnshard/columnshard_schema.cpp
index 5f77776fec..251d734fdd 100644
--- a/ydb/core/tx/columnshard/columnshard_schema.cpp
+++ b/ydb/core/tx/columnshard/columnshard_schema.cpp
@@ -52,7 +52,7 @@ bool Schema::InsertTable_Load(NIceDb::TNiceDb& db, const IBlobGroupSelector* dsG
if (metaStr) {
Y_VERIFY(meta.ParseFromString(metaStr));
}
- TInsertedData data(planStep, writeTxId, pathId, dedupId, blobId, meta, indexSnapshot);
+ TInsertedData data(planStep, writeTxId, pathId, dedupId, NOlap::TBlobRange(blobId, 0, blobId.BlobSize()), meta, indexSnapshot);
switch (recType) {
case EInsertTableIds::Inserted:
diff --git a/ydb/core/tx/columnshard/columnshard_schema.h b/ydb/core/tx/columnshard/columnshard_schema.h
index d1428d5935..e96920909e 100644
--- a/ydb/core/tx/columnshard/columnshard_schema.h
+++ b/ydb/core/tx/columnshard/columnshard_schema.h
@@ -440,7 +440,7 @@ struct Schema : NIceDb::Schema {
static void InsertTable_Upsert(NIceDb::TNiceDb& db, EInsertTableIds recType, const TInsertedData& data) {
Y_VERIFY(data.GetSchemaSnapshot().Valid());
db.Table<InsertTable>().Key((ui8)recType, data.PlanStep, data.WriteTxId, data.PathId, data.DedupId).Update(
- NIceDb::TUpdate<InsertTable::BlobId>(data.BlobId.ToStringLegacy()),
+ NIceDb::TUpdate<InsertTable::BlobId>(data.GetBlobRange().GetBlobId().ToStringLegacy()),
NIceDb::TUpdate<InsertTable::Meta>(data.GetMeta().SerializeToProto().SerializeAsString()),
NIceDb::TUpdate<InsertTable::IndexPlanStep>(data.GetSchemaSnapshot().GetPlanStep()),
NIceDb::TUpdate<InsertTable::IndexTxId>(data.GetSchemaSnapshot().GetTxId())
diff --git a/ydb/core/tx/columnshard/engines/changes/indexation.cpp b/ydb/core/tx/columnshard/engines/changes/indexation.cpp
index 9426d81cab..a981ac7443 100644
--- a/ydb/core/tx/columnshard/engines/changes/indexation.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/indexation.cpp
@@ -7,16 +7,12 @@
namespace NKikimr::NOlap {
-THashMap<NKikimr::NOlap::TUnifiedBlobId, std::vector<NKikimr::NOlap::TBlobRange>> TInsertColumnEngineChanges::GetGroupedBlobRanges() const {
+THashMap<TUnifiedBlobId, std::vector<TBlobRange>> TInsertColumnEngineChanges::GetGroupedBlobRanges() const {
THashMap<TUnifiedBlobId, std::vector<TBlobRange>> result;
for (size_t i = 0; i < DataToIndex.size(); ++i) {
- auto& blobId = DataToIndex[i].BlobId;
- if (CachedBlobs.contains(blobId)) {
- continue;
- }
- Y_VERIFY(!result.contains(blobId));
- std::vector<TBlobRange> blobsVector = { NBlobCache::TBlobRange(blobId, 0, blobId.BlobSize()) };
- result.emplace(blobId, std::move(blobsVector));
+ const auto& indsertedData = DataToIndex[i];
+ Y_VERIFY(indsertedData.GetBlobRange().IsFullBlob());
+ Y_VERIFY(result.emplace(indsertedData.GetBlobRange().GetBlobId(), TVector<TBlobRange>{ indsertedData.GetBlobRange() }).second);
}
return result;
}
@@ -30,9 +26,10 @@ bool TInsertColumnEngineChanges::DoApplyChanges(TColumnEngineForLogs& self, TApp
void TInsertColumnEngineChanges::DoWriteIndex(NColumnShard::TColumnShard& self, TWriteIndexContext& context) {
TBase::DoWriteIndex(self, context);
- for (const auto& cmtd : DataToIndex) {
- self.InsertTable->EraseCommitted(context.DBWrapper, cmtd);
- self.BlobManager->DeleteBlob(cmtd.BlobId, *context.BlobManagerDb);
+ for (const auto& indsertedData : DataToIndex) {
+ self.InsertTable->EraseCommitted(context.DBWrapper, indsertedData);
+ Y_VERIFY(indsertedData.GetBlobRange().IsFullBlob());
+ self.BlobManager->DeleteBlob(indsertedData.GetBlobRange().GetBlobId(), *context.BlobManagerDb);
}
if (!DataToIndex.empty()) {
self.UpdateInsertTableCounters();
@@ -87,16 +84,14 @@ TConclusionStatus TInsertColumnEngineChanges::DoConstructBlobs(TConstructionCont
THashMap<ui64, std::vector<std::shared_ptr<arrow::RecordBatch>>> pathBatches;
for (auto& inserted : DataToIndex) {
- TBlobRange blobRange(inserted.BlobId, 0, inserted.BlobId.BlobSize());
+ const TBlobRange& blobRange = inserted.GetBlobRange();
auto blobSchema = context.SchemaVersions.GetSchema(inserted.GetSchemaSnapshot());
auto& indexInfo = blobSchema->GetIndexInfo();
Y_VERIFY(indexInfo.IsSorted());
std::shared_ptr<arrow::RecordBatch> batch;
- if (auto it = CachedBlobs.find(inserted.BlobId); it != CachedBlobs.end()) {
- batch = it->second;
- } else if (auto* blobData = Blobs.FindPtr(blobRange)) {
+ if (auto* blobData = Blobs.FindPtr(blobRange)) {
Y_VERIFY(!blobData->empty(), "Blob data not present");
// Prepare batch
batch = NArrow::DeserializeBatch(*blobData, indexInfo.ArrowSchema());
diff --git a/ydb/core/tx/columnshard/engines/changes/indexation.h b/ydb/core/tx/columnshard/engines/changes/indexation.h
index 3ae8b04433..00d4e8ae3e 100644
--- a/ydb/core/tx/columnshard/engines/changes/indexation.h
+++ b/ydb/core/tx/columnshard/engines/changes/indexation.h
@@ -24,7 +24,6 @@ protected:
public:
const TMark DefaultMark;
THashMap<ui64, std::vector<std::pair<TMark, ui64>>> PathToGranule; // pathId -> {mark, granule}
- THashMap<TUnifiedBlobId, std::shared_ptr<arrow::RecordBatch>> CachedBlobs;
public:
TInsertColumnEngineChanges(const TMark& defaultMark, std::vector<NOlap::TInsertedData>&& dataToIndex, const TSplitSettings& splitSettings)
: TBase(splitSettings)
diff --git a/ydb/core/tx/columnshard/engines/insert_table/data.h b/ydb/core/tx/columnshard/engines/insert_table/data.h
index f812ef5944..327d323e06 100644
--- a/ydb/core/tx/columnshard/engines/insert_table/data.h
+++ b/ydb/core/tx/columnshard/engines/insert_table/data.h
@@ -9,8 +9,9 @@ namespace NKikimr::NOlap {
struct TInsertedData {
private:
TInsertedDataMeta Meta;
-public:
+ YDB_READONLY_DEF(TBlobRange, BlobRange);
+public:
const TInsertedDataMeta& GetMeta() const {
return Meta;
}
@@ -19,27 +20,33 @@ public:
ui64 WriteTxId = 0;
ui64 PathId = 0;
TString DedupId;
- TUnifiedBlobId BlobId;
TInsertedData() = delete; // avoid invalid TInsertedData anywhere
- TInsertedData(ui64 planStep, ui64 writeTxId, ui64 pathId, TString dedupId, const TUnifiedBlobId& blobId,
+ TInsertedData(ui64 planStep, ui64 writeTxId, ui64 pathId, TString dedupId, const TBlobRange& blobRange,
const NKikimrTxColumnShard::TLogicalMetadata& proto, const TSnapshot& schemaVersion)
: Meta(proto)
+ , BlobRange(blobRange)
, PlanStep(planStep)
, WriteTxId(writeTxId)
, PathId(pathId)
, DedupId(dedupId)
- , BlobId(blobId)
, SchemaVersion(schemaVersion) {
Y_VERIFY(SchemaVersion.Valid());
}
- TInsertedData(ui64 writeTxId, ui64 pathId, TString dedupId, const TUnifiedBlobId& blobId,
+ TInsertedData(ui64 writeTxId, ui64 pathId, TString dedupId, const TBlobRange& blobRange,
const NKikimrTxColumnShard::TLogicalMetadata& proto, const TSnapshot& schemaVersion)
- : TInsertedData(0, writeTxId, pathId, dedupId, blobId, proto, schemaVersion)
+ : TInsertedData(0, writeTxId, pathId, dedupId, blobRange, proto, schemaVersion)
{}
+
+ TInsertedData(ui64 writeTxId, ui64 pathId, TString dedupId, const TUnifiedBlobId& blobId,
+ const NKikimrTxColumnShard::TLogicalMetadata& proto, const TSnapshot& schemaVersion)
+ : TInsertedData(0, writeTxId, pathId, dedupId, TBlobRange(blobId, 0, blobId.BlobSize()), proto, schemaVersion)
+ {
+ }
+
bool operator < (const TInsertedData& key) const {
if (PlanStep < key.PlanStep) {
return true;
@@ -100,7 +107,7 @@ public:
return SchemaVersion;
}
- ui32 BlobSize() const { return BlobId.BlobSize(); }
+ ui32 BlobSize() const { return BlobRange.GetBlobSize(); }
private:
TSnapshot SchemaVersion = TSnapshot::Zero();
@@ -108,7 +115,7 @@ private:
class TCommittedBlob {
private:
- TUnifiedBlobId BlobId;
+ TBlobRange BlobRange;
TSnapshot CommitSnapshot;
TSnapshot SchemaSnapshot;
YDB_READONLY_DEF(std::optional<NArrow::TReplaceKey>, First);
@@ -124,8 +131,8 @@ public:
return *Last;
}
- TCommittedBlob(const TUnifiedBlobId& blobId, const TSnapshot& snapshot, const TSnapshot& schemaSnapshot, const std::optional<NArrow::TReplaceKey>& first, const std::optional<NArrow::TReplaceKey>& last)
- : BlobId(blobId)
+ TCommittedBlob(const TBlobRange& blobRange, const TSnapshot& snapshot, const TSnapshot& schemaSnapshot, const std::optional<NArrow::TReplaceKey>& first, const std::optional<NArrow::TReplaceKey>& last)
+ : BlobRange(blobRange)
, CommitSnapshot(snapshot)
, SchemaSnapshot(schemaSnapshot)
, First(first)
@@ -134,10 +141,10 @@ public:
/// It uses trick then we place key wtih planStep:txId in container and find them later by BlobId only.
/// So hash() and equality should depend on BlobId only.
- bool operator == (const TCommittedBlob& key) const { return BlobId == key.BlobId; }
- ui64 Hash() const noexcept { return BlobId.Hash(); }
+ bool operator == (const TCommittedBlob& key) const { return BlobRange == key.BlobRange; }
+ ui64 Hash() const noexcept { return BlobRange.Hash(); }
TString DebugString() const {
- return TStringBuilder() << BlobId << ";ps=" << CommitSnapshot.GetPlanStep() << ";ti=" << CommitSnapshot.GetTxId();
+ return TStringBuilder() << BlobRange << ";ps=" << CommitSnapshot.GetPlanStep() << ";ti=" << CommitSnapshot.GetTxId();
}
const TSnapshot& GetSnapshot() const {
@@ -148,8 +155,8 @@ public:
return SchemaSnapshot;
}
- const TUnifiedBlobId& GetBlobId() const {
- return BlobId;
+ const TBlobRange& GetBlobRange() const {
+ return BlobRange;
}
};
diff --git a/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp b/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp
index 628dc73cad..c5d9d3714d 100644
--- a/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp
+++ b/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp
@@ -125,7 +125,7 @@ std::vector<TCommittedBlob> TInsertTable::Read(ui64 pathId, const TSnapshot& sna
std::vector<TCommittedBlob> result;
result.reserve(ret.size());
for (auto&& i : ret) {
- result.emplace_back(TCommittedBlob(i->BlobId, i->GetSnapshot(), i->GetSchemaSnapshot(), i->GetMeta().GetMin(pkSchema), i->GetMeta().GetMax(pkSchema)));
+ result.emplace_back(TCommittedBlob(i->GetBlobRange(), i->GetSnapshot(), i->GetSchemaSnapshot(), i->GetMeta().GetMin(pkSchema), i->GetMeta().GetMax(pkSchema)));
}
return result;
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp
index 3b7ba86529..ea035e3cc0 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp
@@ -121,7 +121,7 @@ void TCommittedDataSource::DoFetch() {
if (!ReadStarted) {
Y_VERIFY(!ResultReady);
ReadStarted = true;
- ReadData.AddBlobForFetch(GetSourceIdx(), TBlobRange(CommittedBlob.GetBlobId(), 0, CommittedBlob.GetBlobId().BlobSize()));
+ ReadData.AddBlobForFetch(GetSourceIdx(), CommittedBlob.GetBlobRange());
}
}
diff --git a/ydb/core/tx/columnshard/engines/reader/read_metadata.h b/ydb/core/tx/columnshard/engines/reader/read_metadata.h
index 9faa66ee63..4bf3b64841 100644
--- a/ydb/core/tx/columnshard/engines/reader/read_metadata.h
+++ b/ydb/core/tx/columnshard/engines/reader/read_metadata.h
@@ -147,7 +147,6 @@ public:
}
std::shared_ptr<TSelectInfo> SelectInfo;
std::vector<TCommittedBlob> CommittedBlobs;
- THashMap<TUnifiedBlobId, std::shared_ptr<arrow::RecordBatch>> CommittedBatches;
std::shared_ptr<TReadStats> ReadStats;
const TSnapshot& GetSnapshot() const {
@@ -175,7 +174,7 @@ public:
}
return IndexVersions.GetSchema(version);
}
-
+
ISnapshotSchema::TPtr GetLoadSchema(const std::optional<TSnapshot>& version = {}) const {
if (!version) {
if (!EmptyVersionSchemaCache) {
diff --git a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
index 7d0aff6a34..6f563f38ff 100644
--- a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
+++ b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
@@ -292,6 +292,11 @@ TCompactionLimits TestLimits() {
bool Insert(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap,
std::vector<TInsertedData>&& dataToIndex, THashMap<TBlobRange, TString>& blobs, ui32& step) {
+
+ for (ui32 i = 0; i < dataToIndex.size(); ++i) {
+ // Commited data always has nonzero planstep (for WriteLoadRead tests)
+ dataToIndex[i].PlanStep = i + 1;
+ };
std::shared_ptr<TInsertColumnEngineChanges> changes = engine.StartInsert(std::move(dataToIndex));
if (!changes) {
return false;
@@ -419,8 +424,8 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
engine.Load(db, lostBlobs);
std::vector<TInsertedData> dataToIndex = {
- TInsertedData(1, 2, paths[0], "", blobRanges[0].BlobId, {}, indexSnaphot),
- TInsertedData(2, 1, paths[0], "", blobRanges[1].BlobId, {}, indexSnaphot)
+ TInsertedData(2, paths[0], "", blobRanges[0].BlobId, {}, indexSnaphot),
+ TInsertedData(1, paths[0], "", blobRanges[1].BlobId, {}, indexSnaphot)
};
// write
@@ -430,7 +435,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
blobs[blobRanges[0]] = testBlob;
blobs[blobRanges[1]] = testBlob;
Insert(engine, db, TSnapshot(1, 2), std::move(dataToIndex), blobs, step);
-
+
// selects
auto lastSchema = engine.GetVersionedIndex().GetLastSchema();
@@ -515,7 +520,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
// PlanStep, TxId, PathId, DedupId, BlobId, Data, [Metadata]
std::vector<TInsertedData> dataToIndex;
dataToIndex.push_back(
- TInsertedData{planStep, txId, pathId, "", blobRange.BlobId, {}, indexSnapshot});
+ TInsertedData{txId, pathId, "", blobRange.BlobId, {}, indexSnapshot});
bool ok = Insert(engine, db, TSnapshot(planStep, txId), std::move(dataToIndex), blobs, step);
UNIT_ASSERT(ok);
@@ -611,7 +616,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
// PlanStep, TxId, PathId, DedupId, BlobId, Data, [Metadata]
std::vector<TInsertedData> dataToIndex;
dataToIndex.push_back(
- TInsertedData{planStep, txId, pathId, "", blobRange.BlobId, {}, indexSnapshot});
+ TInsertedData{txId, pathId, "", blobRange.BlobId, {}, indexSnapshot});
bool ok = Insert(engine, db, TSnapshot(planStep, txId), std::move(dataToIndex), blobs, step);
UNIT_ASSERT(ok);
@@ -640,7 +645,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
// PlanStep, TxId, PathId, DedupId, BlobId, Data, [Metadata]
std::vector<TInsertedData> dataToIndex;
dataToIndex.push_back(
- TInsertedData(planStep, txId, pathId, "", blobRange.BlobId, {}, indexSnapshot));
+ TInsertedData(txId, pathId, "", blobRange.BlobId, {}, indexSnapshot));
bool ok = Insert(engine, db, TSnapshot(planStep, txId), std::move(dataToIndex), blobs, step);
UNIT_ASSERT(ok);
@@ -680,7 +685,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
// PlanStep, TxId, PathId, DedupId, BlobId, Data, [Metadata]
std::vector<TInsertedData> dataToIndex;
dataToIndex.push_back(
- TInsertedData(planStep, txId, pathId, "", blobRange.BlobId, {}, indexSnapshot));
+ TInsertedData(txId, pathId, "", blobRange.BlobId, {}, indexSnapshot));
bool ok = Insert(engine, db, TSnapshot(planStep, txId), std::move(dataToIndex), blobs, step);
UNIT_ASSERT(ok);
diff --git a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp
index fa1f3bbac9..acb8596d8e 100644
--- a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp
+++ b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp
@@ -25,7 +25,7 @@ IBlobConstructor::EStatus TIndexedWriteController::TBlobConstructor::BuildNext()
bool TIndexedWriteController::TBlobConstructor::RegisterBlobId(const TUnifiedBlobId& blobId) {
const auto& blobInfo = BlobsSplitted[CurrentIndex - 1];
- Owner.BlobData.emplace_back(blobId, blobInfo.GetData(), blobInfo.GetSpecialKeys(), blobInfo.GetRowsCount(), blobInfo.GetRawBytes(), AppData()->TimeProvider->Now());
+ Owner.BlobData.emplace_back(TBlobRange(blobId, 0, blobId.BlobSize()), blobInfo.GetData(), blobInfo.GetSpecialKeys(), blobInfo.GetRowsCount(), blobInfo.GetRawBytes(), AppData()->TimeProvider->Now());
return true;
}
diff --git a/ydb/core/tx/columnshard/indexing_actor.cpp b/ydb/core/tx/columnshard/indexing_actor.cpp
index c0fbf8dcc8..e1dcded8cb 100644
--- a/ydb/core/tx/columnshard/indexing_actor.cpp
+++ b/ydb/core/tx/columnshard/indexing_actor.cpp
@@ -32,7 +32,6 @@ public:
Y_VERIFY(TxEvent);
auto indexChanges = dynamic_pointer_cast<NOlap::TInsertColumnEngineChanges>(TxEvent->IndexChanges);
Y_VERIFY(indexChanges);
- indexChanges->CachedBlobs = std::move(TxEvent->CachedBlobs);
for (auto& [blobId, ranges] : event.GroupedBlobRanges) {
Y_VERIFY(ranges.size() == 1);
diff --git a/ydb/core/tx/columnshard/inflight_request_tracker.h b/ydb/core/tx/columnshard/inflight_request_tracker.h
index 3ef8ebf6f8..eca4f203d5 100644
--- a/ydb/core/tx/columnshard/inflight_request_tracker.h
+++ b/ydb/core/tx/columnshard/inflight_request_tracker.h
@@ -1,8 +1,7 @@
#pragma once
#include "blob.h"
-#include "engines/reader/read_metadata.h"
-#include "engines/column_engine.h"
+#include <ydb/core/tx/columnshard/engines/reader/read_metadata.h>
namespace NKikimr::NColumnShard {
@@ -59,8 +58,8 @@ public:
}
for (const auto& committedBlob : readMeta->CommittedBlobs) {
- if (blobTracker.SetBlobInUse(committedBlob.GetBlobId(), false)) {
- freedBlobs.emplace(committedBlob.GetBlobId());
+ if (blobTracker.SetBlobInUse(committedBlob.GetBlobRange().GetBlobId(), false)) {
+ freedBlobs.emplace(committedBlob.GetBlobRange().GetBlobId());
}
}
}
@@ -103,7 +102,7 @@ private:
}
for (const auto& committedBlob : readMeta->CommittedBlobs) {
- blobTracker.SetBlobInUse(committedBlob.GetBlobId(), true);
+ blobTracker.SetBlobInUse(committedBlob.GetBlobRange().GetBlobId(), true);
}
}
diff --git a/ydb/core/tx/columnshard/operations/write.cpp b/ydb/core/tx/columnshard/operations/write.cpp
index cab7325042..8a7850081a 100644
--- a/ydb/core/tx/columnshard/operations/write.cpp
+++ b/ydb/core/tx/columnshard/operations/write.cpp
@@ -82,7 +82,7 @@ namespace NKikimr::NColumnShard {
auto allAborted = owner.InsertTable->GetAborted();
for (auto& [abortedWriteId, abortedData] : allAborted) {
owner.InsertTable->EraseAborted(dbTable, abortedData);
- owner.BlobManager->DeleteBlob(abortedData.BlobId, blobManagerDb);
+ owner.BlobManager->DeleteBlob(abortedData.GetBlobRange().GetBlobId(), blobManagerDb);
}
}