aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-10-01 19:48:13 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-10-01 20:03:27 +0300
commita8bdeac6cbeb7b346252304f66ffa226e9047d53 (patch)
tree543586ec773002b033e88475c9224e1021cecf28
parent33c98f7f7bf065218f7bc5ff613552a8404447ce (diff)
downloadydb-a8bdeac6cbeb7b346252304f66ffa226e9047d53.tar.gz
KIKIMR-19505: store blob into TInsertedData for reuse in future in indexation changes
-rw-r--r--ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp18
-rw-r--r--ydb/core/tx/columnshard/blobs_action/transaction/tx_write.h2
-rw-r--r--ydb/core/tx/columnshard/columnshard_schema.cpp8
-rw-r--r--ydb/core/tx/columnshard/engines/insert_table/data.cpp52
-rw-r--r--ydb/core/tx/columnshard/engines/insert_table/data.h53
-rw-r--r--ydb/core/tx/columnshard/engines/ut_insert_table.cpp6
-rw-r--r--ydb/core/tx/columnshard/engines/ut_logs_engine.cpp12
7 files changed, 110 insertions, 41 deletions
diff --git a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp
index 0f25c5b80e4..56b9de2a0f6 100644
--- a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp
+++ b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp
@@ -1,7 +1,7 @@
#include "tx_write.h"
namespace NKikimr::NColumnShard {
-bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const TEvPrivate::TEvWriteBlobsResult::TPutBlobData& blobData, const TWriteId writeId) {
+bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const TEvPrivate::TEvWriteBlobsResult::TPutBlobData& blobData, const TWriteId writeId, const TString& blob) {
const NKikimrTxColumnShard::TLogicalMetadata& meta = blobData.GetLogicalMeta();
const auto& blobRange = blobData.GetBlobRange();
@@ -15,7 +15,7 @@ bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const TEvPrivate::TEvWrit
auto tableSchema = Self->TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetSchemaUnsafe(PutBlobResult->Get()->GetSchemaVersion());
- NOlap::TInsertedData insertData((ui64)writeId, writeMeta.GetTableId(), writeMeta.GetDedupId(), blobRange, meta, tableSchema->GetSnapshot());
+ NOlap::TInsertedData insertData((ui64)writeId, writeMeta.GetTableId(), writeMeta.GetDedupId(), blobRange, meta, tableSchema->GetSnapshot(), blob);
bool ok = Self->InsertTable->Insert(dbTable, std::move(insertData));
if (ok) {
// Put new data into blob cache
@@ -29,15 +29,15 @@ bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const TEvPrivate::TEvWrit
bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
- LOG_S_DEBUG(TxPrefix() << "execute" << TxSuffix());
-
+ NActors::TLogContextGuard logGuard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("tx_state", "execute");
+ ACFL_DEBUG("event", "start_execute");
const auto& writeMeta(PutBlobResult->Get()->GetWriteMeta());
Y_VERIFY(Self->TablesManager.IsReadyForWrite(writeMeta.GetTableId()));
txc.DB.NoMoreReadsForTx();
TWriteOperation::TPtr operation;
if (writeMeta.HasLongTxId()) {
- Y_VERIFY_S(PutBlobResult->Get()->GetBlobData().size() == 1, TStringBuilder() << "Blobs count: " << PutBlobResult->Get()->GetBlobData().size());
+ AFL_VERIFY(PutBlobResult->Get()->GetBlobData().size() == 1)("count", PutBlobResult->Get()->GetBlobData().size());
} else {
operation = Self->OperationsManager.GetOperation((TWriteId)writeMeta.GetWriteId());
Y_VERIFY(operation);
@@ -54,7 +54,7 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
writeId = Self->GetLongTxWrite(db, writeMeta.GetLongTxIdUnsafe(), writeMeta.GetWritePartId());
}
- if (!InsertOneBlob(txc, blobData, writeId)) {
+ if (!InsertOneBlob(txc, blobData, writeId, PutBlobResult->Get()->GetBlobVerified(blobData.GetBlobRange()))) {
LOG_S_DEBUG(TxPrefix() << "duplicate writeId " << (ui64)writeId << TxSuffix());
Self->IncCounter(COUNTER_WRITE_DUPLICATE);
}
@@ -62,6 +62,8 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
}
TBlobManagerDb blobManagerDb(txc.DB);
+ AFL_VERIFY(PutBlobResult->Get()->GetActions().size() == 1);
+ AFL_VERIFY(PutBlobResult->Get()->GetActions().front()->GetBlobsCount() == PutBlobResult->Get()->GetBlobData().size());
for (auto&& i : PutBlobResult->Get()->GetActions()) {
i->OnExecuteTxAfterWrite(*Self, blobManagerDb, true);
}
@@ -80,8 +82,8 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
}
void TTxWrite::Complete(const TActorContext& ctx) {
- Y_VERIFY(Result);
- LOG_S_DEBUG(TxPrefix() << "complete" << TxSuffix());
+ NActors::TLogContextGuard logGuard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("tx_state", "complete");
+ AFL_VERIFY(Result);
Self->CSCounters.OnWriteTxComplete((TMonotonic::Now() - PutBlobResult->Get()->GetWriteMeta().GetWriteStartInstant()).MilliSeconds());
Self->CSCounters.OnSuccessWriteResponse();
ctx.Send(PutBlobResult->Get()->GetWriteMeta().GetSource(), Result.release());
diff --git a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.h b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.h
index 2bb0adaad8a..c0b6688c306 100644
--- a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.h
+++ b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.h
@@ -15,7 +15,7 @@ public:
void Complete(const TActorContext& ctx) override;
TTxType GetTxType() const override { return TXTYPE_WRITE; }
- bool InsertOneBlob(TTransactionContext& txc, const TEvPrivate::TEvWriteBlobsResult::TPutBlobData& blobData, const TWriteId writeId);
+ bool InsertOneBlob(TTransactionContext& txc, const TEvPrivate::TEvWriteBlobsResult::TPutBlobData& blobData, const TWriteId writeId, const TString& blob);
private:
TEvPrivate::TEvWriteBlobsResult::TPtr PutBlobResult;
diff --git a/ydb/core/tx/columnshard/columnshard_schema.cpp b/ydb/core/tx/columnshard/columnshard_schema.cpp
index 251d734fdd6..980b9938ad0 100644
--- a/ydb/core/tx/columnshard/columnshard_schema.cpp
+++ b/ydb/core/tx/columnshard/columnshard_schema.cpp
@@ -27,8 +27,9 @@ bool Schema::IndexColumns_Load(NIceDb::TNiceDb& db, const IBlobGroupSelector* ds
bool Schema::InsertTable_Load(NIceDb::TNiceDb& db, const IBlobGroupSelector* dsGroupSelector, NOlap::TInsertTableAccessor& insertTable, const TInstant& /*loadTime*/) {
auto rowset = db.Table<InsertTable>().GreaterOrEqual(0, 0, 0, 0, "").Select();
- if (!rowset.IsReady())
+ if (!rowset.IsReady()) {
return false;
+ }
while (!rowset.EndOfSet()) {
EInsertTableIds recType = (EInsertTableIds)rowset.GetValue<InsertTable::Committed>();
@@ -52,7 +53,7 @@ bool Schema::InsertTable_Load(NIceDb::TNiceDb& db, const IBlobGroupSelector* dsG
if (metaStr) {
Y_VERIFY(meta.ParseFromString(metaStr));
}
- TInsertedData data(planStep, writeTxId, pathId, dedupId, NOlap::TBlobRange(blobId, 0, blobId.BlobSize()), meta, indexSnapshot);
+ TInsertedData data(planStep, writeTxId, pathId, dedupId, NOlap::TBlobRange(blobId, 0, blobId.BlobSize()), meta, indexSnapshot, {});
switch (recType) {
case EInsertTableIds::Inserted:
@@ -66,8 +67,9 @@ bool Schema::InsertTable_Load(NIceDb::TNiceDb& db, const IBlobGroupSelector* dsG
break;
}
- if (!rowset.Next())
+ if (!rowset.Next()) {
return false;
+ }
}
return true;
}
diff --git a/ydb/core/tx/columnshard/engines/insert_table/data.cpp b/ydb/core/tx/columnshard/engines/insert_table/data.cpp
index 2830fe24c17..18f41acfdc9 100644
--- a/ydb/core/tx/columnshard/engines/insert_table/data.cpp
+++ b/ydb/core/tx/columnshard/engines/insert_table/data.cpp
@@ -1,5 +1,57 @@
#include "data.h"
+#include <library/cpp/actors/core/log.h>
namespace NKikimr::NOlap {
+namespace {
+
+class TInsertTableCacheController {
+private:
+ TAtomicCounter BlobsCacheSize = 0;
+ const i64 BlobsCacheLimit = (i64)1 << 30;
+public:
+ void Return(const ui64 size) {
+ const i64 val = BlobsCacheSize.Sub(size);
+ AFL_VERIFY(val >= 0)("size", size)("val", val);
+ }
+
+ bool Take(const ui64 size) {
+ if (BlobsCacheSize.Add(size) <= BlobsCacheLimit) {
+ return true;
+ }
+ const i64 val = BlobsCacheSize.Sub(size);
+ AFL_VERIFY(val >= 0)("size", size)("val", val);
+ return false;
+ }
+};
+
+}
+
+TInsertedData::TBlobStorageGuard::~TBlobStorageGuard() {
+ Singleton<TInsertTableCacheController>()->Return(Data.size());
+}
+
+TInsertedData::~TInsertedData() {
+}
+
+TInsertedData::TInsertedData(ui64 planStep, ui64 writeTxId, ui64 pathId, TString dedupId, const TBlobRange& blobRange,
+ const NKikimrTxColumnShard::TLogicalMetadata& proto, const TSnapshot& schemaVersion,
+ const std::optional<TString>& blobData /*= {}*/)
+ : Meta(proto)
+ , BlobRange(blobRange)
+ , PlanStep(planStep)
+ , WriteTxId(writeTxId)
+ , PathId(pathId)
+ , DedupId(dedupId)
+ , SchemaVersion(schemaVersion)
+{
+ if (blobData) {
+ AFL_VERIFY(blobData->size() == BlobRange.Size);
+ if (Singleton<TInsertTableCacheController>()->Take(blobData->size())) {
+ BlobDataGuard = std::make_shared<TBlobStorageGuard>(*blobData);
+ }
+ }
+ Y_VERIFY(SchemaVersion.Valid());
+}
+
}
diff --git a/ydb/core/tx/columnshard/engines/insert_table/data.h b/ydb/core/tx/columnshard/engines/insert_table/data.h
index 327d323e063..d891fbca17e 100644
--- a/ydb/core/tx/columnshard/engines/insert_table/data.h
+++ b/ydb/core/tx/columnshard/engines/insert_table/data.h
@@ -10,43 +10,58 @@ struct TInsertedData {
private:
TInsertedDataMeta Meta;
YDB_READONLY_DEF(TBlobRange, BlobRange);
+ class TBlobStorageGuard {
+ private:
+ YDB_READONLY_DEF(TString, Data);
+ public:
+ TBlobStorageGuard(const TString& data)
+ : Data(data)
+ {
-public:
- const TInsertedDataMeta& GetMeta() const {
- return Meta;
- }
+ }
+ ~TBlobStorageGuard();
+ };
+ std::shared_ptr<TBlobStorageGuard> BlobDataGuard;
+public:
ui64 PlanStep = 0;
ui64 WriteTxId = 0;
ui64 PathId = 0;
TString DedupId;
+private:
+ TSnapshot SchemaVersion = TSnapshot::Zero();
+public:
+ std::optional<TString> GetBlobData() const {
+ if (BlobDataGuard) {
+ return BlobDataGuard->GetData();
+ } else {
+ return {};
+ }
+ }
+
+ const TInsertedDataMeta& GetMeta() const {
+ return Meta;
+ }
TInsertedData() = delete; // avoid invalid TInsertedData anywhere
TInsertedData(ui64 planStep, ui64 writeTxId, ui64 pathId, TString dedupId, const TBlobRange& blobRange,
- const NKikimrTxColumnShard::TLogicalMetadata& proto, const TSnapshot& schemaVersion)
- : Meta(proto)
- , BlobRange(blobRange)
- , PlanStep(planStep)
- , WriteTxId(writeTxId)
- , PathId(pathId)
- , DedupId(dedupId)
- , SchemaVersion(schemaVersion) {
- Y_VERIFY(SchemaVersion.Valid());
- }
+ const NKikimrTxColumnShard::TLogicalMetadata& proto, const TSnapshot& schemaVersion, const std::optional<TString>& blobData);
TInsertedData(ui64 writeTxId, ui64 pathId, TString dedupId, const TBlobRange& blobRange,
- const NKikimrTxColumnShard::TLogicalMetadata& proto, const TSnapshot& schemaVersion)
- : TInsertedData(0, writeTxId, pathId, dedupId, blobRange, proto, schemaVersion)
+ const NKikimrTxColumnShard::TLogicalMetadata& proto, const TSnapshot& schemaVersion, const std::optional<TString>& blobData)
+ : TInsertedData(0, writeTxId, pathId, dedupId, blobRange, proto, schemaVersion, blobData)
{}
TInsertedData(ui64 writeTxId, ui64 pathId, TString dedupId, const TUnifiedBlobId& blobId,
- const NKikimrTxColumnShard::TLogicalMetadata& proto, const TSnapshot& schemaVersion)
- : TInsertedData(0, writeTxId, pathId, dedupId, TBlobRange(blobId, 0, blobId.BlobSize()), proto, schemaVersion)
+ const NKikimrTxColumnShard::TLogicalMetadata& proto, const TSnapshot& schemaVersion, const std::optional<TString>& blobData)
+ : TInsertedData(0, writeTxId, pathId, dedupId, TBlobRange(blobId, 0, blobId.BlobSize()), proto, schemaVersion, blobData)
{
}
+ ~TInsertedData();
+
bool operator < (const TInsertedData& key) const {
if (PlanStep < key.PlanStep) {
return true;
@@ -109,8 +124,6 @@ public:
ui32 BlobSize() const { return BlobRange.GetBlobSize(); }
-private:
- TSnapshot SchemaVersion = TSnapshot::Zero();
};
class TCommittedBlob {
diff --git a/ydb/core/tx/columnshard/engines/ut_insert_table.cpp b/ydb/core/tx/columnshard/engines/ut_insert_table.cpp
index e0dc9cf4e2c..b14b5b6a18a 100644
--- a/ydb/core/tx/columnshard/engines/ut_insert_table.cpp
+++ b/ydb/core/tx/columnshard/engines/ut_insert_table.cpp
@@ -54,16 +54,16 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestInsertTable) {
TSnapshot indexSnapshot(1, 1);
// insert, not commited
- bool ok = insertTable.Insert(dbTable, TInsertedData(writeId, tableId, dedupId, blobId1, {}, indexSnapshot));
+ bool ok = insertTable.Insert(dbTable, TInsertedData(writeId, tableId, dedupId, blobId1, {}, indexSnapshot, {}));
UNIT_ASSERT(ok);
// insert the same blobId1 again
- ok = insertTable.Insert(dbTable, TInsertedData(writeId, tableId, dedupId, blobId1, {}, indexSnapshot));
+ ok = insertTable.Insert(dbTable, TInsertedData(writeId, tableId, dedupId, blobId1, {}, indexSnapshot, {}));
UNIT_ASSERT(!ok);
// insert different blodId with the same writeId and dedupId
TUnifiedBlobId blobId2(2222, 1, 2, 100, 1);
- ok = insertTable.Insert(dbTable, TInsertedData(writeId, tableId, dedupId, blobId2, {}, indexSnapshot));
+ ok = insertTable.Insert(dbTable, TInsertedData(writeId, tableId, dedupId, blobId2, {}, indexSnapshot, {}));
UNIT_ASSERT(!ok);
// read nothing
diff --git a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
index 07d2cdfe9f4..ada2883e5f0 100644
--- a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
+++ b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
@@ -442,8 +442,8 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
engine.Load(db, lostBlobs);
std::vector<TInsertedData> dataToIndex = {
- TInsertedData(2, paths[0], "", blobRanges[0].BlobId, {}, indexSnaphot),
- TInsertedData(1, paths[0], "", blobRanges[1].BlobId, {}, indexSnaphot)
+ TInsertedData(2, paths[0], "", blobRanges[0].BlobId, {}, indexSnaphot, {}),
+ TInsertedData(1, paths[0], "", blobRanges[1].BlobId, {}, indexSnaphot, {})
};
// write
@@ -538,7 +538,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
// PlanStep, TxId, PathId, DedupId, BlobId, Data, [Metadata]
std::vector<TInsertedData> dataToIndex;
dataToIndex.push_back(
- TInsertedData{txId, pathId, "", blobRange.BlobId, {}, indexSnapshot});
+ TInsertedData(txId, pathId, "", blobRange.BlobId, {}, indexSnapshot, {}));
bool ok = Insert(engine, db, TSnapshot(planStep, txId), std::move(dataToIndex), blobs, step);
UNIT_ASSERT(ok);
@@ -634,7 +634,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
// PlanStep, TxId, PathId, DedupId, BlobId, Data, [Metadata]
std::vector<TInsertedData> dataToIndex;
dataToIndex.push_back(
- TInsertedData{txId, pathId, "", blobRange.BlobId, {}, indexSnapshot});
+ TInsertedData(txId, pathId, "", blobRange.BlobId, {}, indexSnapshot, {}));
bool ok = Insert(engine, db, TSnapshot(planStep, txId), std::move(dataToIndex), blobs, step);
UNIT_ASSERT(ok);
@@ -663,7 +663,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
// PlanStep, TxId, PathId, DedupId, BlobId, Data, [Metadata]
std::vector<TInsertedData> dataToIndex;
dataToIndex.push_back(
- TInsertedData(txId, pathId, "", blobRange.BlobId, {}, indexSnapshot));
+ TInsertedData(txId, pathId, "", blobRange.BlobId, {}, indexSnapshot, {}));
bool ok = Insert(engine, db, TSnapshot(planStep, txId), std::move(dataToIndex), blobs, step);
UNIT_ASSERT(ok);
@@ -703,7 +703,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
// PlanStep, TxId, PathId, DedupId, BlobId, Data, [Metadata]
std::vector<TInsertedData> dataToIndex;
dataToIndex.push_back(
- TInsertedData(txId, pathId, "", blobRange.BlobId, {}, indexSnapshot));
+ TInsertedData(txId, pathId, "", blobRange.BlobId, {}, indexSnapshot, {}));
bool ok = Insert(engine, db, TSnapshot(planStep, txId), std::move(dataToIndex), blobs, step);
UNIT_ASSERT(ok);