diff options
| author | ivanmorozov <[email protected]> | 2023-08-31 19:15:10 +0300 | 
|---|---|---|
| committer | ivanmorozov <[email protected]> | 2023-08-31 19:47:25 +0300 | 
| commit | 2c70a8f92a317edc09a0b56352129057d1f5ea90 (patch) | |
| tree | 597eb3c2a9707d4fbf3a00b689fe01f6eb75483a | |
| parent | 0ea09be71af2c4edddf36d4bb55e743a96eebf28 (diff) | |
KIKIMR-19218: insert table chunks with min/max
27 files changed, 305 insertions, 135 deletions
| diff --git a/ydb/core/protos/tx_columnshard.proto b/ydb/core/protos/tx_columnshard.proto index 3e6db3dd209..f9677baaf11 100644 --- a/ydb/core/protos/tx_columnshard.proto +++ b/ydb/core/protos/tx_columnshard.proto @@ -48,10 +48,9 @@ message TReadStats {  message TLogicalMetadata {      optional uint32 NumRows = 1; -    repeated NKikimrSSA.TProgram.TConstant FirstPkValue = 2; // It's min PK if batch is sorted -    repeated NKikimrSSA.TProgram.TConstant LastPkValue = 3; // It's max PK if batch is sorted      optional uint64 RawBytes = 4;      optional uint64 DirtyWriteTimeSeconds = 5; +    optional string SpecialKeysRawData = 6;  }  message TMetadata { diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp index 0f435cab595..247f05a5fee 100644 --- a/ydb/core/tx/columnshard/columnshard__write.cpp +++ b/ydb/core/tx/columnshard/columnshard__write.cpp @@ -58,7 +58,7 @@ bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const TEvPrivate::TEvWrit      auto tableSchema = Self->TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetSchemaUnsafe(PutBlobResult->Get()->GetSchemaVersion()); -    NOlap::TInsertedData insertData((ui64)writeId, writeMeta.GetTableId(), writeMeta.GetDedupId(), logoBlobId, meta, time, tableSchema->GetSnapshot()); +    NOlap::TInsertedData insertData(0, (ui64)writeId, writeMeta.GetTableId(), writeMeta.GetDedupId(), logoBlobId, meta, tableSchema->GetSnapshot());      bool ok = Self->InsertTable->Insert(dbTable, std::move(insertData));      if (ok) {          THashSet<TWriteId> writesToAbort = Self->InsertTable->OldWritesToAbort(time); @@ -112,7 +112,7 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {      }      TVector<TWriteId> writeIds; -    for (auto blobData: PutBlobResult->Get()->GetBlobData()) { +    for (auto blobData : PutBlobResult->Get()->GetBlobData()) {          auto writeId = TWriteId(writeMeta.GetWriteId());          if (operation) {              writeId = Self->BuildNextWriteId(txc); @@ -144,10 +144,12 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {  void TTxWrite::Complete(const TActorContext& ctx) {      Y_VERIFY(Result);      LOG_S_DEBUG(TxPrefix() << "complete" << TxSuffix()); +    Self->CSCounters.OnWriteTxComplete((TMonotonic::Now() - PutBlobResult->Get()->GetWriteMeta().GetWriteStartInstant()).MilliSeconds()); +    Self->CSCounters.OnSuccessWriteResponse();      ctx.Send(PutBlobResult->Get()->GetWriteMeta().GetSource(), Result.release());  } -void TColumnShard::OverloadWriteFail(const EOverloadStatus& overloadReason, const NEvWrite::TWriteData& writeData, std::unique_ptr<NActors::IEventBase>&& event, const TActorContext& ctx) { +void TColumnShard::OverloadWriteFail(const EOverloadStatus overloadReason, const NEvWrite::TWriteData& writeData, std::unique_ptr<NActors::IEventBase>&& event, const TActorContext& ctx) {      IncCounter(COUNTER_WRITE_FAIL);      switch (overloadReason) {          case EOverloadStatus::Disk: @@ -203,6 +205,7 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteBlobsResult::TPtr& ev, const TActo      auto wg = WritesMonitor.FinishWrite(putResult.GetResourceUsage().SourceMemorySize);      if (putResult.GetPutStatus() != NKikimrProto::OK) { +        CSCounters.OnWritePutBlobsFail((TMonotonic::Now() - writeMeta.GetWriteStartInstant()).MilliSeconds());          IncCounter(COUNTER_WRITE_FAIL);          auto errCode = NKikimrTxColumnShard::EResultStatus::STORAGE_ERROR; @@ -217,29 +220,33 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteBlobsResult::TPtr& ev, const TActo          if (writeMeta.HasLongTxId()) {              auto result = std::make_unique<TEvColumnShard::TEvWriteResult>(TabletID(), writeMeta, errCode);              ctx.Send(writeMeta.GetSource(), result.release()); +            CSCounters.OnFailedWriteResponse();          } else {              auto operation = OperationsManager.GetOperation((TWriteId)writeMeta.GetWriteId());              Y_VERIFY(operation);              auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(operation->GetTxId(), NKikimrDataEvents::TEvWriteResult::ERROR, "put data fails");              ctx.Send(writeMeta.GetSource(), result.release()); +            CSCounters.OnFailedWriteResponse();          } -        return; -    } +    } else { +        CSCounters.OnWritePutBlobsSuccess((TMonotonic::Now() - writeMeta.GetWriteStartInstant()).MilliSeconds()); +        LOG_S_DEBUG("Write (record) into pathId " << writeMeta.GetTableId() +            << (writeMeta.GetWriteId() ? (" writeId " + ToString(writeMeta.GetWriteId())).c_str() : "") << " at tablet " << TabletID()); -    LOG_S_DEBUG("Write (record) into pathId " << writeMeta.GetTableId() -        << (writeMeta.GetWriteId() ? (" writeId " + ToString(writeMeta.GetWriteId())).c_str() : "") << " at tablet " << TabletID()); +        Execute(new TTxWrite(this, ev), ctx); +    } -    Execute(new TTxWrite(this, ev), ctx);  }  void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContext& ctx) { +    CSCounters.OnStartWriteRequest();      LastAccessTime = TAppData::TimeProvider->Now();      const auto& record = Proto(ev->Get());      const ui64 tableId = record.GetTableId();      const ui64 writeId = record.GetWriteId();      const TString dedupId = record.GetDedupId(); -    const auto source = ev->Get()->GetSource(); +    const auto source = ev->Sender;      NEvWrite::TWriteMeta writeMeta(writeId, tableId, source);      writeMeta.SetDedupId(dedupId); @@ -253,7 +260,8 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex          IncCounter(COUNTER_WRITE_FAIL);          auto result = std::make_unique<TEvColumnShard::TEvWriteResult>(TabletID(), writeMeta, NKikimrTxColumnShard::EResultStatus::ERROR); -        ctx.Send(ev->Get()->GetSource(), result.release()); +        ctx.Send(source, result.release()); +        CSCounters.OnFailedWriteResponse();          return;      } @@ -264,7 +272,8 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex              << " at tablet " << TabletID());          IncCounter(COUNTER_WRITE_FAIL);          auto result = std::make_unique<TEvColumnShard::TEvWriteResult>(TabletID(), writeMeta, NKikimrTxColumnShard::EResultStatus::ERROR); -        ctx.Send(ev->Get()->GetSource(), result.release()); +        ctx.Send(source, result.release()); +        CSCounters.OnFailedWriteResponse();          return;      } @@ -273,6 +282,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex      if (overloadStatus != EOverloadStatus::None) {          std::unique_ptr<NActors::IEventBase> result = std::make_unique<TEvColumnShard::TEvWriteResult>(TabletID(), writeData.GetWriteMeta(), NKikimrTxColumnShard::EResultStatus::OVERLOADED);          OverloadWriteFail(overloadStatus, writeData, std::move(result), ctx); +        CSCounters.OnFailedWriteResponse();      } else {          if (ui64 writeId = (ui64) HasLongTxWrite(writeMeta.GetLongTxIdUnsafe(), writeMeta.GetWritePartId())) {              LOG_S_DEBUG("Write (duplicate) into pathId " << writeMeta.GetTableId() @@ -284,6 +294,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex              auto result = std::make_unique<TEvColumnShard::TEvWriteResult>(                  TabletID(), writeMeta, writeId, NKikimrTxColumnShard::EResultStatus::SUCCESS);              ctx.Send(writeMeta.GetSource(), result.release()); +            CSCounters.OnFailedWriteResponse();              return;          } @@ -295,6 +306,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex              << " at tablet " << TabletID());          auto writeController = std::make_shared<NOlap::TIndexedWriteController>(ctx.SelfID, writeData); +        CSCounters.OnWritePutBlobsStart();          ctx.Register(CreateWriteActor(TabletID(), writeController, BlobManager->StartBlobBatch(), TInstant::Max(), Settings.MaxSmallBlobSize));      }  } diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h index 60c962bb95c..4c5575cb355 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.h +++ b/ydb/core/tx/columnshard/columnshard_impl.h @@ -12,6 +12,7 @@  #include "inflight_request_tracker.h"  #include "counters/columnshard.h" +#include <ydb/core/base/tablet_pipecache.h>  #include <ydb/core/tablet/tablet_counters.h>  #include <ydb/core/tablet/tablet_pipe_client_cache.h>  #include <ydb/core/tablet_flat/flat_cxx_database.h> @@ -212,7 +213,7 @@ public:      };  private: -    void OverloadWriteFail(const EOverloadStatus& overloadReason, const NEvWrite::TWriteData& writeData, std::unique_ptr<NActors::IEventBase>&& event, const TActorContext& ctx); +    void OverloadWriteFail(const EOverloadStatus overloadReason, const NEvWrite::TWriteData& writeData, std::unique_ptr<NActors::IEventBase>&& event, const TActorContext& ctx);      EOverloadStatus CheckOverloaded(const ui64 tableId) const;  protected: diff --git a/ydb/core/tx/columnshard/columnshard_private_events.h b/ydb/core/tx/columnshard/columnshard_private_events.h index 4e6ea811193..c5601a38835 100644 --- a/ydb/core/tx/columnshard/columnshard_private_events.h +++ b/ydb/core/tx/columnshard/columnshard_private_events.h @@ -6,6 +6,7 @@  #include <ydb/core/protos/counters_columnshard.pb.h>  #include <ydb/core/tx/columnshard/engines/writer/write_controller.h>  #include <ydb/core/tx/ev_write/write_data.h> +#include <ydb/core/formats/arrow/special_keys.h>  namespace NKikimr::NColumnShard { @@ -259,7 +260,7 @@ struct TEvPrivate {          public:              TPutBlobData() = default; -            TPutBlobData(const TUnifiedBlobId& blobId, const TString& data, ui64 rowsCount, ui64 rawBytes, const TInstant dirtyTime) +            TPutBlobData(const TUnifiedBlobId& blobId, const TString& data, const NArrow::TFirstLastSpecialKeys& specialKeys, ui64 rowsCount, ui64 rawBytes, const TInstant dirtyTime)                  : BlobId(blobId)                  , BlobData(data)                  , RowsCount(rowsCount) @@ -268,6 +269,7 @@ struct TEvPrivate {                  LogicalMeta.SetNumRows(rowsCount);                  LogicalMeta.SetRawBytes(rawBytes);                  LogicalMeta.SetDirtyWriteTimeSeconds(dirtyTime.Seconds()); +                LogicalMeta.SetSpecialKeysRawData(specialKeys.SerializeToString());              }          }; diff --git a/ydb/core/tx/columnshard/columnshard_schema.cpp b/ydb/core/tx/columnshard/columnshard_schema.cpp index 3ec637e7862..07e4c60c4af 100644 --- a/ydb/core/tx/columnshard/columnshard_schema.cpp +++ b/ydb/core/tx/columnshard/columnshard_schema.cpp @@ -25,4 +25,51 @@ bool Schema::IndexColumns_Load(NIceDb::TNiceDb& db, const IBlobGroupSelector* ds      return true;  } +bool Schema::InsertTable_Load(NIceDb::TNiceDb& db, const IBlobGroupSelector* dsGroupSelector, NOlap::TInsertTableAccessor& insertTable, const TInstant& /*loadTime*/) { +    auto rowset = db.Table<InsertTable>().GreaterOrEqual(0, 0, 0, 0, "").Select(); +    if (!rowset.IsReady()) +        return false; + +    while (!rowset.EndOfSet()) { +        EInsertTableIds recType = (EInsertTableIds)rowset.GetValue<InsertTable::Committed>(); +        ui64 shardOrPlan = rowset.GetValue<InsertTable::ShardOrPlan>(); +        ui64 writeTxId = rowset.GetValueOrDefault<InsertTable::WriteTxId>(); +        ui64 pathId = rowset.GetValue<InsertTable::PathId>(); +        TString dedupId = rowset.GetValue<InsertTable::DedupId>(); +        TString strBlobId = rowset.GetValue<InsertTable::BlobId>(); +        TString metaStr = rowset.GetValue<InsertTable::Meta>(); + +        Y_VERIFY(rowset.HaveValue<InsertTable::IndexPlanStep>()); +        ui64 indexPlanStep = rowset.GetValue<InsertTable::IndexPlanStep>(); +        ui64 indexTxId = rowset.GetValue<InsertTable::IndexTxId>(); +        const NOlap::TSnapshot indexSnapshot(indexPlanStep, indexTxId); + +        TString error; +        NOlap::TUnifiedBlobId blobId = NOlap::TUnifiedBlobId::ParseFromString(strBlobId, dsGroupSelector, error); +        Y_VERIFY(blobId.IsValid(), "Failied to parse blob id: %s", error.c_str()); + +        NKikimrTxColumnShard::TLogicalMetadata meta; +        if (metaStr) { +            Y_VERIFY(meta.ParseFromString(metaStr)); +        } +        TInsertedData data(shardOrPlan, writeTxId, pathId, dedupId, blobId, meta, indexSnapshot); + +        switch (recType) { +            case EInsertTableIds::Inserted: +                insertTable.AddInserted(std::move(data), true); +                break; +            case EInsertTableIds::Committed: +                insertTable.AddCommitted(std::move(data), true); +                break; +            case EInsertTableIds::Aborted: +                insertTable.AddAborted(std::move(data), true); +                break; +        } + +        if (!rowset.Next()) +            return false; +    } +    return true; +} +  } diff --git a/ydb/core/tx/columnshard/columnshard_schema.h b/ydb/core/tx/columnshard/columnshard_schema.h index d4e734b7270..47017374ddc 100644 --- a/ydb/core/tx/columnshard/columnshard_schema.h +++ b/ydb/core/tx/columnshard/columnshard_schema.h @@ -438,19 +438,13 @@ struct Schema : NIceDb::Schema {      // InsertTable activities      static void InsertTable_Upsert(NIceDb::TNiceDb& db, EInsertTableIds recType, const TInsertedData& data) { -        if (data.GetSchemaSnapshot().Valid()) { -            db.Table<InsertTable>().Key((ui8)recType, data.ShardOrPlan, data.WriteTxId, data.PathId, data.DedupId).Update( -                NIceDb::TUpdate<InsertTable::BlobId>(data.BlobId.ToStringLegacy()), -                NIceDb::TUpdate<InsertTable::Meta>(data.Metadata), -                NIceDb::TUpdate<InsertTable::IndexPlanStep>(data.GetSchemaSnapshot().GetPlanStep()), -                NIceDb::TUpdate<InsertTable::IndexTxId>(data.GetSchemaSnapshot().GetTxId()) -            ); -        } else { -            db.Table<InsertTable>().Key((ui8)recType, data.ShardOrPlan, data.WriteTxId, data.PathId, data.DedupId).Update( -                NIceDb::TUpdate<InsertTable::BlobId>(data.BlobId.ToStringLegacy()), -                NIceDb::TUpdate<InsertTable::Meta>(data.Metadata) -            ); -        } +        Y_VERIFY(data.GetSchemaSnapshot().Valid()); +        db.Table<InsertTable>().Key((ui8)recType, data.ShardOrPlan, data.WriteTxId, data.PathId, data.DedupId).Update( +            NIceDb::TUpdate<InsertTable::BlobId>(data.BlobId.ToStringLegacy()), +            NIceDb::TUpdate<InsertTable::Meta>(data.GetMeta().SerializeToProto().SerializeAsString()), +            NIceDb::TUpdate<InsertTable::IndexPlanStep>(data.GetSchemaSnapshot().GetPlanStep()), +            NIceDb::TUpdate<InsertTable::IndexTxId>(data.GetSchemaSnapshot().GetTxId()) +        );      }      static void InsertTable_Erase(NIceDb::TNiceDb& db, EInsertTableIds recType, const TInsertedData& data) { @@ -484,56 +478,7 @@ struct Schema : NIceDb::Schema {      static bool InsertTable_Load(NIceDb::TNiceDb& db,                                   const IBlobGroupSelector* dsGroupSelector,                                   NOlap::TInsertTableAccessor& insertTable, -                                 const TInstant& loadTime) { -        auto rowset = db.Table<InsertTable>().GreaterOrEqual(0, 0, 0, 0, "").Select(); -        if (!rowset.IsReady()) -            return false; - -        while (!rowset.EndOfSet()) { -            EInsertTableIds recType = (EInsertTableIds)rowset.GetValue<InsertTable::Committed>(); -            ui64 shardOrPlan = rowset.GetValue<InsertTable::ShardOrPlan>(); -            ui64 writeTxId = rowset.GetValueOrDefault<InsertTable::WriteTxId>(); -            ui64 pathId = rowset.GetValue<InsertTable::PathId>(); -            TString dedupId = rowset.GetValue<InsertTable::DedupId>(); -            TString strBlobId = rowset.GetValue<InsertTable::BlobId>(); -            TString metaStr = rowset.GetValue<InsertTable::Meta>(); - -            std::optional<NOlap::TSnapshot> indexSnapshot; -            if (rowset.HaveValue<InsertTable::IndexPlanStep>()) { -                ui64 indexPlanStep = rowset.GetValue<InsertTable::IndexPlanStep>(); -                ui64 indexTxId = rowset.GetValue<InsertTable::IndexTxId>(); -                indexSnapshot = NOlap::TSnapshot(indexPlanStep, indexTxId); -            } - -            TString error; -            NOlap::TUnifiedBlobId blobId = NOlap::TUnifiedBlobId::ParseFromString(strBlobId, dsGroupSelector, error); -            Y_VERIFY(blobId.IsValid(), "Failied to parse blob id: %s", error.c_str()); - -            TInstant writeTime = loadTime; -            NKikimrTxColumnShard::TLogicalMetadata meta; -            if (meta.ParseFromString(metaStr) && meta.HasDirtyWriteTimeSeconds()) { -                writeTime = TInstant::Seconds(meta.GetDirtyWriteTimeSeconds()); -            } - -            TInsertedData data(shardOrPlan, writeTxId, pathId, dedupId, blobId, metaStr, writeTime, indexSnapshot); - -            switch (recType) { -                case EInsertTableIds::Inserted: -                    insertTable.AddInserted(std::move(data), true); -                    break; -                case EInsertTableIds::Committed: -                    insertTable.AddCommitted(std::move(data), true); -                    break; -                case EInsertTableIds::Aborted: -                    insertTable.AddAborted(std::move(data), true); -                    break; -            } - -            if (!rowset.Next()) -                return false; -        } -        return true; -    } +                                 const TInstant& loadTime);      // IndexGranules activities diff --git a/ydb/core/tx/columnshard/counters/columnshard.cpp b/ydb/core/tx/columnshard/counters/columnshard.cpp index a3997488565..372e5a46f72 100644 --- a/ydb/core/tx/columnshard/counters/columnshard.cpp +++ b/ydb/core/tx/columnshard/counters/columnshard.cpp @@ -34,6 +34,14 @@ TCSCounters::TCSCounters()      SplitCompactionGranuleBytes = TBase::GetValueAutoAggregationsClient("SplitCompaction/Bytes");      SplitCompactionGranulePortionsCount = TBase::GetValueAutoAggregationsClient("SplitCompaction/PortionsCount"); + +    HistogramSuccessWritePutBlobsDurationMs = TBase::GetHistogram("SuccessWritePutBlobsDurationMs", NMonitoring::ExponentialHistogram(18, 2, 5)); +    HistogramFailedWritePutBlobsDurationMs = TBase::GetHistogram("FailedWritePutBlobsDurationMs", NMonitoring::ExponentialHistogram(18, 2, 5)); +    HistogramWriteTxCompleteDurationMs = TBase::GetHistogram("WriteTxCompleteDurationMs", NMonitoring::ExponentialHistogram(18, 2, 5)); +    WritePutBlobsCount = TBase::GetValue("WritePutBlobs"); +    WriteRequests = TBase::GetValue("WriteRequests"); +    FailedWriteRequests = TBase::GetDeriviative("FailedWriteRequests"); +    SuccessWriteRequests = TBase::GetDeriviative("SuccessWriteRequests");  }  } diff --git a/ydb/core/tx/columnshard/counters/columnshard.h b/ydb/core/tx/columnshard/counters/columnshard.h index dea6ad256e9..546d45aaa6e 100644 --- a/ydb/core/tx/columnshard/counters/columnshard.h +++ b/ydb/core/tx/columnshard/counters/columnshard.h @@ -34,7 +34,47 @@ private:      std::shared_ptr<TValueAggregationClient> SplitCompactionGranuleBytes;      std::shared_ptr<TValueAggregationClient> SplitCompactionGranulePortionsCount; + +    NMonitoring::THistogramPtr HistogramSuccessWritePutBlobsDurationMs; +    NMonitoring::THistogramPtr HistogramFailedWritePutBlobsDurationMs; +    NMonitoring::THistogramPtr HistogramWriteTxCompleteDurationMs; +    NMonitoring::TDynamicCounters::TCounterPtr WritePutBlobsCount; +    NMonitoring::TDynamicCounters::TCounterPtr WriteRequests; +    NMonitoring::TDynamicCounters::TCounterPtr FailedWriteRequests; +    NMonitoring::TDynamicCounters::TCounterPtr SuccessWriteRequests;  public: +    void OnStartWriteRequest() const { +        WriteRequests->Add(1); +    } + +    void OnFailedWriteResponse() const { +        WriteRequests->Sub(1); +        FailedWriteRequests->Add(1); +    } + +    void OnSuccessWriteResponse() const { +        WriteRequests->Sub(1); +        SuccessWriteRequests->Add(1); +    } + +    void OnWritePutBlobsSuccess(const ui32 milliseconds) const { +        HistogramSuccessWritePutBlobsDurationMs->Collect(milliseconds); +        WritePutBlobsCount->Sub(1); +    } + +    void OnWritePutBlobsFail(const ui32 milliseconds) const { +        HistogramFailedWritePutBlobsDurationMs->Collect(milliseconds); +        WritePutBlobsCount->Sub(1); +    } + +    void OnWritePutBlobsStart() const { +        WritePutBlobsCount->Add(1); +    } + +    void OnWriteTxComplete(const ui32 milliseconds) const { +        HistogramWriteTxCompleteDurationMs->Collect(milliseconds); +    } +      void OnInternalCompactionInfo(const ui64 bytes, const ui32 portionsCount) const {          InternalCompactionGranuleBytes->SetValue(bytes);          InternalCompactionGranulePortionsCount->SetValue(portionsCount); diff --git a/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.darwin-x86_64.txt index f8834fe594a..3524a65489c 100644 --- a/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.darwin-x86_64.txt @@ -21,4 +21,5 @@ target_sources(columnshard-engines-insert_table PRIVATE    ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp    ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/data.cpp    ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/path_info.cpp +  ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/meta.cpp  ) diff --git a/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.linux-aarch64.txt index 7762ca111a9..d7120e82291 100644 --- a/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.linux-aarch64.txt @@ -22,4 +22,5 @@ target_sources(columnshard-engines-insert_table PRIVATE    ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp    ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/data.cpp    ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/path_info.cpp +  ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/meta.cpp  ) diff --git a/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.linux-x86_64.txt index 7762ca111a9..d7120e82291 100644 --- a/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.linux-x86_64.txt @@ -22,4 +22,5 @@ target_sources(columnshard-engines-insert_table PRIVATE    ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp    ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/data.cpp    ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/path_info.cpp +  ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/meta.cpp  ) diff --git a/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.windows-x86_64.txt index f8834fe594a..3524a65489c 100644 --- a/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/insert_table/CMakeLists.windows-x86_64.txt @@ -21,4 +21,5 @@ target_sources(columnshard-engines-insert_table PRIVATE    ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp    ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/data.cpp    ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/path_info.cpp +  ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table/meta.cpp  ) diff --git a/ydb/core/tx/columnshard/engines/insert_table/data.h b/ydb/core/tx/columnshard/engines/insert_table/data.h index a02cd61fb51..596dda3b174 100644 --- a/ydb/core/tx/columnshard/engines/insert_table/data.h +++ b/ydb/core/tx/columnshard/engines/insert_table/data.h @@ -1,4 +1,5 @@  #pragma once +#include "meta.h"  #include <ydb/core/tx/columnshard/blob.h>  #include <ydb/core/tx/columnshard/engines/defs.h>  #include <ydb/core/protos/tx_columnshard.pb.h> @@ -6,42 +7,45 @@  namespace NKikimr::NOlap {  struct TInsertedData { +private: +    TInsertedDataMeta Meta;  public: + +    const TInsertedDataMeta& GetMeta() const { +        return Meta; +    } +      ui64 ShardOrPlan = 0;      ui64 WriteTxId = 0;      ui64 PathId = 0;      TString DedupId;      TUnifiedBlobId BlobId; -    TString Metadata; -    TInstant DirtyTime;      TInsertedData() = delete; // avoid invalid TInsertedData anywhere -    TInsertedData(ui64 writeTxId, ui64 pathId, TString dedupId, const TUnifiedBlobId& blobId, -                  const NKikimrTxColumnShard::TLogicalMetadata& meta, const TInstant& writeTime, const TSnapshot& schemaVersion) -        : WriteTxId(writeTxId) +    TInsertedData(ui64 shardOrPlan, ui64 writeTxId, ui64 pathId, TString dedupId, const TUnifiedBlobId& blobId, +                  const ui32 numRows, const ui64 rawBytes, const std::shared_ptr<arrow::RecordBatch>& batch, const std::vector<TString>& columnNames, const TInstant writeTime, const TSnapshot& schemaVersion) +        : Meta(writeTime, numRows, rawBytes, batch, columnNames) +        , ShardOrPlan(shardOrPlan) +        , WriteTxId(writeTxId)          , PathId(pathId)          , DedupId(dedupId)          , BlobId(blobId) -        , DirtyTime(writeTime)          , SchemaVersion(schemaVersion)      { -        Y_VERIFY(meta.SerializeToString(&Metadata)); +        Y_VERIFY(SchemaVersion.Valid());      }      TInsertedData(ui64 shardOrPlan, ui64 writeTxId, ui64 pathId, TString dedupId, const TUnifiedBlobId& blobId, -                  const TString& meta, const TInstant& writeTime, const std::optional<TSnapshot>& schemaVersion) -        : ShardOrPlan(shardOrPlan) +        const NKikimrTxColumnShard::TLogicalMetadata& proto, const TSnapshot& schemaVersion) +        : Meta(proto) +        , ShardOrPlan(shardOrPlan)          , WriteTxId(writeTxId)          , PathId(pathId)          , DedupId(dedupId)          , BlobId(blobId) -        , Metadata(meta) -        , DirtyTime(writeTime)    { -        if (schemaVersion) { -            SchemaVersion = *schemaVersion; -            Y_VERIFY(SchemaVersion.Valid()); -        } +        , SchemaVersion(schemaVersion) { +        Y_VERIFY(SchemaVersion.Valid());      }      bool operator < (const TInsertedData& key) const { @@ -115,17 +119,27 @@ private:      TUnifiedBlobId BlobId;      TSnapshot CommitSnapshot;      TSnapshot SchemaSnapshot; +    YDB_READONLY_DEF(std::optional<NArrow::TReplaceKey>, First); +    YDB_READONLY_DEF(std::optional<NArrow::TReplaceKey>, Last);  public: -    TCommittedBlob(const TUnifiedBlobId& blobId, const TSnapshot& snapshot, const TSnapshot& schemaSnapshot) +    const NArrow::TReplaceKey& GetFirstVerified() const { +        Y_VERIFY(First); +        return *First; +    } + +    const NArrow::TReplaceKey& GetLastVerified() const { +        Y_VERIFY(Last); +        return *Last; +    } + +    TCommittedBlob(const TUnifiedBlobId& blobId, const TSnapshot& snapshot, const TSnapshot& schemaSnapshot, const std::optional<NArrow::TReplaceKey>& first, const std::optional<NArrow::TReplaceKey>& last)          : BlobId(blobId)          , CommitSnapshot(snapshot)          , SchemaSnapshot(schemaSnapshot) +        , First(first) +        , Last(last)      {} -    static TCommittedBlob BuildKeyBlob(const TUnifiedBlobId& blobId) { -        return TCommittedBlob(blobId, TSnapshot::Zero(), TSnapshot::Zero()); -    } -      /// It uses trick then we place key wtih planStep:txId in container and find them later by BlobId only.      /// So hash() and equality should depend on BlobId only.      bool operator == (const TCommittedBlob& key) const { return BlobId == key.BlobId; } diff --git a/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp b/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp index a7b0402e40c..1988620a5bc 100644 --- a/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp +++ b/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp @@ -24,11 +24,8 @@ TInsertionSummary::TCounters TInsertTable::Commit(IDbWrapper& dbTable, ui64 plan          std::optional<TInsertedData> data = Summary.ExtractInserted(writeId);          Y_VERIFY(data, "Commit %" PRIu64 ":%" PRIu64 " : writeId %" PRIu64 " not found", planStep, txId, (ui64)writeId); -        NKikimrTxColumnShard::TLogicalMetadata meta; -        if (meta.ParseFromString(data->Metadata)) { -            counters.Rows += meta.GetNumRows(); -            counters.RawBytes += meta.GetRawBytes(); -        } +        counters.Rows += data->GetMeta().GetNumRows(); +        counters.RawBytes += data->GetMeta().GetRawBytes();          counters.Bytes += data->BlobSize();          dbTable.EraseInserted(*data); @@ -108,22 +105,32 @@ bool TInsertTable::Load(IDbWrapper& dbTable, const TInstant loadTime) {      return dbTable.Load(*this, loadTime);  } -std::vector<TCommittedBlob> TInsertTable::Read(ui64 pathId, const TSnapshot& snapshot) const { +std::vector<TCommittedBlob> TInsertTable::Read(ui64 pathId, const TSnapshot& snapshot, const std::shared_ptr<arrow::Schema>& pkSchema) const {      const TPathInfo* pInfo = Summary.GetPathInfoOptional(pathId);      if (!pInfo) {          return {};      } -    std::vector<TCommittedBlob> ret; +    std::vector<const TInsertedData*> ret;      ret.reserve(pInfo->GetCommitted().size());      for (const auto& data : pInfo->GetCommitted()) {          if (std::less_equal<TSnapshot>()(data.GetSnapshot(), snapshot)) { -            ret.emplace_back(TCommittedBlob(data.BlobId, data.GetSnapshot(), data.GetSchemaSnapshot())); +            ret.emplace_back(&data);          }      } +    const auto pred = [pkSchema](const TInsertedData* l, const TInsertedData* r) { +        return l->GetMeta().GetMin(pkSchema) < r->GetMeta().GetMin(pkSchema); +    }; +    std::sort(ret.begin(), ret.end(), pred); + +    std::vector<TCommittedBlob> result; +    result.reserve(ret.size()); +    for (auto&& i : ret) { +        result.emplace_back(TCommittedBlob(i->BlobId, i->GetSnapshot(), i->GetSchemaSnapshot(), i->GetMeta().GetMin(pkSchema), i->GetMeta().GetMax(pkSchema))); +    } -    return ret; +    return result;  }  } diff --git a/ydb/core/tx/columnshard/engines/insert_table/insert_table.h b/ydb/core/tx/columnshard/engines/insert_table/insert_table.h index 6aaaa129e29..49bb8968439 100644 --- a/ydb/core/tx/columnshard/engines/insert_table/insert_table.h +++ b/ydb/core/tx/columnshard/engines/insert_table/insert_table.h @@ -61,7 +61,7 @@ public:      THashSet<TWriteId> DropPath(IDbWrapper& dbTable, ui64 pathId);      void EraseCommitted(IDbWrapper& dbTable, const TInsertedData& key);      void EraseAborted(IDbWrapper& dbTable, const TInsertedData& key); -    std::vector<TCommittedBlob> Read(ui64 pathId, const TSnapshot& snapshot) const; +    std::vector<TCommittedBlob> Read(ui64 pathId, const TSnapshot& snapshot, const std::shared_ptr<arrow::Schema>& pkSchema) const;      bool Load(IDbWrapper& dbTable, const TInstant loadTime);  private: diff --git a/ydb/core/tx/columnshard/engines/insert_table/meta.cpp b/ydb/core/tx/columnshard/engines/insert_table/meta.cpp new file mode 100644 index 00000000000..092e852abc0 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/insert_table/meta.cpp @@ -0,0 +1,28 @@ +#include "meta.h" + +namespace NKikimr::NOlap { + +bool TInsertedDataMeta::DeserializeFromProto(const NKikimrTxColumnShard::TLogicalMetadata& proto) { +    if (proto.HasDirtyWriteTimeSeconds()) { +        DirtyWriteTime = TInstant::Seconds(proto.GetDirtyWriteTimeSeconds()); +    } +    if (proto.HasRawBytes()) { +        SpecialKeys = NArrow::TFirstLastSpecialKeys(proto.GetSpecialKeysRawData()); +    } +    NumRows = proto.GetNumRows(); +    RawBytes = proto.GetRawBytes(); +    return true; +} + +NKikimrTxColumnShard::TLogicalMetadata TInsertedDataMeta::SerializeToProto() const { +    NKikimrTxColumnShard::TLogicalMetadata result; +    result.SetDirtyWriteTimeSeconds(DirtyWriteTime.Seconds()); +    if (SpecialKeys) { +        result.SetSpecialKeysRawData(SpecialKeys->SerializeToString()); +    } +    result.SetNumRows(NumRows); +    result.SetRawBytes(RawBytes); +    return result; +} + +} diff --git a/ydb/core/tx/columnshard/engines/insert_table/meta.h b/ydb/core/tx/columnshard/engines/insert_table/meta.h new file mode 100644 index 00000000000..44a0afbb0c1 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/insert_table/meta.h @@ -0,0 +1,53 @@ +#pragma once +#include <ydb/core/formats/arrow/special_keys.h> +#include <ydb/core/tx/columnshard/blob.h> +#include <ydb/core/tx/columnshard/engines/defs.h> +#include <ydb/core/protos/tx_columnshard.pb.h> +#include <ydb/library/accessor/accessor.h> + +namespace NKikimr::NOlap { + +class TInsertedDataMeta { +private: +    YDB_READONLY_DEF(TInstant, DirtyWriteTime); +    YDB_READONLY(ui32, NumRows, 0); +    YDB_READONLY(ui64, RawBytes, 0); +    std::optional<NArrow::TFirstLastSpecialKeys> SpecialKeys; + +    bool DeserializeFromProto(const NKikimrTxColumnShard::TLogicalMetadata& proto); + +public: +    TInsertedDataMeta(const NKikimrTxColumnShard::TLogicalMetadata& proto) { +        Y_VERIFY(DeserializeFromProto(proto)); +    } + +    TInsertedDataMeta(const TInstant dirtyWriteTime, const ui32 numRows, const ui64 rawBytes, std::shared_ptr<arrow::RecordBatch> batch, const std::vector<TString>& columnNames = {}) +        : DirtyWriteTime(dirtyWriteTime) +        , NumRows(numRows) +        , RawBytes(rawBytes) +    { +        if (batch) { +            SpecialKeys = NArrow::TFirstLastSpecialKeys(batch, columnNames); +        } +    } + +    std::optional<NArrow::TReplaceKey> GetMin(const std::shared_ptr<arrow::Schema>& schema) const { +        if (SpecialKeys) { +            return SpecialKeys->GetMin(schema); +        } else { +            return {}; +        } +    } +    std::optional<NArrow::TReplaceKey> GetMax(const std::shared_ptr<arrow::Schema>& schema) const { +        if (SpecialKeys) { +            return SpecialKeys->GetMax(schema); +        } else { +            return {}; +        } +    } + +    NKikimrTxColumnShard::TLogicalMetadata SerializeToProto() const; + +}; + +} diff --git a/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp b/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp index a708cbdd3b5..0bc69d578ff 100644 --- a/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp +++ b/ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp @@ -93,6 +93,11 @@ bool TInsertionSummary::IsOverloaded(const ui64 pathId) const {  void TInsertionSummary::Clear() {      StatsPrepared = {};      StatsCommitted = {}; +    if (LocalInsertedCritical) { +        --LocalInsertedCritical; +        CriticalInserted.Dec(); +    } +      PathInfo.clear();      Priorities.clear();      Inserted.clear(); @@ -129,7 +134,7 @@ THashSet<NKikimr::NOlap::TWriteId> TInsertionSummary::GetInsertedByPathId(const  THashSet<NKikimr::NOlap::TWriteId> TInsertionSummary::GetDeprecatedInsertions(const TInstant timeBorder) const {      THashSet<TWriteId> toAbort;      for (auto& [writeId, data] : Inserted) { -        if (data.DirtyTime && data.DirtyTime < timeBorder) { +        if (data.GetMeta().GetDirtyWriteTime() && data.GetMeta().GetDirtyWriteTime() < timeBorder) {              toAbort.insert(writeId);          }      } diff --git a/ydb/core/tx/columnshard/engines/insert_table/ya.make b/ydb/core/tx/columnshard/engines/insert_table/ya.make index c38f51f528e..5f1d92bfb0e 100644 --- a/ydb/core/tx/columnshard/engines/insert_table/ya.make +++ b/ydb/core/tx/columnshard/engines/insert_table/ya.make @@ -5,6 +5,7 @@ SRCS(      rt_insertion.cpp      data.cpp      path_info.cpp +    meta.cpp  )  PEERDIR( diff --git a/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp b/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp index 5c46f9eefe6..c8c8317dbee 100644 --- a/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp +++ b/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp @@ -5,6 +5,7 @@  #include <ydb/core/tx/columnshard/hooks/abstract/abstract.h>  #include <ydb/core/tx/columnshard/columnshard__index_scan.h>  #include <ydb/core/tx/columnshard/columnshard__stats_scan.h> +#include <ydb/core/tx/columnshard/engines/indexed_read_data.h>  #include <util/string/join.h>  namespace NKikimr::NOlap { @@ -25,8 +26,9 @@ std::shared_ptr<NOlap::TSelectInfo> TDataStorageAccessor::Select(const NOlap::TR                              readDescription.PKRangesFilter);  } -std::vector<NOlap::TCommittedBlob> TDataStorageAccessor::GetCommitedBlobs(const NOlap::TReadDescription& readDescription) const { -    return std::move(InsertTable->Read(readDescription.PathId, readDescription.GetSnapshot())); +std::vector<NOlap::TCommittedBlob> TDataStorageAccessor::GetCommitedBlobs(const NOlap::TReadDescription& readDescription, const std::shared_ptr<arrow::Schema>& pkSchema) const { +     +    return std::move(InsertTable->Read(readDescription.PathId, readDescription.GetSnapshot(), pkSchema));  }  std::unique_ptr<NColumnShard::TScanIteratorBase> TReadMetadata::StartScan(const NOlap::TReadContext& readContext) const { @@ -85,7 +87,7 @@ bool TReadMetadata::Init(const TReadDescription& readDescription, const TDataSto          AllColumns.insert(AllColumns.end(), auxiliaryColumns.begin(), auxiliaryColumns.end());      } -    CommittedBlobs = dataAccessor.GetCommitedBlobs(readDescription); +    CommittedBlobs = dataAccessor.GetCommitedBlobs(readDescription, ResultIndexSchema->GetIndexInfo().GetReplaceKey());      THashSet<ui32> columnIds;      for (auto& columnId : AllColumns) { diff --git a/ydb/core/tx/columnshard/engines/reader/read_metadata.h b/ydb/core/tx/columnshard/engines/reader/read_metadata.h index 355e0339687..cd582350cb8 100644 --- a/ydb/core/tx/columnshard/engines/reader/read_metadata.h +++ b/ydb/core/tx/columnshard/engines/reader/read_metadata.h @@ -2,6 +2,7 @@  #include "conveyor_task.h"  #include "description.h"  #include "read_context.h" +#include "read_filter_merger.h"  #include <ydb/library/accessor/accessor.h>  #include <ydb/core/tx/columnshard/blob.h>  #include <ydb/core/tx/columnshard/counters.h> @@ -63,7 +64,7 @@ public:      TDataStorageAccessor(const std::unique_ptr<NOlap::TInsertTable>& insertTable,                                   const std::unique_ptr<NOlap::IColumnEngine>& index);      std::shared_ptr<NOlap::TSelectInfo> Select(const NOlap::TReadDescription& readDescription, const THashSet<ui32>& columnIds) const; -    std::vector<NOlap::TCommittedBlob> GetCommitedBlobs(const NOlap::TReadDescription& readDescription) const; +    std::vector<NOlap::TCommittedBlob> GetCommitedBlobs(const NOlap::TReadDescription& readDescription, const std::shared_ptr<arrow::Schema>& pkSchema) const;  };  // Holds all metadata that is needed to perform read/scan diff --git a/ydb/core/tx/columnshard/engines/ut_insert_table.cpp b/ydb/core/tx/columnshard/engines/ut_insert_table.cpp index a4078db93b9..8c4dde51bf6 100644 --- a/ydb/core/tx/columnshard/engines/ut_insert_table.cpp +++ b/ydb/core/tx/columnshard/engines/ut_insert_table.cpp @@ -55,23 +55,22 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestInsertTable) {          TSnapshot indexSnapshot(1, 1);           // insert, not commited -        TInstant time = TInstant::Now(); -        bool ok = insertTable.Insert(dbTable, TInsertedData(metaShard, writeId, tableId, dedupId, blobId1, {}, time, indexSnapshot)); +        bool ok = insertTable.Insert(dbTable, TInsertedData(metaShard, writeId, tableId, dedupId, blobId1, {}, indexSnapshot));          UNIT_ASSERT(ok);          // insert the same blobId1 again -        ok = insertTable.Insert(dbTable, TInsertedData(metaShard, writeId, tableId, dedupId, blobId1, {}, time, indexSnapshot)); +        ok = insertTable.Insert(dbTable, TInsertedData(metaShard, writeId, tableId, dedupId, blobId1, {}, indexSnapshot));          UNIT_ASSERT(!ok);          // insert different blodId with the same writeId and dedupId          TUnifiedBlobId blobId2(2222, 1, 2, 100, 1); -        ok = insertTable.Insert(dbTable, TInsertedData(metaShard, writeId, tableId, dedupId, blobId2, {}, time, indexSnapshot)); +        ok = insertTable.Insert(dbTable, TInsertedData(metaShard, writeId, tableId, dedupId, blobId2, {}, indexSnapshot));          UNIT_ASSERT(!ok);          // read nothing -        auto blobs = insertTable.Read(tableId, TSnapshot::Zero()); +        auto blobs = insertTable.Read(tableId, TSnapshot::Zero(), nullptr);          UNIT_ASSERT_EQUAL(blobs.size(), 0); -        blobs = insertTable.Read(tableId+1, TSnapshot::Zero()); +        blobs = insertTable.Read(tableId+1, TSnapshot::Zero(), nullptr);          UNIT_ASSERT_EQUAL(blobs.size(), 0);          // commit @@ -84,15 +83,15 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestInsertTable) {          UNIT_ASSERT_EQUAL((*insertTable.GetPathPriorities().begin()->second.begin())->GetCommitted().size(), 1);          // read old snapshot -        blobs = insertTable.Read(tableId, TSnapshot::Zero()); +        blobs = insertTable.Read(tableId, TSnapshot::Zero(), nullptr);          UNIT_ASSERT_EQUAL(blobs.size(), 0); -        blobs = insertTable.Read(tableId+1, TSnapshot::Zero()); +        blobs = insertTable.Read(tableId+1, TSnapshot::Zero(), nullptr);          UNIT_ASSERT_EQUAL(blobs.size(), 0);          // read new snapshot -        blobs = insertTable.Read(tableId, TSnapshot(planStep, txId)); +        blobs = insertTable.Read(tableId, TSnapshot(planStep, txId), nullptr);          UNIT_ASSERT_EQUAL(blobs.size(), 1); -        blobs = insertTable.Read(tableId+1, TSnapshot::Zero()); +        blobs = insertTable.Read(tableId+1, TSnapshot::Zero(), nullptr);          UNIT_ASSERT_EQUAL(blobs.size(), 0);      }  } diff --git a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp index fb4c91d3f82..c5284b9432f 100644 --- a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp +++ b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp @@ -412,8 +412,6 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {          blobRanges.push_back(MakeBlobRange(2, testBlob.size()));          // PlanStep, TxId, PathId, DedupId, BlobId, Data, [Metadata] -        TInstant writeTime = TInstant::Now(); -          // load          TColumnEngineForLogs engine(0);          TSnapshot indexSnaphot(1, 1); @@ -422,8 +420,8 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {          engine.Load(db, lostBlobs);          std::vector<TInsertedData> dataToIndex = { -            {1, 2, paths[0], "", blobRanges[0].BlobId, "", writeTime, indexSnaphot}, -            {2, 1, paths[0], "", blobRanges[1].BlobId, "", writeTime, indexSnaphot} +            TInsertedData(1, 2, paths[0], "", blobRanges[0].BlobId, {}, indexSnaphot), +            TInsertedData(2, 1, paths[0], "", blobRanges[1].BlobId, {}, indexSnaphot)          };          // write @@ -522,7 +520,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {              // PlanStep, TxId, PathId, DedupId, BlobId, Data, [Metadata]              std::vector<TInsertedData> dataToIndex;              dataToIndex.push_back( -                TInsertedData{planStep, txId, pathId, "", blobRange.BlobId, "", TInstant::Now(), indexSnapshot}); +                TInsertedData{planStep, txId, pathId, "", blobRange.BlobId, {}, indexSnapshot});              bool ok = Insert(engine, db, TSnapshot(planStep, txId), std::move(dataToIndex), blobs, step);              UNIT_ASSERT(ok); @@ -622,7 +620,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {              // PlanStep, TxId, PathId, DedupId, BlobId, Data, [Metadata]              std::vector<TInsertedData> dataToIndex;              dataToIndex.push_back( -                TInsertedData{planStep, txId, pathId, "", blobRange.BlobId, "", TInstant::Now(), indexSnapshot}); +                TInsertedData{planStep, txId, pathId, "", blobRange.BlobId, {}, indexSnapshot});              bool ok = Insert(engine, db, TSnapshot(planStep, txId), std::move(dataToIndex), blobs, step);              // first overload returns ok: it's a postcondition @@ -660,7 +658,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {              // PlanStep, TxId, PathId, DedupId, BlobId, Data, [Metadata]              std::vector<TInsertedData> dataToIndex;              dataToIndex.push_back( -                TInsertedData{planStep, txId, pathId, "", blobRange.BlobId, "", TInstant::Now(), indexSnapshot}); +                TInsertedData(planStep, txId, pathId, "", blobRange.BlobId, {}, indexSnapshot));              bool ok = Insert(engine, db, TSnapshot(planStep, txId), std::move(dataToIndex), blobs, step);              bool overload = engine.GetOverloadedGranules(pathId); @@ -703,7 +701,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {              // PlanStep, TxId, PathId, DedupId, BlobId, Data, [Metadata]              std::vector<TInsertedData> dataToIndex;              dataToIndex.push_back( -                TInsertedData{planStep, txId, pathId, "", blobRange.BlobId, "", TInstant::Now(), indexSnapshot}); +                TInsertedData(planStep, txId, pathId, "", blobRange.BlobId, {}, indexSnapshot));              bool ok = Insert(engine, db, TSnapshot(planStep, txId), std::move(dataToIndex), blobs, step);              UNIT_ASSERT(ok); diff --git a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp index cf1037451f0..fa1f3bbac99 100644 --- a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp +++ b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp @@ -25,7 +25,7 @@ IBlobConstructor::EStatus TIndexedWriteController::TBlobConstructor::BuildNext()  bool TIndexedWriteController::TBlobConstructor::RegisterBlobId(const TUnifiedBlobId& blobId) {      const auto& blobInfo = BlobsSplitted[CurrentIndex - 1]; -    Owner.BlobData.emplace_back(blobId, blobInfo.GetData(), blobInfo.GetRowsCount(), blobInfo.GetRawBytes(), AppData()->TimeProvider->Now()); +    Owner.BlobData.emplace_back(blobId, blobInfo.GetData(), blobInfo.GetSpecialKeys(), blobInfo.GetRowsCount(), blobInfo.GetRawBytes(), AppData()->TimeProvider->Now());      return true;  } diff --git a/ydb/core/tx/ev_write/write_data.cpp b/ydb/core/tx/ev_write/write_data.cpp index 4150cbd28ad..feb1e61e5fa 100644 --- a/ydb/core/tx/ev_write/write_data.cpp +++ b/ydb/core/tx/ev_write/write_data.cpp @@ -8,6 +8,8 @@ namespace NKikimr::NEvWrite {  TWriteData::TWriteData(const TWriteMeta& writeMeta, IDataContainer::TPtr data)      : WriteMeta(writeMeta)      , Data(data) -{} +{ +    Y_VERIFY(Data); +}  } diff --git a/ydb/core/tx/ev_write/write_data.h b/ydb/core/tx/ev_write/write_data.h index bcdd83a0715..9d95b58b98d 100644 --- a/ydb/core/tx/ev_write/write_data.h +++ b/ydb/core/tx/ev_write/write_data.h @@ -2,8 +2,9 @@  #include <ydb/core/tx/long_tx_service/public/types.h>  #include <ydb/core/formats/arrow/arrow_helpers.h> +#include <ydb/library/accessor/accessor.h> - +#include <library/cpp/actors/core/monotonic.h>  namespace NKikimr::NEvWrite { @@ -26,6 +27,7 @@ class TWriteMeta {      YDB_ACCESSOR(ui64, WritePartId, 0);      YDB_ACCESSOR_DEF(TString, DedupId); +    YDB_READONLY(TMonotonic, WriteStartInstant, TMonotonic::Now());  public:      TWriteMeta(const ui64 writeId, const ui64 tableId, const NActors::TActorId& source)          : WriteId(writeId) diff --git a/ydb/core/tx/ev_write/ya.make b/ydb/core/tx/ev_write/ya.make index 6b93ee539a5..21cc23242f6 100644 --- a/ydb/core/tx/ev_write/ya.make +++ b/ydb/core/tx/ev_write/ya.make @@ -16,7 +16,7 @@ PEERDIR(      library/cpp/actors/core      library/cpp/actors/wilson -    # Temporary fix dep ydb/core/tx/columnshard   +    # Temporary fix dep ydb/core/tx/columnshard      ydb/core/tablet_flat/protos      ydb/core/tablet_flat      ydb/core/blobstorage/vdisk/protos | 
