diff options
author | nsofya <nsofya@yandex-team.com> | 2023-07-27 16:23:24 +0300 |
---|---|---|
committer | nsofya <nsofya@yandex-team.com> | 2023-07-27 16:23:24 +0300 |
commit | 9417fcf6262cf70a06d9739e54a16390e6edea61 (patch) | |
tree | 338b1e5347efa85b924b5ef6e28a255c863809a1 | |
parent | 2519745782969ed01a5c6dda3fcae5a151bdac3c (diff) | |
download | ydb-9417fcf6262cf70a06d9739e54a16390e6edea61.tar.gz |
EvWrite implementation for columnshard
Это драфт. Поэтому тесты здесь базовые исключительно на happy path. Как финализируем протокол, я добавлю больше.
Поведение описала примерно в ev_write.proto (видела, что везде пишете на английском, но так как это драфт, не тратила время не перевод)
Хочется получить обратную связь, что я могла упустить, или где хотелось другого поведения. Возможно, формат данных хочется другой или можно что-то переиспользовать. Принимаю также комментарии по тому, где разместить код)
Места где я прямо ожидаю комментарии выделю прямо в коде
44 files changed, 987 insertions, 88 deletions
diff --git a/ydb/core/base/events.h b/ydb/core/base/events.h index 59686ca51a8..f81a444a7f7 100644 --- a/ydb/core/base/events.h +++ b/ydb/core/base/events.h @@ -162,7 +162,8 @@ struct TKikimrEvents : TEvents { ES_EXT_INDEX, ES_CONVEYOR, ES_KQP_SCAN_EXCHANGE, - ES_IC_NODE_CACHE + ES_IC_NODE_CACHE, + ES_DATA_OPERATIONS }; }; diff --git a/ydb/core/protos/CMakeLists.darwin-x86_64.txt b/ydb/core/protos/CMakeLists.darwin-x86_64.txt index df648322943..b34e66bd7ac 100644 --- a/ydb/core/protos/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/protos/CMakeLists.darwin-x86_64.txt @@ -1490,6 +1490,18 @@ get_built_tool_path( cpp_styleguide ) get_built_tool_path( + TOOL_protoc_bin + TOOL_protoc_dependency + contrib/tools/protoc/bin + protoc +) +get_built_tool_path( + TOOL_cpp_styleguide_bin + TOOL_cpp_styleguide_dependency + contrib/tools/protoc/plugins/cpp_styleguide + cpp_styleguide +) +get_built_tool_path( TOOL_enum_parser_bin TOOL_enum_parser_dependency tools/enum_parser/enum_parser @@ -1575,6 +1587,7 @@ target_proto_messages(ydb-core-protos PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/protos/database_basic_sausage_metainfo.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/datashard_load.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/drivemodel.proto + ${CMAKE_SOURCE_DIR}/ydb/core/protos/ev_write.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/export.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/external_sources.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/flat_tx_scheme.proto diff --git a/ydb/core/protos/CMakeLists.linux-aarch64.txt b/ydb/core/protos/CMakeLists.linux-aarch64.txt index c7f97d48857..27fe911721d 100644 --- a/ydb/core/protos/CMakeLists.linux-aarch64.txt +++ b/ydb/core/protos/CMakeLists.linux-aarch64.txt @@ -1490,6 +1490,18 @@ get_built_tool_path( cpp_styleguide ) get_built_tool_path( + TOOL_protoc_bin + TOOL_protoc_dependency + contrib/tools/protoc/bin + protoc +) +get_built_tool_path( + TOOL_cpp_styleguide_bin + TOOL_cpp_styleguide_dependency + contrib/tools/protoc/plugins/cpp_styleguide + cpp_styleguide +) +get_built_tool_path( TOOL_enum_parser_bin TOOL_enum_parser_dependency tools/enum_parser/enum_parser @@ -1576,6 +1588,7 @@ target_proto_messages(ydb-core-protos PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/protos/database_basic_sausage_metainfo.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/datashard_load.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/drivemodel.proto + ${CMAKE_SOURCE_DIR}/ydb/core/protos/ev_write.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/export.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/external_sources.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/flat_tx_scheme.proto diff --git a/ydb/core/protos/CMakeLists.linux-x86_64.txt b/ydb/core/protos/CMakeLists.linux-x86_64.txt index c7f97d48857..27fe911721d 100644 --- a/ydb/core/protos/CMakeLists.linux-x86_64.txt +++ b/ydb/core/protos/CMakeLists.linux-x86_64.txt @@ -1490,6 +1490,18 @@ get_built_tool_path( cpp_styleguide ) get_built_tool_path( + TOOL_protoc_bin + TOOL_protoc_dependency + contrib/tools/protoc/bin + protoc +) +get_built_tool_path( + TOOL_cpp_styleguide_bin + TOOL_cpp_styleguide_dependency + contrib/tools/protoc/plugins/cpp_styleguide + cpp_styleguide +) +get_built_tool_path( TOOL_enum_parser_bin TOOL_enum_parser_dependency tools/enum_parser/enum_parser @@ -1576,6 +1588,7 @@ target_proto_messages(ydb-core-protos PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/protos/database_basic_sausage_metainfo.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/datashard_load.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/drivemodel.proto + ${CMAKE_SOURCE_DIR}/ydb/core/protos/ev_write.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/export.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/external_sources.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/flat_tx_scheme.proto diff --git a/ydb/core/protos/CMakeLists.windows-x86_64.txt b/ydb/core/protos/CMakeLists.windows-x86_64.txt index df648322943..b34e66bd7ac 100644 --- a/ydb/core/protos/CMakeLists.windows-x86_64.txt +++ b/ydb/core/protos/CMakeLists.windows-x86_64.txt @@ -1490,6 +1490,18 @@ get_built_tool_path( cpp_styleguide ) get_built_tool_path( + TOOL_protoc_bin + TOOL_protoc_dependency + contrib/tools/protoc/bin + protoc +) +get_built_tool_path( + TOOL_cpp_styleguide_bin + TOOL_cpp_styleguide_dependency + contrib/tools/protoc/plugins/cpp_styleguide + cpp_styleguide +) +get_built_tool_path( TOOL_enum_parser_bin TOOL_enum_parser_dependency tools/enum_parser/enum_parser @@ -1575,6 +1587,7 @@ target_proto_messages(ydb-core-protos PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/protos/database_basic_sausage_metainfo.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/datashard_load.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/drivemodel.proto + ${CMAKE_SOURCE_DIR}/ydb/core/protos/ev_write.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/export.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/external_sources.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/flat_tx_scheme.proto diff --git a/ydb/core/protos/ev_write.proto b/ydb/core/protos/ev_write.proto new file mode 100644 index 00000000000..8953f5f2f25 --- /dev/null +++ b/ydb/core/protos/ev_write.proto @@ -0,0 +1,50 @@ +import "ydb/core/scheme/protos/type_info.proto"; +import "ydb/core/protos/tx_datashard.proto"; +import "ydb/public/api/protos/ydb_issue_message.proto"; + +package NKikimrDataEvents; +option java_package = "ru.yandex.kikimr.proto"; + + +message TOperationData { + message TArrowData { + optional uint64 PayloadIndex = 1; + } + + repeated uint32 ColumnIds = 1 [packed = true]; + oneof Data { + TArrowData ArrowData = 900; + } +} + +message TEvWrite { + optional NKikimrTxDataShard.TTableId TableId = 1; + optional uint64 TxId = 2; + + oneof Operation { + TOperationData Replace = 900; + } +} + +message TEvWriteResult { + enum EOperationStatus { + UNSPECIFIED = 0; + PREPARED = 1; + COMPLETED = 2; + ABORTED = 3; + ERROR = 4; + OVERLOADED = 6; + BAD_REQUEST = 7; + } + + optional EOperationStatus Status = 1; + optional Ydb.Issue.IssueMessage IssueMessage = 2; + + optional uint64 Origin = 4; + optional uint64 TxId = 5; + + // For Tx planner + optional uint64 MinStep = 6; + optional uint64 MaxStep = 7; + repeated fixed64 DomainCoordinators = 8; +} diff --git a/ydb/core/protos/tx_columnshard.proto b/ydb/core/protos/tx_columnshard.proto index 37abe8a8497..5c4a2907a5a 100644 --- a/ydb/core/protos/tx_columnshard.proto +++ b/ydb/core/protos/tx_columnshard.proto @@ -126,6 +126,7 @@ enum ETransactionKind { TX_KIND_COMMIT = 2; TX_KIND_TTL = 3; // Immediate (not planned) TX_KIND_DATA = 4; + TX_KIND_COMMIT_WRITE = 5; } enum ETransactionFlag { diff --git a/ydb/core/protos/ya.make b/ydb/core/protos/ya.make index b5982bcc5ee..d68c5f5b7ac 100644 --- a/ydb/core/protos/ya.make +++ b/ydb/core/protos/ya.make @@ -55,6 +55,7 @@ SRCS( database_basic_sausage_metainfo.proto datashard_load.proto drivemodel.proto + ev_write.proto export.proto external_sources.proto flat_tx_scheme.proto diff --git a/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt index 07c0eaea85c..9d3ca29f9bd 100644 --- a/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt @@ -10,6 +10,7 @@ add_subdirectory(common) add_subdirectory(counters) add_subdirectory(engines) add_subdirectory(hooks) +add_subdirectory(operations) add_subdirectory(resources) add_subdirectory(splitter) add_subdirectory(ut_rw) @@ -50,6 +51,7 @@ target_link_libraries(core-tx-columnshard PUBLIC tx-columnshard-counters tx-columnshard-common tx-columnshard-splitter + tx-columnshard-operations core-tx-tiering tx-conveyor-usage tx-long_tx_service-public diff --git a/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt index 14cdf344f20..41ca866dcff 100644 --- a/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt @@ -10,6 +10,7 @@ add_subdirectory(common) add_subdirectory(counters) add_subdirectory(engines) add_subdirectory(hooks) +add_subdirectory(operations) add_subdirectory(resources) add_subdirectory(splitter) add_subdirectory(ut_rw) @@ -51,6 +52,7 @@ target_link_libraries(core-tx-columnshard PUBLIC tx-columnshard-counters tx-columnshard-common tx-columnshard-splitter + tx-columnshard-operations core-tx-tiering tx-conveyor-usage tx-long_tx_service-public diff --git a/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt index 14cdf344f20..41ca866dcff 100644 --- a/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt @@ -10,6 +10,7 @@ add_subdirectory(common) add_subdirectory(counters) add_subdirectory(engines) add_subdirectory(hooks) +add_subdirectory(operations) add_subdirectory(resources) add_subdirectory(splitter) add_subdirectory(ut_rw) @@ -51,6 +52,7 @@ target_link_libraries(core-tx-columnshard PUBLIC tx-columnshard-counters tx-columnshard-common tx-columnshard-splitter + tx-columnshard-operations core-tx-tiering tx-conveyor-usage tx-long_tx_service-public diff --git a/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt index 07c0eaea85c..9d3ca29f9bd 100644 --- a/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt @@ -10,6 +10,7 @@ add_subdirectory(common) add_subdirectory(counters) add_subdirectory(engines) add_subdirectory(hooks) +add_subdirectory(operations) add_subdirectory(resources) add_subdirectory(splitter) add_subdirectory(ut_rw) @@ -50,6 +51,7 @@ target_link_libraries(core-tx-columnshard PUBLIC tx-columnshard-counters tx-columnshard-common tx-columnshard-splitter + tx-columnshard-operations core-tx-tiering tx-conveyor-usage tx-long_tx_service-public diff --git a/ydb/core/tx/columnshard/columnshard__init.cpp b/ydb/core/tx/columnshard/columnshard__init.cpp index 505720e530a..d50d97d00eb 100644 --- a/ydb/core/tx/columnshard/columnshard__init.cpp +++ b/ydb/core/tx/columnshard/columnshard__init.cpp @@ -5,6 +5,7 @@ #include "blob_manager_db.h" #include <ydb/core/tablet/tablet_exception.h> +#include <ydb/core/tx/columnshard/operations/write.h> namespace NKikimr::NColumnShard { @@ -86,6 +87,10 @@ bool TTxInit::ReadEverything(TTransactionContext& txc, const TActorContext& ctx) return false; } + if (!Self->OperationsManager.Init(txc)) { + return false; + } + Self->TablesManager.InitFromDB(db, Self->TabletID()); Self->SetCounter(COUNTER_TABLES, Self->TablesManager.GetTables().size()); Self->SetCounter(COUNTER_TABLE_PRESETS, Self->TablesManager.GetSchemaPresets().size()); diff --git a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp index 175b89b6b0f..ece932faebe 100644 --- a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp +++ b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp @@ -1,5 +1,7 @@ #include "columnshard_impl.h" #include "columnshard_schema.h" + +#include <ydb/core/tx/columnshard/operations/write.h> #include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h> namespace NKikimr::NColumnShard { @@ -27,11 +29,16 @@ private: , Status(status) {} - THolder<IEventBase> MakeEvent(ui64 tabletId) const { - auto result = MakeHolder<TEvColumnShard::TEvProposeTransactionResult>( + std::unique_ptr<IEventBase> MakeEvent(ui64 tabletId) const { + if (TxInfo.TxKind == NKikimrTxColumnShard::TX_KIND_COMMIT_WRITE) { + auto result = NEvents::TDataEvents::TEvWriteResult::BuildCommited(TxInfo.TxId); + return result; + } else { + auto result = std::make_unique<TEvColumnShard::TEvProposeTransactionResult>( tabletId, TxInfo.TxKind, TxInfo.TxId, Status); - result->Record.SetStep(TxInfo.PlanStep); - return result; + result->Record.SetStep(TxInfo.PlanStep); + return result; + } } }; @@ -121,6 +128,12 @@ public: Trigger = ETriggerActivities::POST_INSERT; break; } + case NKikimrTxColumnShard::TX_KIND_COMMIT_WRITE: { + NOlap::TSnapshot snapshot(step, txId); + Y_VERIFY(Self->OperationsManager.CommitTransaction(*Self, txId, txc, snapshot)); + Trigger = ETriggerActivities::POST_INSERT; + break; + } default: { Y_FAIL("Unexpected TxKind"); } @@ -151,7 +164,7 @@ public: Self->ProgressTxController.CompleteRunningTx(TTxController::TPlanQueueItem(res.TxInfo.PlanStep, res.TxInfo.TxId)); auto event = res.MakeEvent(Self->TabletID()); - ctx.Send(res.TxInfo.Source, event.Release(), 0, res.TxInfo.Cookie); + ctx.Send(res.TxInfo.Source, event.release(), 0, res.TxInfo.Cookie); } Self->ScheduleNextGC(ctx); diff --git a/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp b/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp index 615c5ea9c16..e713d87e0a1 100644 --- a/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp +++ b/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp @@ -195,7 +195,7 @@ bool TTxProposeTransaction::Execute(TTransactionContext& txc, const TActorContex } } - const auto& txInfo = Self->ProgressTxController.RegisterTxWithDeadline(txId, txKind, txBody, Ev->Get()->GetSource(), Ev->Cookie, txc); + const auto& txInfo = Self->ProgressTxController.RegisterTxWithDeadline(txId, txKind, txBody, Ev->Get()->GetSource(), Ev->Cookie, txc); minStep = txInfo.MinStep; maxStep = txInfo.MaxStep; diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp index c0156436e9e..c61206b037d 100644 --- a/ydb/core/tx/columnshard/columnshard__write.cpp +++ b/ydb/core/tx/columnshard/columnshard__write.cpp @@ -4,6 +4,8 @@ #include "blob_cache.h" #include <ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h> +#include <ydb/core/tx/columnshard/operations/write.h> +#include <ydb/core/tx/columnshard/operations/write_data.h> namespace NKikimr::NColumnShard { @@ -51,14 +53,21 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) { NKikimrTxColumnShard::TLogicalMetadata meta; Y_VERIFY(meta.ParseFromString(blobData.GetLogicalMeta())); - ui32 status = NKikimrTxColumnShard::EResultStatus::SUCCESS; const auto& logoBlobId = blobData.GetBlobId(); Y_VERIFY(logoBlobId.IsValid()); Y_VERIFY(Self->TablesManager.IsReadyForWrite(writeMeta.GetTableId())); + + TWriteOperation::TPtr operation; if (writeMeta.HasLongTxId()) { Y_VERIFY(writeMeta.GetMetaShard() == 0); writeId = (ui64)Self->GetLongTxWrite(db, writeMeta.GetLongTxIdUnsafe(), writeMeta.GetWritePartId()); + } else { + operation = Self->OperationsManager.GetOperation((TWriteId)writeMeta.GetWriteId()); + if (operation) { + Y_VERIFY(operation->GetStatus() == EOperationStatus::Started); + writeId = (ui64)Self->BuildNextWriteId(txc); + } } ui64 writeUnixTime = meta.GetDirtyWriteTimeSeconds(); @@ -102,18 +111,23 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) { Self->IncCounter(COUNTER_WRITE_SUCCESS); Self->BlobManager->SaveBlobBatch((std::move(PutBlobResult->Get()->GetPutResultPtr()->ReleaseBlobBatch())), blobManagerDb); + + if (operation) { + operation->OnWriteFinish(txc, writeId); + auto txInfo = Self->ProgressTxController.RegisterTxWithDeadline(operation->GetTxId(), NKikimrTxColumnShard::TX_KIND_COMMIT_WRITE, "", writeMeta.GetSource(), 0, txc); + Y_UNUSED(txInfo); + } } else { LOG_S_DEBUG(TxPrefix() << "duplicate writeId " << writeId << TxSuffix()); + Self->IncCounter(COUNTER_WRITE_DUPLICATE); } - // Return EResultStatus::SUCCESS for dups - Self->IncCounter(COUNTER_WRITE_DUPLICATE); - - if (status != NKikimrTxColumnShard::EResultStatus::SUCCESS) { - Self->IncCounter(COUNTER_WRITE_FAIL); + if (operation) { + NEvents::TDataEvents::TCoordinatorInfo tInfo = Self->ProgressTxController.GetCoordinatorInfo(operation->GetTxId()); + Result = NEvents::TDataEvents::TEvWriteResult::BuildPrepared(operation->GetTxId(), tInfo); + } else { + Result = std::make_unique<TEvColumnShard::TEvWriteResult>(Self->TabletID(), writeMeta, writeId, NKikimrTxColumnShard::EResultStatus::SUCCESS); } - - Result = std::make_unique<TEvColumnShard::TEvWriteResult>(Self->TabletID(), writeMeta, writeId, status); return true; } @@ -123,7 +137,7 @@ void TTxWrite::Complete(const TActorContext& ctx) { ctx.Send(PutBlobResult->Get()->GetWriteMeta().GetSource(), Result.release()); } -void TColumnShard::OverloadWriteFail(const EOverloadStatus& overloadReason, const NEvWrite::TWriteData& writeData, const TActorContext& ctx) { +void TColumnShard::OverloadWriteFail(const EOverloadStatus& overloadReason, const NEvWrite::TWriteData& writeData, std::unique_ptr<NActors::IEventBase>&& event, const TActorContext& ctx) { IncCounter(COUNTER_WRITE_FAIL); switch (overloadReason) { case EOverloadStatus::Disk: @@ -149,8 +163,7 @@ void TColumnShard::OverloadWriteFail(const EOverloadStatus& overloadReason, cons << " overload reason: [" << overloadReason << "]" << " at tablet " << TabletID()); - auto result = std::make_unique<TEvColumnShard::TEvWriteResult>(TabletID(), writeData.GetWriteMeta(), NKikimrTxColumnShard::EResultStatus::OVERLOADED); - ctx.Send(writeData.GetWriteMeta().GetSource(), result.release()); + ctx.Send(writeData.GetWriteMeta().GetSource(), event.release()); } TColumnShard::EOverloadStatus TColumnShard::CheckOverloaded(const ui64 tableId) const { @@ -191,8 +204,14 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteBlobsResult::TPtr& ev, const TActo errCode = NKikimrTxColumnShard::EResultStatus::STORAGE_ERROR; } - auto result = std::make_unique<TEvColumnShard::TEvWriteResult>(TabletID(), writeMeta, errCode); - ctx.Send(writeMeta.GetSource(), result.release()); + auto operation = OperationsManager.GetOperation((TWriteId)writeMeta.GetWriteId()); + if (operation) { + auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(operation->GetTxId(), NKikimrDataEvents::TEvWriteResult::ERROR, "put data fails"); + ctx.Send(writeMeta.GetSource(), result.release()); + } else { + auto result = std::make_unique<TEvColumnShard::TEvWriteResult>(TabletID(), writeMeta, errCode); + ctx.Send(writeMeta.GetSource(), result.release()); + } return; } @@ -223,27 +242,32 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex writeMeta.SetWritePartId(record.GetWritePartId()); } - auto arrowData = std::make_shared<NEvWrite::TArrowData>(); - if (!arrowData->ParseFromProto(record)) { - LOG_S_ERROR("Write (fail) " << record.GetData().size() << " bytes into pathId " << writeMeta.GetTableId() + if (!TablesManager.IsReadyForWrite(tableId)) { + LOG_S_NOTICE("Write (fail) into pathId:" << writeMeta.GetTableId() << (TablesManager.HasPrimaryIndex()? "": " no index") << " at tablet " << TabletID()); IncCounter(COUNTER_WRITE_FAIL); + auto result = std::make_unique<TEvColumnShard::TEvWriteResult>(TabletID(), writeMeta, NKikimrTxColumnShard::EResultStatus::ERROR); ctx.Send(ev->Get()->GetSource(), result.release()); return; } - NEvWrite::TWriteData writeData(writeMeta, arrowData); - auto overloadStatus = CheckOverloaded(tableId); - if (!TablesManager.IsReadyForWrite(tableId)) { - LOG_S_NOTICE("Write (fail) into pathId:" << writeMeta.GetTableId() << (TablesManager.HasPrimaryIndex()? "": " no index") + const auto& snapshotSchema = TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetLastSchema(); + auto arrowData = std::make_shared<TProtoArrowData>(snapshotSchema); + if (!arrowData->ParseFromProto(record)) { + LOG_S_ERROR("Write (fail) " << record.GetData().size() << " bytes into pathId " << writeMeta.GetTableId() << " at tablet " << TabletID()); IncCounter(COUNTER_WRITE_FAIL); - auto result = std::make_unique<TEvColumnShard::TEvWriteResult>(TabletID(), writeMeta, NKikimrTxColumnShard::EResultStatus::ERROR); ctx.Send(ev->Get()->GetSource(), result.release()); - } else if (overloadStatus != EOverloadStatus::None) { - OverloadWriteFail(overloadStatus, writeData, ctx); + return; + } + + NEvWrite::TWriteData writeData(writeMeta, arrowData); + auto overloadStatus = CheckOverloaded(tableId); + if (overloadStatus != EOverloadStatus::None) { + std::unique_ptr<NActors::IEventBase> result = std::make_unique<TEvColumnShard::TEvWriteResult>(TabletID(), writeData.GetWriteMeta(), NKikimrTxColumnShard::EResultStatus::OVERLOADED); + OverloadWriteFail(overloadStatus, writeData, std::move(result), ctx); } else { if (writeMeta.HasLongTxId()) { // TODO: multiple blobs in one longTx ({longTxId, dedupId} -> writeId) @@ -261,17 +285,50 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex } } - auto wg = WritesMonitor.RegisterWrite(writeData.GetSize()); + WritesMonitor.RegisterWrite(writeData.GetSize()); LOG_S_DEBUG("Write (blob) " << writeData.GetSize() << " bytes into pathId " << writeMeta.GetTableId() << (writeMeta.GetWriteId()? (" writeId " + ToString(writeMeta.GetWriteId())).c_str() : " ") << WritesMonitor.DebugString() << " at tablet " << TabletID()); - const auto& snapshotSchema = TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetLastSchema(); auto writeController = std::make_shared<NOlap::TIndexedWriteController>(ctx.SelfID, writeData, snapshotSchema); ctx.Register(CreateWriteActor(TabletID(), writeController, BlobManager->StartBlobBatch(), TInstant::Max(), Settings.MaxSmallBlobSize)); } } +void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActorContext& ctx) { + const auto& record = ev->Get()->Record; + const auto& tableId = record.GetTableId().GetTableId(); + const ui64 txId = ev->Get()->GetTxId(); + const auto source = ev->Sender; + + if (!TablesManager.IsReadyForWrite(tableId)) { + IncCounter(COUNTER_WRITE_FAIL); + auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(txId, NKikimrDataEvents::TEvWriteResult::ERROR, "table not writable"); + ctx.Send(source, result.release()); + return; + } + + auto arrowData = std::make_shared<TArrowData>(TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetLastSchema()); + if (!arrowData->Parse(record.GetReplace(), TPayloadHelper<NEvents::TDataEvents::TEvWrite>(*ev->Get()))) { + IncCounter(COUNTER_WRITE_FAIL); + auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(txId, NKikimrDataEvents::TEvWriteResult::BAD_REQUEST, "parsing data error"); + ctx.Send(source, result.release()); + } + + auto overloadStatus = CheckOverloaded(tableId); + if (overloadStatus != EOverloadStatus::None) { + NEvWrite::TWriteData writeData(NEvWrite::TWriteMeta(0, tableId, source), arrowData); + std::unique_ptr<NActors::IEventBase> result = NEvents::TDataEvents::TEvWriteResult::BuildError(txId, NKikimrDataEvents::TEvWriteResult::OVERLOADED, "overload data error"); + OverloadWriteFail(overloadStatus, writeData, std::move(result), ctx); + return; + } + + auto wg = WritesMonitor.RegisterWrite(arrowData->GetData().size()); + auto operation = OperationsManager.RegisterOperation(txId); + Y_VERIFY(operation); + operation->Start(*this, tableId, arrowData, source, ctx); +} + } diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index e33fd497c91..ff0be97c5eb 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -294,16 +294,25 @@ TWriteId TColumnShard::GetLongTxWrite(NIceDb::TNiceDb& db, const NLongTxService: it = LongTxWritesByUniqueId.emplace(longTxId.UniqueId, TPartsForLTXShard()).first; } - TWriteId writeId = ++LastWriteId; + TWriteId writeId = BuildNextWriteId(db); auto& lw = LongTxWrites[writeId]; lw.WriteId = (ui64)writeId; lw.WritePartId = partId; lw.LongTxId = longTxId; it->second[partId] = &lw; - Schema::SaveSpecialValue(db, Schema::EValueIds::LastWriteId, (ui64)writeId); Schema::SaveLongTxWrite(db, writeId, partId, longTxId); + return writeId; +} + +TWriteId TColumnShard::BuildNextWriteId(NTabletFlatExecutor::TTransactionContext& txc) { + NIceDb::TNiceDb db(txc.DB); + return BuildNextWriteId(db); +} +TWriteId TColumnShard::BuildNextWriteId(NIceDb::TNiceDb& db) { + TWriteId writeId = ++LastWriteId; + Schema::SaveSpecialValue(db, Schema::EValueIds::LastWriteId, (ui64)writeId); return writeId; } @@ -362,6 +371,12 @@ bool TColumnShard::AbortTx(const ui64 txId, const NKikimrTxColumnShard::ETransac } break; } + case NKikimrTxColumnShard::TX_KIND_COMMIT_WRITE: { + if (!OperationsManager.AbortTransaction(*this, txId, txc)) { + return false; + } + break; + } default: { Y_FAIL("Unsupported TxKind"); } diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h index b5ffb7b7d8b..9c2b425dcf9 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.h +++ b/ydb/core/tx/columnshard/columnshard_impl.h @@ -15,6 +15,7 @@ #include <ydb/core/tablet/tablet_pipe_client_cache.h> #include <ydb/core/tablet_flat/flat_cxx_database.h> #include <ydb/core/tablet_flat/tablet_flat_executed.h> +#include <ydb/core/tx/ev_write/events.h> #include <ydb/core/tx/tiering/common.h> #include <ydb/core/tx/tiering/manager.h> #include <ydb/core/tx/time_cast/time_cast.h> @@ -33,6 +34,8 @@ class TInsertColumnEngineChanges; namespace NKikimr::NColumnShard { +class TOperationsManager; + extern bool gAllowLogBatchingDefaultValue; IActor* CreateIndexingActor(ui64 tabletId, const TActorId& parent, const TIndexationCounters& counters); @@ -146,6 +149,9 @@ class TColumnShard friend class TTxController; + friend class TOperationsManager; + friend class TWriteOperation; + class TTxProgressTx; class TTxProposeCancel; @@ -174,6 +180,7 @@ class TColumnShard void Handle(TEvPrivate::TEvForget::TPtr& ev, const TActorContext& ctx); void Handle(TEvBlobStorage::TEvCollectGarbageResult::TPtr& ev, const TActorContext& ctx); void Handle(NMetadata::NProvider::TEvRefreshSubscriberData::TPtr& ev); + void Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActorContext& ctx); ITransaction* CreateTxInitSchema(); ITransaction* CreateTxRunGc(); @@ -235,7 +242,7 @@ public: }; private: - void OverloadWriteFail(const EOverloadStatus& overloadReason, const NEvWrite::TWriteData& writeData, const TActorContext& ctx); + void OverloadWriteFail(const EOverloadStatus& overloadReason, const NEvWrite::TWriteData& writeData, std::unique_ptr<NActors::IEventBase>&& event, const TActorContext& ctx); EOverloadStatus CheckOverloaded(const ui64 tableId) const; protected: @@ -285,6 +292,7 @@ protected: HFunc(TEvPrivate::TEvScanStats, Handle); HFunc(TEvPrivate::TEvReadFinished, Handle); HFunc(TEvPrivate::TEvPeriodicWakeup, Handle); + HFunc(NEvents::TDataEvents::TEvWrite, Handle); default: if (!HandleDefaultEvents(ev, SelfId())) { LOG_S_WARN("TColumnShard.StateWork at " << TabletID() @@ -297,6 +305,7 @@ protected: private: TTxController ProgressTxController; + TOperationsManager OperationsManager; struct TAlterMeta { NKikimrTxColumnShard::TSchemaTxBody Body; @@ -543,6 +552,9 @@ private: bool LoadTx(const ui64 txId, const NKikimrTxColumnShard::ETransactionKind& txKind, const TString& txBody); void TryAbortWrites(NIceDb::TNiceDb& db, NOlap::TDbWrapper& dbTable, THashSet<TWriteId>&& writesToAbort); + TWriteId BuildNextWriteId(NTabletFlatExecutor::TTransactionContext& txc); + TWriteId BuildNextWriteId(NIceDb::TNiceDb& db); + void EnqueueProgressTx(const TActorContext& ctx); void EnqueueBackgroundActivities(bool periodic = false, TBackgroundActivity activity = TBackgroundActivity::All()); void CleanForgottenBlobs(const TActorContext& ctx, const THashSet<TUnifiedBlobId>& allowList = {}); diff --git a/ydb/core/tx/columnshard/columnshard_schema.h b/ydb/core/tx/columnshard/columnshard_schema.h index 8ba55007c15..d119e4a3bf6 100644 --- a/ydb/core/tx/columnshard/columnshard_schema.h +++ b/ydb/core/tx/columnshard/columnshard_schema.h @@ -8,6 +8,7 @@ #include <ydb/core/tx/columnshard/engines/insert_table/insert_table.h> #include <ydb/core/tx/columnshard/engines/granules_table.h> #include <ydb/core/tx/columnshard/engines/columns_table.h> +#include <ydb/core/tx/columnshard/operations/write.h> #include <type_traits> @@ -34,6 +35,7 @@ struct Schema : NIceDb::Schema { GranulesTableId, ColumnsTableId, CountersTableId, + OperationsTableId, }; enum class EValueIds : ui32 { @@ -246,6 +248,17 @@ struct Schema : NIceDb::Schema { using TColumns = TableColumns<Index, Counter, ValueUI64>; }; + struct Operations : NIceDb::Schema::Table<OperationsTableId> { + struct WriteId : Column<1, NScheme::NTypeIds::Uint64> {}; + struct TxId : Column<2, NScheme::NTypeIds::Uint64> {}; + struct Status : Column<3, NScheme::NTypeIds::Uint32> {}; + struct CreatedAt : Column<4, NScheme::NTypeIds::Uint64> {}; + struct GlobalWriteId : Column<5, NScheme::NTypeIds::Uint64> {}; + + using TKey = TableKey<WriteId>; + using TColumns = TableColumns<TxId, WriteId, Status, CreatedAt, GlobalWriteId>; + }; + using TTables = SchemaTables< Value, TxInfo, @@ -263,7 +276,8 @@ struct Schema : NIceDb::Schema { IndexColumns, IndexCounters, SmallBlobs, - OneToOneEvictedBlobs + OneToOneEvictedBlobs, + Operations >; // @@ -648,6 +662,21 @@ struct Schema : NIceDb::Schema { } return true; } + + // Operations + static void Operations_Write(NIceDb::TNiceDb& db, const TWriteOperation& operation) { + db.Table<Operations>().Key((ui64)operation.GetWriteId()).Update( + NIceDb::TUpdate<Operations::Status>((ui32)operation.GetStatus()), + NIceDb::TUpdate<Operations::CreatedAt>(operation.GetCreatedAt().Seconds()), + NIceDb::TUpdate<Operations::GlobalWriteId>(operation.GetGlobalWriteId()), + NIceDb::TUpdate<Operations::TxId>(operation.GetTxId()) + ); + } + + static void Operations_Erase(NIceDb::TNiceDb& db, const TWriteId writeId) { + db.Table<Operations>().Key((ui64)writeId).Delete(); + } + }; } diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.cpp b/ydb/core/tx/columnshard/columnshard_ut_common.cpp index 345dbe751a3..02f8f5df731 100644 --- a/ydb/core/tx/columnshard/columnshard_ut_common.cpp +++ b/ydb/core/tx/columnshard/columnshard_ut_common.cpp @@ -75,6 +75,22 @@ void PlanSchemaTx(TTestBasicRuntime& runtime, TActorId& sender, NOlap::TSnapshot UNIT_ASSERT_EQUAL(res.GetStatus(), NKikimrTxColumnShard::SUCCESS); } +void PlanWriteTx(TTestBasicRuntime& runtime, TActorId& sender, NOlap::TSnapshot snap, bool waitResult) { + auto plan = std::make_unique<TEvTxProcessing::TEvPlanStep>(snap.GetPlanStep(), 0, TTestTxConfig::TxTablet0); + auto tx = plan->Record.AddTransactions(); + tx->SetTxId(snap.GetTxId()); + ActorIdToProto(sender, tx->MutableAckTo()); + + ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, plan.release()); + UNIT_ASSERT(runtime.GrabEdgeEvent<TEvTxProcessing::TEvPlanStepAck>(sender)); + if (waitResult) { + auto ev = runtime.GrabEdgeEvent<NEvents::TDataEvents::TEvWriteResult>(sender); + const auto& res = ev->Get()->Record; + UNIT_ASSERT_EQUAL(res.GetTxId(), snap.GetTxId()); + UNIT_ASSERT_EQUAL(res.GetStatus(), NKikimrDataEvents::TEvWriteResult::COMPLETED); + } +} + bool WriteData(TTestBasicRuntime& runtime, TActorId& sender, ui64 metaShard, ui64 writeId, ui64 tableId, const TString& data, std::shared_ptr<arrow::Schema> schema, bool waitResult) { const TString dedupId = ToString(writeId); diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.h b/ydb/core/tx/columnshard/columnshard_ut_common.h index 6f994b27dc2..264fc89dee9 100644 --- a/ydb/core/tx/columnshard/columnshard_ut_common.h +++ b/ydb/core/tx/columnshard/columnshard_ut_common.h @@ -411,6 +411,7 @@ struct TTestSchema { bool ProposeSchemaTx(TTestBasicRuntime& runtime, TActorId& sender, const TString& txBody, NOlap::TSnapshot snap); void ProvideTieringSnapshot(TTestBasicRuntime& runtime, TActorId& sender, NMetadata::NFetcher::ISnapshot::TPtr snapshot); void PlanSchemaTx(TTestBasicRuntime& runtime, TActorId& sender, NOlap::TSnapshot snap); +void PlanWriteTx(TTestBasicRuntime& runtime, TActorId& sender, NOlap::TSnapshot snap, bool waitResult = true); bool WriteData(TTestBasicRuntime& runtime, TActorId& sender, ui64 metaShard, ui64 writeId, ui64 tableId, const TString& data, std::shared_ptr<arrow::Schema> schema = {}, bool waitResult = true); std::optional<ui64> WriteData(TTestBasicRuntime& runtime, TActorId& sender, const NLongTxService::TLongTxId& longTxId, diff --git a/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h b/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h index 7f39d3c376c..a485cca6978 100644 --- a/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h +++ b/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h @@ -39,6 +39,7 @@ public: virtual const std::shared_ptr<arrow::Schema>& GetSchema() const = 0; virtual const TIndexInfo& GetIndexInfo() const = 0; virtual const TSnapshot& GetSnapshot() const = 0; + virtual ui32 GetColumnsCount() const = 0; std::shared_ptr<arrow::RecordBatch> NormalizeBatch(const ISnapshotSchema& dataSchema, const std::shared_ptr<arrow::RecordBatch> batch) const; std::shared_ptr<arrow::RecordBatch> PrepareForInsert(const TString& data, const std::shared_ptr<arrow::Schema>& dataSchema, TString& strError) const; diff --git a/ydb/core/tx/columnshard/engines/scheme/filtered_scheme.cpp b/ydb/core/tx/columnshard/engines/scheme/filtered_scheme.cpp index c635623fb09..31b81351385 100644 --- a/ydb/core/tx/columnshard/engines/scheme/filtered_scheme.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/filtered_scheme.cpp @@ -59,4 +59,8 @@ const TSnapshot& TFilteredSnapshotSchema::GetSnapshot() const { return OriginalSnapshot->GetSnapshot(); } +ui32 TFilteredSnapshotSchema::GetColumnsCount() const { + return Schema->num_fields(); +} + } diff --git a/ydb/core/tx/columnshard/engines/scheme/filtered_scheme.h b/ydb/core/tx/columnshard/engines/scheme/filtered_scheme.h index b1187db6d1b..00e754e587f 100644 --- a/ydb/core/tx/columnshard/engines/scheme/filtered_scheme.h +++ b/ydb/core/tx/columnshard/engines/scheme/filtered_scheme.h @@ -22,6 +22,7 @@ public: const std::shared_ptr<arrow::Schema>& GetSchema() const override; const TIndexInfo& GetIndexInfo() const override; const TSnapshot& GetSnapshot() const override; + ui32 GetColumnsCount() const override; }; } diff --git a/ydb/core/tx/columnshard/engines/scheme/snapshot_scheme.cpp b/ydb/core/tx/columnshard/engines/scheme/snapshot_scheme.cpp index 69f2d4e9a12..52d4b7d3281 100644 --- a/ydb/core/tx/columnshard/engines/scheme/snapshot_scheme.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/snapshot_scheme.cpp @@ -42,4 +42,8 @@ const TSnapshot& TSnapshotSchema::GetSnapshot() const { return Snapshot; } +ui32 TSnapshotSchema::GetColumnsCount() const { + return Schema->num_fields(); +} + } diff --git a/ydb/core/tx/columnshard/engines/scheme/snapshot_scheme.h b/ydb/core/tx/columnshard/engines/scheme/snapshot_scheme.h index 20603eb983b..b72819a3119 100644 --- a/ydb/core/tx/columnshard/engines/scheme/snapshot_scheme.h +++ b/ydb/core/tx/columnshard/engines/scheme/snapshot_scheme.h @@ -22,6 +22,7 @@ public: const std::shared_ptr<arrow::Schema>& GetSchema() const override; const TIndexInfo& GetIndexInfo() const override; const TSnapshot& GetSnapshot() const override; + ui32 GetColumnsCount() const override; }; } diff --git a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp index e91d06c1db2..20170d772e1 100644 --- a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp +++ b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp @@ -25,14 +25,13 @@ IBlobConstructor::EStatus TIndexedWriteController::TBlobConstructor::BuildNext() const ui64 writeId = writeMeta.GetWriteId(); // Heavy operations inside. We cannot run them in tablet event handler. - TString strError; { NColumnShard::TCpuGuard guard(Owner.ResourceUsage); - Batch = SnapshotSchema->PrepareForInsert(Owner.WriteData.GetData().GetData(), Owner.WriteData.GetData().GetArrowSchema(), strError); + Batch = Owner.WriteData.GetData().GetArrowBatch(); } if (!Batch) { - AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "ev_write_bad_data")("write_id", writeId)("table_id", tableId)("error", strError); + AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "ev_write_bad_data")("write_id", writeId)("table_id", tableId); return EStatus::Error; } diff --git a/ydb/core/tx/columnshard/operations/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/operations/CMakeLists.darwin-x86_64.txt new file mode 100644 index 00000000000..48f4f0d3050 --- /dev/null +++ b/ydb/core/tx/columnshard/operations/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,21 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(tx-columnshard-operations) +target_link_libraries(tx-columnshard-operations PUBLIC + contrib-libs-cxxsupp + yutil + ydb-core-protos + core-tx-ev_write + ydb-services-metadata +) +target_sources(tx-columnshard-operations PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/operations/write.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/operations/write_data.cpp +) diff --git a/ydb/core/tx/columnshard/operations/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/operations/CMakeLists.linux-aarch64.txt new file mode 100644 index 00000000000..4dba1ab769b --- /dev/null +++ b/ydb/core/tx/columnshard/operations/CMakeLists.linux-aarch64.txt @@ -0,0 +1,22 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(tx-columnshard-operations) +target_link_libraries(tx-columnshard-operations PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + ydb-core-protos + core-tx-ev_write + ydb-services-metadata +) +target_sources(tx-columnshard-operations PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/operations/write.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/operations/write_data.cpp +) diff --git a/ydb/core/tx/columnshard/operations/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/operations/CMakeLists.linux-x86_64.txt new file mode 100644 index 00000000000..4dba1ab769b --- /dev/null +++ b/ydb/core/tx/columnshard/operations/CMakeLists.linux-x86_64.txt @@ -0,0 +1,22 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(tx-columnshard-operations) +target_link_libraries(tx-columnshard-operations PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + ydb-core-protos + core-tx-ev_write + ydb-services-metadata +) +target_sources(tx-columnshard-operations PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/operations/write.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/operations/write_data.cpp +) diff --git a/ydb/core/tx/columnshard/operations/CMakeLists.txt b/ydb/core/tx/columnshard/operations/CMakeLists.txt new file mode 100644 index 00000000000..f8b31df0c11 --- /dev/null +++ b/ydb/core/tx/columnshard/operations/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/core/tx/columnshard/operations/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/operations/CMakeLists.windows-x86_64.txt new file mode 100644 index 00000000000..48f4f0d3050 --- /dev/null +++ b/ydb/core/tx/columnshard/operations/CMakeLists.windows-x86_64.txt @@ -0,0 +1,21 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(tx-columnshard-operations) +target_link_libraries(tx-columnshard-operations PUBLIC + contrib-libs-cxxsupp + yutil + ydb-core-protos + core-tx-ev_write + ydb-services-metadata +) +target_sources(tx-columnshard-operations PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/operations/write.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/operations/write_data.cpp +) diff --git a/ydb/core/tx/columnshard/operations/write.cpp b/ydb/core/tx/columnshard/operations/write.cpp new file mode 100644 index 00000000000..b7d01e4b55a --- /dev/null +++ b/ydb/core/tx/columnshard/operations/write.cpp @@ -0,0 +1,180 @@ +#include "write.h" + +#include <ydb/core/tx/columnshard/columnshard_schema.h> +#include <ydb/core/tx/columnshard/blob_manager_db.h> +#include <ydb/core/tx/columnshard/columnshard_impl.h> +#include <ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h> + +#include <ydb/core/tablet_flat/tablet_flat_executor.h> + + +namespace NKikimr::NColumnShard { + + TWriteOperation::TWriteOperation(const TWriteId writeId, const ui64 txId, const EOperationStatus& status, const TInstant createdAt, const ui64 globalWriteId) + : Status(status) + , CreatedAt(createdAt) + , WriteId(writeId) + , TxId(txId) + , GlobalWriteId(globalWriteId) + { + } + + void TWriteOperation::Start(TColumnShard& owner, const ui64 tableId, const NEvWrite::IDataContainer::TPtr& data, const NActors::TActorId& source, const TActorContext& ctx) { + Y_VERIFY(Status == EOperationStatus::Draft); + + NEvWrite::TWriteMeta writeMeta((ui64)WriteId, tableId, source); + const auto& snapshotSchema = owner.TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetLastSchema(); + auto writeController = std::make_shared<NOlap::TIndexedWriteController>(ctx.SelfID, NEvWrite::TWriteData(writeMeta, data), snapshotSchema); + ctx.Register(CreateWriteActor(owner.TabletID(), writeController, owner.BlobManager->StartBlobBatch(), TInstant::Max(), owner.Settings.MaxSmallBlobSize)); + Status = EOperationStatus::Started; + } + + void TWriteOperation::Commit(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc, const NOlap::TSnapshot& snapshot) const { + Y_VERIFY(Status == EOperationStatus::Prepared); + + TBlobGroupSelector dsGroupSelector(owner.Info()); + NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector); + + owner.BatchCache.Commit(WriteId); + + auto pathExists = [&](ui64 pathId) { + return owner.TablesManager.HasTable(pathId); + }; + + auto counters = owner.InsertTable->Commit(dbTable, snapshot.GetPlanStep(), snapshot.GetTxId(), 0, { WriteId }, + pathExists); + + owner.IncCounter(COUNTER_BLOBS_COMMITTED, counters.Rows); + owner.IncCounter(COUNTER_BYTES_COMMITTED, counters.Bytes); + owner.IncCounter(COUNTER_RAW_BYTES_COMMITTED, counters.RawBytes); + owner.UpdateInsertTableCounters(); + } + + void TWriteOperation::OnWriteFinish(NTabletFlatExecutor::TTransactionContext& txc, const ui64 globalWriteId) { + Y_VERIFY(Status == EOperationStatus::Started); + Status = EOperationStatus::Prepared; + GlobalWriteId = globalWriteId; + NIceDb::TNiceDb db(txc.DB); + Schema::Operations_Write(db, *this); + } + + void TWriteOperation::Abort(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) const { + Y_VERIFY(Status == EOperationStatus::Prepared); + owner.BatchCache.EraseInserted(WriteId); + + TBlobGroupSelector dsGroupSelector(owner.Info()); + NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector); + + owner.InsertTable->Abort(dbTable, 0, {WriteId}); + + TBlobManagerDb blobManagerDb(txc.DB); + auto allAborted = owner.InsertTable->GetAborted(); + for (auto& [abortedWriteId, abortedData] : allAborted) { + owner.InsertTable->EraseAborted(dbTable, abortedData); + owner.BlobManager->DeleteBlob(abortedData.BlobId, blobManagerDb); + } + } + + bool TOperationsManager::Init(NTabletFlatExecutor::TTransactionContext& txc) { + NIceDb::TNiceDb db(txc.DB); + auto rowset = db.Table<Schema::Operations>().Select(); + if (!rowset.IsReady()) { + return false; + } + + while (!rowset.EndOfSet()) { + const TWriteId writeId = (TWriteId) rowset.GetValue<Schema::Operations::WriteId>(); + const ui64 createdAtSec = rowset.GetValue<Schema::Operations::CreatedAt>(); + const ui64 txId = rowset.GetValue<Schema::Operations::TxId>(); + const ui64 globalWriteId = rowset.GetValue<Schema::Operations::GlobalWriteId>(); + const EOperationStatus status = (EOperationStatus) rowset.GetValue<Schema::Operations::Status>(); + + auto operation = std::make_shared<TWriteOperation>(writeId, txId, status, TInstant::Seconds(createdAtSec), globalWriteId); + + Y_VERIFY(operation->GetStatus() != EOperationStatus::Draft); + + auto [_, isOk] = Operations.emplace(operation->GetWriteId(), operation); + if (!isOk) { + AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "duplicated_operation")("operation", *operation); + return false; + } + Transactions[txId].push_back(operation->GetWriteId()); + LastWriteId = std::max(LastWriteId, operation->GetWriteId()); + if (!rowset.Next()) { + return false; + } + } + return true; + } + + bool TOperationsManager::CommitTransaction(TColumnShard& owner, const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc, const NOlap::TSnapshot& snapshot) { + TLogContextGuard gLogging(NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_SCAN)("tx_id", txId)("event", "transaction_commit_fails")); + auto tIt = Transactions.find(txId); + if (tIt == Transactions.end()) { + AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("details", "skip_unknown_transaction"); + return true; + } + + TVector<TWriteOperation::TPtr> commited; + for (auto&& opId : tIt->second) { + auto opPtr = Operations.FindPtr(opId); + (*opPtr)->Commit(owner, txc, snapshot); + commited.emplace_back(*opPtr); + } + + Transactions.erase(txId); + for (auto&& op: commited) { + RemoveOperation(op, txc); + } + return true; + } + + bool TOperationsManager::AbortTransaction(TColumnShard& owner, const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc) { + TLogContextGuard gLogging(NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_SCAN)("tx_id", txId)("event", "transaction_abort_fails")); + auto tIt = Transactions.find(txId); + if (tIt == Transactions.end()) { + AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("details", "unknown_transaction"); + return true; + } + + TVector<TWriteOperation::TPtr> aborted; + for (auto&& opId : tIt->second) { + auto opPtr = Operations.FindPtr(opId); + (*opPtr)->Abort(owner, txc); + aborted.emplace_back(*opPtr); + } + + Transactions.erase(txId); + for (auto&& op: aborted) { + RemoveOperation(op, txc); + } + return true; + } + + TWriteOperation::TPtr TOperationsManager::GetOperation(const TWriteId writeId) const { + auto it = Operations.find(writeId); + if (it == Operations.end()) { + return nullptr; + } + return it->second; + } + + void TOperationsManager::RemoveOperation(const TWriteOperation::TPtr& op, NTabletFlatExecutor::TTransactionContext& txc) { + NIceDb::TNiceDb db(txc.DB); + Operations.erase(op->GetWriteId()); + Schema::Operations_Erase(db, op->GetWriteId()); + } + + TWriteId TOperationsManager::BuildNextWriteId() { + return ++LastWriteId; + } + + TWriteOperation::TPtr TOperationsManager::RegisterOperation(const ui64 txId) { + auto writeId = BuildNextWriteId(); + auto operation = std::make_shared<TWriteOperation>(writeId, txId, EOperationStatus::Draft, AppData()->TimeProvider->Now()); + Y_VERIFY(Operations.emplace(operation->GetWriteId(), operation).second); + + Transactions[operation->GetTxId()].push_back(operation->GetWriteId()); + return operation; + } +} diff --git a/ydb/core/tx/columnshard/operations/write.h b/ydb/core/tx/columnshard/operations/write.h new file mode 100644 index 00000000000..d50b4cfcce6 --- /dev/null +++ b/ydb/core/tx/columnshard/operations/write.h @@ -0,0 +1,74 @@ +#pragma once + +#include <ydb/core/tx/ev_write/write_data.h> +#include <ydb/core/tx/columnshard/common/snapshot.h> +#include <ydb/core/tx/columnshard/engines/defs.h> + +#include <ydb/core/tablet_flat/flat_cxx_database.h> +#include <ydb/library/accessor/accessor.h> + +#include <util/generic/map.h> +#include <tuple> + + +namespace NKikimr::NTabletFlatExecutor { + class TTransactionContext; +} + +namespace NKikimr::NColumnShard { + + class TColumnShard; + + using TWriteId = NOlap::TWriteId; + + enum class EOperationStatus : ui32 { + Draft = 1, + Started = 2, + Prepared = 3 + }; + + class TWriteOperation { + YDB_READONLY(EOperationStatus, Status, EOperationStatus::Draft); + YDB_READONLY_DEF(TInstant, CreatedAt); + YDB_READONLY_DEF(TWriteId, WriteId); + YDB_READONLY(ui64, TxId, 0); + YDB_READONLY(ui64, GlobalWriteId, 0); + + public: + using TPtr = std::shared_ptr<TWriteOperation>; + + TWriteOperation(const TWriteId writeId, const ui64 txId, const EOperationStatus& status, const TInstant createdAt, const ui64 globalWriteId = 0); + + void Start(TColumnShard& owner, const ui64 tableId, const NEvWrite::IDataContainer::TPtr& data, const NActors::TActorId& source, const TActorContext& ctx); + void OnWriteFinish(NTabletFlatExecutor::TTransactionContext& txc, const ui64 globalWriteId); + void Commit(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc, const NOlap::TSnapshot& snapshot) const; + void Abort(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) const; + + void Out(IOutputStream& out) const { + out << "write_id=" << (ui64) WriteId << ";tx_id=" << TxId; + } + }; + + class TOperationsManager { + TMap<ui64, TVector<TWriteId>> Transactions; + TMap<TWriteId, TWriteOperation::TPtr> Operations; + TWriteId LastWriteId = TWriteId(0); + + public: + bool Init(NTabletFlatExecutor::TTransactionContext& txc); + + TWriteOperation::TPtr GetOperation(const TWriteId writeId) const; + bool CommitTransaction(TColumnShard& owner, const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc, const NOlap::TSnapshot& snapshot); + bool AbortTransaction(TColumnShard& owner, const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc); + + TWriteOperation::TPtr RegisterOperation(const ui64 txId); + private: + TWriteId BuildNextWriteId(); + void RemoveOperation(const TWriteOperation::TPtr& op, NTabletFlatExecutor::TTransactionContext& txc); + }; +} + +template <> +inline void Out<NKikimr::NColumnShard::TWriteOperation>(IOutputStream& o, const NKikimr::NColumnShard::TWriteOperation& x) { + return x.Out(o); +} diff --git a/ydb/core/tx/columnshard/operations/write_data.cpp b/ydb/core/tx/columnshard/operations/write_data.cpp new file mode 100644 index 00000000000..38e386f6ccc --- /dev/null +++ b/ydb/core/tx/columnshard/operations/write_data.cpp @@ -0,0 +1,55 @@ +#include "write_data.h" + +#include <ydb/core/tx/columnshard/defs.h> + + +namespace NKikimr::NColumnShard { + +void TArrowData::Serialize(NKikimrDataEvents::TOperationData& proto) const { + Y_FAIL("Not implemented"); + Y_UNUSED(proto); +} + +bool TArrowData::Parse(const NKikimrDataEvents::TOperationData& proto, const IPayloadData& payload) { + IncomingData = payload.GetDataFromPayload(proto.GetArrowData().GetPayloadIndex()); + + std::vector<ui32> columns; + for (auto&& columnId : proto.GetColumnIds()) { + columns.emplace_back(columnId); + } + BatchSchema = std::make_shared<NOlap::TFilteredSnapshotSchema>(IndexSchema, columns); + return BatchSchema->GetColumnsCount() == columns.size() && !IncomingData.empty() && IncomingData.size() <= NColumnShard::TLimits::GetMaxBlobSize(); +} + +std::shared_ptr<arrow::RecordBatch> TArrowData::GetArrowBatch() const { + TString err; + return IndexSchema->PrepareForInsert(IncomingData, BatchSchema->GetSchema(), err); +} + +void TProtoArrowData::Serialize(NKikimrDataEvents::TOperationData& proto) const { + Y_FAIL("Not implemented"); + Y_UNUSED(proto); +} + +bool TProtoArrowData::ParseFromProto(const NKikimrTxColumnShard::TEvWrite& proto) { + IncomingData = proto.GetData(); + if (proto.HasMeta()) { + const auto& incomingDataScheme = proto.GetMeta().GetSchema(); + if (incomingDataScheme.empty() || proto.GetMeta().GetFormat() != NKikimrTxColumnShard::FORMAT_ARROW) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "invalid_data_format"); + return false; + } + ArrowSchema = NArrow::DeserializeSchema(incomingDataScheme); + if (!ArrowSchema) { + return false; + } + } + return !IncomingData.empty() && IncomingData.size() <= NColumnShard::TLimits::GetMaxBlobSize(); +} + +std::shared_ptr<arrow::RecordBatch> TProtoArrowData::GetArrowBatch() const { + TString err; + return IndexSchema->PrepareForInsert(IncomingData, ArrowSchema, err); +} + +} diff --git a/ydb/core/tx/columnshard/operations/write_data.h b/ydb/core/tx/columnshard/operations/write_data.h new file mode 100644 index 00000000000..b626c7358d6 --- /dev/null +++ b/ydb/core/tx/columnshard/operations/write_data.h @@ -0,0 +1,80 @@ +#pragma once + +#include <ydb/core/tx/ev_write/write_data.h> +#include <ydb/core/tx/columnshard/common/snapshot.h> +#include <ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h> +#include <ydb/core/tx/columnshard/engines/scheme/filtered_scheme.h> + + +namespace NKikimr::NColumnShard { + +class IPayloadData { +public: + virtual TString GetDataFromPayload(const ui64 index) const = 0; + virtual ui64 AddDataToPayload(TString&& blobData) = 0; + virtual ~IPayloadData() {} +}; + + +template <class TEvent> +class TPayloadHelper : public IPayloadData { + TEvent& Event; +public: + TPayloadHelper(TEvent& ev) + : Event(ev) {} + + TString GetDataFromPayload(const ui64 index) const override { + TRope rope = Event.GetPayload(index); + TString data = TString::Uninitialized(rope.GetSize()); + rope.Begin().ExtractPlainDataAndAdvance(data.Detach(), data.size()); + return data; + } + + ui64 AddDataToPayload(TString&& blobData) override { + TRope rope; + rope.Insert(rope.End(), TRope(blobData)); + return Event.AddPayload(std::move(rope)); + } +}; + +class TArrowData : public NEvWrite::IDataContainer { +public: + TArrowData(const NOlap::ISnapshotSchema::TPtr& schema) + : IndexSchema(schema) + {} + + const TString& GetData() const override { + return IncomingData; + } + + bool Parse(const NKikimrDataEvents::TOperationData& proto, const IPayloadData& payload); + std::shared_ptr<arrow::RecordBatch> GetArrowBatch() const override; + void Serialize(NKikimrDataEvents::TOperationData& proto) const override; + +private: + NOlap::ISnapshotSchema::TPtr IndexSchema; + NOlap::ISnapshotSchema::TPtr BatchSchema; + TString IncomingData; +}; + +class TProtoArrowData : public NEvWrite::IDataContainer { +public: + TProtoArrowData(const NOlap::ISnapshotSchema::TPtr& schema) + : IndexSchema(schema) + {} + + const TString& GetData() const override { + return IncomingData; + } + + bool ParseFromProto(const NKikimrTxColumnShard::TEvWrite& proto); + std::shared_ptr<arrow::RecordBatch> GetArrowBatch() const override; + void Serialize(NKikimrDataEvents::TOperationData& proto) const override; + +private: + NOlap::ISnapshotSchema::TPtr IndexSchema; + std::shared_ptr<arrow::Schema> ArrowSchema; + TString IncomingData; +}; + +} diff --git a/ydb/core/tx/columnshard/operations/ya.make b/ydb/core/tx/columnshard/operations/ya.make new file mode 100644 index 00000000000..cc7fd4d19cc --- /dev/null +++ b/ydb/core/tx/columnshard/operations/ya.make @@ -0,0 +1,14 @@ +LIBRARY() + +SRCS( + write.cpp + write_data.cpp +) + +PEERDIR( + ydb/core/protos + ydb/core/tx/ev_write + ydb/services/metadata +) + +END() diff --git a/ydb/core/tx/columnshard/tx_controller.cpp b/ydb/core/tx/columnshard/tx_controller.cpp index 8e902861caa..9ef34cffaf7 100644 --- a/ydb/core/tx/columnshard/tx_controller.cpp +++ b/ydb/core/tx/columnshard/tx_controller.cpp @@ -176,6 +176,15 @@ const TTxController::TBasicTxInfo* TTxController::GetTxInfo(const ui64 txId) con return BasicTxInfo.FindPtr(txId); } +NEvents::TDataEvents::TCoordinatorInfo TTxController::GetCoordinatorInfo(const ui64 txId) const { + auto txInfo = BasicTxInfo.FindPtr(txId); + Y_VERIFY(txInfo); + if (Owner.ProcessingParams) { + return NEvents::TDataEvents::TCoordinatorInfo(Owner.TabletID(), txInfo->MinStep, txInfo->MaxStep, Owner.ProcessingParams->GetCoordinators()); + } + return NEvents::TDataEvents::TCoordinatorInfo(Owner.TabletID(), txInfo->MinStep, txInfo->MaxStep, {}); +} + size_t TTxController::CleanExpiredTxs(NTabletFlatExecutor::TTransactionContext& txc) { size_t removedCount = 0; if (HaveOutdatedTxs()) { diff --git a/ydb/core/tx/columnshard/tx_controller.h b/ydb/core/tx/columnshard/tx_controller.h index f7c30397ad8..563f1f364e7 100644 --- a/ydb/core/tx/columnshard/tx_controller.h +++ b/ydb/core/tx/columnshard/tx_controller.h @@ -1,7 +1,9 @@ #pragma once #include "columnshard_schema.h" + #include <ydb/core/tablet_flat/tablet_flat_executed.h> +#include <ydb/core/tx/ev_write/events.h> namespace NKikimr::NColumnShard { @@ -68,6 +70,7 @@ public: std::optional<TPlanQueueItem> GetPlannedTx() const; TPlanQueueItem GetFrontTx() const; const TBasicTxInfo* GetTxInfo(const ui64 txId) const; + NEvents::TDataEvents::TCoordinatorInfo GetCoordinatorInfo(const ui64 txId) const; size_t CleanExpiredTxs(NTabletFlatExecutor::TTransactionContext& txc); diff --git a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp index fb7f456f368..325d3c015a7 100644 --- a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp +++ b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp @@ -8,6 +8,8 @@ #include <ydb/core/tx/columnshard/engines/changes/with_appended.h> #include <ydb/core/tx/columnshard/engines/changes/compaction.h> #include <ydb/core/tx/columnshard/engines/changes/cleanup.h> +#include <ydb/core/tx/columnshard/operations/write_data.h> +#include <library/cpp/actors/protos/unittests.pb.h> namespace NKikimr { @@ -1837,6 +1839,68 @@ void TestReadAggregate(const std::vector<std::pair<TString, TTypeInfo>>& ydbSche } +Y_UNIT_TEST_SUITE(EvWrite) { + class TArrowData : public NKikimr::NEvWrite::IDataContainer { + std::vector<std::pair<TString, TTypeInfo>> YdbSchema; + ui64 Index; + + public: + TArrowData(const std::vector<std::pair<TString, TTypeInfo>>& ydbSchema, const ui64 idx) + : YdbSchema(ydbSchema) + , Index(idx) + { + } + + void Serialize(NKikimrDataEvents::TOperationData& proto) const override { + for (ui32 i = 0; i < YdbSchema.size(); ++i) { + proto.AddColumnIds(i + 1); + } + proto.MutableArrowData()->SetPayloadIndex(Index); + } + + std::shared_ptr<arrow::RecordBatch> GetArrowBatch() const override { + return nullptr; + } + + const TString& GetData() const override { + return Default<TString>(); + } + }; + + Y_UNIT_TEST(WriteInTransaction) { + TTestBasicRuntime runtime; + TTester::Setup(runtime); + + TActorId sender = runtime.AllocateEdgeActor(); + CreateTestBootstrapper(runtime, CreateTestTabletInfo(TTestTxConfig::TxTablet0, TTabletTypes::ColumnShard), &CreateColumnShard); + + TDispatchOptions options; + options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvBoot)); + runtime.DispatchEvents(options); + + const ui64 tableId = 1; + const TestTableDescription table; + SetupSchema(runtime, sender, tableId, table); + + const ui64 txId = 111; + + const std::vector<std::pair<TString, TTypeInfo>>& ydbSchema = table.Schema; + TString blobData = MakeTestBlob({0, 100}, ydbSchema); + + auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(txId); + auto dataPtr = std::make_shared<TArrowData>(ydbSchema, TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData))); + evWrite->AddReplaceOp(tableId, dataPtr); + + ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, evWrite.release()); + + TAutoPtr<NActors::IEventHandle> handle; + auto event = runtime.GrabEdgeEvent<NKikimr::NEvents::TDataEvents::TEvWriteResult>(handle); + UNIT_ASSERT(event); + UNIT_ASSERT_EQUAL(event->Record.GetStatus(), NKikimrDataEvents::TEvWriteResult::PREPARED); + PlanWriteTx(runtime, sender, NOlap::TSnapshot(11, txId)); + } +} + Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { Y_UNIT_TEST(Write) { TestTableDescription table; diff --git a/ydb/core/tx/columnshard/ya.make b/ydb/core/tx/columnshard/ya.make index 14b1e978ca0..c38c0ed6ea9 100644 --- a/ydb/core/tx/columnshard/ya.make +++ b/ydb/core/tx/columnshard/ya.make @@ -57,6 +57,7 @@ PEERDIR( ydb/core/tx/columnshard/counters ydb/core/tx/columnshard/common ydb/core/tx/columnshard/splitter + ydb/core/tx/columnshard/operations ydb/core/tx/tiering ydb/core/tx/conveyor/usage ydb/core/tx/long_tx_service/public diff --git a/ydb/core/tx/ev_write/events.h b/ydb/core/tx/ev_write/events.h new file mode 100644 index 00000000000..196fdb6fab3 --- /dev/null +++ b/ydb/core/tx/ev_write/events.h @@ -0,0 +1,91 @@ +#pragma once + +#include "write_data.h" + +#include <ydb/core/protos/ev_write.pb.h> +#include <ydb/core/base/events.h> + +#include <library/cpp/actors/core/event_pb.h> +#include <library/cpp/actors/core/log.h> + + +namespace NKikimr::NEvents { + +struct TDataEvents { + + class TCoordinatorInfo { + YDB_READONLY_DEF(ui64, TabletId); + YDB_READONLY(ui64, MinStep, 0); + YDB_READONLY(ui64, MaxStep, 0); + YDB_READONLY_DEF(google::protobuf::RepeatedField<ui64>, DomainCoordinators); + + public: + TCoordinatorInfo(const ui64 tabletId, const ui64 minStep, const ui64 maxStep, const google::protobuf::RepeatedField<ui64>& coordinators) + : TabletId(tabletId) + , MinStep(minStep) + , MaxStep(maxStep) + , DomainCoordinators(coordinators) {} + }; + + enum EEventType { + EvWrite = EventSpaceBegin(TKikimrEvents::ES_DATA_OPERATIONS), + EvWriteResult, + EvEnd + }; + + static_assert(EEventType::EvEnd < EventSpaceEnd(TKikimrEvents::ES_DATA_OPERATIONS), "expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_DATA_OPERATIONS)"); + + struct TEvWrite : public NActors::TEventPB<TEvWrite, NKikimrDataEvents::TEvWrite, TDataEvents::EvWrite> { + TEvWrite() = default; + + TEvWrite(ui64 txId) { + Record.SetTxId(txId); + } + + void AddReplaceOp(const ui64 tableId, const NEvWrite::IDataContainer::TPtr& data) { + Record.MutableTableId()->SetTableId(tableId); + Y_VERIFY(data); + data->Serialize(*Record.MutableReplace()); + } + + ui64 GetTxId() const { + Y_VERIFY(Record.HasTxId()); + return Record.GetTxId(); + } + }; + + struct TEvWriteResult : public NActors::TEventPB<TEvWriteResult, NKikimrDataEvents::TEvWriteResult, TDataEvents::EvWriteResult> { + + TEvWriteResult() = default; + + static std::unique_ptr<TEvWriteResult> BuildError(const ui64 txId, const NKikimrDataEvents::TEvWriteResult::EOperationStatus& status, const TString& errorMsg) { + auto result = std::make_unique<TEvWriteResult>(); + AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "ev_write_error")("status", NKikimrDataEvents::TEvWriteResult::EOperationStatus_Name(status)) + ("details", errorMsg)("tx_id", txId); + result->Record.SetTxId(txId); + result->Record.SetStatus(status); + result->Record.MutableIssueMessage()->set_message(errorMsg); + return result; + } + + static std::unique_ptr<TEvWriteResult> BuildCommited(const ui64 txId) { + auto result = std::make_unique<TEvWriteResult>(); + result->Record.SetTxId(txId); + result->Record.SetStatus(NKikimrDataEvents::TEvWriteResult::COMPLETED); + return result; + } + + static std::unique_ptr<TEvWriteResult> BuildPrepared(const ui64 txId, const TCoordinatorInfo& transactionInfo) { + auto result = std::make_unique<TEvWriteResult>(); + result->Record.SetTxId(txId); + result->Record.SetStatus(NKikimrDataEvents::TEvWriteResult::PREPARED); + + result->Record.SetMinStep(transactionInfo.GetMinStep()); + result->Record.SetMaxStep(transactionInfo.GetMaxStep()); + result->Record.MutableDomainCoordinators()->CopyFrom(transactionInfo.GetDomainCoordinators()); + return result; + } + }; +}; + +} diff --git a/ydb/core/tx/ev_write/write_data.cpp b/ydb/core/tx/ev_write/write_data.cpp index 21311e70d3d..4150cbd28ad 100644 --- a/ydb/core/tx/ev_write/write_data.cpp +++ b/ydb/core/tx/ev_write/write_data.cpp @@ -10,32 +10,4 @@ TWriteData::TWriteData(const TWriteMeta& writeMeta, IDataContainer::TPtr data) , Data(data) {} -bool TArrowData::ParseFromProto(const NKikimrTxColumnShard::TEvWrite& proto) { - IncomingData = proto.GetData(); - if (proto.HasMeta()) { - const auto& incomingDataScheme = proto.GetMeta().GetSchema(); - if (incomingDataScheme.empty() || proto.GetMeta().GetFormat() != NKikimrTxColumnShard::FORMAT_ARROW) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "invalid_data_format"); - return false; - } - ArrowSchema = NArrow::DeserializeSchema(incomingDataScheme); - if (!ArrowSchema) { - return false; - } - } - return !IncomingData.empty() && IncomingData.size() <= NColumnShard::TLimits::GetMaxBlobSize(); -} - -std::shared_ptr<arrow::RecordBatch> TArrowData::GetArrowBatch() const { - auto batch = NArrow::DeserializeBatch(IncomingData, ArrowSchema); - if (!batch) { - return nullptr; - } - - if (batch->num_rows() == 0) { - return nullptr; - } - return batch; -} - } diff --git a/ydb/core/tx/ev_write/write_data.h b/ydb/core/tx/ev_write/write_data.h index 5a0e562c278..8f499ad39d5 100644 --- a/ydb/core/tx/ev_write/write_data.h +++ b/ydb/core/tx/ev_write/write_data.h @@ -4,6 +4,7 @@ #include <ydb/core/formats/arrow/arrow_helpers.h> #include <ydb/core/protos/tx_columnshard.pb.h> +#include <ydb/core/protos/ev_write.pb.h> namespace NKikimr::NEvWrite { @@ -12,30 +13,11 @@ class IDataContainer { public: using TPtr = std::shared_ptr<IDataContainer>; virtual ~IDataContainer() {} - - virtual std::shared_ptr<arrow::Schema> GetArrowSchema() const = 0; + virtual void Serialize(NKikimrDataEvents::TOperationData& proto) const = 0; + virtual std::shared_ptr<arrow::RecordBatch> GetArrowBatch() const = 0; virtual const TString& GetData() const = 0; }; -class TArrowData : public IDataContainer { -public: - std::shared_ptr<arrow::Schema> GetArrowSchema() const override { - return ArrowSchema; - } - - const TString& GetData() const override { - return IncomingData; - } - - bool ParseFromProto(const NKikimrTxColumnShard::TEvWrite& proto); - - std::shared_ptr<arrow::RecordBatch> GetArrowBatch() const; - -private: - TString IncomingData; - std::shared_ptr<arrow::Schema> ArrowSchema; -}; - class TWriteMeta { YDB_ACCESSOR(ui64, WriteId, 0); YDB_READONLY(ui64, TableId, 0); |