aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-08-31 19:15:10 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-08-31 19:47:25 +0300
commit2c70a8f92a317edc09a0b56352129057d1f5ea90 (patch)
tree597eb3c2a9707d4fbf3a00b689fe01f6eb75483a
parent0ea09be71af2c4edddf36d4bb55e743a96eebf28 (diff)
downloadydb-2c70a8f92a317edc09a0b56352129057d1f5ea90.tar.gz
KIKIMR-19218: insert table chunks with min/max
-rw-r--r--ydb/core/protos/tx_columnshard.proto3
-rw-r--r--ydb/core/tx/columnshard/columnshard__write.cpp34
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.h3
-rw-r--r--ydb/core/tx/columnshard/columnshard_private_events.h4
-rw-r--r--ydb/core/tx/columnshard/columnshard_schema.cpp47
-rw-r--r--ydb/core/tx/columnshard/columnshard_schema.h71
-rw-r--r--ydb/core/tx/columnshard/counters/columnshard.cpp8
-rw-r--r--ydb/core/tx/columnshard/counters/columnshard.h40
-rw-r--r--ydb/core/tx/columnshard/engines/insert_table/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/engines/insert_table/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/tx/columnshard/engines/insert_table/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/engines/insert_table/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/engines/insert_table/data.h54
-rw-r--r--ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp25
-rw-r--r--ydb/core/tx/columnshard/engines/insert_table/insert_table.h2
-rw-r--r--ydb/core/tx/columnshard/engines/insert_table/meta.cpp28
-rw-r--r--ydb/core/tx/columnshard/engines/insert_table/meta.h53
-rw-r--r--ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp7
-rw-r--r--ydb/core/tx/columnshard/engines/insert_table/ya.make1
-rw-r--r--ydb/core/tx/columnshard/engines/reader/read_metadata.cpp8
-rw-r--r--ydb/core/tx/columnshard/engines/reader/read_metadata.h3
-rw-r--r--ydb/core/tx/columnshard/engines/ut_insert_table.cpp19
-rw-r--r--ydb/core/tx/columnshard/engines/ut_logs_engine.cpp14
-rw-r--r--ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp2
-rw-r--r--ydb/core/tx/ev_write/write_data.cpp4
-rw-r--r--ydb/core/tx/ev_write/write_data.h4
-rw-r--r--ydb/core/tx/ev_write/ya.make2
27 files changed, 305 insertions, 135 deletions
diff --git a/ydb/core/protos/tx_columnshard.proto b/ydb/core/protos/tx_columnshard.proto
index 3e6db3dd209..f9677baaf11 100644
--- a/ydb/core/protos/tx_columnshard.proto
+++ b/ydb/core/protos/tx_columnshard.proto
@@ -48,10 +48,9 @@ message TReadStats {
message TLogicalMetadata {
optional uint32 NumRows = 1;
- repeated NKikimrSSA.TProgram.TConstant FirstPkValue = 2; // It's min PK if batch is sorted
- repeated NKikimrSSA.TProgram.TConstant LastPkValue = 3; // It's max PK if batch is sorted
optional uint64 RawBytes = 4;
optional uint64 DirtyWriteTimeSeconds = 5;
+ optional string SpecialKeysRawData = 6;
}
message TMetadata {
diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp
index 0f435cab595..247f05a5fee 100644
--- a/ydb/core/tx/columnshard/columnshard__write.cpp
+++ b/ydb/core/tx/columnshard/columnshard__write.cpp
@@ -58,7 +58,7 @@ bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const TEvPrivate::TEvWrit
auto tableSchema = Self->TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetSchemaUnsafe(PutBlobResult->Get()->GetSchemaVersion());
- NOlap::TInsertedData insertData((ui64)writeId, writeMeta.GetTableId(), writeMeta.GetDedupId(), logoBlobId, meta, time, tableSchema->GetSnapshot());
+ NOlap::TInsertedData insertData(0, (ui64)writeId, writeMeta.GetTableId(), writeMeta.GetDedupId(), logoBlobId, meta, tableSchema->GetSnapshot());
bool ok = Self->InsertTable->Insert(dbTable, std::move(insertData));
if (ok) {
THashSet<TWriteId> writesToAbort = Self->InsertTable->OldWritesToAbort(time);
@@ -112,7 +112,7 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
}
TVector<TWriteId> writeIds;
- for (auto blobData: PutBlobResult->Get()->GetBlobData()) {
+ for (auto blobData : PutBlobResult->Get()->GetBlobData()) {
auto writeId = TWriteId(writeMeta.GetWriteId());
if (operation) {
writeId = Self->BuildNextWriteId(txc);
@@ -144,10 +144,12 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
void TTxWrite::Complete(const TActorContext& ctx) {
Y_VERIFY(Result);
LOG_S_DEBUG(TxPrefix() << "complete" << TxSuffix());
+ Self->CSCounters.OnWriteTxComplete((TMonotonic::Now() - PutBlobResult->Get()->GetWriteMeta().GetWriteStartInstant()).MilliSeconds());
+ Self->CSCounters.OnSuccessWriteResponse();
ctx.Send(PutBlobResult->Get()->GetWriteMeta().GetSource(), Result.release());
}
-void TColumnShard::OverloadWriteFail(const EOverloadStatus& overloadReason, const NEvWrite::TWriteData& writeData, std::unique_ptr<NActors::IEventBase>&& event, const TActorContext& ctx) {
+void TColumnShard::OverloadWriteFail(const EOverloadStatus overloadReason, const NEvWrite::TWriteData& writeData, std::unique_ptr<NActors::IEventBase>&& event, const TActorContext& ctx) {
IncCounter(COUNTER_WRITE_FAIL);
switch (overloadReason) {
case EOverloadStatus::Disk:
@@ -203,6 +205,7 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteBlobsResult::TPtr& ev, const TActo
auto wg = WritesMonitor.FinishWrite(putResult.GetResourceUsage().SourceMemorySize);
if (putResult.GetPutStatus() != NKikimrProto::OK) {
+ CSCounters.OnWritePutBlobsFail((TMonotonic::Now() - writeMeta.GetWriteStartInstant()).MilliSeconds());
IncCounter(COUNTER_WRITE_FAIL);
auto errCode = NKikimrTxColumnShard::EResultStatus::STORAGE_ERROR;
@@ -217,29 +220,33 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteBlobsResult::TPtr& ev, const TActo
if (writeMeta.HasLongTxId()) {
auto result = std::make_unique<TEvColumnShard::TEvWriteResult>(TabletID(), writeMeta, errCode);
ctx.Send(writeMeta.GetSource(), result.release());
+ CSCounters.OnFailedWriteResponse();
} else {
auto operation = OperationsManager.GetOperation((TWriteId)writeMeta.GetWriteId());
Y_VERIFY(operation);
auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(operation->GetTxId(), NKikimrDataEvents::TEvWriteResult::ERROR, "put data fails");
ctx.Send(writeMeta.GetSource(), result.release());
+ CSCounters.OnFailedWriteResponse();
}
- return;
- }
+ } else {
+ CSCounters.OnWritePutBlobsSuccess((TMonotonic::Now() - writeMeta.GetWriteStartInstant()).MilliSeconds());
+ LOG_S_DEBUG("Write (record) into pathId " << writeMeta.GetTableId()
+ << (writeMeta.GetWriteId() ? (" writeId " + ToString(writeMeta.GetWriteId())).c_str() : "") << " at tablet " << TabletID());
- LOG_S_DEBUG("Write (record) into pathId " << writeMeta.GetTableId()
- << (writeMeta.GetWriteId() ? (" writeId " + ToString(writeMeta.GetWriteId())).c_str() : "") << " at tablet " << TabletID());
+ Execute(new TTxWrite(this, ev), ctx);
+ }
- Execute(new TTxWrite(this, ev), ctx);
}
void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContext& ctx) {
+ CSCounters.OnStartWriteRequest();
LastAccessTime = TAppData::TimeProvider->Now();
const auto& record = Proto(ev->Get());
const ui64 tableId = record.GetTableId();
const ui64 writeId = record.GetWriteId();
const TString dedupId = record.GetDedupId();
- const auto source = ev->Get()->GetSource();
+ const auto source = ev->Sender;
NEvWrite::TWriteMeta writeMeta(writeId, tableId, source);
writeMeta.SetDedupId(dedupId);
@@ -253,7 +260,8 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
IncCounter(COUNTER_WRITE_FAIL);
auto result = std::make_unique<TEvColumnShard::TEvWriteResult>(TabletID(), writeMeta, NKikimrTxColumnShard::EResultStatus::ERROR);
- ctx.Send(ev->Get()->GetSource(), result.release());
+ ctx.Send(source, result.release());
+ CSCounters.OnFailedWriteResponse();
return;
}
@@ -264,7 +272,8 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
<< " at tablet " << TabletID());
IncCounter(COUNTER_WRITE_FAIL);
auto result = std::make_unique<TEvColumnShard::TEvWriteResult>(TabletID(), writeMeta, NKikimrTxColumnShard::EResultStatus::ERROR);
- ctx.Send(ev->Get()->GetSource(), result.release());
+ ctx.Send(source, result.release());
+ CSCounters.OnFailedWriteResponse();
return;
}
@@ -273,6 +282,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
if (overloadStatus != EOverloadStatus::None) {
std::unique_ptr<NActors::IEventBase> result = std::make_unique<TEvColumnShard::TEvWriteResult>(TabletID(), writeData.GetWriteMeta(), NKikimrTxColumnShard::EResultStatus::OVERLOADED);
OverloadWriteFail(overloadStatus, writeData, std::move(result), ctx);
+ CSCounters.OnFailedWriteResponse();
} else {
if (ui64 writeId = (ui64) HasLongTxWrite(writeMeta.GetLongTxIdUnsafe(), writeMeta.GetWritePartId())) {
LOG_S_DEBUG("Write (duplicate) into pathId " << writeMeta.GetTableId()
@@ -284,6 +294,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
auto result = std::make_unique<TEvColumnShard::TEvWriteResult>(
TabletID(), writeMeta, writeId, NKikimrTxColumnShard::EResultStatus::SUCCESS);
ctx.Send(writeMeta.GetSource(), result.release());
+ CSCounters.OnFailedWriteResponse();
return;
}
@@ -295,6 +306,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
<< " at tablet " << TabletID());
auto writeController = std::make_shared<NOlap::TIndexedWriteController>(ctx.SelfID, writeData);
+ CSCounters.OnWritePutBlobsStart();
ctx.Register(CreateWriteActor(TabletID(), writeController, BlobManager->StartBlobBatch(), TInstant::Max(), Settings.MaxSmallBlobSize));
}
}
diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h
index 60c962bb95c..4c5575cb355 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.h
+++ b/ydb/core/tx/columnshard/columnshard_impl.h
@@ -12,6 +12,7 @@
#include "inflight_request_tracker.h"
#include "counters/columnshard.h"
+#include <ydb/core/base/tablet_pipecache.h>
#include <ydb/core/tablet/tablet_counters.h>
#include <ydb/core/tablet/tablet_pipe_client_cache.h>
#include <ydb/core/tablet_flat/flat_cxx_database.h>
@@ -212,7 +213,7 @@ public:
};
private:
- void OverloadWriteFail(const EOverloadStatus& overloadReason, const NEvWrite::TWriteData& writeData, std::unique_ptr<NActors::IEventBase>&& event, const TActorContext& ctx);
+ void OverloadWriteFail(const EOverloadStatus overloadReason, const NEvWrite::TWriteData& writeData, std::unique_ptr<NActors::IEventBase>&& event, const TActorContext& ctx);
EOverloadStatus CheckOverloaded(const ui64 tableId) const;
protected:
diff --git a/ydb/core/tx/columnshard/columnshard_private_events.h b/ydb/core/tx/columnshard/columnshard_private_events.h
index 4e6ea811193..c5601a38835 100644
--- a/ydb/core/tx/columnshard/columnshard_private_events.h
+++ b/ydb/core/tx/columnshard/columnshard_private_events.h
@@ -6,6 +6,7 @@
#include <ydb/core/protos/counters_columnshard.pb.h>
#include <ydb/core/tx/columnshard/engines/writer/write_controller.h>
#include <ydb/core/tx/ev_write/write_data.h>
+#include <ydb/core/formats/arrow/special_keys.h>
namespace NKikimr::NColumnShard {
@@ -259,7 +260,7 @@ struct TEvPrivate {
public:
TPutBlobData() = default;
- TPutBlobData(const TUnifiedBlobId& blobId, const TString& data, ui64 rowsCount, ui64 rawBytes, const TInstant dirtyTime)
+ TPutBlobData(const TUnifiedBlobId& blobId, const TString& data, const NArrow::TFirstLastSpecialKeys& specialKeys, ui64 rowsCount, ui64 rawBytes, const TInstant dirtyTime)
: BlobId(blobId)
, BlobData(data)
, RowsCount(rowsCount)
@@ -268,6 +269,7 @@ struct TEvPrivate {
LogicalMeta.SetNumRows(rowsCount);
LogicalMeta.SetRawBytes(rawBytes);
LogicalMeta.SetDirtyWriteTimeSeconds(dirtyTime.Seconds());
+ LogicalMeta.SetSpecialKeysRawData(specialKeys.SerializeToString());
}
};
diff --git a/ydb/core/tx/columnshard/columnshard_schema.cpp b/ydb/core/tx/columnshard/columnshard_schema.cpp
index 3ec637e7862..07e4c60c4af 100644
--- a/ydb/core/tx/columnshard/columnshard_schema.cpp
+++ b/ydb/core/tx/columnshard/columnshard_schema.cpp
@@ -25,4 +25,51 @@ bool Schema::IndexColumns_Load(NIceDb::TNiceDb& db, const IBlobGroupSelector* ds
return true;
}
+bool Schema::InsertTable_Load(NIceDb::TNiceDb& db, const IBlobGroupSelector* dsGroupSelector, NOlap::TInsertTableAccessor& insertTable, const TInstant& /*loadTime*/) {
+ auto rowset = db.Table<InsertTable>().GreaterOrEqual(0, 0, 0, 0, "").Select();
+ if (!rowset.IsReady())
+ return false;
+
+ while (!rowset.EndOfSet()) {
+ EInsertTableIds recType = (EInsertTableIds)rowset.GetValue<InsertTable::Committed>();
+ ui64 shardOrPlan = rowset.GetValue<InsertTable::ShardOrPlan>();
+ ui64 writeTxId = rowset.GetValueOrDefault<InsertTable::WriteTxId>();
+ ui64 pathId = rowset.GetValue<InsertTable::PathId>();
+ TString dedupId = rowset.GetValue<InsertTable::DedupId>();
+ TString strBlobId = rowset.GetValue<InsertTable::BlobId>();
+ TString metaStr = rowset.GetValue<InsertTable::Meta>();
+
+ Y_VERIFY(rowset.HaveValue<InsertTable::IndexPlanStep>());
+ ui64 indexPlanStep = rowset.GetValue<InsertTable::IndexPlanStep>();
+ ui64 indexTxId = rowset.GetValue<InsertTable::IndexTxId>();
+ const NOlap::TSnapshot indexSnapshot(indexPlanStep, indexTxId);
+
+ TString error;
+ NOlap::TUnifiedBlobId blobId = NOlap::TUnifiedBlobId::ParseFromString(strBlobId, dsGroupSelector, error);
+ Y_VERIFY(blobId.IsValid(), "Failied to parse blob id: %s", error.c_str());
+
+ NKikimrTxColumnShard::TLogicalMetadata meta;
+ if (metaStr) {
+ Y_VERIFY(meta.ParseFromString(metaStr));
+ }
+ TInsertedData data(shardOrPlan, writeTxId, pathId, dedupId, blobId, meta, indexSnapshot);
+
+ switch (recType) {
+ case EInsertTableIds::Inserted:
+ insertTable.AddInserted(std::move(data), true);
+ break;
+ case EInsertTableIds::Committed:
+ insertTable.AddCommitted(std::move(data), true);
+ break;
+ case EInsertTableIds::Aborted:
+ insertTable.AddAborted(std::move(data), true);
+ break;
+ }
+
+ if (!rowset.Next())
+ return false;
+ }
+ return true;
+}
+
}
diff --git a/ydb/core/tx/columnshard/columnshard_schema.h b/ydb/core/tx/columnshard/columnshard_schema.h
index d4e734b7270..47017374ddc 100644
--- a/ydb/core/tx/columnshard/columnshard_schema.h
+++ b/ydb/core/tx/columnshard/columnshard_schema.h
@@ -438,19 +438,13 @@ struct Schema : NIceDb::Schema {
// InsertTable activities
static void InsertTable_Upsert(NIceDb::TNiceDb& db, EInsertTableIds recType, const TInsertedData& data) {
- if (data.GetSchemaSnapshot().Valid()) {
- db.Table<InsertTable>().Key((ui8)recType, data.ShardOrPlan, data.WriteTxId, data.PathId, data.DedupId).Update(
- NIceDb::TUpdate<InsertTable::BlobId>(data.BlobId.ToStringLegacy()),
- NIceDb::TUpdate<InsertTable::Meta>(data.Metadata),
- NIceDb::TUpdate<InsertTable::IndexPlanStep>(data.GetSchemaSnapshot().GetPlanStep()),
- NIceDb::TUpdate<InsertTable::IndexTxId>(data.GetSchemaSnapshot().GetTxId())
- );
- } else {
- db.Table<InsertTable>().Key((ui8)recType, data.ShardOrPlan, data.WriteTxId, data.PathId, data.DedupId).Update(
- NIceDb::TUpdate<InsertTable::BlobId>(data.BlobId.ToStringLegacy()),
- NIceDb::TUpdate<InsertTable::Meta>(data.Metadata)
- );
- }
+ Y_VERIFY(data.GetSchemaSnapshot().Valid());
+ db.Table<InsertTable>().Key((ui8)recType, data.ShardOrPlan, data.WriteTxId, data.PathId, data.DedupId).Update(
+ NIceDb::TUpdate<InsertTable::BlobId>(data.BlobId.ToStringLegacy()),
+ NIceDb::TUpdate<InsertTable::Meta>(data.GetMeta().SerializeToProto().SerializeAsString()),
+ NIceDb::TUpdate<InsertTable::IndexPlanStep>(data.GetSchemaSnapshot().GetPlanStep()),
+ NIceDb::TUpdate<InsertTable::IndexTxId>(data.GetSchemaSnapshot().GetTxId())
+ );
}
static void InsertTable_Erase(NIceDb::TNiceDb& db, EInsertTableIds recType, const TInsertedData& data) {
@@ -484,56 +478,7 @@ struct Schema : NIceDb::Schema {
static bool InsertTable_Load(NIceDb::TNiceDb& db,
const IBlobGroupSelector* dsGroupSelector,
NOlap::TInsertTableAccessor& insertTable,
- const TInstant& loadTime) {
- auto rowset = db.Table<InsertTable>().GreaterOrEqual(0, 0, 0, 0, "").Select();
- if (!rowset.IsReady())
- return false;
-
- while (!rowset.EndOfSet()) {
- EInsertTableIds recType = (EInsertTableIds)rowset.GetValue<InsertTable::Committed>();
- ui64 shardOrPlan = rowset.GetValue<InsertTable::ShardOrPlan>();
- ui64 writeTxId = rowset.GetValueOrDefault<InsertTable::WriteTxId>();
- ui64 pathId = rowset.GetValue<InsertTable::PathId>();
- TString dedupId = rowset.GetValue<InsertTable::DedupId>();
- TString strBlobId = rowset.GetValue<InsertTable::BlobId>();
- TString metaStr = rowset.GetValue<InsertTable::Meta>();
-
- std::optional<NOlap::TSnapshot> indexSnapshot;
- if (rowset.HaveValue<InsertTable::IndexPlanStep>()) {
- ui64 indexPlanStep = rowset.GetValue<InsertTable::IndexPlanStep>();
- ui64 indexTxId = rowset.GetValue<InsertTable::IndexTxId>();
- indexSnapshot = NOlap::TSnapshot(indexPlanStep, indexTxId);
- }
-
- TString error;
- NOlap::TUnifiedBlobId blobId = NOlap::TUnifiedBlobId::ParseFromString(strBlobId, dsGroupSelector, error);
- Y_VERIFY(blobId.IsValid(), "Failied to parse blob id: %s", error.c_str());
-
- TInstant writeTime = loadTime;
- NKikimrTxColumnShard::TLogicalMetadata meta;
- if (meta.ParseFromString(metaStr) && meta.HasDirtyWriteTimeSeconds()) {
- writeTime = TInstant::Seconds(meta.GetDirtyWriteTimeSeconds());
- }
-
- TInsertedData data(shardOrPlan, writeTxId, pathId, dedupId, blobId, metaStr, writeTime, indexSnapshot);
-
- switch (recType) {
- case EInsertTableIds::Inserted:
- insertTable.AddInserted(std::move(data), true);
- break;
- case EInsertTableIds::Committed:
- insertTable.AddCommitted(std::move(data), true);
- break;
- case EInsertTableIds::Aborted:
- insertTable.AddAborted(std::move(data), true);
- break;
- }
-
- if (!rowset.Next())
- return false;
- }
- return true;
- }
+ const TInstant& loadTime);
// IndexGranules activities
diff --git a/ydb/core/tx/columnshard/counters/columnshard.cpp b/ydb/core/tx/columnshard/counters/columnshard.cpp
index a3997488565..372e5a46f72 100644
--- a/ydb/core/tx/columnshard/counters/columnshard.cpp
+++ b/ydb/core/tx/columnshard/counters/columnshard.cpp
@@ -34,6 +34,14 @@ TCSCounters::TCSCounters()
SplitCompactionGranuleBytes = TBase::GetValueAutoAggregationsClient("SplitCompaction/Bytes");
SplitCompactionGranulePortionsCount = TBase::GetValueAutoAggregationsClient("SplitCompaction/PortionsCount");
+
+ HistogramSuccessWritePutBlobsDurationMs = TBase::GetHistogram("SuccessWritePutBlobsDurationMs", NMonitoring::ExponentialHistogram(18, 2, 5));
+ HistogramFailedWritePutBlobsDurationMs = TBase::GetHistogram("FailedWritePutBlobsDurationMs", NMonitoring::ExponentialHistogram(18, 2, 5));
+ HistogramWriteTxCompleteDurationMs = TBase::GetHistogram("WriteTxCompleteDurationMs", NMonitoring::ExponentialHistogram(18, 2, 5));
+ WritePutBlobsCount = TBase::GetValue("WritePutBlobs");
+ WriteRequests = TBase::GetValue("WriteRequests");
+ FailedWriteRequests = TBase::GetDeriviative("FailedWriteRequests");
+ SuccessWriteRequests = TBase::GetDeriviative("SuccessWriteRequests");
}
}
diff --git a/ydb/core/tx/columnshard/counters/columnshard.h b/ydb/core/tx/columnshard/counters/columnshard.h
index dea6ad256e9..546d45aaa6e 100644
--- a/ydb/core/tx/columnshard/counters/columnshard.h
+++ b/ydb/core/tx/columnshard/counters/columnshard.h
@@ -34,7 +34,47 @@ private:
std::shared_ptr<TValueAggregationClient> SplitCompactionGranuleBytes;
std::shared_ptr<TValueAggregationClient> SplitCompactionGranulePortionsCount;
+
+ NMonitoring::THistogramPtr HistogramSuccessWritePutBlobsDurationMs;
+ NMonitoring::THistogramPtr HistogramFailedWritePutBlobsDurationMs;
+ NMonitoring::THistogramPtr HistogramWriteTxCompleteDurationMs;
+ NMonitoring::TDynamicCounters::TCounterPtr WritePutBlobsCount;
+ NMonitoring::TDynamicCounters::TCounterPtr WriteRequests;
+ NMonitoring::TDynamicCounters::TCounterPtr FailedWriteRequests;
+ NMonitoring::TDynamicCounters::TCounterPtr SuccessWriteRequests;
public:
+ void OnStartWriteRequest() const {
+ WriteRequests->Add(1);
+ }
+
+ void OnFailedWriteResponse() const {
+ WriteRequests->Sub(1);
+ FailedWriteRequests->Add(1);
+ }
+
+ void OnSuccessWriteResponse() const {
+ WriteRequests->Sub(1);
+ SuccessWriteRequests->Add(1);
+ }
+
+ void OnWritePutBlobsSuccess(const ui32 milliseconds) const {
+ HistogramSuccessWritePutBlobsDurationMs->Collect(milliseconds);
+ WritePutBlobsCount->Sub(1);
+ }
+
+ void OnWritePutBlobsFail(const ui32 milliseconds) const {
+ HistogramFailedWritePutBlobsDurationMs->Collect(milliseconds);
+ WritePutBlobsCount->Sub(1);
+ }
+
+ void OnWritePutBlobsStart() const {
+ WritePutBlobsCount->Add(1);
+ }
+
+ void OnWriteTxComplete(const ui32 milliseconds) const {
+ HistogramWriteTxCompleteDurationMs->Collect(milliseconds);
+ }
+
void OnInternalCompactionInfo(const ui64 bytes, const ui32 portionsCount) const {
InternalCompactionGranuleBytes->SetValue(bytes);
InternalCompactionGranulePortionsCount->SetValue(portionsCount);
diff --git a/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.darwin-x86_64.txt
index f8834fe594a..3524a65489c 100644
--- a/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.darwin-x86_64.txt
@@ -21,4 +21,5 @@ target_sources(columnshard-engines-insert_table PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/data.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/path_info.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/meta.cpp
)
diff --git a/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.linux-aarch64.txt
index 7762ca111a9..d7120e82291 100644
--- a/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.linux-aarch64.txt
@@ -22,4 +22,5 @@ target_sources(columnshard-engines-insert_table PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/data.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/path_info.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/meta.cpp
)
diff --git a/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.linux-x86_64.txt
index 7762ca111a9..d7120e82291 100644
--- a/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.linux-x86_64.txt
@@ -22,4 +22,5 @@ target_sources(columnshard-engines-insert_table PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/data.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/path_info.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/meta.cpp
)
diff --git a/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.windows-x86_64.txt
index f8834fe594a..3524a65489c 100644
--- a/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.windows-x86_64.txt
@@ -21,4 +21,5 @@ target_sources(columnshard-engines-insert_table PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/data.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/path_info.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/meta.cpp
)
diff --git a/ydb/core/tx/columnshard/engines/insert_table/data.h b/ydb/core/tx/columnshard/engines/insert_table/data.h
index a02cd61fb51..596dda3b174 100644
--- a/ydb/core/tx/columnshard/engines/insert_table/data.h
+++ b/ydb/core/tx/columnshard/engines/insert_table/data.h
@@ -1,4 +1,5 @@
#pragma once
+#include "meta.h"
#include <ydb/core/tx/columnshard/blob.h>
#include <ydb/core/tx/columnshard/engines/defs.h>
#include <ydb/core/protos/tx_columnshard.pb.h>
@@ -6,42 +7,45 @@
namespace NKikimr::NOlap {
struct TInsertedData {
+private:
+ TInsertedDataMeta Meta;
public:
+
+ const TInsertedDataMeta& GetMeta() const {
+ return Meta;
+ }
+
ui64 ShardOrPlan = 0;
ui64 WriteTxId = 0;
ui64 PathId = 0;
TString DedupId;
TUnifiedBlobId BlobId;
- TString Metadata;
- TInstant DirtyTime;
TInsertedData() = delete; // avoid invalid TInsertedData anywhere
- TInsertedData(ui64 writeTxId, ui64 pathId, TString dedupId, const TUnifiedBlobId& blobId,
- const NKikimrTxColumnShard::TLogicalMetadata& meta, const TInstant& writeTime, const TSnapshot& schemaVersion)
- : WriteTxId(writeTxId)
+ TInsertedData(ui64 shardOrPlan, ui64 writeTxId, ui64 pathId, TString dedupId, const TUnifiedBlobId& blobId,
+ const ui32 numRows, const ui64 rawBytes, const std::shared_ptr<arrow::RecordBatch>& batch, const std::vector<TString>& columnNames, const TInstant writeTime, const TSnapshot& schemaVersion)
+ : Meta(writeTime, numRows, rawBytes, batch, columnNames)
+ , ShardOrPlan(shardOrPlan)
+ , WriteTxId(writeTxId)
, PathId(pathId)
, DedupId(dedupId)
, BlobId(blobId)
- , DirtyTime(writeTime)
, SchemaVersion(schemaVersion)
{
- Y_VERIFY(meta.SerializeToString(&Metadata));
+ Y_VERIFY(SchemaVersion.Valid());
}
TInsertedData(ui64 shardOrPlan, ui64 writeTxId, ui64 pathId, TString dedupId, const TUnifiedBlobId& blobId,
- const TString& meta, const TInstant& writeTime, const std::optional<TSnapshot>& schemaVersion)
- : ShardOrPlan(shardOrPlan)
+ const NKikimrTxColumnShard::TLogicalMetadata& proto, const TSnapshot& schemaVersion)
+ : Meta(proto)
+ , ShardOrPlan(shardOrPlan)
, WriteTxId(writeTxId)
, PathId(pathId)
, DedupId(dedupId)
, BlobId(blobId)
- , Metadata(meta)
- , DirtyTime(writeTime) {
- if (schemaVersion) {
- SchemaVersion = *schemaVersion;
- Y_VERIFY(SchemaVersion.Valid());
- }
+ , SchemaVersion(schemaVersion) {
+ Y_VERIFY(SchemaVersion.Valid());
}
bool operator < (const TInsertedData& key) const {
@@ -115,17 +119,27 @@ private:
TUnifiedBlobId BlobId;
TSnapshot CommitSnapshot;
TSnapshot SchemaSnapshot;
+ YDB_READONLY_DEF(std::optional<NArrow::TReplaceKey>, First);
+ YDB_READONLY_DEF(std::optional<NArrow::TReplaceKey>, Last);
public:
- TCommittedBlob(const TUnifiedBlobId& blobId, const TSnapshot& snapshot, const TSnapshot& schemaSnapshot)
+ const NArrow::TReplaceKey& GetFirstVerified() const {
+ Y_VERIFY(First);
+ return *First;
+ }
+
+ const NArrow::TReplaceKey& GetLastVerified() const {
+ Y_VERIFY(Last);
+ return *Last;
+ }
+
+ TCommittedBlob(const TUnifiedBlobId& blobId, const TSnapshot& snapshot, const TSnapshot& schemaSnapshot, const std::optional<NArrow::TReplaceKey>& first, const std::optional<NArrow::TReplaceKey>& last)
: BlobId(blobId)
, CommitSnapshot(snapshot)
, SchemaSnapshot(schemaSnapshot)
+ , First(first)
+ , Last(last)
{}
- static TCommittedBlob BuildKeyBlob(const TUnifiedBlobId& blobId) {
- return TCommittedBlob(blobId, TSnapshot::Zero(), TSnapshot::Zero());
- }
-
/// It uses trick then we place key wtih planStep:txId in container and find them later by BlobId only.
/// So hash() and equality should depend on BlobId only.
bool operator == (const TCommittedBlob& key) const { return BlobId == key.BlobId; }
diff --git a/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp b/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp
index a7b0402e40c..1988620a5bc 100644
--- a/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp
+++ b/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp
@@ -24,11 +24,8 @@ TInsertionSummary::TCounters TInsertTable::Commit(IDbWrapper& dbTable, ui64 plan
std::optional<TInsertedData> data = Summary.ExtractInserted(writeId);
Y_VERIFY(data, "Commit %" PRIu64 ":%" PRIu64 " : writeId %" PRIu64 " not found", planStep, txId, (ui64)writeId);
- NKikimrTxColumnShard::TLogicalMetadata meta;
- if (meta.ParseFromString(data->Metadata)) {
- counters.Rows += meta.GetNumRows();
- counters.RawBytes += meta.GetRawBytes();
- }
+ counters.Rows += data->GetMeta().GetNumRows();
+ counters.RawBytes += data->GetMeta().GetRawBytes();
counters.Bytes += data->BlobSize();
dbTable.EraseInserted(*data);
@@ -108,22 +105,32 @@ bool TInsertTable::Load(IDbWrapper& dbTable, const TInstant loadTime) {
return dbTable.Load(*this, loadTime);
}
-std::vector<TCommittedBlob> TInsertTable::Read(ui64 pathId, const TSnapshot& snapshot) const {
+std::vector<TCommittedBlob> TInsertTable::Read(ui64 pathId, const TSnapshot& snapshot, const std::shared_ptr<arrow::Schema>& pkSchema) const {
const TPathInfo* pInfo = Summary.GetPathInfoOptional(pathId);
if (!pInfo) {
return {};
}
- std::vector<TCommittedBlob> ret;
+ std::vector<const TInsertedData*> ret;
ret.reserve(pInfo->GetCommitted().size());
for (const auto& data : pInfo->GetCommitted()) {
if (std::less_equal<TSnapshot>()(data.GetSnapshot(), snapshot)) {
- ret.emplace_back(TCommittedBlob(data.BlobId, data.GetSnapshot(), data.GetSchemaSnapshot()));
+ ret.emplace_back(&data);
}
}
+ const auto pred = [pkSchema](const TInsertedData* l, const TInsertedData* r) {
+ return l->GetMeta().GetMin(pkSchema) < r->GetMeta().GetMin(pkSchema);
+ };
+ std::sort(ret.begin(), ret.end(), pred);
+
+ std::vector<TCommittedBlob> result;
+ result.reserve(ret.size());
+ for (auto&& i : ret) {
+ result.emplace_back(TCommittedBlob(i->BlobId, i->GetSnapshot(), i->GetSchemaSnapshot(), i->GetMeta().GetMin(pkSchema), i->GetMeta().GetMax(pkSchema)));
+ }
- return ret;
+ return result;
}
}
diff --git a/ydb/core/tx/columnshard/engines/insert_table/insert_table.h b/ydb/core/tx/columnshard/engines/insert_table/insert_table.h
index 6aaaa129e29..49bb8968439 100644
--- a/ydb/core/tx/columnshard/engines/insert_table/insert_table.h
+++ b/ydb/core/tx/columnshard/engines/insert_table/insert_table.h
@@ -61,7 +61,7 @@ public:
THashSet<TWriteId> DropPath(IDbWrapper& dbTable, ui64 pathId);
void EraseCommitted(IDbWrapper& dbTable, const TInsertedData& key);
void EraseAborted(IDbWrapper& dbTable, const TInsertedData& key);
- std::vector<TCommittedBlob> Read(ui64 pathId, const TSnapshot& snapshot) const;
+ std::vector<TCommittedBlob> Read(ui64 pathId, const TSnapshot& snapshot, const std::shared_ptr<arrow::Schema>& pkSchema) const;
bool Load(IDbWrapper& dbTable, const TInstant loadTime);
private:
diff --git a/ydb/core/tx/columnshard/engines/insert_table/meta.cpp b/ydb/core/tx/columnshard/engines/insert_table/meta.cpp
new file mode 100644
index 00000000000..092e852abc0
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/insert_table/meta.cpp
@@ -0,0 +1,28 @@
+#include "meta.h"
+
+namespace NKikimr::NOlap {
+
+bool TInsertedDataMeta::DeserializeFromProto(const NKikimrTxColumnShard::TLogicalMetadata& proto) {
+ if (proto.HasDirtyWriteTimeSeconds()) {
+ DirtyWriteTime = TInstant::Seconds(proto.GetDirtyWriteTimeSeconds());
+ }
+ if (proto.HasRawBytes()) {
+ SpecialKeys = NArrow::TFirstLastSpecialKeys(proto.GetSpecialKeysRawData());
+ }
+ NumRows = proto.GetNumRows();
+ RawBytes = proto.GetRawBytes();
+ return true;
+}
+
+NKikimrTxColumnShard::TLogicalMetadata TInsertedDataMeta::SerializeToProto() const {
+ NKikimrTxColumnShard::TLogicalMetadata result;
+ result.SetDirtyWriteTimeSeconds(DirtyWriteTime.Seconds());
+ if (SpecialKeys) {
+ result.SetSpecialKeysRawData(SpecialKeys->SerializeToString());
+ }
+ result.SetNumRows(NumRows);
+ result.SetRawBytes(RawBytes);
+ return result;
+}
+
+}
diff --git a/ydb/core/tx/columnshard/engines/insert_table/meta.h b/ydb/core/tx/columnshard/engines/insert_table/meta.h
new file mode 100644
index 00000000000..44a0afbb0c1
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/insert_table/meta.h
@@ -0,0 +1,53 @@
+#pragma once
+#include <ydb/core/formats/arrow/special_keys.h>
+#include <ydb/core/tx/columnshard/blob.h>
+#include <ydb/core/tx/columnshard/engines/defs.h>
+#include <ydb/core/protos/tx_columnshard.pb.h>
+#include <ydb/library/accessor/accessor.h>
+
+namespace NKikimr::NOlap {
+
+class TInsertedDataMeta {
+private:
+ YDB_READONLY_DEF(TInstant, DirtyWriteTime);
+ YDB_READONLY(ui32, NumRows, 0);
+ YDB_READONLY(ui64, RawBytes, 0);
+ std::optional<NArrow::TFirstLastSpecialKeys> SpecialKeys;
+
+ bool DeserializeFromProto(const NKikimrTxColumnShard::TLogicalMetadata& proto);
+
+public:
+ TInsertedDataMeta(const NKikimrTxColumnShard::TLogicalMetadata& proto) {
+ Y_VERIFY(DeserializeFromProto(proto));
+ }
+
+ TInsertedDataMeta(const TInstant dirtyWriteTime, const ui32 numRows, const ui64 rawBytes, std::shared_ptr<arrow::RecordBatch> batch, const std::vector<TString>& columnNames = {})
+ : DirtyWriteTime(dirtyWriteTime)
+ , NumRows(numRows)
+ , RawBytes(rawBytes)
+ {
+ if (batch) {
+ SpecialKeys = NArrow::TFirstLastSpecialKeys(batch, columnNames);
+ }
+ }
+
+ std::optional<NArrow::TReplaceKey> GetMin(const std::shared_ptr<arrow::Schema>& schema) const {
+ if (SpecialKeys) {
+ return SpecialKeys->GetMin(schema);
+ } else {
+ return {};
+ }
+ }
+ std::optional<NArrow::TReplaceKey> GetMax(const std::shared_ptr<arrow::Schema>& schema) const {
+ if (SpecialKeys) {
+ return SpecialKeys->GetMax(schema);
+ } else {
+ return {};
+ }
+ }
+
+ NKikimrTxColumnShard::TLogicalMetadata SerializeToProto() const;
+
+};
+
+}
diff --git a/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp b/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp
index a708cbdd3b5..0bc69d578ff 100644
--- a/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp
+++ b/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp
@@ -93,6 +93,11 @@ bool TInsertionSummary::IsOverloaded(const ui64 pathId) const {
void TInsertionSummary::Clear() {
StatsPrepared = {};
StatsCommitted = {};
+ if (LocalInsertedCritical) {
+ --LocalInsertedCritical;
+ CriticalInserted.Dec();
+ }
+
PathInfo.clear();
Priorities.clear();
Inserted.clear();
@@ -129,7 +134,7 @@ THashSet<NKikimr::NOlap::TWriteId> TInsertionSummary::GetInsertedByPathId(const
THashSet<NKikimr::NOlap::TWriteId> TInsertionSummary::GetDeprecatedInsertions(const TInstant timeBorder) const {
THashSet<TWriteId> toAbort;
for (auto& [writeId, data] : Inserted) {
- if (data.DirtyTime && data.DirtyTime < timeBorder) {
+ if (data.GetMeta().GetDirtyWriteTime() && data.GetMeta().GetDirtyWriteTime() < timeBorder) {
toAbort.insert(writeId);
}
}
diff --git a/ydb/core/tx/columnshard/engines/insert_table/ya.make b/ydb/core/tx/columnshard/engines/insert_table/ya.make
index c38f51f528e..5f1d92bfb0e 100644
--- a/ydb/core/tx/columnshard/engines/insert_table/ya.make
+++ b/ydb/core/tx/columnshard/engines/insert_table/ya.make
@@ -5,6 +5,7 @@ SRCS(
rt_insertion.cpp
data.cpp
path_info.cpp
+ meta.cpp
)
PEERDIR(
diff --git a/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp b/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
index 5c46f9eefe6..c8c8317dbee 100644
--- a/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
@@ -5,6 +5,7 @@
#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h>
#include <ydb/core/tx/columnshard/columnshard__index_scan.h>
#include <ydb/core/tx/columnshard/columnshard__stats_scan.h>
+#include <ydb/core/tx/columnshard/engines/indexed_read_data.h>
#include <util/string/join.h>
namespace NKikimr::NOlap {
@@ -25,8 +26,9 @@ std::shared_ptr<NOlap::TSelectInfo> TDataStorageAccessor::Select(const NOlap::TR
readDescription.PKRangesFilter);
}
-std::vector<NOlap::TCommittedBlob> TDataStorageAccessor::GetCommitedBlobs(const NOlap::TReadDescription& readDescription) const {
- return std::move(InsertTable->Read(readDescription.PathId, readDescription.GetSnapshot()));
+std::vector<NOlap::TCommittedBlob> TDataStorageAccessor::GetCommitedBlobs(const NOlap::TReadDescription& readDescription, const std::shared_ptr<arrow::Schema>& pkSchema) const {
+
+ return std::move(InsertTable->Read(readDescription.PathId, readDescription.GetSnapshot(), pkSchema));
}
std::unique_ptr<NColumnShard::TScanIteratorBase> TReadMetadata::StartScan(const NOlap::TReadContext& readContext) const {
@@ -85,7 +87,7 @@ bool TReadMetadata::Init(const TReadDescription& readDescription, const TDataSto
AllColumns.insert(AllColumns.end(), auxiliaryColumns.begin(), auxiliaryColumns.end());
}
- CommittedBlobs = dataAccessor.GetCommitedBlobs(readDescription);
+ CommittedBlobs = dataAccessor.GetCommitedBlobs(readDescription, ResultIndexSchema->GetIndexInfo().GetReplaceKey());
THashSet<ui32> columnIds;
for (auto& columnId : AllColumns) {
diff --git a/ydb/core/tx/columnshard/engines/reader/read_metadata.h b/ydb/core/tx/columnshard/engines/reader/read_metadata.h
index 355e0339687..cd582350cb8 100644
--- a/ydb/core/tx/columnshard/engines/reader/read_metadata.h
+++ b/ydb/core/tx/columnshard/engines/reader/read_metadata.h
@@ -2,6 +2,7 @@
#include "conveyor_task.h"
#include "description.h"
#include "read_context.h"
+#include "read_filter_merger.h"
#include <ydb/library/accessor/accessor.h>
#include <ydb/core/tx/columnshard/blob.h>
#include <ydb/core/tx/columnshard/counters.h>
@@ -63,7 +64,7 @@ public:
TDataStorageAccessor(const std::unique_ptr<NOlap::TInsertTable>& insertTable,
const std::unique_ptr<NOlap::IColumnEngine>& index);
std::shared_ptr<NOlap::TSelectInfo> Select(const NOlap::TReadDescription& readDescription, const THashSet<ui32>& columnIds) const;
- std::vector<NOlap::TCommittedBlob> GetCommitedBlobs(const NOlap::TReadDescription& readDescription) const;
+ std::vector<NOlap::TCommittedBlob> GetCommitedBlobs(const NOlap::TReadDescription& readDescription, const std::shared_ptr<arrow::Schema>& pkSchema) const;
};
// Holds all metadata that is needed to perform read/scan
diff --git a/ydb/core/tx/columnshard/engines/ut_insert_table.cpp b/ydb/core/tx/columnshard/engines/ut_insert_table.cpp
index a4078db93b9..8c4dde51bf6 100644
--- a/ydb/core/tx/columnshard/engines/ut_insert_table.cpp
+++ b/ydb/core/tx/columnshard/engines/ut_insert_table.cpp
@@ -55,23 +55,22 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestInsertTable) {
TSnapshot indexSnapshot(1, 1);
// insert, not commited
- TInstant time = TInstant::Now();
- bool ok = insertTable.Insert(dbTable, TInsertedData(metaShard, writeId, tableId, dedupId, blobId1, {}, time, indexSnapshot));
+ bool ok = insertTable.Insert(dbTable, TInsertedData(metaShard, writeId, tableId, dedupId, blobId1, {}, indexSnapshot));
UNIT_ASSERT(ok);
// insert the same blobId1 again
- ok = insertTable.Insert(dbTable, TInsertedData(metaShard, writeId, tableId, dedupId, blobId1, {}, time, indexSnapshot));
+ ok = insertTable.Insert(dbTable, TInsertedData(metaShard, writeId, tableId, dedupId, blobId1, {}, indexSnapshot));
UNIT_ASSERT(!ok);
// insert different blodId with the same writeId and dedupId
TUnifiedBlobId blobId2(2222, 1, 2, 100, 1);
- ok = insertTable.Insert(dbTable, TInsertedData(metaShard, writeId, tableId, dedupId, blobId2, {}, time, indexSnapshot));
+ ok = insertTable.Insert(dbTable, TInsertedData(metaShard, writeId, tableId, dedupId, blobId2, {}, indexSnapshot));
UNIT_ASSERT(!ok);
// read nothing
- auto blobs = insertTable.Read(tableId, TSnapshot::Zero());
+ auto blobs = insertTable.Read(tableId, TSnapshot::Zero(), nullptr);
UNIT_ASSERT_EQUAL(blobs.size(), 0);
- blobs = insertTable.Read(tableId+1, TSnapshot::Zero());
+ blobs = insertTable.Read(tableId+1, TSnapshot::Zero(), nullptr);
UNIT_ASSERT_EQUAL(blobs.size(), 0);
// commit
@@ -84,15 +83,15 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestInsertTable) {
UNIT_ASSERT_EQUAL((*insertTable.GetPathPriorities().begin()->second.begin())->GetCommitted().size(), 1);
// read old snapshot
- blobs = insertTable.Read(tableId, TSnapshot::Zero());
+ blobs = insertTable.Read(tableId, TSnapshot::Zero(), nullptr);
UNIT_ASSERT_EQUAL(blobs.size(), 0);
- blobs = insertTable.Read(tableId+1, TSnapshot::Zero());
+ blobs = insertTable.Read(tableId+1, TSnapshot::Zero(), nullptr);
UNIT_ASSERT_EQUAL(blobs.size(), 0);
// read new snapshot
- blobs = insertTable.Read(tableId, TSnapshot(planStep, txId));
+ blobs = insertTable.Read(tableId, TSnapshot(planStep, txId), nullptr);
UNIT_ASSERT_EQUAL(blobs.size(), 1);
- blobs = insertTable.Read(tableId+1, TSnapshot::Zero());
+ blobs = insertTable.Read(tableId+1, TSnapshot::Zero(), nullptr);
UNIT_ASSERT_EQUAL(blobs.size(), 0);
}
}
diff --git a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
index fb4c91d3f82..c5284b9432f 100644
--- a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
+++ b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
@@ -412,8 +412,6 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
blobRanges.push_back(MakeBlobRange(2, testBlob.size()));
// PlanStep, TxId, PathId, DedupId, BlobId, Data, [Metadata]
- TInstant writeTime = TInstant::Now();
-
// load
TColumnEngineForLogs engine(0);
TSnapshot indexSnaphot(1, 1);
@@ -422,8 +420,8 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
engine.Load(db, lostBlobs);
std::vector<TInsertedData> dataToIndex = {
- {1, 2, paths[0], "", blobRanges[0].BlobId, "", writeTime, indexSnaphot},
- {2, 1, paths[0], "", blobRanges[1].BlobId, "", writeTime, indexSnaphot}
+ TInsertedData(1, 2, paths[0], "", blobRanges[0].BlobId, {}, indexSnaphot),
+ TInsertedData(2, 1, paths[0], "", blobRanges[1].BlobId, {}, indexSnaphot)
};
// write
@@ -522,7 +520,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
// PlanStep, TxId, PathId, DedupId, BlobId, Data, [Metadata]
std::vector<TInsertedData> dataToIndex;
dataToIndex.push_back(
- TInsertedData{planStep, txId, pathId, "", blobRange.BlobId, "", TInstant::Now(), indexSnapshot});
+ TInsertedData{planStep, txId, pathId, "", blobRange.BlobId, {}, indexSnapshot});
bool ok = Insert(engine, db, TSnapshot(planStep, txId), std::move(dataToIndex), blobs, step);
UNIT_ASSERT(ok);
@@ -622,7 +620,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
// PlanStep, TxId, PathId, DedupId, BlobId, Data, [Metadata]
std::vector<TInsertedData> dataToIndex;
dataToIndex.push_back(
- TInsertedData{planStep, txId, pathId, "", blobRange.BlobId, "", TInstant::Now(), indexSnapshot});
+ TInsertedData{planStep, txId, pathId, "", blobRange.BlobId, {}, indexSnapshot});
bool ok = Insert(engine, db, TSnapshot(planStep, txId), std::move(dataToIndex), blobs, step);
// first overload returns ok: it's a postcondition
@@ -660,7 +658,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
// PlanStep, TxId, PathId, DedupId, BlobId, Data, [Metadata]
std::vector<TInsertedData> dataToIndex;
dataToIndex.push_back(
- TInsertedData{planStep, txId, pathId, "", blobRange.BlobId, "", TInstant::Now(), indexSnapshot});
+ TInsertedData(planStep, txId, pathId, "", blobRange.BlobId, {}, indexSnapshot));
bool ok = Insert(engine, db, TSnapshot(planStep, txId), std::move(dataToIndex), blobs, step);
bool overload = engine.GetOverloadedGranules(pathId);
@@ -703,7 +701,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
// PlanStep, TxId, PathId, DedupId, BlobId, Data, [Metadata]
std::vector<TInsertedData> dataToIndex;
dataToIndex.push_back(
- TInsertedData{planStep, txId, pathId, "", blobRange.BlobId, "", TInstant::Now(), indexSnapshot});
+ TInsertedData(planStep, txId, pathId, "", blobRange.BlobId, {}, indexSnapshot));
bool ok = Insert(engine, db, TSnapshot(planStep, txId), std::move(dataToIndex), blobs, step);
UNIT_ASSERT(ok);
diff --git a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp
index cf1037451f0..fa1f3bbac99 100644
--- a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp
+++ b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp
@@ -25,7 +25,7 @@ IBlobConstructor::EStatus TIndexedWriteController::TBlobConstructor::BuildNext()
bool TIndexedWriteController::TBlobConstructor::RegisterBlobId(const TUnifiedBlobId& blobId) {
const auto& blobInfo = BlobsSplitted[CurrentIndex - 1];
- Owner.BlobData.emplace_back(blobId, blobInfo.GetData(), blobInfo.GetRowsCount(), blobInfo.GetRawBytes(), AppData()->TimeProvider->Now());
+ Owner.BlobData.emplace_back(blobId, blobInfo.GetData(), blobInfo.GetSpecialKeys(), blobInfo.GetRowsCount(), blobInfo.GetRawBytes(), AppData()->TimeProvider->Now());
return true;
}
diff --git a/ydb/core/tx/ev_write/write_data.cpp b/ydb/core/tx/ev_write/write_data.cpp
index 4150cbd28ad..feb1e61e5fa 100644
--- a/ydb/core/tx/ev_write/write_data.cpp
+++ b/ydb/core/tx/ev_write/write_data.cpp
@@ -8,6 +8,8 @@ namespace NKikimr::NEvWrite {
TWriteData::TWriteData(const TWriteMeta& writeMeta, IDataContainer::TPtr data)
: WriteMeta(writeMeta)
, Data(data)
-{}
+{
+ Y_VERIFY(Data);
+}
}
diff --git a/ydb/core/tx/ev_write/write_data.h b/ydb/core/tx/ev_write/write_data.h
index bcdd83a0715..9d95b58b98d 100644
--- a/ydb/core/tx/ev_write/write_data.h
+++ b/ydb/core/tx/ev_write/write_data.h
@@ -2,8 +2,9 @@
#include <ydb/core/tx/long_tx_service/public/types.h>
#include <ydb/core/formats/arrow/arrow_helpers.h>
+#include <ydb/library/accessor/accessor.h>
-
+#include <library/cpp/actors/core/monotonic.h>
namespace NKikimr::NEvWrite {
@@ -26,6 +27,7 @@ class TWriteMeta {
YDB_ACCESSOR(ui64, WritePartId, 0);
YDB_ACCESSOR_DEF(TString, DedupId);
+ YDB_READONLY(TMonotonic, WriteStartInstant, TMonotonic::Now());
public:
TWriteMeta(const ui64 writeId, const ui64 tableId, const NActors::TActorId& source)
: WriteId(writeId)
diff --git a/ydb/core/tx/ev_write/ya.make b/ydb/core/tx/ev_write/ya.make
index 6b93ee539a5..21cc23242f6 100644
--- a/ydb/core/tx/ev_write/ya.make
+++ b/ydb/core/tx/ev_write/ya.make
@@ -16,7 +16,7 @@ PEERDIR(
library/cpp/actors/core
library/cpp/actors/wilson
- # Temporary fix dep ydb/core/tx/columnshard
+ # Temporary fix dep ydb/core/tx/columnshard
ydb/core/tablet_flat/protos
ydb/core/tablet_flat
ydb/core/blobstorage/vdisk/protos