diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-08-31 19:15:10 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-08-31 19:47:25 +0300 |
commit | 2c70a8f92a317edc09a0b56352129057d1f5ea90 (patch) | |
tree | 597eb3c2a9707d4fbf3a00b689fe01f6eb75483a | |
parent | 0ea09be71af2c4edddf36d4bb55e743a96eebf28 (diff) | |
download | ydb-2c70a8f92a317edc09a0b56352129057d1f5ea90.tar.gz |
KIKIMR-19218: insert table chunks with min/max
27 files changed, 305 insertions, 135 deletions
diff --git a/ydb/core/protos/tx_columnshard.proto b/ydb/core/protos/tx_columnshard.proto index 3e6db3dd209..f9677baaf11 100644 --- a/ydb/core/protos/tx_columnshard.proto +++ b/ydb/core/protos/tx_columnshard.proto @@ -48,10 +48,9 @@ message TReadStats { message TLogicalMetadata { optional uint32 NumRows = 1; - repeated NKikimrSSA.TProgram.TConstant FirstPkValue = 2; // It's min PK if batch is sorted - repeated NKikimrSSA.TProgram.TConstant LastPkValue = 3; // It's max PK if batch is sorted optional uint64 RawBytes = 4; optional uint64 DirtyWriteTimeSeconds = 5; + optional string SpecialKeysRawData = 6; } message TMetadata { diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp index 0f435cab595..247f05a5fee 100644 --- a/ydb/core/tx/columnshard/columnshard__write.cpp +++ b/ydb/core/tx/columnshard/columnshard__write.cpp @@ -58,7 +58,7 @@ bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const TEvPrivate::TEvWrit auto tableSchema = Self->TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetSchemaUnsafe(PutBlobResult->Get()->GetSchemaVersion()); - NOlap::TInsertedData insertData((ui64)writeId, writeMeta.GetTableId(), writeMeta.GetDedupId(), logoBlobId, meta, time, tableSchema->GetSnapshot()); + NOlap::TInsertedData insertData(0, (ui64)writeId, writeMeta.GetTableId(), writeMeta.GetDedupId(), logoBlobId, meta, tableSchema->GetSnapshot()); bool ok = Self->InsertTable->Insert(dbTable, std::move(insertData)); if (ok) { THashSet<TWriteId> writesToAbort = Self->InsertTable->OldWritesToAbort(time); @@ -112,7 +112,7 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) { } TVector<TWriteId> writeIds; - for (auto blobData: PutBlobResult->Get()->GetBlobData()) { + for (auto blobData : PutBlobResult->Get()->GetBlobData()) { auto writeId = TWriteId(writeMeta.GetWriteId()); if (operation) { writeId = Self->BuildNextWriteId(txc); @@ -144,10 +144,12 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) { void TTxWrite::Complete(const TActorContext& ctx) { Y_VERIFY(Result); LOG_S_DEBUG(TxPrefix() << "complete" << TxSuffix()); + Self->CSCounters.OnWriteTxComplete((TMonotonic::Now() - PutBlobResult->Get()->GetWriteMeta().GetWriteStartInstant()).MilliSeconds()); + Self->CSCounters.OnSuccessWriteResponse(); ctx.Send(PutBlobResult->Get()->GetWriteMeta().GetSource(), Result.release()); } -void TColumnShard::OverloadWriteFail(const EOverloadStatus& overloadReason, const NEvWrite::TWriteData& writeData, std::unique_ptr<NActors::IEventBase>&& event, const TActorContext& ctx) { +void TColumnShard::OverloadWriteFail(const EOverloadStatus overloadReason, const NEvWrite::TWriteData& writeData, std::unique_ptr<NActors::IEventBase>&& event, const TActorContext& ctx) { IncCounter(COUNTER_WRITE_FAIL); switch (overloadReason) { case EOverloadStatus::Disk: @@ -203,6 +205,7 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteBlobsResult::TPtr& ev, const TActo auto wg = WritesMonitor.FinishWrite(putResult.GetResourceUsage().SourceMemorySize); if (putResult.GetPutStatus() != NKikimrProto::OK) { + CSCounters.OnWritePutBlobsFail((TMonotonic::Now() - writeMeta.GetWriteStartInstant()).MilliSeconds()); IncCounter(COUNTER_WRITE_FAIL); auto errCode = NKikimrTxColumnShard::EResultStatus::STORAGE_ERROR; @@ -217,29 +220,33 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteBlobsResult::TPtr& ev, const TActo if (writeMeta.HasLongTxId()) { auto result = std::make_unique<TEvColumnShard::TEvWriteResult>(TabletID(), writeMeta, errCode); ctx.Send(writeMeta.GetSource(), result.release()); + CSCounters.OnFailedWriteResponse(); } else { auto operation = OperationsManager.GetOperation((TWriteId)writeMeta.GetWriteId()); Y_VERIFY(operation); auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(operation->GetTxId(), NKikimrDataEvents::TEvWriteResult::ERROR, "put data fails"); ctx.Send(writeMeta.GetSource(), result.release()); + CSCounters.OnFailedWriteResponse(); } - return; - } + } else { + CSCounters.OnWritePutBlobsSuccess((TMonotonic::Now() - writeMeta.GetWriteStartInstant()).MilliSeconds()); + LOG_S_DEBUG("Write (record) into pathId " << writeMeta.GetTableId() + << (writeMeta.GetWriteId() ? (" writeId " + ToString(writeMeta.GetWriteId())).c_str() : "") << " at tablet " << TabletID()); - LOG_S_DEBUG("Write (record) into pathId " << writeMeta.GetTableId() - << (writeMeta.GetWriteId() ? (" writeId " + ToString(writeMeta.GetWriteId())).c_str() : "") << " at tablet " << TabletID()); + Execute(new TTxWrite(this, ev), ctx); + } - Execute(new TTxWrite(this, ev), ctx); } void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContext& ctx) { + CSCounters.OnStartWriteRequest(); LastAccessTime = TAppData::TimeProvider->Now(); const auto& record = Proto(ev->Get()); const ui64 tableId = record.GetTableId(); const ui64 writeId = record.GetWriteId(); const TString dedupId = record.GetDedupId(); - const auto source = ev->Get()->GetSource(); + const auto source = ev->Sender; NEvWrite::TWriteMeta writeMeta(writeId, tableId, source); writeMeta.SetDedupId(dedupId); @@ -253,7 +260,8 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex IncCounter(COUNTER_WRITE_FAIL); auto result = std::make_unique<TEvColumnShard::TEvWriteResult>(TabletID(), writeMeta, NKikimrTxColumnShard::EResultStatus::ERROR); - ctx.Send(ev->Get()->GetSource(), result.release()); + ctx.Send(source, result.release()); + CSCounters.OnFailedWriteResponse(); return; } @@ -264,7 +272,8 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex << " at tablet " << TabletID()); IncCounter(COUNTER_WRITE_FAIL); auto result = std::make_unique<TEvColumnShard::TEvWriteResult>(TabletID(), writeMeta, NKikimrTxColumnShard::EResultStatus::ERROR); - ctx.Send(ev->Get()->GetSource(), result.release()); + ctx.Send(source, result.release()); + CSCounters.OnFailedWriteResponse(); return; } @@ -273,6 +282,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex if (overloadStatus != EOverloadStatus::None) { std::unique_ptr<NActors::IEventBase> result = std::make_unique<TEvColumnShard::TEvWriteResult>(TabletID(), writeData.GetWriteMeta(), NKikimrTxColumnShard::EResultStatus::OVERLOADED); OverloadWriteFail(overloadStatus, writeData, std::move(result), ctx); + CSCounters.OnFailedWriteResponse(); } else { if (ui64 writeId = (ui64) HasLongTxWrite(writeMeta.GetLongTxIdUnsafe(), writeMeta.GetWritePartId())) { LOG_S_DEBUG("Write (duplicate) into pathId " << writeMeta.GetTableId() @@ -284,6 +294,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex auto result = std::make_unique<TEvColumnShard::TEvWriteResult>( TabletID(), writeMeta, writeId, NKikimrTxColumnShard::EResultStatus::SUCCESS); ctx.Send(writeMeta.GetSource(), result.release()); + CSCounters.OnFailedWriteResponse(); return; } @@ -295,6 +306,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex << " at tablet " << TabletID()); auto writeController = std::make_shared<NOlap::TIndexedWriteController>(ctx.SelfID, writeData); + CSCounters.OnWritePutBlobsStart(); ctx.Register(CreateWriteActor(TabletID(), writeController, BlobManager->StartBlobBatch(), TInstant::Max(), Settings.MaxSmallBlobSize)); } } diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h index 60c962bb95c..4c5575cb355 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.h +++ b/ydb/core/tx/columnshard/columnshard_impl.h @@ -12,6 +12,7 @@ #include "inflight_request_tracker.h" #include "counters/columnshard.h" +#include <ydb/core/base/tablet_pipecache.h> #include <ydb/core/tablet/tablet_counters.h> #include <ydb/core/tablet/tablet_pipe_client_cache.h> #include <ydb/core/tablet_flat/flat_cxx_database.h> @@ -212,7 +213,7 @@ public: }; private: - void OverloadWriteFail(const EOverloadStatus& overloadReason, const NEvWrite::TWriteData& writeData, std::unique_ptr<NActors::IEventBase>&& event, const TActorContext& ctx); + void OverloadWriteFail(const EOverloadStatus overloadReason, const NEvWrite::TWriteData& writeData, std::unique_ptr<NActors::IEventBase>&& event, const TActorContext& ctx); EOverloadStatus CheckOverloaded(const ui64 tableId) const; protected: diff --git a/ydb/core/tx/columnshard/columnshard_private_events.h b/ydb/core/tx/columnshard/columnshard_private_events.h index 4e6ea811193..c5601a38835 100644 --- a/ydb/core/tx/columnshard/columnshard_private_events.h +++ b/ydb/core/tx/columnshard/columnshard_private_events.h @@ -6,6 +6,7 @@ #include <ydb/core/protos/counters_columnshard.pb.h> #include <ydb/core/tx/columnshard/engines/writer/write_controller.h> #include <ydb/core/tx/ev_write/write_data.h> +#include <ydb/core/formats/arrow/special_keys.h> namespace NKikimr::NColumnShard { @@ -259,7 +260,7 @@ struct TEvPrivate { public: TPutBlobData() = default; - TPutBlobData(const TUnifiedBlobId& blobId, const TString& data, ui64 rowsCount, ui64 rawBytes, const TInstant dirtyTime) + TPutBlobData(const TUnifiedBlobId& blobId, const TString& data, const NArrow::TFirstLastSpecialKeys& specialKeys, ui64 rowsCount, ui64 rawBytes, const TInstant dirtyTime) : BlobId(blobId) , BlobData(data) , RowsCount(rowsCount) @@ -268,6 +269,7 @@ struct TEvPrivate { LogicalMeta.SetNumRows(rowsCount); LogicalMeta.SetRawBytes(rawBytes); LogicalMeta.SetDirtyWriteTimeSeconds(dirtyTime.Seconds()); + LogicalMeta.SetSpecialKeysRawData(specialKeys.SerializeToString()); } }; diff --git a/ydb/core/tx/columnshard/columnshard_schema.cpp b/ydb/core/tx/columnshard/columnshard_schema.cpp index 3ec637e7862..07e4c60c4af 100644 --- a/ydb/core/tx/columnshard/columnshard_schema.cpp +++ b/ydb/core/tx/columnshard/columnshard_schema.cpp @@ -25,4 +25,51 @@ bool Schema::IndexColumns_Load(NIceDb::TNiceDb& db, const IBlobGroupSelector* ds return true; } +bool Schema::InsertTable_Load(NIceDb::TNiceDb& db, const IBlobGroupSelector* dsGroupSelector, NOlap::TInsertTableAccessor& insertTable, const TInstant& /*loadTime*/) { + auto rowset = db.Table<InsertTable>().GreaterOrEqual(0, 0, 0, 0, "").Select(); + if (!rowset.IsReady()) + return false; + + while (!rowset.EndOfSet()) { + EInsertTableIds recType = (EInsertTableIds)rowset.GetValue<InsertTable::Committed>(); + ui64 shardOrPlan = rowset.GetValue<InsertTable::ShardOrPlan>(); + ui64 writeTxId = rowset.GetValueOrDefault<InsertTable::WriteTxId>(); + ui64 pathId = rowset.GetValue<InsertTable::PathId>(); + TString dedupId = rowset.GetValue<InsertTable::DedupId>(); + TString strBlobId = rowset.GetValue<InsertTable::BlobId>(); + TString metaStr = rowset.GetValue<InsertTable::Meta>(); + + Y_VERIFY(rowset.HaveValue<InsertTable::IndexPlanStep>()); + ui64 indexPlanStep = rowset.GetValue<InsertTable::IndexPlanStep>(); + ui64 indexTxId = rowset.GetValue<InsertTable::IndexTxId>(); + const NOlap::TSnapshot indexSnapshot(indexPlanStep, indexTxId); + + TString error; + NOlap::TUnifiedBlobId blobId = NOlap::TUnifiedBlobId::ParseFromString(strBlobId, dsGroupSelector, error); + Y_VERIFY(blobId.IsValid(), "Failied to parse blob id: %s", error.c_str()); + + NKikimrTxColumnShard::TLogicalMetadata meta; + if (metaStr) { + Y_VERIFY(meta.ParseFromString(metaStr)); + } + TInsertedData data(shardOrPlan, writeTxId, pathId, dedupId, blobId, meta, indexSnapshot); + + switch (recType) { + case EInsertTableIds::Inserted: + insertTable.AddInserted(std::move(data), true); + break; + case EInsertTableIds::Committed: + insertTable.AddCommitted(std::move(data), true); + break; + case EInsertTableIds::Aborted: + insertTable.AddAborted(std::move(data), true); + break; + } + + if (!rowset.Next()) + return false; + } + return true; +} + } diff --git a/ydb/core/tx/columnshard/columnshard_schema.h b/ydb/core/tx/columnshard/columnshard_schema.h index d4e734b7270..47017374ddc 100644 --- a/ydb/core/tx/columnshard/columnshard_schema.h +++ b/ydb/core/tx/columnshard/columnshard_schema.h @@ -438,19 +438,13 @@ struct Schema : NIceDb::Schema { // InsertTable activities static void InsertTable_Upsert(NIceDb::TNiceDb& db, EInsertTableIds recType, const TInsertedData& data) { - if (data.GetSchemaSnapshot().Valid()) { - db.Table<InsertTable>().Key((ui8)recType, data.ShardOrPlan, data.WriteTxId, data.PathId, data.DedupId).Update( - NIceDb::TUpdate<InsertTable::BlobId>(data.BlobId.ToStringLegacy()), - NIceDb::TUpdate<InsertTable::Meta>(data.Metadata), - NIceDb::TUpdate<InsertTable::IndexPlanStep>(data.GetSchemaSnapshot().GetPlanStep()), - NIceDb::TUpdate<InsertTable::IndexTxId>(data.GetSchemaSnapshot().GetTxId()) - ); - } else { - db.Table<InsertTable>().Key((ui8)recType, data.ShardOrPlan, data.WriteTxId, data.PathId, data.DedupId).Update( - NIceDb::TUpdate<InsertTable::BlobId>(data.BlobId.ToStringLegacy()), - NIceDb::TUpdate<InsertTable::Meta>(data.Metadata) - ); - } + Y_VERIFY(data.GetSchemaSnapshot().Valid()); + db.Table<InsertTable>().Key((ui8)recType, data.ShardOrPlan, data.WriteTxId, data.PathId, data.DedupId).Update( + NIceDb::TUpdate<InsertTable::BlobId>(data.BlobId.ToStringLegacy()), + NIceDb::TUpdate<InsertTable::Meta>(data.GetMeta().SerializeToProto().SerializeAsString()), + NIceDb::TUpdate<InsertTable::IndexPlanStep>(data.GetSchemaSnapshot().GetPlanStep()), + NIceDb::TUpdate<InsertTable::IndexTxId>(data.GetSchemaSnapshot().GetTxId()) + ); } static void InsertTable_Erase(NIceDb::TNiceDb& db, EInsertTableIds recType, const TInsertedData& data) { @@ -484,56 +478,7 @@ struct Schema : NIceDb::Schema { static bool InsertTable_Load(NIceDb::TNiceDb& db, const IBlobGroupSelector* dsGroupSelector, NOlap::TInsertTableAccessor& insertTable, - const TInstant& loadTime) { - auto rowset = db.Table<InsertTable>().GreaterOrEqual(0, 0, 0, 0, "").Select(); - if (!rowset.IsReady()) - return false; - - while (!rowset.EndOfSet()) { - EInsertTableIds recType = (EInsertTableIds)rowset.GetValue<InsertTable::Committed>(); - ui64 shardOrPlan = rowset.GetValue<InsertTable::ShardOrPlan>(); - ui64 writeTxId = rowset.GetValueOrDefault<InsertTable::WriteTxId>(); - ui64 pathId = rowset.GetValue<InsertTable::PathId>(); - TString dedupId = rowset.GetValue<InsertTable::DedupId>(); - TString strBlobId = rowset.GetValue<InsertTable::BlobId>(); - TString metaStr = rowset.GetValue<InsertTable::Meta>(); - - std::optional<NOlap::TSnapshot> indexSnapshot; - if (rowset.HaveValue<InsertTable::IndexPlanStep>()) { - ui64 indexPlanStep = rowset.GetValue<InsertTable::IndexPlanStep>(); - ui64 indexTxId = rowset.GetValue<InsertTable::IndexTxId>(); - indexSnapshot = NOlap::TSnapshot(indexPlanStep, indexTxId); - } - - TString error; - NOlap::TUnifiedBlobId blobId = NOlap::TUnifiedBlobId::ParseFromString(strBlobId, dsGroupSelector, error); - Y_VERIFY(blobId.IsValid(), "Failied to parse blob id: %s", error.c_str()); - - TInstant writeTime = loadTime; - NKikimrTxColumnShard::TLogicalMetadata meta; - if (meta.ParseFromString(metaStr) && meta.HasDirtyWriteTimeSeconds()) { - writeTime = TInstant::Seconds(meta.GetDirtyWriteTimeSeconds()); - } - - TInsertedData data(shardOrPlan, writeTxId, pathId, dedupId, blobId, metaStr, writeTime, indexSnapshot); - - switch (recType) { - case EInsertTableIds::Inserted: - insertTable.AddInserted(std::move(data), true); - break; - case EInsertTableIds::Committed: - insertTable.AddCommitted(std::move(data), true); - break; - case EInsertTableIds::Aborted: - insertTable.AddAborted(std::move(data), true); - break; - } - - if (!rowset.Next()) - return false; - } - return true; - } + const TInstant& loadTime); // IndexGranules activities diff --git a/ydb/core/tx/columnshard/counters/columnshard.cpp b/ydb/core/tx/columnshard/counters/columnshard.cpp index a3997488565..372e5a46f72 100644 --- a/ydb/core/tx/columnshard/counters/columnshard.cpp +++ b/ydb/core/tx/columnshard/counters/columnshard.cpp @@ -34,6 +34,14 @@ TCSCounters::TCSCounters() SplitCompactionGranuleBytes = TBase::GetValueAutoAggregationsClient("SplitCompaction/Bytes"); SplitCompactionGranulePortionsCount = TBase::GetValueAutoAggregationsClient("SplitCompaction/PortionsCount"); + + HistogramSuccessWritePutBlobsDurationMs = TBase::GetHistogram("SuccessWritePutBlobsDurationMs", NMonitoring::ExponentialHistogram(18, 2, 5)); + HistogramFailedWritePutBlobsDurationMs = TBase::GetHistogram("FailedWritePutBlobsDurationMs", NMonitoring::ExponentialHistogram(18, 2, 5)); + HistogramWriteTxCompleteDurationMs = TBase::GetHistogram("WriteTxCompleteDurationMs", NMonitoring::ExponentialHistogram(18, 2, 5)); + WritePutBlobsCount = TBase::GetValue("WritePutBlobs"); + WriteRequests = TBase::GetValue("WriteRequests"); + FailedWriteRequests = TBase::GetDeriviative("FailedWriteRequests"); + SuccessWriteRequests = TBase::GetDeriviative("SuccessWriteRequests"); } } diff --git a/ydb/core/tx/columnshard/counters/columnshard.h b/ydb/core/tx/columnshard/counters/columnshard.h index dea6ad256e9..546d45aaa6e 100644 --- a/ydb/core/tx/columnshard/counters/columnshard.h +++ b/ydb/core/tx/columnshard/counters/columnshard.h @@ -34,7 +34,47 @@ private: std::shared_ptr<TValueAggregationClient> SplitCompactionGranuleBytes; std::shared_ptr<TValueAggregationClient> SplitCompactionGranulePortionsCount; + + NMonitoring::THistogramPtr HistogramSuccessWritePutBlobsDurationMs; + NMonitoring::THistogramPtr HistogramFailedWritePutBlobsDurationMs; + NMonitoring::THistogramPtr HistogramWriteTxCompleteDurationMs; + NMonitoring::TDynamicCounters::TCounterPtr WritePutBlobsCount; + NMonitoring::TDynamicCounters::TCounterPtr WriteRequests; + NMonitoring::TDynamicCounters::TCounterPtr FailedWriteRequests; + NMonitoring::TDynamicCounters::TCounterPtr SuccessWriteRequests; public: + void OnStartWriteRequest() const { + WriteRequests->Add(1); + } + + void OnFailedWriteResponse() const { + WriteRequests->Sub(1); + FailedWriteRequests->Add(1); + } + + void OnSuccessWriteResponse() const { + WriteRequests->Sub(1); + SuccessWriteRequests->Add(1); + } + + void OnWritePutBlobsSuccess(const ui32 milliseconds) const { + HistogramSuccessWritePutBlobsDurationMs->Collect(milliseconds); + WritePutBlobsCount->Sub(1); + } + + void OnWritePutBlobsFail(const ui32 milliseconds) const { + HistogramFailedWritePutBlobsDurationMs->Collect(milliseconds); + WritePutBlobsCount->Sub(1); + } + + void OnWritePutBlobsStart() const { + WritePutBlobsCount->Add(1); + } + + void OnWriteTxComplete(const ui32 milliseconds) const { + HistogramWriteTxCompleteDurationMs->Collect(milliseconds); + } + void OnInternalCompactionInfo(const ui64 bytes, const ui32 portionsCount) const { InternalCompactionGranuleBytes->SetValue(bytes); InternalCompactionGranulePortionsCount->SetValue(portionsCount); diff --git a/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.darwin-x86_64.txt index f8834fe594a..3524a65489c 100644 --- a/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.darwin-x86_64.txt @@ -21,4 +21,5 @@ target_sources(columnshard-engines-insert_table PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/data.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/path_info.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/meta.cpp ) diff --git a/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.linux-aarch64.txt index 7762ca111a9..d7120e82291 100644 --- a/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.linux-aarch64.txt @@ -22,4 +22,5 @@ target_sources(columnshard-engines-insert_table PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/data.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/path_info.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/meta.cpp ) diff --git a/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.linux-x86_64.txt index 7762ca111a9..d7120e82291 100644 --- a/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.linux-x86_64.txt @@ -22,4 +22,5 @@ target_sources(columnshard-engines-insert_table PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/data.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/path_info.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/meta.cpp ) diff --git a/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.windows-x86_64.txt index f8834fe594a..3524a65489c 100644 --- a/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.windows-x86_64.txt @@ -21,4 +21,5 @@ target_sources(columnshard-engines-insert_table PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/data.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/path_info.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/meta.cpp ) diff --git a/ydb/core/tx/columnshard/engines/insert_table/data.h b/ydb/core/tx/columnshard/engines/insert_table/data.h index a02cd61fb51..596dda3b174 100644 --- a/ydb/core/tx/columnshard/engines/insert_table/data.h +++ b/ydb/core/tx/columnshard/engines/insert_table/data.h @@ -1,4 +1,5 @@ #pragma once +#include "meta.h" #include <ydb/core/tx/columnshard/blob.h> #include <ydb/core/tx/columnshard/engines/defs.h> #include <ydb/core/protos/tx_columnshard.pb.h> @@ -6,42 +7,45 @@ namespace NKikimr::NOlap { struct TInsertedData { +private: + TInsertedDataMeta Meta; public: + + const TInsertedDataMeta& GetMeta() const { + return Meta; + } + ui64 ShardOrPlan = 0; ui64 WriteTxId = 0; ui64 PathId = 0; TString DedupId; TUnifiedBlobId BlobId; - TString Metadata; - TInstant DirtyTime; TInsertedData() = delete; // avoid invalid TInsertedData anywhere - TInsertedData(ui64 writeTxId, ui64 pathId, TString dedupId, const TUnifiedBlobId& blobId, - const NKikimrTxColumnShard::TLogicalMetadata& meta, const TInstant& writeTime, const TSnapshot& schemaVersion) - : WriteTxId(writeTxId) + TInsertedData(ui64 shardOrPlan, ui64 writeTxId, ui64 pathId, TString dedupId, const TUnifiedBlobId& blobId, + const ui32 numRows, const ui64 rawBytes, const std::shared_ptr<arrow::RecordBatch>& batch, const std::vector<TString>& columnNames, const TInstant writeTime, const TSnapshot& schemaVersion) + : Meta(writeTime, numRows, rawBytes, batch, columnNames) + , ShardOrPlan(shardOrPlan) + , WriteTxId(writeTxId) , PathId(pathId) , DedupId(dedupId) , BlobId(blobId) - , DirtyTime(writeTime) , SchemaVersion(schemaVersion) { - Y_VERIFY(meta.SerializeToString(&Metadata)); + Y_VERIFY(SchemaVersion.Valid()); } TInsertedData(ui64 shardOrPlan, ui64 writeTxId, ui64 pathId, TString dedupId, const TUnifiedBlobId& blobId, - const TString& meta, const TInstant& writeTime, const std::optional<TSnapshot>& schemaVersion) - : ShardOrPlan(shardOrPlan) + const NKikimrTxColumnShard::TLogicalMetadata& proto, const TSnapshot& schemaVersion) + : Meta(proto) + , ShardOrPlan(shardOrPlan) , WriteTxId(writeTxId) , PathId(pathId) , DedupId(dedupId) , BlobId(blobId) - , Metadata(meta) - , DirtyTime(writeTime) { - if (schemaVersion) { - SchemaVersion = *schemaVersion; - Y_VERIFY(SchemaVersion.Valid()); - } + , SchemaVersion(schemaVersion) { + Y_VERIFY(SchemaVersion.Valid()); } bool operator < (const TInsertedData& key) const { @@ -115,17 +119,27 @@ private: TUnifiedBlobId BlobId; TSnapshot CommitSnapshot; TSnapshot SchemaSnapshot; + YDB_READONLY_DEF(std::optional<NArrow::TReplaceKey>, First); + YDB_READONLY_DEF(std::optional<NArrow::TReplaceKey>, Last); public: - TCommittedBlob(const TUnifiedBlobId& blobId, const TSnapshot& snapshot, const TSnapshot& schemaSnapshot) + const NArrow::TReplaceKey& GetFirstVerified() const { + Y_VERIFY(First); + return *First; + } + + const NArrow::TReplaceKey& GetLastVerified() const { + Y_VERIFY(Last); + return *Last; + } + + TCommittedBlob(const TUnifiedBlobId& blobId, const TSnapshot& snapshot, const TSnapshot& schemaSnapshot, const std::optional<NArrow::TReplaceKey>& first, const std::optional<NArrow::TReplaceKey>& last) : BlobId(blobId) , CommitSnapshot(snapshot) , SchemaSnapshot(schemaSnapshot) + , First(first) + , Last(last) {} - static TCommittedBlob BuildKeyBlob(const TUnifiedBlobId& blobId) { - return TCommittedBlob(blobId, TSnapshot::Zero(), TSnapshot::Zero()); - } - /// It uses trick then we place key wtih planStep:txId in container and find them later by BlobId only. /// So hash() and equality should depend on BlobId only. bool operator == (const TCommittedBlob& key) const { return BlobId == key.BlobId; } diff --git a/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp b/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp index a7b0402e40c..1988620a5bc 100644 --- a/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp +++ b/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp @@ -24,11 +24,8 @@ TInsertionSummary::TCounters TInsertTable::Commit(IDbWrapper& dbTable, ui64 plan std::optional<TInsertedData> data = Summary.ExtractInserted(writeId); Y_VERIFY(data, "Commit %" PRIu64 ":%" PRIu64 " : writeId %" PRIu64 " not found", planStep, txId, (ui64)writeId); - NKikimrTxColumnShard::TLogicalMetadata meta; - if (meta.ParseFromString(data->Metadata)) { - counters.Rows += meta.GetNumRows(); - counters.RawBytes += meta.GetRawBytes(); - } + counters.Rows += data->GetMeta().GetNumRows(); + counters.RawBytes += data->GetMeta().GetRawBytes(); counters.Bytes += data->BlobSize(); dbTable.EraseInserted(*data); @@ -108,22 +105,32 @@ bool TInsertTable::Load(IDbWrapper& dbTable, const TInstant loadTime) { return dbTable.Load(*this, loadTime); } -std::vector<TCommittedBlob> TInsertTable::Read(ui64 pathId, const TSnapshot& snapshot) const { +std::vector<TCommittedBlob> TInsertTable::Read(ui64 pathId, const TSnapshot& snapshot, const std::shared_ptr<arrow::Schema>& pkSchema) const { const TPathInfo* pInfo = Summary.GetPathInfoOptional(pathId); if (!pInfo) { return {}; } - std::vector<TCommittedBlob> ret; + std::vector<const TInsertedData*> ret; ret.reserve(pInfo->GetCommitted().size()); for (const auto& data : pInfo->GetCommitted()) { if (std::less_equal<TSnapshot>()(data.GetSnapshot(), snapshot)) { - ret.emplace_back(TCommittedBlob(data.BlobId, data.GetSnapshot(), data.GetSchemaSnapshot())); + ret.emplace_back(&data); } } + const auto pred = [pkSchema](const TInsertedData* l, const TInsertedData* r) { + return l->GetMeta().GetMin(pkSchema) < r->GetMeta().GetMin(pkSchema); + }; + std::sort(ret.begin(), ret.end(), pred); + + std::vector<TCommittedBlob> result; + result.reserve(ret.size()); + for (auto&& i : ret) { + result.emplace_back(TCommittedBlob(i->BlobId, i->GetSnapshot(), i->GetSchemaSnapshot(), i->GetMeta().GetMin(pkSchema), i->GetMeta().GetMax(pkSchema))); + } - return ret; + return result; } } diff --git a/ydb/core/tx/columnshard/engines/insert_table/insert_table.h b/ydb/core/tx/columnshard/engines/insert_table/insert_table.h index 6aaaa129e29..49bb8968439 100644 --- a/ydb/core/tx/columnshard/engines/insert_table/insert_table.h +++ b/ydb/core/tx/columnshard/engines/insert_table/insert_table.h @@ -61,7 +61,7 @@ public: THashSet<TWriteId> DropPath(IDbWrapper& dbTable, ui64 pathId); void EraseCommitted(IDbWrapper& dbTable, const TInsertedData& key); void EraseAborted(IDbWrapper& dbTable, const TInsertedData& key); - std::vector<TCommittedBlob> Read(ui64 pathId, const TSnapshot& snapshot) const; + std::vector<TCommittedBlob> Read(ui64 pathId, const TSnapshot& snapshot, const std::shared_ptr<arrow::Schema>& pkSchema) const; bool Load(IDbWrapper& dbTable, const TInstant loadTime); private: diff --git a/ydb/core/tx/columnshard/engines/insert_table/meta.cpp b/ydb/core/tx/columnshard/engines/insert_table/meta.cpp new file mode 100644 index 00000000000..092e852abc0 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/insert_table/meta.cpp @@ -0,0 +1,28 @@ +#include "meta.h" + +namespace NKikimr::NOlap { + +bool TInsertedDataMeta::DeserializeFromProto(const NKikimrTxColumnShard::TLogicalMetadata& proto) { + if (proto.HasDirtyWriteTimeSeconds()) { + DirtyWriteTime = TInstant::Seconds(proto.GetDirtyWriteTimeSeconds()); + } + if (proto.HasRawBytes()) { + SpecialKeys = NArrow::TFirstLastSpecialKeys(proto.GetSpecialKeysRawData()); + } + NumRows = proto.GetNumRows(); + RawBytes = proto.GetRawBytes(); + return true; +} + +NKikimrTxColumnShard::TLogicalMetadata TInsertedDataMeta::SerializeToProto() const { + NKikimrTxColumnShard::TLogicalMetadata result; + result.SetDirtyWriteTimeSeconds(DirtyWriteTime.Seconds()); + if (SpecialKeys) { + result.SetSpecialKeysRawData(SpecialKeys->SerializeToString()); + } + result.SetNumRows(NumRows); + result.SetRawBytes(RawBytes); + return result; +} + +} diff --git a/ydb/core/tx/columnshard/engines/insert_table/meta.h b/ydb/core/tx/columnshard/engines/insert_table/meta.h new file mode 100644 index 00000000000..44a0afbb0c1 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/insert_table/meta.h @@ -0,0 +1,53 @@ +#pragma once +#include <ydb/core/formats/arrow/special_keys.h> +#include <ydb/core/tx/columnshard/blob.h> +#include <ydb/core/tx/columnshard/engines/defs.h> +#include <ydb/core/protos/tx_columnshard.pb.h> +#include <ydb/library/accessor/accessor.h> + +namespace NKikimr::NOlap { + +class TInsertedDataMeta { +private: + YDB_READONLY_DEF(TInstant, DirtyWriteTime); + YDB_READONLY(ui32, NumRows, 0); + YDB_READONLY(ui64, RawBytes, 0); + std::optional<NArrow::TFirstLastSpecialKeys> SpecialKeys; + + bool DeserializeFromProto(const NKikimrTxColumnShard::TLogicalMetadata& proto); + +public: + TInsertedDataMeta(const NKikimrTxColumnShard::TLogicalMetadata& proto) { + Y_VERIFY(DeserializeFromProto(proto)); + } + + TInsertedDataMeta(const TInstant dirtyWriteTime, const ui32 numRows, const ui64 rawBytes, std::shared_ptr<arrow::RecordBatch> batch, const std::vector<TString>& columnNames = {}) + : DirtyWriteTime(dirtyWriteTime) + , NumRows(numRows) + , RawBytes(rawBytes) + { + if (batch) { + SpecialKeys = NArrow::TFirstLastSpecialKeys(batch, columnNames); + } + } + + std::optional<NArrow::TReplaceKey> GetMin(const std::shared_ptr<arrow::Schema>& schema) const { + if (SpecialKeys) { + return SpecialKeys->GetMin(schema); + } else { + return {}; + } + } + std::optional<NArrow::TReplaceKey> GetMax(const std::shared_ptr<arrow::Schema>& schema) const { + if (SpecialKeys) { + return SpecialKeys->GetMax(schema); + } else { + return {}; + } + } + + NKikimrTxColumnShard::TLogicalMetadata SerializeToProto() const; + +}; + +} diff --git a/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp b/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp index a708cbdd3b5..0bc69d578ff 100644 --- a/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp +++ b/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp @@ -93,6 +93,11 @@ bool TInsertionSummary::IsOverloaded(const ui64 pathId) const { void TInsertionSummary::Clear() { StatsPrepared = {}; StatsCommitted = {}; + if (LocalInsertedCritical) { + --LocalInsertedCritical; + CriticalInserted.Dec(); + } + PathInfo.clear(); Priorities.clear(); Inserted.clear(); @@ -129,7 +134,7 @@ THashSet<NKikimr::NOlap::TWriteId> TInsertionSummary::GetInsertedByPathId(const THashSet<NKikimr::NOlap::TWriteId> TInsertionSummary::GetDeprecatedInsertions(const TInstant timeBorder) const { THashSet<TWriteId> toAbort; for (auto& [writeId, data] : Inserted) { - if (data.DirtyTime && data.DirtyTime < timeBorder) { + if (data.GetMeta().GetDirtyWriteTime() && data.GetMeta().GetDirtyWriteTime() < timeBorder) { toAbort.insert(writeId); } } diff --git a/ydb/core/tx/columnshard/engines/insert_table/ya.make b/ydb/core/tx/columnshard/engines/insert_table/ya.make index c38f51f528e..5f1d92bfb0e 100644 --- a/ydb/core/tx/columnshard/engines/insert_table/ya.make +++ b/ydb/core/tx/columnshard/engines/insert_table/ya.make @@ -5,6 +5,7 @@ SRCS( rt_insertion.cpp data.cpp path_info.cpp + meta.cpp ) PEERDIR( diff --git a/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp b/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp index 5c46f9eefe6..c8c8317dbee 100644 --- a/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp +++ b/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp @@ -5,6 +5,7 @@ #include <ydb/core/tx/columnshard/hooks/abstract/abstract.h> #include <ydb/core/tx/columnshard/columnshard__index_scan.h> #include <ydb/core/tx/columnshard/columnshard__stats_scan.h> +#include <ydb/core/tx/columnshard/engines/indexed_read_data.h> #include <util/string/join.h> namespace NKikimr::NOlap { @@ -25,8 +26,9 @@ std::shared_ptr<NOlap::TSelectInfo> TDataStorageAccessor::Select(const NOlap::TR readDescription.PKRangesFilter); } -std::vector<NOlap::TCommittedBlob> TDataStorageAccessor::GetCommitedBlobs(const NOlap::TReadDescription& readDescription) const { - return std::move(InsertTable->Read(readDescription.PathId, readDescription.GetSnapshot())); +std::vector<NOlap::TCommittedBlob> TDataStorageAccessor::GetCommitedBlobs(const NOlap::TReadDescription& readDescription, const std::shared_ptr<arrow::Schema>& pkSchema) const { + + return std::move(InsertTable->Read(readDescription.PathId, readDescription.GetSnapshot(), pkSchema)); } std::unique_ptr<NColumnShard::TScanIteratorBase> TReadMetadata::StartScan(const NOlap::TReadContext& readContext) const { @@ -85,7 +87,7 @@ bool TReadMetadata::Init(const TReadDescription& readDescription, const TDataSto AllColumns.insert(AllColumns.end(), auxiliaryColumns.begin(), auxiliaryColumns.end()); } - CommittedBlobs = dataAccessor.GetCommitedBlobs(readDescription); + CommittedBlobs = dataAccessor.GetCommitedBlobs(readDescription, ResultIndexSchema->GetIndexInfo().GetReplaceKey()); THashSet<ui32> columnIds; for (auto& columnId : AllColumns) { diff --git a/ydb/core/tx/columnshard/engines/reader/read_metadata.h b/ydb/core/tx/columnshard/engines/reader/read_metadata.h index 355e0339687..cd582350cb8 100644 --- a/ydb/core/tx/columnshard/engines/reader/read_metadata.h +++ b/ydb/core/tx/columnshard/engines/reader/read_metadata.h @@ -2,6 +2,7 @@ #include "conveyor_task.h" #include "description.h" #include "read_context.h" +#include "read_filter_merger.h" #include <ydb/library/accessor/accessor.h> #include <ydb/core/tx/columnshard/blob.h> #include <ydb/core/tx/columnshard/counters.h> @@ -63,7 +64,7 @@ public: TDataStorageAccessor(const std::unique_ptr<NOlap::TInsertTable>& insertTable, const std::unique_ptr<NOlap::IColumnEngine>& index); std::shared_ptr<NOlap::TSelectInfo> Select(const NOlap::TReadDescription& readDescription, const THashSet<ui32>& columnIds) const; - std::vector<NOlap::TCommittedBlob> GetCommitedBlobs(const NOlap::TReadDescription& readDescription) const; + std::vector<NOlap::TCommittedBlob> GetCommitedBlobs(const NOlap::TReadDescription& readDescription, const std::shared_ptr<arrow::Schema>& pkSchema) const; }; // Holds all metadata that is needed to perform read/scan diff --git a/ydb/core/tx/columnshard/engines/ut_insert_table.cpp b/ydb/core/tx/columnshard/engines/ut_insert_table.cpp index a4078db93b9..8c4dde51bf6 100644 --- a/ydb/core/tx/columnshard/engines/ut_insert_table.cpp +++ b/ydb/core/tx/columnshard/engines/ut_insert_table.cpp @@ -55,23 +55,22 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestInsertTable) { TSnapshot indexSnapshot(1, 1); // insert, not commited - TInstant time = TInstant::Now(); - bool ok = insertTable.Insert(dbTable, TInsertedData(metaShard, writeId, tableId, dedupId, blobId1, {}, time, indexSnapshot)); + bool ok = insertTable.Insert(dbTable, TInsertedData(metaShard, writeId, tableId, dedupId, blobId1, {}, indexSnapshot)); UNIT_ASSERT(ok); // insert the same blobId1 again - ok = insertTable.Insert(dbTable, TInsertedData(metaShard, writeId, tableId, dedupId, blobId1, {}, time, indexSnapshot)); + ok = insertTable.Insert(dbTable, TInsertedData(metaShard, writeId, tableId, dedupId, blobId1, {}, indexSnapshot)); UNIT_ASSERT(!ok); // insert different blodId with the same writeId and dedupId TUnifiedBlobId blobId2(2222, 1, 2, 100, 1); - ok = insertTable.Insert(dbTable, TInsertedData(metaShard, writeId, tableId, dedupId, blobId2, {}, time, indexSnapshot)); + ok = insertTable.Insert(dbTable, TInsertedData(metaShard, writeId, tableId, dedupId, blobId2, {}, indexSnapshot)); UNIT_ASSERT(!ok); // read nothing - auto blobs = insertTable.Read(tableId, TSnapshot::Zero()); + auto blobs = insertTable.Read(tableId, TSnapshot::Zero(), nullptr); UNIT_ASSERT_EQUAL(blobs.size(), 0); - blobs = insertTable.Read(tableId+1, TSnapshot::Zero()); + blobs = insertTable.Read(tableId+1, TSnapshot::Zero(), nullptr); UNIT_ASSERT_EQUAL(blobs.size(), 0); // commit @@ -84,15 +83,15 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestInsertTable) { UNIT_ASSERT_EQUAL((*insertTable.GetPathPriorities().begin()->second.begin())->GetCommitted().size(), 1); // read old snapshot - blobs = insertTable.Read(tableId, TSnapshot::Zero()); + blobs = insertTable.Read(tableId, TSnapshot::Zero(), nullptr); UNIT_ASSERT_EQUAL(blobs.size(), 0); - blobs = insertTable.Read(tableId+1, TSnapshot::Zero()); + blobs = insertTable.Read(tableId+1, TSnapshot::Zero(), nullptr); UNIT_ASSERT_EQUAL(blobs.size(), 0); // read new snapshot - blobs = insertTable.Read(tableId, TSnapshot(planStep, txId)); + blobs = insertTable.Read(tableId, TSnapshot(planStep, txId), nullptr); UNIT_ASSERT_EQUAL(blobs.size(), 1); - blobs = insertTable.Read(tableId+1, TSnapshot::Zero()); + blobs = insertTable.Read(tableId+1, TSnapshot::Zero(), nullptr); UNIT_ASSERT_EQUAL(blobs.size(), 0); } } diff --git a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp index fb4c91d3f82..c5284b9432f 100644 --- a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp +++ b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp @@ -412,8 +412,6 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { blobRanges.push_back(MakeBlobRange(2, testBlob.size())); // PlanStep, TxId, PathId, DedupId, BlobId, Data, [Metadata] - TInstant writeTime = TInstant::Now(); - // load TColumnEngineForLogs engine(0); TSnapshot indexSnaphot(1, 1); @@ -422,8 +420,8 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { engine.Load(db, lostBlobs); std::vector<TInsertedData> dataToIndex = { - {1, 2, paths[0], "", blobRanges[0].BlobId, "", writeTime, indexSnaphot}, - {2, 1, paths[0], "", blobRanges[1].BlobId, "", writeTime, indexSnaphot} + TInsertedData(1, 2, paths[0], "", blobRanges[0].BlobId, {}, indexSnaphot), + TInsertedData(2, 1, paths[0], "", blobRanges[1].BlobId, {}, indexSnaphot) }; // write @@ -522,7 +520,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { // PlanStep, TxId, PathId, DedupId, BlobId, Data, [Metadata] std::vector<TInsertedData> dataToIndex; dataToIndex.push_back( - TInsertedData{planStep, txId, pathId, "", blobRange.BlobId, "", TInstant::Now(), indexSnapshot}); + TInsertedData{planStep, txId, pathId, "", blobRange.BlobId, {}, indexSnapshot}); bool ok = Insert(engine, db, TSnapshot(planStep, txId), std::move(dataToIndex), blobs, step); UNIT_ASSERT(ok); @@ -622,7 +620,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { // PlanStep, TxId, PathId, DedupId, BlobId, Data, [Metadata] std::vector<TInsertedData> dataToIndex; dataToIndex.push_back( - TInsertedData{planStep, txId, pathId, "", blobRange.BlobId, "", TInstant::Now(), indexSnapshot}); + TInsertedData{planStep, txId, pathId, "", blobRange.BlobId, {}, indexSnapshot}); bool ok = Insert(engine, db, TSnapshot(planStep, txId), std::move(dataToIndex), blobs, step); // first overload returns ok: it's a postcondition @@ -660,7 +658,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { // PlanStep, TxId, PathId, DedupId, BlobId, Data, [Metadata] std::vector<TInsertedData> dataToIndex; dataToIndex.push_back( - TInsertedData{planStep, txId, pathId, "", blobRange.BlobId, "", TInstant::Now(), indexSnapshot}); + TInsertedData(planStep, txId, pathId, "", blobRange.BlobId, {}, indexSnapshot)); bool ok = Insert(engine, db, TSnapshot(planStep, txId), std::move(dataToIndex), blobs, step); bool overload = engine.GetOverloadedGranules(pathId); @@ -703,7 +701,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { // PlanStep, TxId, PathId, DedupId, BlobId, Data, [Metadata] std::vector<TInsertedData> dataToIndex; dataToIndex.push_back( - TInsertedData{planStep, txId, pathId, "", blobRange.BlobId, "", TInstant::Now(), indexSnapshot}); + TInsertedData(planStep, txId, pathId, "", blobRange.BlobId, {}, indexSnapshot)); bool ok = Insert(engine, db, TSnapshot(planStep, txId), std::move(dataToIndex), blobs, step); UNIT_ASSERT(ok); diff --git a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp index cf1037451f0..fa1f3bbac99 100644 --- a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp +++ b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp @@ -25,7 +25,7 @@ IBlobConstructor::EStatus TIndexedWriteController::TBlobConstructor::BuildNext() bool TIndexedWriteController::TBlobConstructor::RegisterBlobId(const TUnifiedBlobId& blobId) { const auto& blobInfo = BlobsSplitted[CurrentIndex - 1]; - Owner.BlobData.emplace_back(blobId, blobInfo.GetData(), blobInfo.GetRowsCount(), blobInfo.GetRawBytes(), AppData()->TimeProvider->Now()); + Owner.BlobData.emplace_back(blobId, blobInfo.GetData(), blobInfo.GetSpecialKeys(), blobInfo.GetRowsCount(), blobInfo.GetRawBytes(), AppData()->TimeProvider->Now()); return true; } diff --git a/ydb/core/tx/ev_write/write_data.cpp b/ydb/core/tx/ev_write/write_data.cpp index 4150cbd28ad..feb1e61e5fa 100644 --- a/ydb/core/tx/ev_write/write_data.cpp +++ b/ydb/core/tx/ev_write/write_data.cpp @@ -8,6 +8,8 @@ namespace NKikimr::NEvWrite { TWriteData::TWriteData(const TWriteMeta& writeMeta, IDataContainer::TPtr data) : WriteMeta(writeMeta) , Data(data) -{} +{ + Y_VERIFY(Data); +} } diff --git a/ydb/core/tx/ev_write/write_data.h b/ydb/core/tx/ev_write/write_data.h index bcdd83a0715..9d95b58b98d 100644 --- a/ydb/core/tx/ev_write/write_data.h +++ b/ydb/core/tx/ev_write/write_data.h @@ -2,8 +2,9 @@ #include <ydb/core/tx/long_tx_service/public/types.h> #include <ydb/core/formats/arrow/arrow_helpers.h> +#include <ydb/library/accessor/accessor.h> - +#include <library/cpp/actors/core/monotonic.h> namespace NKikimr::NEvWrite { @@ -26,6 +27,7 @@ class TWriteMeta { YDB_ACCESSOR(ui64, WritePartId, 0); YDB_ACCESSOR_DEF(TString, DedupId); + YDB_READONLY(TMonotonic, WriteStartInstant, TMonotonic::Now()); public: TWriteMeta(const ui64 writeId, const ui64 tableId, const NActors::TActorId& source) : WriteId(writeId) diff --git a/ydb/core/tx/ev_write/ya.make b/ydb/core/tx/ev_write/ya.make index 6b93ee539a5..21cc23242f6 100644 --- a/ydb/core/tx/ev_write/ya.make +++ b/ydb/core/tx/ev_write/ya.make @@ -16,7 +16,7 @@ PEERDIR( library/cpp/actors/core library/cpp/actors/wilson - # Temporary fix dep ydb/core/tx/columnshard + # Temporary fix dep ydb/core/tx/columnshard ydb/core/tablet_flat/protos ydb/core/tablet_flat ydb/core/blobstorage/vdisk/protos |