diff options
author | nsofya <nsofya@yandex-team.com> | 2023-08-03 12:19:56 +0300 |
---|---|---|
committer | nsofya <nsofya@yandex-team.com> | 2023-08-03 12:19:56 +0300 |
commit | 0a9a054350987d557ba4e8e2fb2317e216210e0f (patch) | |
tree | ffa3e66335a366634c0a67bce19d87655f9016d7 | |
parent | 0905c24307d707354dc6cc258de1b2b7b509de4e (diff) | |
download | ydb-0a9a054350987d557ba4e8e2fb2317e216210e0f.tar.gz |
KIKIMR-18876: Split data for EvWrite
- Добавила разбиение данных по MaxBlobSize (теперь на одну операцию может приходить несколько InternalWriteIds)
- Для старого EvWrite разбивка пока осталась снаружи, но есть проверки, что блобы приходят правильного размера (на входе) + перед записью, что мы дополнительно ничего не разбили (старый тест на ошибку при большом размере прошел)
- Дочистила MetaShard в TWriteMeta
-rw-r--r-- | ydb/core/formats/arrow/arrow_helpers.cpp | 12 | ||||
-rw-r--r-- | ydb/core/formats/arrow/size_calcer.cpp | 2 | ||||
-rw-r--r-- | ydb/core/formats/arrow/size_calcer.h | 8 | ||||
-rw-r--r-- | ydb/core/protos/tx_columnshard.proto | 4 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard.h | 18 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__write.cpp | 92 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard_private_events.h | 12 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard_schema.h | 10 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp | 59 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h | 15 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/operations/write.cpp | 48 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/operations/write.h | 10 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/operations/write_data.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp | 165 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/write_actor.cpp | 3 | ||||
-rw-r--r-- | ydb/core/tx/ev_write/write_data.h | 3 |
16 files changed, 341 insertions, 122 deletions
diff --git a/ydb/core/formats/arrow/arrow_helpers.cpp b/ydb/core/formats/arrow/arrow_helpers.cpp index b757020f47f..b330b7f368c 100644 --- a/ydb/core/formats/arrow/arrow_helpers.cpp +++ b/ydb/core/formats/arrow/arrow_helpers.cpp @@ -212,23 +212,27 @@ std::shared_ptr<arrow::RecordBatch> ExtractExistedColumns(const std::shared_ptr< std::shared_ptr<arrow::Table> CombineInTable(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches) { auto res = arrow::Table::FromRecordBatches(batches); if (!res.ok()) { - return {}; + return nullptr; } res = (*res)->CombineChunks(); if (!res.ok()) { - return {}; + return nullptr; } - return *res; + return res.ValueOrDie(); } std::shared_ptr<arrow::RecordBatch> CombineBatches(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches) { + if (batches.empty()) { + return nullptr; + } auto table = CombineInTable(batches); - return ToBatch(table); + return table ? ToBatch(table) : nullptr; } std::shared_ptr<arrow::RecordBatch> ToBatch(const std::shared_ptr<arrow::Table>& table) { + Y_VERIFY(table); std::vector<std::shared_ptr<arrow::Array>> columns; columns.reserve(table->num_columns()); for (auto& col : table->columns()) { diff --git a/ydb/core/formats/arrow/size_calcer.cpp b/ydb/core/formats/arrow/size_calcer.cpp index 26e39be2ba1..9f9e98fc721 100644 --- a/ydb/core/formats/arrow/size_calcer.cpp +++ b/ydb/core/formats/arrow/size_calcer.cpp @@ -214,7 +214,7 @@ ui64 GetArrayDataSize(const std::shared_ptr<arrow::Array>& column) { } NKikimr::NArrow::TSerializedBatch TSerializedBatch::Build(std::shared_ptr<arrow::RecordBatch> batch) { - return TSerializedBatch(NArrow::SerializeSchema(*batch->schema()), NArrow::SerializeBatchNoCompression(batch), batch->num_rows()); + return TSerializedBatch(NArrow::SerializeSchema(*batch->schema()), NArrow::SerializeBatchNoCompression(batch), batch->num_rows(), NArrow::GetBatchDataSize(batch)); } bool TSerializedBatch::BuildWithLimit(std::shared_ptr<arrow::RecordBatch> batch, const ui32 sizeLimit, std::optional<TSerializedBatch>& sbL, std::optional<TSerializedBatch>& sbR, TString* errorMessage) { diff --git a/ydb/core/formats/arrow/size_calcer.h b/ydb/core/formats/arrow/size_calcer.h index bc24be01982..f17566c85cd 100644 --- a/ydb/core/formats/arrow/size_calcer.h +++ b/ydb/core/formats/arrow/size_calcer.h @@ -50,6 +50,7 @@ private: YDB_READONLY_DEF(TString, SchemaData); YDB_READONLY_DEF(TString, Data); YDB_READONLY(ui32, RowsCount, 0); + YDB_READONLY(ui32, RawBytes, 0); public: size_t GetSize() const { return Data.size(); @@ -59,10 +60,11 @@ public: static bool BuildWithLimit(std::shared_ptr<arrow::RecordBatch> batch, const ui32 sizeLimit, std::optional<TSerializedBatch>& sbL, std::optional<TSerializedBatch>& sbR, TString* errorMessage); static TSerializedBatch Build(std::shared_ptr<arrow::RecordBatch> batch); - TSerializedBatch(TString&& schemaData, TString&& data, const ui32 rowsCount) + TSerializedBatch(TString&& schemaData, TString&& data, const ui32 rowsCount, const ui32 rawBytes) : SchemaData(schemaData) , Data(data) , RowsCount(rowsCount) + , RawBytes(rawBytes) { } @@ -84,6 +86,10 @@ public: bool operator!() const { return !!ErrorMessage; } + + std::vector<TSerializedBatch>&& ReleaseResult() { + return std::move(Result); + } }; TSplitBlobResult SplitByBlobSize(const std::shared_ptr<arrow::RecordBatch>& batch, const ui32 sizeLimit); diff --git a/ydb/core/protos/tx_columnshard.proto b/ydb/core/protos/tx_columnshard.proto index 5c4a2907a5a..3e6db3dd209 100644 --- a/ydb/core/protos/tx_columnshard.proto +++ b/ydb/core/protos/tx_columnshard.proto @@ -319,3 +319,7 @@ message TEvReadBlobRangesResult { optional uint64 TabletId = 1; repeated TResult Results = 2; } + +message TInternalOperationData { + repeated uint64 InternalWriteIds = 1; +} diff --git a/ydb/core/tx/columnshard/columnshard.h b/ydb/core/tx/columnshard/columnshard.h index 50899858f2a..96ced7b1781 100644 --- a/ydb/core/tx/columnshard/columnshard.h +++ b/ydb/core/tx/columnshard/columnshard.h @@ -271,7 +271,7 @@ struct TEvColumnShard { TEvWriteResult(ui64 origin, const NEvWrite::TWriteMeta& writeMeta, const i64 writeId, ui32 status) { Record.SetOrigin(origin); - Record.SetTxInitiator(writeMeta.GetMetaShard()); + Record.SetTxInitiator(0); Record.SetWriteId(writeId); Record.SetTableId(writeMeta.GetTableId()); Record.SetDedupId(writeMeta.GetDedupId()); @@ -319,6 +319,22 @@ struct TEvColumnShard { TEvReadResult(const TEvReadResult& ev) { Record.CopyFrom(ev.Record); } + + std::shared_ptr<arrow::RecordBatch> GetArrowBatch() const { + const auto& scheme = Record.GetMeta().GetSchema(); + if (scheme.empty() || Record.GetMeta().GetFormat() != NKikimrTxColumnShard::FORMAT_ARROW) { + return nullptr; + } + const auto arrowSchema = NArrow::DeserializeSchema(scheme); + if (Record.GetData().empty()) { + return NArrow::MakeEmptyBatch(arrowSchema); + } + return NArrow::DeserializeBatch(Record.GetData(), arrowSchema); + } + + bool HasMore() const { + return !Record.GetFinished(); + } }; using TEvScan = TEvDataShard::TEvKqpScan; diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp index 84eef1582f8..a3f5e4e9980 100644 --- a/ydb/core/tx/columnshard/columnshard__write.cpp +++ b/ydb/core/tx/columnshard/columnshard__write.cpp @@ -28,6 +28,8 @@ private: const ui32 TabletTxNo; std::unique_ptr<NActors::IEventBase> Result; + bool InsertOneBlob(TTransactionContext& txc, const TEvPrivate::TEvWriteBlobsResult::TPutBlobData& blobData, const TWriteId writeId); + TStringBuilder TxPrefix() const { return TStringBuilder() << "TxWrite[" << ToString(TabletTxNo) << "] "; } @@ -37,17 +39,7 @@ private: } }; - -bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) { - LOG_S_DEBUG(TxPrefix() << "execute" << TxSuffix()); - - const auto& writeMeta(PutBlobResult->Get()->GetWriteMeta()); - const auto& blobData(PutBlobResult->Get()->GetBlobData()); - - txc.DB.NoMoreReadsForTx(); - NIceDb::TNiceDb db(txc.DB); - - auto writeId = writeMeta.GetWriteId(); +bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const TEvPrivate::TEvWriteBlobsResult::TPutBlobData& blobData, const TWriteId writeId) { const TString data = blobData.GetBlobData(); NKikimrTxColumnShard::TLogicalMetadata meta; @@ -56,20 +48,6 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) { const auto& logoBlobId = blobData.GetBlobId(); Y_VERIFY(logoBlobId.IsValid()); - Y_VERIFY(Self->TablesManager.IsReadyForWrite(writeMeta.GetTableId())); - - TWriteOperation::TPtr operation; - if (writeMeta.HasLongTxId()) { - Y_VERIFY(writeMeta.GetMetaShard() == 0); - writeId = (ui64)Self->GetLongTxWrite(db, writeMeta.GetLongTxIdUnsafe(), writeMeta.GetWritePartId()); - } else { - operation = Self->OperationsManager.GetOperation((TWriteId)writeMeta.GetWriteId()); - if (operation) { - Y_VERIFY(operation->GetStatus() == EOperationStatus::Started); - writeId = (ui64)Self->BuildNextWriteId(txc); - } - } - ui64 writeUnixTime = meta.GetDirtyWriteTimeSeconds(); TInstant time = TInstant::Seconds(writeUnixTime); @@ -77,10 +55,13 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) { TBlobGroupSelector dsGroupSelector(Self->Info()); NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector); - NOlap::TInsertedData insertData(writeMeta.GetMetaShard(), (ui64)writeId, writeMeta.GetTableId(), writeMeta.GetDedupId(), logoBlobId, blobData.GetLogicalMeta(), time, PutBlobResult->Get()->GetSnapshot()); + const auto& writeMeta(PutBlobResult->Get()->GetWriteMeta()); + + NOlap::TInsertedData insertData(0, (ui64)writeId, writeMeta.GetTableId(), writeMeta.GetDedupId(), logoBlobId, blobData.GetLogicalMeta(), time, PutBlobResult->Get()->GetSnapshot()); bool ok = Self->InsertTable->Insert(dbTable, std::move(insertData)); if (ok) { THashSet<TWriteId> writesToAbort = Self->InsertTable->OldWritesToAbort(time); + NIceDb::TNiceDb db(txc.DB); Self->TryAbortWrites(db, dbTable, std::move(writesToAbort)); // TODO: It leads to write+erase for aborted rows. Abort() inserts rows, EraseAborted() erases them. @@ -107,22 +88,54 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) { Self->IncCounter(COUNTER_WRITE_SUCCESS); Self->BlobManager->SaveBlobBatch((std::move(PutBlobResult->Get()->GetPutResultPtr()->ReleaseBlobBatch())), blobManagerDb); + return true; + } + return false; +} + +bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) { + LOG_S_DEBUG(TxPrefix() << "execute" << TxSuffix()); + + const auto& writeMeta(PutBlobResult->Get()->GetWriteMeta()); + Y_VERIFY(Self->TablesManager.IsReadyForWrite(writeMeta.GetTableId())); + + txc.DB.NoMoreReadsForTx(); + TWriteOperation::TPtr operation; + if (writeMeta.HasLongTxId()) { + Y_VERIFY_S(PutBlobResult->Get()->GetBlobData().size() == 1, TStringBuilder() << "Blobs count: " << PutBlobResult->Get()->GetBlobData().size()); + } else { + operation = Self->OperationsManager.GetOperation((TWriteId)writeMeta.GetWriteId()); + Y_VERIFY(operation); + Y_VERIFY(operation->GetStatus() == EOperationStatus::Started); + } + + TVector<TWriteId> writeIds; + for (auto blobData: PutBlobResult->Get()->GetBlobData()) { + auto writeId = TWriteId(writeMeta.GetWriteId()); if (operation) { - operation->OnWriteFinish(txc, writeId); - auto txInfo = Self->ProgressTxController.RegisterTxWithDeadline(operation->GetTxId(), NKikimrTxColumnShard::TX_KIND_COMMIT_WRITE, "", writeMeta.GetSource(), 0, txc); - Y_UNUSED(txInfo); + writeId = Self->BuildNextWriteId(txc); + } else { + NIceDb::TNiceDb db(txc.DB); + writeId = Self->GetLongTxWrite(db, writeMeta.GetLongTxIdUnsafe(), writeMeta.GetWritePartId()); } - } else { - LOG_S_DEBUG(TxPrefix() << "duplicate writeId " << writeId << TxSuffix()); - Self->IncCounter(COUNTER_WRITE_DUPLICATE); + + if (!InsertOneBlob(txc, blobData, writeId)) { + LOG_S_DEBUG(TxPrefix() << "duplicate writeId " << (ui64)writeId << TxSuffix()); + Self->IncCounter(COUNTER_WRITE_DUPLICATE); + } + writeIds.push_back(writeId); } if (operation) { + operation->OnWriteFinish(txc, writeIds); + auto txInfo = Self->ProgressTxController.RegisterTxWithDeadline(operation->GetTxId(), NKikimrTxColumnShard::TX_KIND_COMMIT_WRITE, "", writeMeta.GetSource(), 0, txc); + Y_UNUSED(txInfo); NEvents::TDataEvents::TCoordinatorInfo tInfo = Self->ProgressTxController.GetCoordinatorInfo(operation->GetTxId()); Result = NEvents::TDataEvents::TEvWriteResult::BuildPrepared(operation->GetTxId(), tInfo); } else { - Result = std::make_unique<TEvColumnShard::TEvWriteResult>(Self->TabletID(), writeMeta, writeId, NKikimrTxColumnShard::EResultStatus::SUCCESS); + Y_VERIFY(writeIds.size() == 1); + Result = std::make_unique<TEvColumnShard::TEvWriteResult>(Self->TabletID(), writeMeta, (ui64)writeIds.front(), NKikimrTxColumnShard::EResultStatus::SUCCESS); } return true; } @@ -200,20 +213,19 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteBlobsResult::TPtr& ev, const TActo errCode = NKikimrTxColumnShard::EResultStatus::STORAGE_ERROR; } - auto operation = OperationsManager.GetOperation((TWriteId)writeMeta.GetWriteId()); - if (operation) { - auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(operation->GetTxId(), NKikimrDataEvents::TEvWriteResult::ERROR, "put data fails"); + if (writeMeta.HasLongTxId()) { + auto result = std::make_unique<TEvColumnShard::TEvWriteResult>(TabletID(), writeMeta, errCode); ctx.Send(writeMeta.GetSource(), result.release()); } else { - auto result = std::make_unique<TEvColumnShard::TEvWriteResult>(TabletID(), writeMeta, errCode); + auto operation = OperationsManager.GetOperation((TWriteId)writeMeta.GetWriteId()); + Y_VERIFY(operation); + auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(operation->GetTxId(), NKikimrDataEvents::TEvWriteResult::ERROR, "put data fails"); ctx.Send(writeMeta.GetSource(), result.release()); } return; } - const auto& blobData = ev->Get()->GetBlobData(); - Y_VERIFY(blobData.GetBlobId().IsValid()); - LOG_S_DEBUG("Write (record) " << blobData.GetBlobData().size() << " bytes into pathId " << writeMeta.GetTableId() + LOG_S_DEBUG("Write (record) into pathId " << writeMeta.GetTableId() << (writeMeta.GetWriteId() ? (" writeId " + ToString(writeMeta.GetWriteId())).c_str() : "") << " at tablet " << TabletID()); Execute(new TTxWrite(this, ev), ctx); diff --git a/ydb/core/tx/columnshard/columnshard_private_events.h b/ydb/core/tx/columnshard/columnshard_private_events.h index fa62a2478ec..7fd7da79e3a 100644 --- a/ydb/core/tx/columnshard/columnshard_private_events.h +++ b/ydb/core/tx/columnshard/columnshard_private_events.h @@ -255,12 +255,16 @@ struct TEvPrivate { YDB_READONLY_DEF(TUnifiedBlobId, BlobId); YDB_READONLY_DEF(TString, BlobData); YDB_ACCESSOR_DEF(TString, LogicalMeta); + YDB_ACCESSOR(ui64, RowsCount, 0); + YDB_ACCESSOR(ui64, RawBytes, 0); public: TPutBlobData() = default; - TPutBlobData(const TUnifiedBlobId& blobId, const TString& data) + TPutBlobData(const TUnifiedBlobId& blobId, const TString& data, ui64 rowsCount, ui64 rawBytes) : BlobId(blobId) , BlobData(data) + , RowsCount(rowsCount) + , RawBytes(rawBytes) {} }; @@ -272,13 +276,13 @@ struct TEvPrivate { Y_VERIFY(PutResult); } - TEvWriteBlobsResult(const NColumnShard::TBlobPutResult::TPtr& putResult, TPutBlobData&& blobData, const NEvWrite::TWriteMeta& writeMeta, const NOlap::TSnapshot& snapshot) + TEvWriteBlobsResult(const NColumnShard::TBlobPutResult::TPtr& putResult, TVector<TPutBlobData>&& blobData, const NEvWrite::TWriteMeta& writeMeta, const NOlap::TSnapshot& snapshot) : TEvWriteBlobsResult(putResult, writeMeta, snapshot) { BlobData = std::move(blobData); } - const TPutBlobData& GetBlobData() const { + const TVector<TPutBlobData>& GetBlobData() const { return BlobData; } @@ -300,7 +304,7 @@ struct TEvPrivate { private: NColumnShard::TBlobPutResult::TPtr PutResult; - TPutBlobData BlobData; + TVector<TPutBlobData> BlobData; NEvWrite::TWriteMeta WriteMeta; NOlap::TSnapshot Snapshot; }; diff --git a/ydb/core/tx/columnshard/columnshard_schema.h b/ydb/core/tx/columnshard/columnshard_schema.h index d119e4a3bf6..06578fafc5d 100644 --- a/ydb/core/tx/columnshard/columnshard_schema.h +++ b/ydb/core/tx/columnshard/columnshard_schema.h @@ -254,9 +254,10 @@ struct Schema : NIceDb::Schema { struct Status : Column<3, NScheme::NTypeIds::Uint32> {}; struct CreatedAt : Column<4, NScheme::NTypeIds::Uint64> {}; struct GlobalWriteId : Column<5, NScheme::NTypeIds::Uint64> {}; + struct Metadata : Column<6, NScheme::NTypeIds::String> {}; using TKey = TableKey<WriteId>; - using TColumns = TableColumns<TxId, WriteId, Status, CreatedAt, GlobalWriteId>; + using TColumns = TableColumns<TxId, WriteId, Status, CreatedAt, GlobalWriteId, Metadata>; }; using TTables = SchemaTables< @@ -665,10 +666,15 @@ struct Schema : NIceDb::Schema { // Operations static void Operations_Write(NIceDb::TNiceDb& db, const TWriteOperation& operation) { + TString metadata; + NKikimrTxColumnShard::TInternalOperationData proto; + operation.ToProto(proto); + Y_VERIFY(proto.SerializeToString(&metadata)); + db.Table<Operations>().Key((ui64)operation.GetWriteId()).Update( NIceDb::TUpdate<Operations::Status>((ui32)operation.GetStatus()), NIceDb::TUpdate<Operations::CreatedAt>(operation.GetCreatedAt().Seconds()), - NIceDb::TUpdate<Operations::GlobalWriteId>(operation.GetGlobalWriteId()), + NIceDb::TUpdate<Operations::Metadata>(metadata), NIceDb::TUpdate<Operations::TxId>(operation.GetTxId()) ); } diff --git a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp index 164661a4c1d..6e62e700576 100644 --- a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp +++ b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp @@ -12,44 +12,46 @@ TIndexedWriteController::TBlobConstructor::TBlobConstructor(NOlap::ISnapshotSche {} const TString& TIndexedWriteController::TBlobConstructor::GetBlob() const { - return DataPrepared; + Y_VERIFY(CurrentIndex > 0); + return BlobsSplitted[CurrentIndex - 1].GetData(); } IBlobConstructor::EStatus TIndexedWriteController::TBlobConstructor::BuildNext() { - if (!!DataPrepared) { + if (CurrentIndex == BlobsSplitted.size()) { return EStatus::Finished; } + CurrentIndex++; + return EStatus::Ok; +} +bool TIndexedWriteController::TBlobConstructor::RegisterBlobId(const TUnifiedBlobId& blobId) { + const auto& blobInfo = BlobsSplitted[CurrentIndex - 1]; + Owner.AddBlob(NColumnShard::TEvPrivate::TEvWriteBlobsResult::TPutBlobData(blobId, blobInfo.GetData(), blobInfo.GetRowsCount(), blobInfo.GetRawBytes())); + return true; +} + +bool TIndexedWriteController::TBlobConstructor::Init() { const auto& writeMeta = Owner.WriteData.GetWriteMeta(); const ui64 tableId = writeMeta.GetTableId(); const ui64 writeId = writeMeta.GetWriteId(); - // Heavy operations inside. We cannot run them in tablet event handler. + std::shared_ptr<arrow::RecordBatch> batch; { NColumnShard::TCpuGuard guard(Owner.ResourceUsage); - Batch = Owner.WriteData.GetData().GetArrowBatch(); + batch = Owner.WriteData.GetData().GetArrowBatch(); } - if (!Batch) { + if (!batch) { AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "ev_write_bad_data")("write_id", writeId)("table_id", tableId); - return EStatus::Error; + return false; } - { - NColumnShard::TCpuGuard guard(Owner.ResourceUsage); - DataPrepared = NArrow::SerializeBatchNoCompression(Batch); + auto splitResult = NArrow::SplitByBlobSize(batch, NColumnShard::TLimits::GetMaxBlobSize()); + if (!splitResult) { + AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", TStringBuilder() << "cannot split batch in according to limits: " + splitResult.GetErrorMessage()); + return false; } - - if (DataPrepared.size() > NColumnShard::TLimits::GetMaxBlobSize()) { - AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "ev_write_data_too_big")("write_id", writeId)("table_id", tableId); - return EStatus::Error; - } - return EStatus::Ok; -} - -bool TIndexedWriteController::TBlobConstructor::RegisterBlobId(const TUnifiedBlobId& blobId) { - Y_VERIFY(blobId.BlobSize() == DataPrepared.size()); - Owner.AddBlob(NColumnShard::TEvPrivate::TEvWriteBlobsResult::TPutBlobData(blobId, DataPrepared), Batch->num_rows(), NArrow::GetBatchDataSize(Batch)); + BlobsSplitted = splitResult.ReleaseResult(); return true; } @@ -63,8 +65,7 @@ TIndexedWriteController::TIndexedWriteController(const TActorId& dstActor, const void TIndexedWriteController::DoOnReadyResult(const NActors::TActorContext& ctx, const NColumnShard::TBlobPutResult::TPtr& putResult) { if (putResult->GetPutStatus() == NKikimrProto::OK) { - Y_VERIFY(BlobData.size() == 1); - auto result = std::make_unique<NColumnShard::TEvPrivate::TEvWriteBlobsResult>(putResult, std::move(BlobData[0]), WriteData.GetWriteMeta(), BlobConstructor->GetSnapshot()); + auto result = std::make_unique<NColumnShard::TEvPrivate::TEvWriteBlobsResult>(putResult, std::move(BlobData), WriteData.GetWriteMeta(), BlobConstructor->GetSnapshot()); ctx.Send(DstActor, result.release()); } else { auto result = std::make_unique<NColumnShard::TEvPrivate::TEvWriteBlobsResult>(putResult, WriteData.GetWriteMeta(), BlobConstructor->GetSnapshot()); @@ -72,14 +73,20 @@ void TIndexedWriteController::DoOnReadyResult(const NActors::TActorContext& ctx, } } -void TIndexedWriteController::AddBlob(NColumnShard::TEvPrivate::TEvWriteBlobsResult::TPutBlobData&& data, const ui64 numRows, const ui64 batchSize) { - Y_VERIFY(BlobData.empty()); +NOlap::IBlobConstructor::TPtr TIndexedWriteController::GetBlobConstructor() { + if (!BlobConstructor->Init()) { + return nullptr; + } + return BlobConstructor; +} +void TIndexedWriteController::AddBlob(NColumnShard::TEvPrivate::TEvWriteBlobsResult::TPutBlobData&& data) { ui64 dirtyTime = AppData()->TimeProvider->Now().Seconds(); Y_VERIFY(dirtyTime); + NKikimrTxColumnShard::TLogicalMetadata outMeta; - outMeta.SetNumRows(numRows); - outMeta.SetRawBytes(batchSize); + outMeta.SetNumRows(data.GetRowsCount()); + outMeta.SetRawBytes(data.GetRawBytes()); outMeta.SetDirtyWriteTimeSeconds(dirtyTime); TString metaString; diff --git a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h index 659420c714a..f76187e450b 100644 --- a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h +++ b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h @@ -7,6 +7,7 @@ #include <ydb/core/tx/columnshard/engines/portion_info.h> #include <ydb/core/tx/columnshard/columnshard.h> #include <ydb/core/tx/columnshard/columnshard_private_events.h> +#include <ydb/core/formats/arrow/size_calcer.h> namespace NKikimr::NOlap { @@ -16,10 +17,9 @@ private: class TBlobConstructor : public IBlobConstructor { TIndexedWriteController& Owner; NOlap::ISnapshotSchema::TPtr SnapshotSchema; + std::vector<NArrow::TSerializedBatch> BlobsSplitted; - TString DataPrepared; - std::shared_ptr<arrow::RecordBatch> Batch; - + ui64 CurrentIndex = 0; public: TBlobConstructor(NOlap::ISnapshotSchema::TPtr snapshotSchema, TIndexedWriteController& owner); @@ -29,11 +29,12 @@ private: const NOlap::TSnapshot& GetSnapshot() const { return SnapshotSchema->GetSnapshot(); } + bool Init(); }; NEvWrite::TWriteData WriteData; std::shared_ptr<TBlobConstructor> BlobConstructor; - std::vector<NColumnShard::TEvPrivate::TEvWriteBlobsResult::TPutBlobData> BlobData; + TVector<NColumnShard::TEvPrivate::TEvWriteBlobsResult::TPutBlobData> BlobData; TActorId DstActor; public: @@ -41,12 +42,10 @@ public: void DoOnReadyResult(const NActors::TActorContext& ctx, const NColumnShard::TBlobPutResult::TPtr& putResult) override; - NOlap::IBlobConstructor::TPtr GetBlobConstructor() override { - return BlobConstructor; - } + NOlap::IBlobConstructor::TPtr GetBlobConstructor() override; public: - void AddBlob(NColumnShard::TEvPrivate::TEvWriteBlobsResult::TPutBlobData&& data, const ui64 numRows, const ui64 batchSize); + void AddBlob(NColumnShard::TEvPrivate::TEvWriteBlobsResult::TPutBlobData&& data); }; } diff --git a/ydb/core/tx/columnshard/operations/write.cpp b/ydb/core/tx/columnshard/operations/write.cpp index 59878c0e7d7..cefb4b481bd 100644 --- a/ydb/core/tx/columnshard/operations/write.cpp +++ b/ydb/core/tx/columnshard/operations/write.cpp @@ -10,12 +10,11 @@ namespace NKikimr::NColumnShard { - TWriteOperation::TWriteOperation(const TWriteId writeId, const ui64 txId, const EOperationStatus& status, const TInstant createdAt, const ui64 globalWriteId) + TWriteOperation::TWriteOperation(const TWriteId writeId, const ui64 txId, const EOperationStatus& status, const TInstant createdAt) : Status(status) , CreatedAt(createdAt) , WriteId(writeId) , TxId(txId) - , GlobalWriteId(globalWriteId) { } @@ -35,34 +34,50 @@ namespace NKikimr::NColumnShard { TBlobGroupSelector dsGroupSelector(owner.Info()); NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector); - auto pathExists = [&](ui64 pathId) { - return owner.TablesManager.HasTable(pathId); - }; + for (auto gWriteId : GlobalWriteIds) { + auto pathExists = [&](ui64 pathId) { + return owner.TablesManager.HasTable(pathId); + }; - auto counters = owner.InsertTable->Commit(dbTable, snapshot.GetPlanStep(), snapshot.GetTxId(), 0, { WriteId }, - pathExists); + auto counters = owner.InsertTable->Commit(dbTable, snapshot.GetPlanStep(), snapshot.GetTxId(), 0, { gWriteId }, + pathExists); - owner.IncCounter(COUNTER_BLOBS_COMMITTED, counters.Rows); - owner.IncCounter(COUNTER_BYTES_COMMITTED, counters.Bytes); - owner.IncCounter(COUNTER_RAW_BYTES_COMMITTED, counters.RawBytes); + owner.IncCounter(COUNTER_BLOBS_COMMITTED, counters.Rows); + owner.IncCounter(COUNTER_BYTES_COMMITTED, counters.Bytes); + owner.IncCounter(COUNTER_RAW_BYTES_COMMITTED, counters.RawBytes); + } owner.UpdateInsertTableCounters(); } - void TWriteOperation::OnWriteFinish(NTabletFlatExecutor::TTransactionContext& txc, const ui64 globalWriteId) { + void TWriteOperation::OnWriteFinish(NTabletFlatExecutor::TTransactionContext& txc, const TVector<TWriteId>& globalWriteIds) { Y_VERIFY(Status == EOperationStatus::Started); Status = EOperationStatus::Prepared; - GlobalWriteId = globalWriteId; + GlobalWriteIds = globalWriteIds; NIceDb::TNiceDb db(txc.DB); Schema::Operations_Write(db, *this); } + void TWriteOperation::ToProto(NKikimrTxColumnShard::TInternalOperationData& proto) const { + for (auto&& writeId : GlobalWriteIds) { + proto.AddInternalWriteIds((ui64)writeId); + } + } + + void TWriteOperation::FromProto(const NKikimrTxColumnShard::TInternalOperationData& proto) { + for (auto&& writeId : proto.GetInternalWriteIds()) { + GlobalWriteIds.push_back(TWriteId(writeId)); + } + } + void TWriteOperation::Abort(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) const { Y_VERIFY(Status == EOperationStatus::Prepared); TBlobGroupSelector dsGroupSelector(owner.Info()); NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector); - owner.InsertTable->Abort(dbTable, 0, {WriteId}); + THashSet<TWriteId> writeIds; + writeIds.insert(GlobalWriteIds.begin(), GlobalWriteIds.end()); + owner.InsertTable->Abort(dbTable, 0, writeIds); TBlobManagerDb blobManagerDb(txc.DB); auto allAborted = owner.InsertTable->GetAborted(); @@ -83,10 +98,13 @@ namespace NKikimr::NColumnShard { const TWriteId writeId = (TWriteId) rowset.GetValue<Schema::Operations::WriteId>(); const ui64 createdAtSec = rowset.GetValue<Schema::Operations::CreatedAt>(); const ui64 txId = rowset.GetValue<Schema::Operations::TxId>(); - const ui64 globalWriteId = rowset.GetValue<Schema::Operations::GlobalWriteId>(); + const TString metadata = rowset.GetValue<Schema::Operations::Metadata>(); + NKikimrTxColumnShard::TInternalOperationData metaProto; + Y_VERIFY(metaProto.ParseFromString(metadata)); const EOperationStatus status = (EOperationStatus) rowset.GetValue<Schema::Operations::Status>(); - auto operation = std::make_shared<TWriteOperation>(writeId, txId, status, TInstant::Seconds(createdAtSec), globalWriteId); + auto operation = std::make_shared<TWriteOperation>(writeId, txId, status, TInstant::Seconds(createdAtSec)); + operation->FromProto(metaProto); Y_VERIFY(operation->GetStatus() != EOperationStatus::Draft); diff --git a/ydb/core/tx/columnshard/operations/write.h b/ydb/core/tx/columnshard/operations/write.h index d50b4cfcce6..cb90eaaa235 100644 --- a/ydb/core/tx/columnshard/operations/write.h +++ b/ydb/core/tx/columnshard/operations/write.h @@ -3,6 +3,7 @@ #include <ydb/core/tx/ev_write/write_data.h> #include <ydb/core/tx/columnshard/common/snapshot.h> #include <ydb/core/tx/columnshard/engines/defs.h> +#include <ydb/core/protos/tx_columnshard.pb.h> #include <ydb/core/tablet_flat/flat_cxx_database.h> #include <ydb/library/accessor/accessor.h> @@ -32,21 +33,24 @@ namespace NKikimr::NColumnShard { YDB_READONLY_DEF(TInstant, CreatedAt); YDB_READONLY_DEF(TWriteId, WriteId); YDB_READONLY(ui64, TxId, 0); - YDB_READONLY(ui64, GlobalWriteId, 0); + YDB_READONLY_DEF(TVector<TWriteId>, GlobalWriteIds); public: using TPtr = std::shared_ptr<TWriteOperation>; - TWriteOperation(const TWriteId writeId, const ui64 txId, const EOperationStatus& status, const TInstant createdAt, const ui64 globalWriteId = 0); + TWriteOperation(const TWriteId writeId, const ui64 txId, const EOperationStatus& status, const TInstant createdAt); void Start(TColumnShard& owner, const ui64 tableId, const NEvWrite::IDataContainer::TPtr& data, const NActors::TActorId& source, const TActorContext& ctx); - void OnWriteFinish(NTabletFlatExecutor::TTransactionContext& txc, const ui64 globalWriteId); + void OnWriteFinish(NTabletFlatExecutor::TTransactionContext& txc, const TVector<TWriteId>& globalWriteIds); void Commit(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc, const NOlap::TSnapshot& snapshot) const; void Abort(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) const; void Out(IOutputStream& out) const { out << "write_id=" << (ui64) WriteId << ";tx_id=" << TxId; } + + void ToProto(NKikimrTxColumnShard::TInternalOperationData& proto) const; + void FromProto(const NKikimrTxColumnShard::TInternalOperationData& proto); }; class TOperationsManager { diff --git a/ydb/core/tx/columnshard/operations/write_data.cpp b/ydb/core/tx/columnshard/operations/write_data.cpp index 27f022590a6..b994b096847 100644 --- a/ydb/core/tx/columnshard/operations/write_data.cpp +++ b/ydb/core/tx/columnshard/operations/write_data.cpp @@ -13,7 +13,7 @@ bool TArrowData::Parse(const NKikimrDataEvents::TOperationData& proto, const IPa columns.emplace_back(columnId); } BatchSchema = std::make_shared<NOlap::TFilteredSnapshotSchema>(IndexSchema, columns); - return BatchSchema->GetColumnsCount() == columns.size() && !IncomingData.empty() && IncomingData.size() <= NColumnShard::TLimits::GetMaxBlobSize(); + return BatchSchema->GetColumnsCount() == columns.size() && !IncomingData.empty(); } std::shared_ptr<arrow::RecordBatch> TArrowData::GetArrowBatch() const { diff --git a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp index 2dd54ea1f48..7d63330a47a 100644 --- a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp +++ b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp @@ -10,6 +10,9 @@ #include <ydb/core/tx/columnshard/engines/changes/cleanup.h> #include <ydb/core/tx/columnshard/operations/write_data.h> #include <library/cpp/actors/protos/unittests.pb.h> +#include <ydb/core/formats/arrow/simple_builder/filler.h> +#include <ydb/core/formats/arrow/simple_builder/array.h> +#include <ydb/core/formats/arrow/simple_builder/batch.h> namespace NKikimr { @@ -1859,37 +1862,171 @@ Y_UNIT_TEST_SUITE(EvWrite) { } }; + void PrepareTablet(TTestBasicRuntime& runtime, const ui64 tableId, const std::vector<std::pair<TString, TTypeInfo>>& schema) { + CreateTestBootstrapper(runtime, CreateTestTabletInfo(TTestTxConfig::TxTablet0, TTabletTypes::ColumnShard), &CreateColumnShard); + + TDispatchOptions options; + options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvBoot)); + runtime.DispatchEvents(options); + + TestTableDescription tableDescription; + tableDescription.Schema = schema; + tableDescription.Pk = { schema[0] }; + TActorId sender = runtime.AllocateEdgeActor(); + SetupSchema(runtime, sender, tableId, tableDescription); + } + + std::shared_ptr<arrow::RecordBatch> ReadAllAsBatch(TTestBasicRuntime& runtime, const ui64 tableId, const NOlap::TSnapshot& snapshot, const std::vector<std::pair<TString, TTypeInfo>>& schema) { + TActorId sender = runtime.AllocateEdgeActor(); + + ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, + new TEvColumnShard::TEvRead(sender, TTestTxConfig::TxTablet1, snapshot.GetPlanStep(), snapshot.GetTxId(), tableId)); + + std::vector<std::shared_ptr<arrow::RecordBatch>> batches; + while(true) { + TAutoPtr<IEventHandle> handle; + auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle); + UNIT_ASSERT(event); + auto b = event->GetArrowBatch(); + if (b) { + batches.push_back(b); + } + if (!event->HasMore()) { + break; + } + } + auto res = NArrow::CombineBatches(batches); + return res ? res : NArrow::MakeEmptyBatch(NArrow::MakeArrowSchema(schema)); + } + Y_UNIT_TEST(WriteInTransaction) { + using namespace NArrow; + TTestBasicRuntime runtime; TTester::Setup(runtime); + const ui64 tableId = 1; + const std::vector<std::pair<TString, TTypeInfo>> schema = { + {"key", TTypeInfo(NTypeIds::Uint64) }, + {"field", TTypeInfo(NTypeIds::Utf8) } + }; + PrepareTablet(runtime, tableId, schema); + const ui64 txId = 111; + + NConstruction::IArrayBuilder::TPtr keyColumn = std::make_shared<NConstruction::TSimpleArrayConstructor<NConstruction::TIntSeqFiller<arrow::UInt64Type>>>("key"); + NConstruction::IArrayBuilder::TPtr column = std::make_shared<NConstruction::TSimpleArrayConstructor<NConstruction::TStringPoolFiller>>( + "field", NConstruction::TStringPoolFiller(8, 100)); + + auto batch = NConstruction::TRecordBatchConstructor({ keyColumn, column }).BuildBatch(2048); + TString blobData = NArrow::SerializeBatchNoCompression(batch); + UNIT_ASSERT(blobData.size() < TLimits::GetMaxBlobSize()); + + auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(txId); + auto dataPtr = std::make_shared<TArrowData>(schema, TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData))); + evWrite->AddReplaceOp(tableId, dataPtr); + TActorId sender = runtime.AllocateEdgeActor(); - CreateTestBootstrapper(runtime, CreateTestTabletInfo(TTestTxConfig::TxTablet0, TTabletTypes::ColumnShard), &CreateColumnShard); + ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, evWrite.release()); - TDispatchOptions options; - options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvBoot)); - runtime.DispatchEvents(options); + { + TAutoPtr<NActors::IEventHandle> handle; + auto event = runtime.GrabEdgeEvent<NKikimr::NEvents::TDataEvents::TEvWriteResult>(handle); + UNIT_ASSERT(event); + UNIT_ASSERT_VALUES_EQUAL((ui64)event->Record.GetStatus(), (ui64)NKikimrDataEvents::TEvWriteResult::PREPARED); + + auto readResult = ReadAllAsBatch(runtime, tableId, NOlap::TSnapshot(10, txId), schema); + UNIT_ASSERT_VALUES_EQUAL(readResult->num_rows(), 0); + + PlanWriteTx(runtime, sender, NOlap::TSnapshot(11, txId)); + } + + auto readResult = ReadAllAsBatch(runtime, tableId, NOlap::TSnapshot(11, txId), schema); + UNIT_ASSERT_VALUES_EQUAL(readResult->num_rows(), 2048); + } + + Y_UNIT_TEST(AbotrInTransaction) { + using namespace NArrow; + + TTestBasicRuntime runtime; + TTester::Setup(runtime); const ui64 tableId = 1; - const TestTableDescription table; - SetupSchema(runtime, sender, tableId, table); + const std::vector<std::pair<TString, TTypeInfo>> schema = { + {"key", TTypeInfo(NTypeIds::Uint64) }, + {"field", TTypeInfo(NTypeIds::Utf8) } + }; + PrepareTablet(runtime, tableId, schema); + const ui64 txId = 111; + + NConstruction::IArrayBuilder::TPtr keyColumn = std::make_shared<NConstruction::TSimpleArrayConstructor<NConstruction::TIntSeqFiller<arrow::UInt64Type>>>("key"); + NConstruction::IArrayBuilder::TPtr column = std::make_shared<NConstruction::TSimpleArrayConstructor<NConstruction::TStringPoolFiller>>( + "field", NConstruction::TStringPoolFiller(8, 100)); + + auto batch = NConstruction::TRecordBatchConstructor({ keyColumn, column }).BuildBatch(2048); + TString blobData = NArrow::SerializeBatchNoCompression(batch); + UNIT_ASSERT(blobData.size() < TLimits::GetMaxBlobSize()); + + auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(txId); + auto dataPtr = std::make_shared<TArrowData>(schema, TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData))); + evWrite->AddReplaceOp(tableId, dataPtr); + + TActorId sender = runtime.AllocateEdgeActor(); + ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, evWrite.release()); + + ui64 outdatedStep = 11; + { + TAutoPtr<NActors::IEventHandle> handle; + auto event = runtime.GrabEdgeEvent<NKikimr::NEvents::TDataEvents::TEvWriteResult>(handle); + UNIT_ASSERT(event); + UNIT_ASSERT_VALUES_EQUAL((ui64)event->Record.GetStatus(), (ui64)NKikimrDataEvents::TEvWriteResult::PREPARED); + + outdatedStep = event->Record.GetMaxStep() + 1; + PlanWriteTx(runtime, sender, NOlap::TSnapshot(outdatedStep, txId + 1), false); + } + + auto readResult = ReadAllAsBatch(runtime, tableId, NOlap::TSnapshot(outdatedStep, txId), schema); + UNIT_ASSERT_VALUES_EQUAL(readResult->num_rows(), 0); + } + + Y_UNIT_TEST(WriteWithSplit) { + using namespace NArrow; + + TTestBasicRuntime runtime; + TTester::Setup(runtime); + const ui64 tableId = 1; + const std::vector<std::pair<TString, TTypeInfo>> schema = { + {"key", TTypeInfo(NTypeIds::Uint64) }, + {"field", TTypeInfo(NTypeIds::Utf8) } + }; + PrepareTablet(runtime, tableId, schema); const ui64 txId = 111; - const std::vector<std::pair<TString, TTypeInfo>>& ydbSchema = table.Schema; - TString blobData = MakeTestBlob({0, 100}, ydbSchema); + NConstruction::IArrayBuilder::TPtr keyColumn = std::make_shared<NConstruction::TSimpleArrayConstructor<NConstruction::TIntSeqFiller<arrow::UInt64Type>>>("key"); + NConstruction::IArrayBuilder::TPtr column = std::make_shared<NConstruction::TSimpleArrayConstructor<NConstruction::TStringPoolFiller>>( + "field", NConstruction::TStringPoolFiller(8, TLimits::GetMaxBlobSize() / 1024)); + + auto batch = NConstruction::TRecordBatchConstructor({ keyColumn, column }).BuildBatch(2048); + TString blobData = NArrow::SerializeBatchNoCompression(batch); + UNIT_ASSERT(blobData.size() > TLimits::GetMaxBlobSize()); auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(txId); - auto dataPtr = std::make_shared<TArrowData>(ydbSchema, TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData))); + auto dataPtr = std::make_shared<TArrowData>(schema, TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData))); evWrite->AddReplaceOp(tableId, dataPtr); + TActorId sender = runtime.AllocateEdgeActor(); ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, evWrite.release()); - TAutoPtr<NActors::IEventHandle> handle; - auto event = runtime.GrabEdgeEvent<NKikimr::NEvents::TDataEvents::TEvWriteResult>(handle); - UNIT_ASSERT(event); - UNIT_ASSERT_EQUAL(event->Record.GetStatus(), NKikimrDataEvents::TEvWriteResult::PREPARED); - PlanWriteTx(runtime, sender, NOlap::TSnapshot(11, txId)); + { + TAutoPtr<NActors::IEventHandle> handle; + auto event = runtime.GrabEdgeEvent<NKikimr::NEvents::TDataEvents::TEvWriteResult>(handle); + UNIT_ASSERT(event); + UNIT_ASSERT_VALUES_EQUAL((ui64)event->Record.GetStatus(), (ui64)NKikimrDataEvents::TEvWriteResult::PREPARED); + PlanWriteTx(runtime, sender, NOlap::TSnapshot(11, txId)); + } + + auto readResult = ReadAllAsBatch(runtime, tableId, NOlap::TSnapshot(11, txId), schema); + UNIT_ASSERT_VALUES_EQUAL(readResult->num_rows(), 2048); } } diff --git a/ydb/core/tx/columnshard/write_actor.cpp b/ydb/core/tx/columnshard/write_actor.cpp index 194ece3ea31..9b020e09276 100644 --- a/ydb/core/tx/columnshard/write_actor.cpp +++ b/ydb/core/tx/columnshard/write_actor.cpp @@ -94,6 +94,9 @@ public: } auto blobsConstructor = WriteController->GetBlobConstructor(); + if (!blobsConstructor) { + return SendResultAndDie(ctx, NKikimrProto::ERROR); + } auto status = NOlap::IBlobConstructor::EStatus::Finished; while (true) { status = blobsConstructor->BuildNext(); diff --git a/ydb/core/tx/ev_write/write_data.h b/ydb/core/tx/ev_write/write_data.h index d15a59af45d..54f7ee05259 100644 --- a/ydb/core/tx/ev_write/write_data.h +++ b/ydb/core/tx/ev_write/write_data.h @@ -21,9 +21,8 @@ class TWriteMeta { YDB_ACCESSOR_DEF(NActors::TActorId, Source); // Long Tx logic - YDB_OPT(NLongTxService::TLongTxId, LongTxId) + YDB_OPT(NLongTxService::TLongTxId, LongTxId); YDB_ACCESSOR(ui64, WritePartId, 0); - YDB_READONLY(ui64, MetaShard, 0); YDB_ACCESSOR_DEF(TString, DedupId); public: |