aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authornsofya <nsofya@yandex-team.com>2023-08-03 12:19:56 +0300
committernsofya <nsofya@yandex-team.com>2023-08-03 12:19:56 +0300
commit0a9a054350987d557ba4e8e2fb2317e216210e0f (patch)
treeffa3e66335a366634c0a67bce19d87655f9016d7
parent0905c24307d707354dc6cc258de1b2b7b509de4e (diff)
downloadydb-0a9a054350987d557ba4e8e2fb2317e216210e0f.tar.gz
KIKIMR-18876: Split data for EvWrite
- Добавила разбиение данных по MaxBlobSize (теперь на одну операцию может приходить несколько InternalWriteIds) - Для старого EvWrite разбивка пока осталась снаружи, но есть проверки, что блобы приходят правильного размера (на входе) + перед записью, что мы дополнительно ничего не разбили (старый тест на ошибку при большом размере прошел) - Дочистила MetaShard в TWriteMeta
-rw-r--r--ydb/core/formats/arrow/arrow_helpers.cpp12
-rw-r--r--ydb/core/formats/arrow/size_calcer.cpp2
-rw-r--r--ydb/core/formats/arrow/size_calcer.h8
-rw-r--r--ydb/core/protos/tx_columnshard.proto4
-rw-r--r--ydb/core/tx/columnshard/columnshard.h18
-rw-r--r--ydb/core/tx/columnshard/columnshard__write.cpp92
-rw-r--r--ydb/core/tx/columnshard/columnshard_private_events.h12
-rw-r--r--ydb/core/tx/columnshard/columnshard_schema.h10
-rw-r--r--ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp59
-rw-r--r--ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h15
-rw-r--r--ydb/core/tx/columnshard/operations/write.cpp48
-rw-r--r--ydb/core/tx/columnshard/operations/write.h10
-rw-r--r--ydb/core/tx/columnshard/operations/write_data.cpp2
-rw-r--r--ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp165
-rw-r--r--ydb/core/tx/columnshard/write_actor.cpp3
-rw-r--r--ydb/core/tx/ev_write/write_data.h3
16 files changed, 341 insertions, 122 deletions
diff --git a/ydb/core/formats/arrow/arrow_helpers.cpp b/ydb/core/formats/arrow/arrow_helpers.cpp
index b757020f47f..b330b7f368c 100644
--- a/ydb/core/formats/arrow/arrow_helpers.cpp
+++ b/ydb/core/formats/arrow/arrow_helpers.cpp
@@ -212,23 +212,27 @@ std::shared_ptr<arrow::RecordBatch> ExtractExistedColumns(const std::shared_ptr<
std::shared_ptr<arrow::Table> CombineInTable(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches) {
auto res = arrow::Table::FromRecordBatches(batches);
if (!res.ok()) {
- return {};
+ return nullptr;
}
res = (*res)->CombineChunks();
if (!res.ok()) {
- return {};
+ return nullptr;
}
- return *res;
+ return res.ValueOrDie();
}
std::shared_ptr<arrow::RecordBatch> CombineBatches(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches) {
+ if (batches.empty()) {
+ return nullptr;
+ }
auto table = CombineInTable(batches);
- return ToBatch(table);
+ return table ? ToBatch(table) : nullptr;
}
std::shared_ptr<arrow::RecordBatch> ToBatch(const std::shared_ptr<arrow::Table>& table) {
+ Y_VERIFY(table);
std::vector<std::shared_ptr<arrow::Array>> columns;
columns.reserve(table->num_columns());
for (auto& col : table->columns()) {
diff --git a/ydb/core/formats/arrow/size_calcer.cpp b/ydb/core/formats/arrow/size_calcer.cpp
index 26e39be2ba1..9f9e98fc721 100644
--- a/ydb/core/formats/arrow/size_calcer.cpp
+++ b/ydb/core/formats/arrow/size_calcer.cpp
@@ -214,7 +214,7 @@ ui64 GetArrayDataSize(const std::shared_ptr<arrow::Array>& column) {
}
NKikimr::NArrow::TSerializedBatch TSerializedBatch::Build(std::shared_ptr<arrow::RecordBatch> batch) {
- return TSerializedBatch(NArrow::SerializeSchema(*batch->schema()), NArrow::SerializeBatchNoCompression(batch), batch->num_rows());
+ return TSerializedBatch(NArrow::SerializeSchema(*batch->schema()), NArrow::SerializeBatchNoCompression(batch), batch->num_rows(), NArrow::GetBatchDataSize(batch));
}
bool TSerializedBatch::BuildWithLimit(std::shared_ptr<arrow::RecordBatch> batch, const ui32 sizeLimit, std::optional<TSerializedBatch>& sbL, std::optional<TSerializedBatch>& sbR, TString* errorMessage) {
diff --git a/ydb/core/formats/arrow/size_calcer.h b/ydb/core/formats/arrow/size_calcer.h
index bc24be01982..f17566c85cd 100644
--- a/ydb/core/formats/arrow/size_calcer.h
+++ b/ydb/core/formats/arrow/size_calcer.h
@@ -50,6 +50,7 @@ private:
YDB_READONLY_DEF(TString, SchemaData);
YDB_READONLY_DEF(TString, Data);
YDB_READONLY(ui32, RowsCount, 0);
+ YDB_READONLY(ui32, RawBytes, 0);
public:
size_t GetSize() const {
return Data.size();
@@ -59,10 +60,11 @@ public:
static bool BuildWithLimit(std::shared_ptr<arrow::RecordBatch> batch, const ui32 sizeLimit, std::optional<TSerializedBatch>& sbL, std::optional<TSerializedBatch>& sbR, TString* errorMessage);
static TSerializedBatch Build(std::shared_ptr<arrow::RecordBatch> batch);
- TSerializedBatch(TString&& schemaData, TString&& data, const ui32 rowsCount)
+ TSerializedBatch(TString&& schemaData, TString&& data, const ui32 rowsCount, const ui32 rawBytes)
: SchemaData(schemaData)
, Data(data)
, RowsCount(rowsCount)
+ , RawBytes(rawBytes)
{
}
@@ -84,6 +86,10 @@ public:
bool operator!() const {
return !!ErrorMessage;
}
+
+ std::vector<TSerializedBatch>&& ReleaseResult() {
+ return std::move(Result);
+ }
};
TSplitBlobResult SplitByBlobSize(const std::shared_ptr<arrow::RecordBatch>& batch, const ui32 sizeLimit);
diff --git a/ydb/core/protos/tx_columnshard.proto b/ydb/core/protos/tx_columnshard.proto
index 5c4a2907a5a..3e6db3dd209 100644
--- a/ydb/core/protos/tx_columnshard.proto
+++ b/ydb/core/protos/tx_columnshard.proto
@@ -319,3 +319,7 @@ message TEvReadBlobRangesResult {
optional uint64 TabletId = 1;
repeated TResult Results = 2;
}
+
+message TInternalOperationData {
+ repeated uint64 InternalWriteIds = 1;
+}
diff --git a/ydb/core/tx/columnshard/columnshard.h b/ydb/core/tx/columnshard/columnshard.h
index 50899858f2a..96ced7b1781 100644
--- a/ydb/core/tx/columnshard/columnshard.h
+++ b/ydb/core/tx/columnshard/columnshard.h
@@ -271,7 +271,7 @@ struct TEvColumnShard {
TEvWriteResult(ui64 origin, const NEvWrite::TWriteMeta& writeMeta, const i64 writeId, ui32 status) {
Record.SetOrigin(origin);
- Record.SetTxInitiator(writeMeta.GetMetaShard());
+ Record.SetTxInitiator(0);
Record.SetWriteId(writeId);
Record.SetTableId(writeMeta.GetTableId());
Record.SetDedupId(writeMeta.GetDedupId());
@@ -319,6 +319,22 @@ struct TEvColumnShard {
TEvReadResult(const TEvReadResult& ev) {
Record.CopyFrom(ev.Record);
}
+
+ std::shared_ptr<arrow::RecordBatch> GetArrowBatch() const {
+ const auto& scheme = Record.GetMeta().GetSchema();
+ if (scheme.empty() || Record.GetMeta().GetFormat() != NKikimrTxColumnShard::FORMAT_ARROW) {
+ return nullptr;
+ }
+ const auto arrowSchema = NArrow::DeserializeSchema(scheme);
+ if (Record.GetData().empty()) {
+ return NArrow::MakeEmptyBatch(arrowSchema);
+ }
+ return NArrow::DeserializeBatch(Record.GetData(), arrowSchema);
+ }
+
+ bool HasMore() const {
+ return !Record.GetFinished();
+ }
};
using TEvScan = TEvDataShard::TEvKqpScan;
diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp
index 84eef1582f8..a3f5e4e9980 100644
--- a/ydb/core/tx/columnshard/columnshard__write.cpp
+++ b/ydb/core/tx/columnshard/columnshard__write.cpp
@@ -28,6 +28,8 @@ private:
const ui32 TabletTxNo;
std::unique_ptr<NActors::IEventBase> Result;
+ bool InsertOneBlob(TTransactionContext& txc, const TEvPrivate::TEvWriteBlobsResult::TPutBlobData& blobData, const TWriteId writeId);
+
TStringBuilder TxPrefix() const {
return TStringBuilder() << "TxWrite[" << ToString(TabletTxNo) << "] ";
}
@@ -37,17 +39,7 @@ private:
}
};
-
-bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
- LOG_S_DEBUG(TxPrefix() << "execute" << TxSuffix());
-
- const auto& writeMeta(PutBlobResult->Get()->GetWriteMeta());
- const auto& blobData(PutBlobResult->Get()->GetBlobData());
-
- txc.DB.NoMoreReadsForTx();
- NIceDb::TNiceDb db(txc.DB);
-
- auto writeId = writeMeta.GetWriteId();
+bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const TEvPrivate::TEvWriteBlobsResult::TPutBlobData& blobData, const TWriteId writeId) {
const TString data = blobData.GetBlobData();
NKikimrTxColumnShard::TLogicalMetadata meta;
@@ -56,20 +48,6 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
const auto& logoBlobId = blobData.GetBlobId();
Y_VERIFY(logoBlobId.IsValid());
- Y_VERIFY(Self->TablesManager.IsReadyForWrite(writeMeta.GetTableId()));
-
- TWriteOperation::TPtr operation;
- if (writeMeta.HasLongTxId()) {
- Y_VERIFY(writeMeta.GetMetaShard() == 0);
- writeId = (ui64)Self->GetLongTxWrite(db, writeMeta.GetLongTxIdUnsafe(), writeMeta.GetWritePartId());
- } else {
- operation = Self->OperationsManager.GetOperation((TWriteId)writeMeta.GetWriteId());
- if (operation) {
- Y_VERIFY(operation->GetStatus() == EOperationStatus::Started);
- writeId = (ui64)Self->BuildNextWriteId(txc);
- }
- }
-
ui64 writeUnixTime = meta.GetDirtyWriteTimeSeconds();
TInstant time = TInstant::Seconds(writeUnixTime);
@@ -77,10 +55,13 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
TBlobGroupSelector dsGroupSelector(Self->Info());
NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector);
- NOlap::TInsertedData insertData(writeMeta.GetMetaShard(), (ui64)writeId, writeMeta.GetTableId(), writeMeta.GetDedupId(), logoBlobId, blobData.GetLogicalMeta(), time, PutBlobResult->Get()->GetSnapshot());
+ const auto& writeMeta(PutBlobResult->Get()->GetWriteMeta());
+
+ NOlap::TInsertedData insertData(0, (ui64)writeId, writeMeta.GetTableId(), writeMeta.GetDedupId(), logoBlobId, blobData.GetLogicalMeta(), time, PutBlobResult->Get()->GetSnapshot());
bool ok = Self->InsertTable->Insert(dbTable, std::move(insertData));
if (ok) {
THashSet<TWriteId> writesToAbort = Self->InsertTable->OldWritesToAbort(time);
+ NIceDb::TNiceDb db(txc.DB);
Self->TryAbortWrites(db, dbTable, std::move(writesToAbort));
// TODO: It leads to write+erase for aborted rows. Abort() inserts rows, EraseAborted() erases them.
@@ -107,22 +88,54 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
Self->IncCounter(COUNTER_WRITE_SUCCESS);
Self->BlobManager->SaveBlobBatch((std::move(PutBlobResult->Get()->GetPutResultPtr()->ReleaseBlobBatch())), blobManagerDb);
+ return true;
+ }
+ return false;
+}
+
+bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
+ LOG_S_DEBUG(TxPrefix() << "execute" << TxSuffix());
+
+ const auto& writeMeta(PutBlobResult->Get()->GetWriteMeta());
+ Y_VERIFY(Self->TablesManager.IsReadyForWrite(writeMeta.GetTableId()));
+
+ txc.DB.NoMoreReadsForTx();
+ TWriteOperation::TPtr operation;
+ if (writeMeta.HasLongTxId()) {
+ Y_VERIFY_S(PutBlobResult->Get()->GetBlobData().size() == 1, TStringBuilder() << "Blobs count: " << PutBlobResult->Get()->GetBlobData().size());
+ } else {
+ operation = Self->OperationsManager.GetOperation((TWriteId)writeMeta.GetWriteId());
+ Y_VERIFY(operation);
+ Y_VERIFY(operation->GetStatus() == EOperationStatus::Started);
+ }
+
+ TVector<TWriteId> writeIds;
+ for (auto blobData: PutBlobResult->Get()->GetBlobData()) {
+ auto writeId = TWriteId(writeMeta.GetWriteId());
if (operation) {
- operation->OnWriteFinish(txc, writeId);
- auto txInfo = Self->ProgressTxController.RegisterTxWithDeadline(operation->GetTxId(), NKikimrTxColumnShard::TX_KIND_COMMIT_WRITE, "", writeMeta.GetSource(), 0, txc);
- Y_UNUSED(txInfo);
+ writeId = Self->BuildNextWriteId(txc);
+ } else {
+ NIceDb::TNiceDb db(txc.DB);
+ writeId = Self->GetLongTxWrite(db, writeMeta.GetLongTxIdUnsafe(), writeMeta.GetWritePartId());
}
- } else {
- LOG_S_DEBUG(TxPrefix() << "duplicate writeId " << writeId << TxSuffix());
- Self->IncCounter(COUNTER_WRITE_DUPLICATE);
+
+ if (!InsertOneBlob(txc, blobData, writeId)) {
+ LOG_S_DEBUG(TxPrefix() << "duplicate writeId " << (ui64)writeId << TxSuffix());
+ Self->IncCounter(COUNTER_WRITE_DUPLICATE);
+ }
+ writeIds.push_back(writeId);
}
if (operation) {
+ operation->OnWriteFinish(txc, writeIds);
+ auto txInfo = Self->ProgressTxController.RegisterTxWithDeadline(operation->GetTxId(), NKikimrTxColumnShard::TX_KIND_COMMIT_WRITE, "", writeMeta.GetSource(), 0, txc);
+ Y_UNUSED(txInfo);
NEvents::TDataEvents::TCoordinatorInfo tInfo = Self->ProgressTxController.GetCoordinatorInfo(operation->GetTxId());
Result = NEvents::TDataEvents::TEvWriteResult::BuildPrepared(operation->GetTxId(), tInfo);
} else {
- Result = std::make_unique<TEvColumnShard::TEvWriteResult>(Self->TabletID(), writeMeta, writeId, NKikimrTxColumnShard::EResultStatus::SUCCESS);
+ Y_VERIFY(writeIds.size() == 1);
+ Result = std::make_unique<TEvColumnShard::TEvWriteResult>(Self->TabletID(), writeMeta, (ui64)writeIds.front(), NKikimrTxColumnShard::EResultStatus::SUCCESS);
}
return true;
}
@@ -200,20 +213,19 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteBlobsResult::TPtr& ev, const TActo
errCode = NKikimrTxColumnShard::EResultStatus::STORAGE_ERROR;
}
- auto operation = OperationsManager.GetOperation((TWriteId)writeMeta.GetWriteId());
- if (operation) {
- auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(operation->GetTxId(), NKikimrDataEvents::TEvWriteResult::ERROR, "put data fails");
+ if (writeMeta.HasLongTxId()) {
+ auto result = std::make_unique<TEvColumnShard::TEvWriteResult>(TabletID(), writeMeta, errCode);
ctx.Send(writeMeta.GetSource(), result.release());
} else {
- auto result = std::make_unique<TEvColumnShard::TEvWriteResult>(TabletID(), writeMeta, errCode);
+ auto operation = OperationsManager.GetOperation((TWriteId)writeMeta.GetWriteId());
+ Y_VERIFY(operation);
+ auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(operation->GetTxId(), NKikimrDataEvents::TEvWriteResult::ERROR, "put data fails");
ctx.Send(writeMeta.GetSource(), result.release());
}
return;
}
- const auto& blobData = ev->Get()->GetBlobData();
- Y_VERIFY(blobData.GetBlobId().IsValid());
- LOG_S_DEBUG("Write (record) " << blobData.GetBlobData().size() << " bytes into pathId " << writeMeta.GetTableId()
+ LOG_S_DEBUG("Write (record) into pathId " << writeMeta.GetTableId()
<< (writeMeta.GetWriteId() ? (" writeId " + ToString(writeMeta.GetWriteId())).c_str() : "") << " at tablet " << TabletID());
Execute(new TTxWrite(this, ev), ctx);
diff --git a/ydb/core/tx/columnshard/columnshard_private_events.h b/ydb/core/tx/columnshard/columnshard_private_events.h
index fa62a2478ec..7fd7da79e3a 100644
--- a/ydb/core/tx/columnshard/columnshard_private_events.h
+++ b/ydb/core/tx/columnshard/columnshard_private_events.h
@@ -255,12 +255,16 @@ struct TEvPrivate {
YDB_READONLY_DEF(TUnifiedBlobId, BlobId);
YDB_READONLY_DEF(TString, BlobData);
YDB_ACCESSOR_DEF(TString, LogicalMeta);
+ YDB_ACCESSOR(ui64, RowsCount, 0);
+ YDB_ACCESSOR(ui64, RawBytes, 0);
public:
TPutBlobData() = default;
- TPutBlobData(const TUnifiedBlobId& blobId, const TString& data)
+ TPutBlobData(const TUnifiedBlobId& blobId, const TString& data, ui64 rowsCount, ui64 rawBytes)
: BlobId(blobId)
, BlobData(data)
+ , RowsCount(rowsCount)
+ , RawBytes(rawBytes)
{}
};
@@ -272,13 +276,13 @@ struct TEvPrivate {
Y_VERIFY(PutResult);
}
- TEvWriteBlobsResult(const NColumnShard::TBlobPutResult::TPtr& putResult, TPutBlobData&& blobData, const NEvWrite::TWriteMeta& writeMeta, const NOlap::TSnapshot& snapshot)
+ TEvWriteBlobsResult(const NColumnShard::TBlobPutResult::TPtr& putResult, TVector<TPutBlobData>&& blobData, const NEvWrite::TWriteMeta& writeMeta, const NOlap::TSnapshot& snapshot)
: TEvWriteBlobsResult(putResult, writeMeta, snapshot)
{
BlobData = std::move(blobData);
}
- const TPutBlobData& GetBlobData() const {
+ const TVector<TPutBlobData>& GetBlobData() const {
return BlobData;
}
@@ -300,7 +304,7 @@ struct TEvPrivate {
private:
NColumnShard::TBlobPutResult::TPtr PutResult;
- TPutBlobData BlobData;
+ TVector<TPutBlobData> BlobData;
NEvWrite::TWriteMeta WriteMeta;
NOlap::TSnapshot Snapshot;
};
diff --git a/ydb/core/tx/columnshard/columnshard_schema.h b/ydb/core/tx/columnshard/columnshard_schema.h
index d119e4a3bf6..06578fafc5d 100644
--- a/ydb/core/tx/columnshard/columnshard_schema.h
+++ b/ydb/core/tx/columnshard/columnshard_schema.h
@@ -254,9 +254,10 @@ struct Schema : NIceDb::Schema {
struct Status : Column<3, NScheme::NTypeIds::Uint32> {};
struct CreatedAt : Column<4, NScheme::NTypeIds::Uint64> {};
struct GlobalWriteId : Column<5, NScheme::NTypeIds::Uint64> {};
+ struct Metadata : Column<6, NScheme::NTypeIds::String> {};
using TKey = TableKey<WriteId>;
- using TColumns = TableColumns<TxId, WriteId, Status, CreatedAt, GlobalWriteId>;
+ using TColumns = TableColumns<TxId, WriteId, Status, CreatedAt, GlobalWriteId, Metadata>;
};
using TTables = SchemaTables<
@@ -665,10 +666,15 @@ struct Schema : NIceDb::Schema {
// Operations
static void Operations_Write(NIceDb::TNiceDb& db, const TWriteOperation& operation) {
+ TString metadata;
+ NKikimrTxColumnShard::TInternalOperationData proto;
+ operation.ToProto(proto);
+ Y_VERIFY(proto.SerializeToString(&metadata));
+
db.Table<Operations>().Key((ui64)operation.GetWriteId()).Update(
NIceDb::TUpdate<Operations::Status>((ui32)operation.GetStatus()),
NIceDb::TUpdate<Operations::CreatedAt>(operation.GetCreatedAt().Seconds()),
- NIceDb::TUpdate<Operations::GlobalWriteId>(operation.GetGlobalWriteId()),
+ NIceDb::TUpdate<Operations::Metadata>(metadata),
NIceDb::TUpdate<Operations::TxId>(operation.GetTxId())
);
}
diff --git a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp
index 164661a4c1d..6e62e700576 100644
--- a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp
+++ b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp
@@ -12,44 +12,46 @@ TIndexedWriteController::TBlobConstructor::TBlobConstructor(NOlap::ISnapshotSche
{}
const TString& TIndexedWriteController::TBlobConstructor::GetBlob() const {
- return DataPrepared;
+ Y_VERIFY(CurrentIndex > 0);
+ return BlobsSplitted[CurrentIndex - 1].GetData();
}
IBlobConstructor::EStatus TIndexedWriteController::TBlobConstructor::BuildNext() {
- if (!!DataPrepared) {
+ if (CurrentIndex == BlobsSplitted.size()) {
return EStatus::Finished;
}
+ CurrentIndex++;
+ return EStatus::Ok;
+}
+bool TIndexedWriteController::TBlobConstructor::RegisterBlobId(const TUnifiedBlobId& blobId) {
+ const auto& blobInfo = BlobsSplitted[CurrentIndex - 1];
+ Owner.AddBlob(NColumnShard::TEvPrivate::TEvWriteBlobsResult::TPutBlobData(blobId, blobInfo.GetData(), blobInfo.GetRowsCount(), blobInfo.GetRawBytes()));
+ return true;
+}
+
+bool TIndexedWriteController::TBlobConstructor::Init() {
const auto& writeMeta = Owner.WriteData.GetWriteMeta();
const ui64 tableId = writeMeta.GetTableId();
const ui64 writeId = writeMeta.GetWriteId();
- // Heavy operations inside. We cannot run them in tablet event handler.
+ std::shared_ptr<arrow::RecordBatch> batch;
{
NColumnShard::TCpuGuard guard(Owner.ResourceUsage);
- Batch = Owner.WriteData.GetData().GetArrowBatch();
+ batch = Owner.WriteData.GetData().GetArrowBatch();
}
- if (!Batch) {
+ if (!batch) {
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "ev_write_bad_data")("write_id", writeId)("table_id", tableId);
- return EStatus::Error;
+ return false;
}
- {
- NColumnShard::TCpuGuard guard(Owner.ResourceUsage);
- DataPrepared = NArrow::SerializeBatchNoCompression(Batch);
+ auto splitResult = NArrow::SplitByBlobSize(batch, NColumnShard::TLimits::GetMaxBlobSize());
+ if (!splitResult) {
+ AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", TStringBuilder() << "cannot split batch in according to limits: " + splitResult.GetErrorMessage());
+ return false;
}
-
- if (DataPrepared.size() > NColumnShard::TLimits::GetMaxBlobSize()) {
- AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "ev_write_data_too_big")("write_id", writeId)("table_id", tableId);
- return EStatus::Error;
- }
- return EStatus::Ok;
-}
-
-bool TIndexedWriteController::TBlobConstructor::RegisterBlobId(const TUnifiedBlobId& blobId) {
- Y_VERIFY(blobId.BlobSize() == DataPrepared.size());
- Owner.AddBlob(NColumnShard::TEvPrivate::TEvWriteBlobsResult::TPutBlobData(blobId, DataPrepared), Batch->num_rows(), NArrow::GetBatchDataSize(Batch));
+ BlobsSplitted = splitResult.ReleaseResult();
return true;
}
@@ -63,8 +65,7 @@ TIndexedWriteController::TIndexedWriteController(const TActorId& dstActor, const
void TIndexedWriteController::DoOnReadyResult(const NActors::TActorContext& ctx, const NColumnShard::TBlobPutResult::TPtr& putResult) {
if (putResult->GetPutStatus() == NKikimrProto::OK) {
- Y_VERIFY(BlobData.size() == 1);
- auto result = std::make_unique<NColumnShard::TEvPrivate::TEvWriteBlobsResult>(putResult, std::move(BlobData[0]), WriteData.GetWriteMeta(), BlobConstructor->GetSnapshot());
+ auto result = std::make_unique<NColumnShard::TEvPrivate::TEvWriteBlobsResult>(putResult, std::move(BlobData), WriteData.GetWriteMeta(), BlobConstructor->GetSnapshot());
ctx.Send(DstActor, result.release());
} else {
auto result = std::make_unique<NColumnShard::TEvPrivate::TEvWriteBlobsResult>(putResult, WriteData.GetWriteMeta(), BlobConstructor->GetSnapshot());
@@ -72,14 +73,20 @@ void TIndexedWriteController::DoOnReadyResult(const NActors::TActorContext& ctx,
}
}
-void TIndexedWriteController::AddBlob(NColumnShard::TEvPrivate::TEvWriteBlobsResult::TPutBlobData&& data, const ui64 numRows, const ui64 batchSize) {
- Y_VERIFY(BlobData.empty());
+NOlap::IBlobConstructor::TPtr TIndexedWriteController::GetBlobConstructor() {
+ if (!BlobConstructor->Init()) {
+ return nullptr;
+ }
+ return BlobConstructor;
+}
+void TIndexedWriteController::AddBlob(NColumnShard::TEvPrivate::TEvWriteBlobsResult::TPutBlobData&& data) {
ui64 dirtyTime = AppData()->TimeProvider->Now().Seconds();
Y_VERIFY(dirtyTime);
+
NKikimrTxColumnShard::TLogicalMetadata outMeta;
- outMeta.SetNumRows(numRows);
- outMeta.SetRawBytes(batchSize);
+ outMeta.SetNumRows(data.GetRowsCount());
+ outMeta.SetRawBytes(data.GetRawBytes());
outMeta.SetDirtyWriteTimeSeconds(dirtyTime);
TString metaString;
diff --git a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h
index 659420c714a..f76187e450b 100644
--- a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h
+++ b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h
@@ -7,6 +7,7 @@
#include <ydb/core/tx/columnshard/engines/portion_info.h>
#include <ydb/core/tx/columnshard/columnshard.h>
#include <ydb/core/tx/columnshard/columnshard_private_events.h>
+#include <ydb/core/formats/arrow/size_calcer.h>
namespace NKikimr::NOlap {
@@ -16,10 +17,9 @@ private:
class TBlobConstructor : public IBlobConstructor {
TIndexedWriteController& Owner;
NOlap::ISnapshotSchema::TPtr SnapshotSchema;
+ std::vector<NArrow::TSerializedBatch> BlobsSplitted;
- TString DataPrepared;
- std::shared_ptr<arrow::RecordBatch> Batch;
-
+ ui64 CurrentIndex = 0;
public:
TBlobConstructor(NOlap::ISnapshotSchema::TPtr snapshotSchema, TIndexedWriteController& owner);
@@ -29,11 +29,12 @@ private:
const NOlap::TSnapshot& GetSnapshot() const {
return SnapshotSchema->GetSnapshot();
}
+ bool Init();
};
NEvWrite::TWriteData WriteData;
std::shared_ptr<TBlobConstructor> BlobConstructor;
- std::vector<NColumnShard::TEvPrivate::TEvWriteBlobsResult::TPutBlobData> BlobData;
+ TVector<NColumnShard::TEvPrivate::TEvWriteBlobsResult::TPutBlobData> BlobData;
TActorId DstActor;
public:
@@ -41,12 +42,10 @@ public:
void DoOnReadyResult(const NActors::TActorContext& ctx, const NColumnShard::TBlobPutResult::TPtr& putResult) override;
- NOlap::IBlobConstructor::TPtr GetBlobConstructor() override {
- return BlobConstructor;
- }
+ NOlap::IBlobConstructor::TPtr GetBlobConstructor() override;
public:
- void AddBlob(NColumnShard::TEvPrivate::TEvWriteBlobsResult::TPutBlobData&& data, const ui64 numRows, const ui64 batchSize);
+ void AddBlob(NColumnShard::TEvPrivate::TEvWriteBlobsResult::TPutBlobData&& data);
};
}
diff --git a/ydb/core/tx/columnshard/operations/write.cpp b/ydb/core/tx/columnshard/operations/write.cpp
index 59878c0e7d7..cefb4b481bd 100644
--- a/ydb/core/tx/columnshard/operations/write.cpp
+++ b/ydb/core/tx/columnshard/operations/write.cpp
@@ -10,12 +10,11 @@
namespace NKikimr::NColumnShard {
- TWriteOperation::TWriteOperation(const TWriteId writeId, const ui64 txId, const EOperationStatus& status, const TInstant createdAt, const ui64 globalWriteId)
+ TWriteOperation::TWriteOperation(const TWriteId writeId, const ui64 txId, const EOperationStatus& status, const TInstant createdAt)
: Status(status)
, CreatedAt(createdAt)
, WriteId(writeId)
, TxId(txId)
- , GlobalWriteId(globalWriteId)
{
}
@@ -35,34 +34,50 @@ namespace NKikimr::NColumnShard {
TBlobGroupSelector dsGroupSelector(owner.Info());
NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector);
- auto pathExists = [&](ui64 pathId) {
- return owner.TablesManager.HasTable(pathId);
- };
+ for (auto gWriteId : GlobalWriteIds) {
+ auto pathExists = [&](ui64 pathId) {
+ return owner.TablesManager.HasTable(pathId);
+ };
- auto counters = owner.InsertTable->Commit(dbTable, snapshot.GetPlanStep(), snapshot.GetTxId(), 0, { WriteId },
- pathExists);
+ auto counters = owner.InsertTable->Commit(dbTable, snapshot.GetPlanStep(), snapshot.GetTxId(), 0, { gWriteId },
+ pathExists);
- owner.IncCounter(COUNTER_BLOBS_COMMITTED, counters.Rows);
- owner.IncCounter(COUNTER_BYTES_COMMITTED, counters.Bytes);
- owner.IncCounter(COUNTER_RAW_BYTES_COMMITTED, counters.RawBytes);
+ owner.IncCounter(COUNTER_BLOBS_COMMITTED, counters.Rows);
+ owner.IncCounter(COUNTER_BYTES_COMMITTED, counters.Bytes);
+ owner.IncCounter(COUNTER_RAW_BYTES_COMMITTED, counters.RawBytes);
+ }
owner.UpdateInsertTableCounters();
}
- void TWriteOperation::OnWriteFinish(NTabletFlatExecutor::TTransactionContext& txc, const ui64 globalWriteId) {
+ void TWriteOperation::OnWriteFinish(NTabletFlatExecutor::TTransactionContext& txc, const TVector<TWriteId>& globalWriteIds) {
Y_VERIFY(Status == EOperationStatus::Started);
Status = EOperationStatus::Prepared;
- GlobalWriteId = globalWriteId;
+ GlobalWriteIds = globalWriteIds;
NIceDb::TNiceDb db(txc.DB);
Schema::Operations_Write(db, *this);
}
+ void TWriteOperation::ToProto(NKikimrTxColumnShard::TInternalOperationData& proto) const {
+ for (auto&& writeId : GlobalWriteIds) {
+ proto.AddInternalWriteIds((ui64)writeId);
+ }
+ }
+
+ void TWriteOperation::FromProto(const NKikimrTxColumnShard::TInternalOperationData& proto) {
+ for (auto&& writeId : proto.GetInternalWriteIds()) {
+ GlobalWriteIds.push_back(TWriteId(writeId));
+ }
+ }
+
void TWriteOperation::Abort(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) const {
Y_VERIFY(Status == EOperationStatus::Prepared);
TBlobGroupSelector dsGroupSelector(owner.Info());
NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector);
- owner.InsertTable->Abort(dbTable, 0, {WriteId});
+ THashSet<TWriteId> writeIds;
+ writeIds.insert(GlobalWriteIds.begin(), GlobalWriteIds.end());
+ owner.InsertTable->Abort(dbTable, 0, writeIds);
TBlobManagerDb blobManagerDb(txc.DB);
auto allAborted = owner.InsertTable->GetAborted();
@@ -83,10 +98,13 @@ namespace NKikimr::NColumnShard {
const TWriteId writeId = (TWriteId) rowset.GetValue<Schema::Operations::WriteId>();
const ui64 createdAtSec = rowset.GetValue<Schema::Operations::CreatedAt>();
const ui64 txId = rowset.GetValue<Schema::Operations::TxId>();
- const ui64 globalWriteId = rowset.GetValue<Schema::Operations::GlobalWriteId>();
+ const TString metadata = rowset.GetValue<Schema::Operations::Metadata>();
+ NKikimrTxColumnShard::TInternalOperationData metaProto;
+ Y_VERIFY(metaProto.ParseFromString(metadata));
const EOperationStatus status = (EOperationStatus) rowset.GetValue<Schema::Operations::Status>();
- auto operation = std::make_shared<TWriteOperation>(writeId, txId, status, TInstant::Seconds(createdAtSec), globalWriteId);
+ auto operation = std::make_shared<TWriteOperation>(writeId, txId, status, TInstant::Seconds(createdAtSec));
+ operation->FromProto(metaProto);
Y_VERIFY(operation->GetStatus() != EOperationStatus::Draft);
diff --git a/ydb/core/tx/columnshard/operations/write.h b/ydb/core/tx/columnshard/operations/write.h
index d50b4cfcce6..cb90eaaa235 100644
--- a/ydb/core/tx/columnshard/operations/write.h
+++ b/ydb/core/tx/columnshard/operations/write.h
@@ -3,6 +3,7 @@
#include <ydb/core/tx/ev_write/write_data.h>
#include <ydb/core/tx/columnshard/common/snapshot.h>
#include <ydb/core/tx/columnshard/engines/defs.h>
+#include <ydb/core/protos/tx_columnshard.pb.h>
#include <ydb/core/tablet_flat/flat_cxx_database.h>
#include <ydb/library/accessor/accessor.h>
@@ -32,21 +33,24 @@ namespace NKikimr::NColumnShard {
YDB_READONLY_DEF(TInstant, CreatedAt);
YDB_READONLY_DEF(TWriteId, WriteId);
YDB_READONLY(ui64, TxId, 0);
- YDB_READONLY(ui64, GlobalWriteId, 0);
+ YDB_READONLY_DEF(TVector<TWriteId>, GlobalWriteIds);
public:
using TPtr = std::shared_ptr<TWriteOperation>;
- TWriteOperation(const TWriteId writeId, const ui64 txId, const EOperationStatus& status, const TInstant createdAt, const ui64 globalWriteId = 0);
+ TWriteOperation(const TWriteId writeId, const ui64 txId, const EOperationStatus& status, const TInstant createdAt);
void Start(TColumnShard& owner, const ui64 tableId, const NEvWrite::IDataContainer::TPtr& data, const NActors::TActorId& source, const TActorContext& ctx);
- void OnWriteFinish(NTabletFlatExecutor::TTransactionContext& txc, const ui64 globalWriteId);
+ void OnWriteFinish(NTabletFlatExecutor::TTransactionContext& txc, const TVector<TWriteId>& globalWriteIds);
void Commit(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc, const NOlap::TSnapshot& snapshot) const;
void Abort(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) const;
void Out(IOutputStream& out) const {
out << "write_id=" << (ui64) WriteId << ";tx_id=" << TxId;
}
+
+ void ToProto(NKikimrTxColumnShard::TInternalOperationData& proto) const;
+ void FromProto(const NKikimrTxColumnShard::TInternalOperationData& proto);
};
class TOperationsManager {
diff --git a/ydb/core/tx/columnshard/operations/write_data.cpp b/ydb/core/tx/columnshard/operations/write_data.cpp
index 27f022590a6..b994b096847 100644
--- a/ydb/core/tx/columnshard/operations/write_data.cpp
+++ b/ydb/core/tx/columnshard/operations/write_data.cpp
@@ -13,7 +13,7 @@ bool TArrowData::Parse(const NKikimrDataEvents::TOperationData& proto, const IPa
columns.emplace_back(columnId);
}
BatchSchema = std::make_shared<NOlap::TFilteredSnapshotSchema>(IndexSchema, columns);
- return BatchSchema->GetColumnsCount() == columns.size() && !IncomingData.empty() && IncomingData.size() <= NColumnShard::TLimits::GetMaxBlobSize();
+ return BatchSchema->GetColumnsCount() == columns.size() && !IncomingData.empty();
}
std::shared_ptr<arrow::RecordBatch> TArrowData::GetArrowBatch() const {
diff --git a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
index 2dd54ea1f48..7d63330a47a 100644
--- a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
+++ b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
@@ -10,6 +10,9 @@
#include <ydb/core/tx/columnshard/engines/changes/cleanup.h>
#include <ydb/core/tx/columnshard/operations/write_data.h>
#include <library/cpp/actors/protos/unittests.pb.h>
+#include <ydb/core/formats/arrow/simple_builder/filler.h>
+#include <ydb/core/formats/arrow/simple_builder/array.h>
+#include <ydb/core/formats/arrow/simple_builder/batch.h>
namespace NKikimr {
@@ -1859,37 +1862,171 @@ Y_UNIT_TEST_SUITE(EvWrite) {
}
};
+ void PrepareTablet(TTestBasicRuntime& runtime, const ui64 tableId, const std::vector<std::pair<TString, TTypeInfo>>& schema) {
+ CreateTestBootstrapper(runtime, CreateTestTabletInfo(TTestTxConfig::TxTablet0, TTabletTypes::ColumnShard), &CreateColumnShard);
+
+ TDispatchOptions options;
+ options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvBoot));
+ runtime.DispatchEvents(options);
+
+ TestTableDescription tableDescription;
+ tableDescription.Schema = schema;
+ tableDescription.Pk = { schema[0] };
+ TActorId sender = runtime.AllocateEdgeActor();
+ SetupSchema(runtime, sender, tableId, tableDescription);
+ }
+
+ std::shared_ptr<arrow::RecordBatch> ReadAllAsBatch(TTestBasicRuntime& runtime, const ui64 tableId, const NOlap::TSnapshot& snapshot, const std::vector<std::pair<TString, TTypeInfo>>& schema) {
+ TActorId sender = runtime.AllocateEdgeActor();
+
+ ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender,
+ new TEvColumnShard::TEvRead(sender, TTestTxConfig::TxTablet1, snapshot.GetPlanStep(), snapshot.GetTxId(), tableId));
+
+ std::vector<std::shared_ptr<arrow::RecordBatch>> batches;
+ while(true) {
+ TAutoPtr<IEventHandle> handle;
+ auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle);
+ UNIT_ASSERT(event);
+ auto b = event->GetArrowBatch();
+ if (b) {
+ batches.push_back(b);
+ }
+ if (!event->HasMore()) {
+ break;
+ }
+ }
+ auto res = NArrow::CombineBatches(batches);
+ return res ? res : NArrow::MakeEmptyBatch(NArrow::MakeArrowSchema(schema));
+ }
+
Y_UNIT_TEST(WriteInTransaction) {
+ using namespace NArrow;
+
TTestBasicRuntime runtime;
TTester::Setup(runtime);
+ const ui64 tableId = 1;
+ const std::vector<std::pair<TString, TTypeInfo>> schema = {
+ {"key", TTypeInfo(NTypeIds::Uint64) },
+ {"field", TTypeInfo(NTypeIds::Utf8) }
+ };
+ PrepareTablet(runtime, tableId, schema);
+ const ui64 txId = 111;
+
+ NConstruction::IArrayBuilder::TPtr keyColumn = std::make_shared<NConstruction::TSimpleArrayConstructor<NConstruction::TIntSeqFiller<arrow::UInt64Type>>>("key");
+ NConstruction::IArrayBuilder::TPtr column = std::make_shared<NConstruction::TSimpleArrayConstructor<NConstruction::TStringPoolFiller>>(
+ "field", NConstruction::TStringPoolFiller(8, 100));
+
+ auto batch = NConstruction::TRecordBatchConstructor({ keyColumn, column }).BuildBatch(2048);
+ TString blobData = NArrow::SerializeBatchNoCompression(batch);
+ UNIT_ASSERT(blobData.size() < TLimits::GetMaxBlobSize());
+
+ auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(txId);
+ auto dataPtr = std::make_shared<TArrowData>(schema, TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData)));
+ evWrite->AddReplaceOp(tableId, dataPtr);
+
TActorId sender = runtime.AllocateEdgeActor();
- CreateTestBootstrapper(runtime, CreateTestTabletInfo(TTestTxConfig::TxTablet0, TTabletTypes::ColumnShard), &CreateColumnShard);
+ ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, evWrite.release());
- TDispatchOptions options;
- options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvBoot));
- runtime.DispatchEvents(options);
+ {
+ TAutoPtr<NActors::IEventHandle> handle;
+ auto event = runtime.GrabEdgeEvent<NKikimr::NEvents::TDataEvents::TEvWriteResult>(handle);
+ UNIT_ASSERT(event);
+ UNIT_ASSERT_VALUES_EQUAL((ui64)event->Record.GetStatus(), (ui64)NKikimrDataEvents::TEvWriteResult::PREPARED);
+
+ auto readResult = ReadAllAsBatch(runtime, tableId, NOlap::TSnapshot(10, txId), schema);
+ UNIT_ASSERT_VALUES_EQUAL(readResult->num_rows(), 0);
+
+ PlanWriteTx(runtime, sender, NOlap::TSnapshot(11, txId));
+ }
+
+ auto readResult = ReadAllAsBatch(runtime, tableId, NOlap::TSnapshot(11, txId), schema);
+ UNIT_ASSERT_VALUES_EQUAL(readResult->num_rows(), 2048);
+ }
+
+ Y_UNIT_TEST(AbotrInTransaction) {
+ using namespace NArrow;
+
+ TTestBasicRuntime runtime;
+ TTester::Setup(runtime);
const ui64 tableId = 1;
- const TestTableDescription table;
- SetupSchema(runtime, sender, tableId, table);
+ const std::vector<std::pair<TString, TTypeInfo>> schema = {
+ {"key", TTypeInfo(NTypeIds::Uint64) },
+ {"field", TTypeInfo(NTypeIds::Utf8) }
+ };
+ PrepareTablet(runtime, tableId, schema);
+ const ui64 txId = 111;
+
+ NConstruction::IArrayBuilder::TPtr keyColumn = std::make_shared<NConstruction::TSimpleArrayConstructor<NConstruction::TIntSeqFiller<arrow::UInt64Type>>>("key");
+ NConstruction::IArrayBuilder::TPtr column = std::make_shared<NConstruction::TSimpleArrayConstructor<NConstruction::TStringPoolFiller>>(
+ "field", NConstruction::TStringPoolFiller(8, 100));
+
+ auto batch = NConstruction::TRecordBatchConstructor({ keyColumn, column }).BuildBatch(2048);
+ TString blobData = NArrow::SerializeBatchNoCompression(batch);
+ UNIT_ASSERT(blobData.size() < TLimits::GetMaxBlobSize());
+
+ auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(txId);
+ auto dataPtr = std::make_shared<TArrowData>(schema, TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData)));
+ evWrite->AddReplaceOp(tableId, dataPtr);
+
+ TActorId sender = runtime.AllocateEdgeActor();
+ ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, evWrite.release());
+
+ ui64 outdatedStep = 11;
+ {
+ TAutoPtr<NActors::IEventHandle> handle;
+ auto event = runtime.GrabEdgeEvent<NKikimr::NEvents::TDataEvents::TEvWriteResult>(handle);
+ UNIT_ASSERT(event);
+ UNIT_ASSERT_VALUES_EQUAL((ui64)event->Record.GetStatus(), (ui64)NKikimrDataEvents::TEvWriteResult::PREPARED);
+
+ outdatedStep = event->Record.GetMaxStep() + 1;
+ PlanWriteTx(runtime, sender, NOlap::TSnapshot(outdatedStep, txId + 1), false);
+ }
+
+ auto readResult = ReadAllAsBatch(runtime, tableId, NOlap::TSnapshot(outdatedStep, txId), schema);
+ UNIT_ASSERT_VALUES_EQUAL(readResult->num_rows(), 0);
+ }
+
+ Y_UNIT_TEST(WriteWithSplit) {
+ using namespace NArrow;
+
+ TTestBasicRuntime runtime;
+ TTester::Setup(runtime);
+ const ui64 tableId = 1;
+ const std::vector<std::pair<TString, TTypeInfo>> schema = {
+ {"key", TTypeInfo(NTypeIds::Uint64) },
+ {"field", TTypeInfo(NTypeIds::Utf8) }
+ };
+ PrepareTablet(runtime, tableId, schema);
const ui64 txId = 111;
- const std::vector<std::pair<TString, TTypeInfo>>& ydbSchema = table.Schema;
- TString blobData = MakeTestBlob({0, 100}, ydbSchema);
+ NConstruction::IArrayBuilder::TPtr keyColumn = std::make_shared<NConstruction::TSimpleArrayConstructor<NConstruction::TIntSeqFiller<arrow::UInt64Type>>>("key");
+ NConstruction::IArrayBuilder::TPtr column = std::make_shared<NConstruction::TSimpleArrayConstructor<NConstruction::TStringPoolFiller>>(
+ "field", NConstruction::TStringPoolFiller(8, TLimits::GetMaxBlobSize() / 1024));
+
+ auto batch = NConstruction::TRecordBatchConstructor({ keyColumn, column }).BuildBatch(2048);
+ TString blobData = NArrow::SerializeBatchNoCompression(batch);
+ UNIT_ASSERT(blobData.size() > TLimits::GetMaxBlobSize());
auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(txId);
- auto dataPtr = std::make_shared<TArrowData>(ydbSchema, TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData)));
+ auto dataPtr = std::make_shared<TArrowData>(schema, TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData)));
evWrite->AddReplaceOp(tableId, dataPtr);
+ TActorId sender = runtime.AllocateEdgeActor();
ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, evWrite.release());
- TAutoPtr<NActors::IEventHandle> handle;
- auto event = runtime.GrabEdgeEvent<NKikimr::NEvents::TDataEvents::TEvWriteResult>(handle);
- UNIT_ASSERT(event);
- UNIT_ASSERT_EQUAL(event->Record.GetStatus(), NKikimrDataEvents::TEvWriteResult::PREPARED);
- PlanWriteTx(runtime, sender, NOlap::TSnapshot(11, txId));
+ {
+ TAutoPtr<NActors::IEventHandle> handle;
+ auto event = runtime.GrabEdgeEvent<NKikimr::NEvents::TDataEvents::TEvWriteResult>(handle);
+ UNIT_ASSERT(event);
+ UNIT_ASSERT_VALUES_EQUAL((ui64)event->Record.GetStatus(), (ui64)NKikimrDataEvents::TEvWriteResult::PREPARED);
+ PlanWriteTx(runtime, sender, NOlap::TSnapshot(11, txId));
+ }
+
+ auto readResult = ReadAllAsBatch(runtime, tableId, NOlap::TSnapshot(11, txId), schema);
+ UNIT_ASSERT_VALUES_EQUAL(readResult->num_rows(), 2048);
}
}
diff --git a/ydb/core/tx/columnshard/write_actor.cpp b/ydb/core/tx/columnshard/write_actor.cpp
index 194ece3ea31..9b020e09276 100644
--- a/ydb/core/tx/columnshard/write_actor.cpp
+++ b/ydb/core/tx/columnshard/write_actor.cpp
@@ -94,6 +94,9 @@ public:
}
auto blobsConstructor = WriteController->GetBlobConstructor();
+ if (!blobsConstructor) {
+ return SendResultAndDie(ctx, NKikimrProto::ERROR);
+ }
auto status = NOlap::IBlobConstructor::EStatus::Finished;
while (true) {
status = blobsConstructor->BuildNext();
diff --git a/ydb/core/tx/ev_write/write_data.h b/ydb/core/tx/ev_write/write_data.h
index d15a59af45d..54f7ee05259 100644
--- a/ydb/core/tx/ev_write/write_data.h
+++ b/ydb/core/tx/ev_write/write_data.h
@@ -21,9 +21,8 @@ class TWriteMeta {
YDB_ACCESSOR_DEF(NActors::TActorId, Source);
// Long Tx logic
- YDB_OPT(NLongTxService::TLongTxId, LongTxId)
+ YDB_OPT(NLongTxService::TLongTxId, LongTxId);
YDB_ACCESSOR(ui64, WritePartId, 0);
- YDB_READONLY(ui64, MetaShard, 0);
YDB_ACCESSOR_DEF(TString, DedupId);
public: