aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authornsofya <nsofya@yandex-team.com>2023-07-27 16:23:24 +0300
committernsofya <nsofya@yandex-team.com>2023-07-27 16:23:24 +0300
commit9417fcf6262cf70a06d9739e54a16390e6edea61 (patch)
tree338b1e5347efa85b924b5ef6e28a255c863809a1
parent2519745782969ed01a5c6dda3fcae5a151bdac3c (diff)
downloadydb-9417fcf6262cf70a06d9739e54a16390e6edea61.tar.gz
EvWrite implementation for columnshard
Это драфт. Поэтому тесты здесь базовые исключительно на happy path. Как финализируем протокол, я добавлю больше. Поведение описала примерно в ev_write.proto (видела, что везде пишете на английском, но так как это драфт, не тратила время не перевод) Хочется получить обратную связь, что я могла упустить, или где хотелось другого поведения. Возможно, формат данных хочется другой или можно что-то переиспользовать. Принимаю также комментарии по тому, где разместить код) Места где я прямо ожидаю комментарии выделю прямо в коде
-rw-r--r--ydb/core/base/events.h3
-rw-r--r--ydb/core/protos/CMakeLists.darwin-x86_64.txt13
-rw-r--r--ydb/core/protos/CMakeLists.linux-aarch64.txt13
-rw-r--r--ydb/core/protos/CMakeLists.linux-x86_64.txt13
-rw-r--r--ydb/core/protos/CMakeLists.windows-x86_64.txt13
-rw-r--r--ydb/core/protos/ev_write.proto50
-rw-r--r--ydb/core/protos/tx_columnshard.proto1
-rw-r--r--ydb/core/protos/ya.make1
-rw-r--r--ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt2
-rw-r--r--ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt2
-rw-r--r--ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt2
-rw-r--r--ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt2
-rw-r--r--ydb/core/tx/columnshard/columnshard__init.cpp5
-rw-r--r--ydb/core/tx/columnshard/columnshard__progress_tx.cpp23
-rw-r--r--ydb/core/tx/columnshard/columnshard__propose_transaction.cpp2
-rw-r--r--ydb/core/tx/columnshard/columnshard__write.cpp107
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.cpp19
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.h14
-rw-r--r--ydb/core/tx/columnshard/columnshard_schema.h31
-rw-r--r--ydb/core/tx/columnshard/columnshard_ut_common.cpp16
-rw-r--r--ydb/core/tx/columnshard/columnshard_ut_common.h1
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h1
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/filtered_scheme.cpp4
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/filtered_scheme.h1
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/snapshot_scheme.cpp4
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/snapshot_scheme.h1
-rw-r--r--ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp5
-rw-r--r--ydb/core/tx/columnshard/operations/CMakeLists.darwin-x86_64.txt21
-rw-r--r--ydb/core/tx/columnshard/operations/CMakeLists.linux-aarch64.txt22
-rw-r--r--ydb/core/tx/columnshard/operations/CMakeLists.linux-x86_64.txt22
-rw-r--r--ydb/core/tx/columnshard/operations/CMakeLists.txt17
-rw-r--r--ydb/core/tx/columnshard/operations/CMakeLists.windows-x86_64.txt21
-rw-r--r--ydb/core/tx/columnshard/operations/write.cpp180
-rw-r--r--ydb/core/tx/columnshard/operations/write.h74
-rw-r--r--ydb/core/tx/columnshard/operations/write_data.cpp55
-rw-r--r--ydb/core/tx/columnshard/operations/write_data.h80
-rw-r--r--ydb/core/tx/columnshard/operations/ya.make14
-rw-r--r--ydb/core/tx/columnshard/tx_controller.cpp9
-rw-r--r--ydb/core/tx/columnshard/tx_controller.h3
-rw-r--r--ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp64
-rw-r--r--ydb/core/tx/columnshard/ya.make1
-rw-r--r--ydb/core/tx/ev_write/events.h91
-rw-r--r--ydb/core/tx/ev_write/write_data.cpp28
-rw-r--r--ydb/core/tx/ev_write/write_data.h24
44 files changed, 987 insertions, 88 deletions
diff --git a/ydb/core/base/events.h b/ydb/core/base/events.h
index 59686ca51a8..f81a444a7f7 100644
--- a/ydb/core/base/events.h
+++ b/ydb/core/base/events.h
@@ -162,7 +162,8 @@ struct TKikimrEvents : TEvents {
ES_EXT_INDEX,
ES_CONVEYOR,
ES_KQP_SCAN_EXCHANGE,
- ES_IC_NODE_CACHE
+ ES_IC_NODE_CACHE,
+ ES_DATA_OPERATIONS
};
};
diff --git a/ydb/core/protos/CMakeLists.darwin-x86_64.txt b/ydb/core/protos/CMakeLists.darwin-x86_64.txt
index df648322943..b34e66bd7ac 100644
--- a/ydb/core/protos/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/protos/CMakeLists.darwin-x86_64.txt
@@ -1490,6 +1490,18 @@ get_built_tool_path(
cpp_styleguide
)
get_built_tool_path(
+ TOOL_protoc_bin
+ TOOL_protoc_dependency
+ contrib/tools/protoc/bin
+ protoc
+)
+get_built_tool_path(
+ TOOL_cpp_styleguide_bin
+ TOOL_cpp_styleguide_dependency
+ contrib/tools/protoc/plugins/cpp_styleguide
+ cpp_styleguide
+)
+get_built_tool_path(
TOOL_enum_parser_bin
TOOL_enum_parser_dependency
tools/enum_parser/enum_parser
@@ -1575,6 +1587,7 @@ target_proto_messages(ydb-core-protos PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/protos/database_basic_sausage_metainfo.proto
${CMAKE_SOURCE_DIR}/ydb/core/protos/datashard_load.proto
${CMAKE_SOURCE_DIR}/ydb/core/protos/drivemodel.proto
+ ${CMAKE_SOURCE_DIR}/ydb/core/protos/ev_write.proto
${CMAKE_SOURCE_DIR}/ydb/core/protos/export.proto
${CMAKE_SOURCE_DIR}/ydb/core/protos/external_sources.proto
${CMAKE_SOURCE_DIR}/ydb/core/protos/flat_tx_scheme.proto
diff --git a/ydb/core/protos/CMakeLists.linux-aarch64.txt b/ydb/core/protos/CMakeLists.linux-aarch64.txt
index c7f97d48857..27fe911721d 100644
--- a/ydb/core/protos/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/protos/CMakeLists.linux-aarch64.txt
@@ -1490,6 +1490,18 @@ get_built_tool_path(
cpp_styleguide
)
get_built_tool_path(
+ TOOL_protoc_bin
+ TOOL_protoc_dependency
+ contrib/tools/protoc/bin
+ protoc
+)
+get_built_tool_path(
+ TOOL_cpp_styleguide_bin
+ TOOL_cpp_styleguide_dependency
+ contrib/tools/protoc/plugins/cpp_styleguide
+ cpp_styleguide
+)
+get_built_tool_path(
TOOL_enum_parser_bin
TOOL_enum_parser_dependency
tools/enum_parser/enum_parser
@@ -1576,6 +1588,7 @@ target_proto_messages(ydb-core-protos PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/protos/database_basic_sausage_metainfo.proto
${CMAKE_SOURCE_DIR}/ydb/core/protos/datashard_load.proto
${CMAKE_SOURCE_DIR}/ydb/core/protos/drivemodel.proto
+ ${CMAKE_SOURCE_DIR}/ydb/core/protos/ev_write.proto
${CMAKE_SOURCE_DIR}/ydb/core/protos/export.proto
${CMAKE_SOURCE_DIR}/ydb/core/protos/external_sources.proto
${CMAKE_SOURCE_DIR}/ydb/core/protos/flat_tx_scheme.proto
diff --git a/ydb/core/protos/CMakeLists.linux-x86_64.txt b/ydb/core/protos/CMakeLists.linux-x86_64.txt
index c7f97d48857..27fe911721d 100644
--- a/ydb/core/protos/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/protos/CMakeLists.linux-x86_64.txt
@@ -1490,6 +1490,18 @@ get_built_tool_path(
cpp_styleguide
)
get_built_tool_path(
+ TOOL_protoc_bin
+ TOOL_protoc_dependency
+ contrib/tools/protoc/bin
+ protoc
+)
+get_built_tool_path(
+ TOOL_cpp_styleguide_bin
+ TOOL_cpp_styleguide_dependency
+ contrib/tools/protoc/plugins/cpp_styleguide
+ cpp_styleguide
+)
+get_built_tool_path(
TOOL_enum_parser_bin
TOOL_enum_parser_dependency
tools/enum_parser/enum_parser
@@ -1576,6 +1588,7 @@ target_proto_messages(ydb-core-protos PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/protos/database_basic_sausage_metainfo.proto
${CMAKE_SOURCE_DIR}/ydb/core/protos/datashard_load.proto
${CMAKE_SOURCE_DIR}/ydb/core/protos/drivemodel.proto
+ ${CMAKE_SOURCE_DIR}/ydb/core/protos/ev_write.proto
${CMAKE_SOURCE_DIR}/ydb/core/protos/export.proto
${CMAKE_SOURCE_DIR}/ydb/core/protos/external_sources.proto
${CMAKE_SOURCE_DIR}/ydb/core/protos/flat_tx_scheme.proto
diff --git a/ydb/core/protos/CMakeLists.windows-x86_64.txt b/ydb/core/protos/CMakeLists.windows-x86_64.txt
index df648322943..b34e66bd7ac 100644
--- a/ydb/core/protos/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/protos/CMakeLists.windows-x86_64.txt
@@ -1490,6 +1490,18 @@ get_built_tool_path(
cpp_styleguide
)
get_built_tool_path(
+ TOOL_protoc_bin
+ TOOL_protoc_dependency
+ contrib/tools/protoc/bin
+ protoc
+)
+get_built_tool_path(
+ TOOL_cpp_styleguide_bin
+ TOOL_cpp_styleguide_dependency
+ contrib/tools/protoc/plugins/cpp_styleguide
+ cpp_styleguide
+)
+get_built_tool_path(
TOOL_enum_parser_bin
TOOL_enum_parser_dependency
tools/enum_parser/enum_parser
@@ -1575,6 +1587,7 @@ target_proto_messages(ydb-core-protos PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/protos/database_basic_sausage_metainfo.proto
${CMAKE_SOURCE_DIR}/ydb/core/protos/datashard_load.proto
${CMAKE_SOURCE_DIR}/ydb/core/protos/drivemodel.proto
+ ${CMAKE_SOURCE_DIR}/ydb/core/protos/ev_write.proto
${CMAKE_SOURCE_DIR}/ydb/core/protos/export.proto
${CMAKE_SOURCE_DIR}/ydb/core/protos/external_sources.proto
${CMAKE_SOURCE_DIR}/ydb/core/protos/flat_tx_scheme.proto
diff --git a/ydb/core/protos/ev_write.proto b/ydb/core/protos/ev_write.proto
new file mode 100644
index 00000000000..8953f5f2f25
--- /dev/null
+++ b/ydb/core/protos/ev_write.proto
@@ -0,0 +1,50 @@
+import "ydb/core/scheme/protos/type_info.proto";
+import "ydb/core/protos/tx_datashard.proto";
+import "ydb/public/api/protos/ydb_issue_message.proto";
+
+package NKikimrDataEvents;
+option java_package = "ru.yandex.kikimr.proto";
+
+
+message TOperationData {
+ message TArrowData {
+ optional uint64 PayloadIndex = 1;
+ }
+
+ repeated uint32 ColumnIds = 1 [packed = true];
+ oneof Data {
+ TArrowData ArrowData = 900;
+ }
+}
+
+message TEvWrite {
+ optional NKikimrTxDataShard.TTableId TableId = 1;
+ optional uint64 TxId = 2;
+
+ oneof Operation {
+ TOperationData Replace = 900;
+ }
+}
+
+message TEvWriteResult {
+ enum EOperationStatus {
+ UNSPECIFIED = 0;
+ PREPARED = 1;
+ COMPLETED = 2;
+ ABORTED = 3;
+ ERROR = 4;
+ OVERLOADED = 6;
+ BAD_REQUEST = 7;
+ }
+
+ optional EOperationStatus Status = 1;
+ optional Ydb.Issue.IssueMessage IssueMessage = 2;
+
+ optional uint64 Origin = 4;
+ optional uint64 TxId = 5;
+
+ // For Tx planner
+ optional uint64 MinStep = 6;
+ optional uint64 MaxStep = 7;
+ repeated fixed64 DomainCoordinators = 8;
+}
diff --git a/ydb/core/protos/tx_columnshard.proto b/ydb/core/protos/tx_columnshard.proto
index 37abe8a8497..5c4a2907a5a 100644
--- a/ydb/core/protos/tx_columnshard.proto
+++ b/ydb/core/protos/tx_columnshard.proto
@@ -126,6 +126,7 @@ enum ETransactionKind {
TX_KIND_COMMIT = 2;
TX_KIND_TTL = 3; // Immediate (not planned)
TX_KIND_DATA = 4;
+ TX_KIND_COMMIT_WRITE = 5;
}
enum ETransactionFlag {
diff --git a/ydb/core/protos/ya.make b/ydb/core/protos/ya.make
index b5982bcc5ee..d68c5f5b7ac 100644
--- a/ydb/core/protos/ya.make
+++ b/ydb/core/protos/ya.make
@@ -55,6 +55,7 @@ SRCS(
database_basic_sausage_metainfo.proto
datashard_load.proto
drivemodel.proto
+ ev_write.proto
export.proto
external_sources.proto
flat_tx_scheme.proto
diff --git a/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt
index 07c0eaea85c..9d3ca29f9bd 100644
--- a/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt
@@ -10,6 +10,7 @@ add_subdirectory(common)
add_subdirectory(counters)
add_subdirectory(engines)
add_subdirectory(hooks)
+add_subdirectory(operations)
add_subdirectory(resources)
add_subdirectory(splitter)
add_subdirectory(ut_rw)
@@ -50,6 +51,7 @@ target_link_libraries(core-tx-columnshard PUBLIC
tx-columnshard-counters
tx-columnshard-common
tx-columnshard-splitter
+ tx-columnshard-operations
core-tx-tiering
tx-conveyor-usage
tx-long_tx_service-public
diff --git a/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt
index 14cdf344f20..41ca866dcff 100644
--- a/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt
@@ -10,6 +10,7 @@ add_subdirectory(common)
add_subdirectory(counters)
add_subdirectory(engines)
add_subdirectory(hooks)
+add_subdirectory(operations)
add_subdirectory(resources)
add_subdirectory(splitter)
add_subdirectory(ut_rw)
@@ -51,6 +52,7 @@ target_link_libraries(core-tx-columnshard PUBLIC
tx-columnshard-counters
tx-columnshard-common
tx-columnshard-splitter
+ tx-columnshard-operations
core-tx-tiering
tx-conveyor-usage
tx-long_tx_service-public
diff --git a/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt
index 14cdf344f20..41ca866dcff 100644
--- a/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt
@@ -10,6 +10,7 @@ add_subdirectory(common)
add_subdirectory(counters)
add_subdirectory(engines)
add_subdirectory(hooks)
+add_subdirectory(operations)
add_subdirectory(resources)
add_subdirectory(splitter)
add_subdirectory(ut_rw)
@@ -51,6 +52,7 @@ target_link_libraries(core-tx-columnshard PUBLIC
tx-columnshard-counters
tx-columnshard-common
tx-columnshard-splitter
+ tx-columnshard-operations
core-tx-tiering
tx-conveyor-usage
tx-long_tx_service-public
diff --git a/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt
index 07c0eaea85c..9d3ca29f9bd 100644
--- a/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt
@@ -10,6 +10,7 @@ add_subdirectory(common)
add_subdirectory(counters)
add_subdirectory(engines)
add_subdirectory(hooks)
+add_subdirectory(operations)
add_subdirectory(resources)
add_subdirectory(splitter)
add_subdirectory(ut_rw)
@@ -50,6 +51,7 @@ target_link_libraries(core-tx-columnshard PUBLIC
tx-columnshard-counters
tx-columnshard-common
tx-columnshard-splitter
+ tx-columnshard-operations
core-tx-tiering
tx-conveyor-usage
tx-long_tx_service-public
diff --git a/ydb/core/tx/columnshard/columnshard__init.cpp b/ydb/core/tx/columnshard/columnshard__init.cpp
index 505720e530a..d50d97d00eb 100644
--- a/ydb/core/tx/columnshard/columnshard__init.cpp
+++ b/ydb/core/tx/columnshard/columnshard__init.cpp
@@ -5,6 +5,7 @@
#include "blob_manager_db.h"
#include <ydb/core/tablet/tablet_exception.h>
+#include <ydb/core/tx/columnshard/operations/write.h>
namespace NKikimr::NColumnShard {
@@ -86,6 +87,10 @@ bool TTxInit::ReadEverything(TTransactionContext& txc, const TActorContext& ctx)
return false;
}
+ if (!Self->OperationsManager.Init(txc)) {
+ return false;
+ }
+
Self->TablesManager.InitFromDB(db, Self->TabletID());
Self->SetCounter(COUNTER_TABLES, Self->TablesManager.GetTables().size());
Self->SetCounter(COUNTER_TABLE_PRESETS, Self->TablesManager.GetSchemaPresets().size());
diff --git a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp
index 175b89b6b0f..ece932faebe 100644
--- a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp
+++ b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp
@@ -1,5 +1,7 @@
#include "columnshard_impl.h"
#include "columnshard_schema.h"
+
+#include <ydb/core/tx/columnshard/operations/write.h>
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h>
namespace NKikimr::NColumnShard {
@@ -27,11 +29,16 @@ private:
, Status(status)
{}
- THolder<IEventBase> MakeEvent(ui64 tabletId) const {
- auto result = MakeHolder<TEvColumnShard::TEvProposeTransactionResult>(
+ std::unique_ptr<IEventBase> MakeEvent(ui64 tabletId) const {
+ if (TxInfo.TxKind == NKikimrTxColumnShard::TX_KIND_COMMIT_WRITE) {
+ auto result = NEvents::TDataEvents::TEvWriteResult::BuildCommited(TxInfo.TxId);
+ return result;
+ } else {
+ auto result = std::make_unique<TEvColumnShard::TEvProposeTransactionResult>(
tabletId, TxInfo.TxKind, TxInfo.TxId, Status);
- result->Record.SetStep(TxInfo.PlanStep);
- return result;
+ result->Record.SetStep(TxInfo.PlanStep);
+ return result;
+ }
}
};
@@ -121,6 +128,12 @@ public:
Trigger = ETriggerActivities::POST_INSERT;
break;
}
+ case NKikimrTxColumnShard::TX_KIND_COMMIT_WRITE: {
+ NOlap::TSnapshot snapshot(step, txId);
+ Y_VERIFY(Self->OperationsManager.CommitTransaction(*Self, txId, txc, snapshot));
+ Trigger = ETriggerActivities::POST_INSERT;
+ break;
+ }
default: {
Y_FAIL("Unexpected TxKind");
}
@@ -151,7 +164,7 @@ public:
Self->ProgressTxController.CompleteRunningTx(TTxController::TPlanQueueItem(res.TxInfo.PlanStep, res.TxInfo.TxId));
auto event = res.MakeEvent(Self->TabletID());
- ctx.Send(res.TxInfo.Source, event.Release(), 0, res.TxInfo.Cookie);
+ ctx.Send(res.TxInfo.Source, event.release(), 0, res.TxInfo.Cookie);
}
Self->ScheduleNextGC(ctx);
diff --git a/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp b/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp
index 615c5ea9c16..e713d87e0a1 100644
--- a/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp
+++ b/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp
@@ -195,7 +195,7 @@ bool TTxProposeTransaction::Execute(TTransactionContext& txc, const TActorContex
}
}
- const auto& txInfo = Self->ProgressTxController.RegisterTxWithDeadline(txId, txKind, txBody, Ev->Get()->GetSource(), Ev->Cookie, txc);
+ const auto& txInfo = Self->ProgressTxController.RegisterTxWithDeadline(txId, txKind, txBody, Ev->Get()->GetSource(), Ev->Cookie, txc);
minStep = txInfo.MinStep;
maxStep = txInfo.MaxStep;
diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp
index c0156436e9e..c61206b037d 100644
--- a/ydb/core/tx/columnshard/columnshard__write.cpp
+++ b/ydb/core/tx/columnshard/columnshard__write.cpp
@@ -4,6 +4,8 @@
#include "blob_cache.h"
#include <ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h>
+#include <ydb/core/tx/columnshard/operations/write.h>
+#include <ydb/core/tx/columnshard/operations/write_data.h>
namespace NKikimr::NColumnShard {
@@ -51,14 +53,21 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
NKikimrTxColumnShard::TLogicalMetadata meta;
Y_VERIFY(meta.ParseFromString(blobData.GetLogicalMeta()));
- ui32 status = NKikimrTxColumnShard::EResultStatus::SUCCESS;
const auto& logoBlobId = blobData.GetBlobId();
Y_VERIFY(logoBlobId.IsValid());
Y_VERIFY(Self->TablesManager.IsReadyForWrite(writeMeta.GetTableId()));
+
+ TWriteOperation::TPtr operation;
if (writeMeta.HasLongTxId()) {
Y_VERIFY(writeMeta.GetMetaShard() == 0);
writeId = (ui64)Self->GetLongTxWrite(db, writeMeta.GetLongTxIdUnsafe(), writeMeta.GetWritePartId());
+ } else {
+ operation = Self->OperationsManager.GetOperation((TWriteId)writeMeta.GetWriteId());
+ if (operation) {
+ Y_VERIFY(operation->GetStatus() == EOperationStatus::Started);
+ writeId = (ui64)Self->BuildNextWriteId(txc);
+ }
}
ui64 writeUnixTime = meta.GetDirtyWriteTimeSeconds();
@@ -102,18 +111,23 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
Self->IncCounter(COUNTER_WRITE_SUCCESS);
Self->BlobManager->SaveBlobBatch((std::move(PutBlobResult->Get()->GetPutResultPtr()->ReleaseBlobBatch())), blobManagerDb);
+
+ if (operation) {
+ operation->OnWriteFinish(txc, writeId);
+ auto txInfo = Self->ProgressTxController.RegisterTxWithDeadline(operation->GetTxId(), NKikimrTxColumnShard::TX_KIND_COMMIT_WRITE, "", writeMeta.GetSource(), 0, txc);
+ Y_UNUSED(txInfo);
+ }
} else {
LOG_S_DEBUG(TxPrefix() << "duplicate writeId " << writeId << TxSuffix());
+ Self->IncCounter(COUNTER_WRITE_DUPLICATE);
}
- // Return EResultStatus::SUCCESS for dups
- Self->IncCounter(COUNTER_WRITE_DUPLICATE);
-
- if (status != NKikimrTxColumnShard::EResultStatus::SUCCESS) {
- Self->IncCounter(COUNTER_WRITE_FAIL);
+ if (operation) {
+ NEvents::TDataEvents::TCoordinatorInfo tInfo = Self->ProgressTxController.GetCoordinatorInfo(operation->GetTxId());
+ Result = NEvents::TDataEvents::TEvWriteResult::BuildPrepared(operation->GetTxId(), tInfo);
+ } else {
+ Result = std::make_unique<TEvColumnShard::TEvWriteResult>(Self->TabletID(), writeMeta, writeId, NKikimrTxColumnShard::EResultStatus::SUCCESS);
}
-
- Result = std::make_unique<TEvColumnShard::TEvWriteResult>(Self->TabletID(), writeMeta, writeId, status);
return true;
}
@@ -123,7 +137,7 @@ void TTxWrite::Complete(const TActorContext& ctx) {
ctx.Send(PutBlobResult->Get()->GetWriteMeta().GetSource(), Result.release());
}
-void TColumnShard::OverloadWriteFail(const EOverloadStatus& overloadReason, const NEvWrite::TWriteData& writeData, const TActorContext& ctx) {
+void TColumnShard::OverloadWriteFail(const EOverloadStatus& overloadReason, const NEvWrite::TWriteData& writeData, std::unique_ptr<NActors::IEventBase>&& event, const TActorContext& ctx) {
IncCounter(COUNTER_WRITE_FAIL);
switch (overloadReason) {
case EOverloadStatus::Disk:
@@ -149,8 +163,7 @@ void TColumnShard::OverloadWriteFail(const EOverloadStatus& overloadReason, cons
<< " overload reason: [" << overloadReason << "]"
<< " at tablet " << TabletID());
- auto result = std::make_unique<TEvColumnShard::TEvWriteResult>(TabletID(), writeData.GetWriteMeta(), NKikimrTxColumnShard::EResultStatus::OVERLOADED);
- ctx.Send(writeData.GetWriteMeta().GetSource(), result.release());
+ ctx.Send(writeData.GetWriteMeta().GetSource(), event.release());
}
TColumnShard::EOverloadStatus TColumnShard::CheckOverloaded(const ui64 tableId) const {
@@ -191,8 +204,14 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteBlobsResult::TPtr& ev, const TActo
errCode = NKikimrTxColumnShard::EResultStatus::STORAGE_ERROR;
}
- auto result = std::make_unique<TEvColumnShard::TEvWriteResult>(TabletID(), writeMeta, errCode);
- ctx.Send(writeMeta.GetSource(), result.release());
+ auto operation = OperationsManager.GetOperation((TWriteId)writeMeta.GetWriteId());
+ if (operation) {
+ auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(operation->GetTxId(), NKikimrDataEvents::TEvWriteResult::ERROR, "put data fails");
+ ctx.Send(writeMeta.GetSource(), result.release());
+ } else {
+ auto result = std::make_unique<TEvColumnShard::TEvWriteResult>(TabletID(), writeMeta, errCode);
+ ctx.Send(writeMeta.GetSource(), result.release());
+ }
return;
}
@@ -223,27 +242,32 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
writeMeta.SetWritePartId(record.GetWritePartId());
}
- auto arrowData = std::make_shared<NEvWrite::TArrowData>();
- if (!arrowData->ParseFromProto(record)) {
- LOG_S_ERROR("Write (fail) " << record.GetData().size() << " bytes into pathId " << writeMeta.GetTableId()
+ if (!TablesManager.IsReadyForWrite(tableId)) {
+ LOG_S_NOTICE("Write (fail) into pathId:" << writeMeta.GetTableId() << (TablesManager.HasPrimaryIndex()? "": " no index")
<< " at tablet " << TabletID());
IncCounter(COUNTER_WRITE_FAIL);
+
auto result = std::make_unique<TEvColumnShard::TEvWriteResult>(TabletID(), writeMeta, NKikimrTxColumnShard::EResultStatus::ERROR);
ctx.Send(ev->Get()->GetSource(), result.release());
return;
}
- NEvWrite::TWriteData writeData(writeMeta, arrowData);
- auto overloadStatus = CheckOverloaded(tableId);
- if (!TablesManager.IsReadyForWrite(tableId)) {
- LOG_S_NOTICE("Write (fail) into pathId:" << writeMeta.GetTableId() << (TablesManager.HasPrimaryIndex()? "": " no index")
+ const auto& snapshotSchema = TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetLastSchema();
+ auto arrowData = std::make_shared<TProtoArrowData>(snapshotSchema);
+ if (!arrowData->ParseFromProto(record)) {
+ LOG_S_ERROR("Write (fail) " << record.GetData().size() << " bytes into pathId " << writeMeta.GetTableId()
<< " at tablet " << TabletID());
IncCounter(COUNTER_WRITE_FAIL);
-
auto result = std::make_unique<TEvColumnShard::TEvWriteResult>(TabletID(), writeMeta, NKikimrTxColumnShard::EResultStatus::ERROR);
ctx.Send(ev->Get()->GetSource(), result.release());
- } else if (overloadStatus != EOverloadStatus::None) {
- OverloadWriteFail(overloadStatus, writeData, ctx);
+ return;
+ }
+
+ NEvWrite::TWriteData writeData(writeMeta, arrowData);
+ auto overloadStatus = CheckOverloaded(tableId);
+ if (overloadStatus != EOverloadStatus::None) {
+ std::unique_ptr<NActors::IEventBase> result = std::make_unique<TEvColumnShard::TEvWriteResult>(TabletID(), writeData.GetWriteMeta(), NKikimrTxColumnShard::EResultStatus::OVERLOADED);
+ OverloadWriteFail(overloadStatus, writeData, std::move(result), ctx);
} else {
if (writeMeta.HasLongTxId()) {
// TODO: multiple blobs in one longTx ({longTxId, dedupId} -> writeId)
@@ -261,17 +285,50 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
}
}
- auto wg = WritesMonitor.RegisterWrite(writeData.GetSize());
+ WritesMonitor.RegisterWrite(writeData.GetSize());
LOG_S_DEBUG("Write (blob) " << writeData.GetSize() << " bytes into pathId " << writeMeta.GetTableId()
<< (writeMeta.GetWriteId()? (" writeId " + ToString(writeMeta.GetWriteId())).c_str() : " ")
<< WritesMonitor.DebugString()
<< " at tablet " << TabletID());
- const auto& snapshotSchema = TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetLastSchema();
auto writeController = std::make_shared<NOlap::TIndexedWriteController>(ctx.SelfID, writeData, snapshotSchema);
ctx.Register(CreateWriteActor(TabletID(), writeController, BlobManager->StartBlobBatch(), TInstant::Max(), Settings.MaxSmallBlobSize));
}
}
+void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActorContext& ctx) {
+ const auto& record = ev->Get()->Record;
+ const auto& tableId = record.GetTableId().GetTableId();
+ const ui64 txId = ev->Get()->GetTxId();
+ const auto source = ev->Sender;
+
+ if (!TablesManager.IsReadyForWrite(tableId)) {
+ IncCounter(COUNTER_WRITE_FAIL);
+ auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(txId, NKikimrDataEvents::TEvWriteResult::ERROR, "table not writable");
+ ctx.Send(source, result.release());
+ return;
+ }
+
+ auto arrowData = std::make_shared<TArrowData>(TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetLastSchema());
+ if (!arrowData->Parse(record.GetReplace(), TPayloadHelper<NEvents::TDataEvents::TEvWrite>(*ev->Get()))) {
+ IncCounter(COUNTER_WRITE_FAIL);
+ auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(txId, NKikimrDataEvents::TEvWriteResult::BAD_REQUEST, "parsing data error");
+ ctx.Send(source, result.release());
+ }
+
+ auto overloadStatus = CheckOverloaded(tableId);
+ if (overloadStatus != EOverloadStatus::None) {
+ NEvWrite::TWriteData writeData(NEvWrite::TWriteMeta(0, tableId, source), arrowData);
+ std::unique_ptr<NActors::IEventBase> result = NEvents::TDataEvents::TEvWriteResult::BuildError(txId, NKikimrDataEvents::TEvWriteResult::OVERLOADED, "overload data error");
+ OverloadWriteFail(overloadStatus, writeData, std::move(result), ctx);
+ return;
+ }
+
+ auto wg = WritesMonitor.RegisterWrite(arrowData->GetData().size());
+ auto operation = OperationsManager.RegisterOperation(txId);
+ Y_VERIFY(operation);
+ operation->Start(*this, tableId, arrowData, source, ctx);
+}
+
}
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp
index e33fd497c91..ff0be97c5eb 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.cpp
+++ b/ydb/core/tx/columnshard/columnshard_impl.cpp
@@ -294,16 +294,25 @@ TWriteId TColumnShard::GetLongTxWrite(NIceDb::TNiceDb& db, const NLongTxService:
it = LongTxWritesByUniqueId.emplace(longTxId.UniqueId, TPartsForLTXShard()).first;
}
- TWriteId writeId = ++LastWriteId;
+ TWriteId writeId = BuildNextWriteId(db);
auto& lw = LongTxWrites[writeId];
lw.WriteId = (ui64)writeId;
lw.WritePartId = partId;
lw.LongTxId = longTxId;
it->second[partId] = &lw;
- Schema::SaveSpecialValue(db, Schema::EValueIds::LastWriteId, (ui64)writeId);
Schema::SaveLongTxWrite(db, writeId, partId, longTxId);
+ return writeId;
+}
+
+TWriteId TColumnShard::BuildNextWriteId(NTabletFlatExecutor::TTransactionContext& txc) {
+ NIceDb::TNiceDb db(txc.DB);
+ return BuildNextWriteId(db);
+}
+TWriteId TColumnShard::BuildNextWriteId(NIceDb::TNiceDb& db) {
+ TWriteId writeId = ++LastWriteId;
+ Schema::SaveSpecialValue(db, Schema::EValueIds::LastWriteId, (ui64)writeId);
return writeId;
}
@@ -362,6 +371,12 @@ bool TColumnShard::AbortTx(const ui64 txId, const NKikimrTxColumnShard::ETransac
}
break;
}
+ case NKikimrTxColumnShard::TX_KIND_COMMIT_WRITE: {
+ if (!OperationsManager.AbortTransaction(*this, txId, txc)) {
+ return false;
+ }
+ break;
+ }
default: {
Y_FAIL("Unsupported TxKind");
}
diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h
index b5ffb7b7d8b..9c2b425dcf9 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.h
+++ b/ydb/core/tx/columnshard/columnshard_impl.h
@@ -15,6 +15,7 @@
#include <ydb/core/tablet/tablet_pipe_client_cache.h>
#include <ydb/core/tablet_flat/flat_cxx_database.h>
#include <ydb/core/tablet_flat/tablet_flat_executed.h>
+#include <ydb/core/tx/ev_write/events.h>
#include <ydb/core/tx/tiering/common.h>
#include <ydb/core/tx/tiering/manager.h>
#include <ydb/core/tx/time_cast/time_cast.h>
@@ -33,6 +34,8 @@ class TInsertColumnEngineChanges;
namespace NKikimr::NColumnShard {
+class TOperationsManager;
+
extern bool gAllowLogBatchingDefaultValue;
IActor* CreateIndexingActor(ui64 tabletId, const TActorId& parent, const TIndexationCounters& counters);
@@ -146,6 +149,9 @@ class TColumnShard
friend class TTxController;
+ friend class TOperationsManager;
+ friend class TWriteOperation;
+
class TTxProgressTx;
class TTxProposeCancel;
@@ -174,6 +180,7 @@ class TColumnShard
void Handle(TEvPrivate::TEvForget::TPtr& ev, const TActorContext& ctx);
void Handle(TEvBlobStorage::TEvCollectGarbageResult::TPtr& ev, const TActorContext& ctx);
void Handle(NMetadata::NProvider::TEvRefreshSubscriberData::TPtr& ev);
+ void Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActorContext& ctx);
ITransaction* CreateTxInitSchema();
ITransaction* CreateTxRunGc();
@@ -235,7 +242,7 @@ public:
};
private:
- void OverloadWriteFail(const EOverloadStatus& overloadReason, const NEvWrite::TWriteData& writeData, const TActorContext& ctx);
+ void OverloadWriteFail(const EOverloadStatus& overloadReason, const NEvWrite::TWriteData& writeData, std::unique_ptr<NActors::IEventBase>&& event, const TActorContext& ctx);
EOverloadStatus CheckOverloaded(const ui64 tableId) const;
protected:
@@ -285,6 +292,7 @@ protected:
HFunc(TEvPrivate::TEvScanStats, Handle);
HFunc(TEvPrivate::TEvReadFinished, Handle);
HFunc(TEvPrivate::TEvPeriodicWakeup, Handle);
+ HFunc(NEvents::TDataEvents::TEvWrite, Handle);
default:
if (!HandleDefaultEvents(ev, SelfId())) {
LOG_S_WARN("TColumnShard.StateWork at " << TabletID()
@@ -297,6 +305,7 @@ protected:
private:
TTxController ProgressTxController;
+ TOperationsManager OperationsManager;
struct TAlterMeta {
NKikimrTxColumnShard::TSchemaTxBody Body;
@@ -543,6 +552,9 @@ private:
bool LoadTx(const ui64 txId, const NKikimrTxColumnShard::ETransactionKind& txKind, const TString& txBody);
void TryAbortWrites(NIceDb::TNiceDb& db, NOlap::TDbWrapper& dbTable, THashSet<TWriteId>&& writesToAbort);
+ TWriteId BuildNextWriteId(NTabletFlatExecutor::TTransactionContext& txc);
+ TWriteId BuildNextWriteId(NIceDb::TNiceDb& db);
+
void EnqueueProgressTx(const TActorContext& ctx);
void EnqueueBackgroundActivities(bool periodic = false, TBackgroundActivity activity = TBackgroundActivity::All());
void CleanForgottenBlobs(const TActorContext& ctx, const THashSet<TUnifiedBlobId>& allowList = {});
diff --git a/ydb/core/tx/columnshard/columnshard_schema.h b/ydb/core/tx/columnshard/columnshard_schema.h
index 8ba55007c15..d119e4a3bf6 100644
--- a/ydb/core/tx/columnshard/columnshard_schema.h
+++ b/ydb/core/tx/columnshard/columnshard_schema.h
@@ -8,6 +8,7 @@
#include <ydb/core/tx/columnshard/engines/insert_table/insert_table.h>
#include <ydb/core/tx/columnshard/engines/granules_table.h>
#include <ydb/core/tx/columnshard/engines/columns_table.h>
+#include <ydb/core/tx/columnshard/operations/write.h>
#include <type_traits>
@@ -34,6 +35,7 @@ struct Schema : NIceDb::Schema {
GranulesTableId,
ColumnsTableId,
CountersTableId,
+ OperationsTableId,
};
enum class EValueIds : ui32 {
@@ -246,6 +248,17 @@ struct Schema : NIceDb::Schema {
using TColumns = TableColumns<Index, Counter, ValueUI64>;
};
+ struct Operations : NIceDb::Schema::Table<OperationsTableId> {
+ struct WriteId : Column<1, NScheme::NTypeIds::Uint64> {};
+ struct TxId : Column<2, NScheme::NTypeIds::Uint64> {};
+ struct Status : Column<3, NScheme::NTypeIds::Uint32> {};
+ struct CreatedAt : Column<4, NScheme::NTypeIds::Uint64> {};
+ struct GlobalWriteId : Column<5, NScheme::NTypeIds::Uint64> {};
+
+ using TKey = TableKey<WriteId>;
+ using TColumns = TableColumns<TxId, WriteId, Status, CreatedAt, GlobalWriteId>;
+ };
+
using TTables = SchemaTables<
Value,
TxInfo,
@@ -263,7 +276,8 @@ struct Schema : NIceDb::Schema {
IndexColumns,
IndexCounters,
SmallBlobs,
- OneToOneEvictedBlobs
+ OneToOneEvictedBlobs,
+ Operations
>;
//
@@ -648,6 +662,21 @@ struct Schema : NIceDb::Schema {
}
return true;
}
+
+ // Operations
+ static void Operations_Write(NIceDb::TNiceDb& db, const TWriteOperation& operation) {
+ db.Table<Operations>().Key((ui64)operation.GetWriteId()).Update(
+ NIceDb::TUpdate<Operations::Status>((ui32)operation.GetStatus()),
+ NIceDb::TUpdate<Operations::CreatedAt>(operation.GetCreatedAt().Seconds()),
+ NIceDb::TUpdate<Operations::GlobalWriteId>(operation.GetGlobalWriteId()),
+ NIceDb::TUpdate<Operations::TxId>(operation.GetTxId())
+ );
+ }
+
+ static void Operations_Erase(NIceDb::TNiceDb& db, const TWriteId writeId) {
+ db.Table<Operations>().Key((ui64)writeId).Delete();
+ }
+
};
}
diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.cpp b/ydb/core/tx/columnshard/columnshard_ut_common.cpp
index 345dbe751a3..02f8f5df731 100644
--- a/ydb/core/tx/columnshard/columnshard_ut_common.cpp
+++ b/ydb/core/tx/columnshard/columnshard_ut_common.cpp
@@ -75,6 +75,22 @@ void PlanSchemaTx(TTestBasicRuntime& runtime, TActorId& sender, NOlap::TSnapshot
UNIT_ASSERT_EQUAL(res.GetStatus(), NKikimrTxColumnShard::SUCCESS);
}
+void PlanWriteTx(TTestBasicRuntime& runtime, TActorId& sender, NOlap::TSnapshot snap, bool waitResult) {
+ auto plan = std::make_unique<TEvTxProcessing::TEvPlanStep>(snap.GetPlanStep(), 0, TTestTxConfig::TxTablet0);
+ auto tx = plan->Record.AddTransactions();
+ tx->SetTxId(snap.GetTxId());
+ ActorIdToProto(sender, tx->MutableAckTo());
+
+ ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, plan.release());
+ UNIT_ASSERT(runtime.GrabEdgeEvent<TEvTxProcessing::TEvPlanStepAck>(sender));
+ if (waitResult) {
+ auto ev = runtime.GrabEdgeEvent<NEvents::TDataEvents::TEvWriteResult>(sender);
+ const auto& res = ev->Get()->Record;
+ UNIT_ASSERT_EQUAL(res.GetTxId(), snap.GetTxId());
+ UNIT_ASSERT_EQUAL(res.GetStatus(), NKikimrDataEvents::TEvWriteResult::COMPLETED);
+ }
+}
+
bool WriteData(TTestBasicRuntime& runtime, TActorId& sender, ui64 metaShard, ui64 writeId, ui64 tableId,
const TString& data, std::shared_ptr<arrow::Schema> schema, bool waitResult) {
const TString dedupId = ToString(writeId);
diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.h b/ydb/core/tx/columnshard/columnshard_ut_common.h
index 6f994b27dc2..264fc89dee9 100644
--- a/ydb/core/tx/columnshard/columnshard_ut_common.h
+++ b/ydb/core/tx/columnshard/columnshard_ut_common.h
@@ -411,6 +411,7 @@ struct TTestSchema {
bool ProposeSchemaTx(TTestBasicRuntime& runtime, TActorId& sender, const TString& txBody, NOlap::TSnapshot snap);
void ProvideTieringSnapshot(TTestBasicRuntime& runtime, TActorId& sender, NMetadata::NFetcher::ISnapshot::TPtr snapshot);
void PlanSchemaTx(TTestBasicRuntime& runtime, TActorId& sender, NOlap::TSnapshot snap);
+void PlanWriteTx(TTestBasicRuntime& runtime, TActorId& sender, NOlap::TSnapshot snap, bool waitResult = true);
bool WriteData(TTestBasicRuntime& runtime, TActorId& sender, ui64 metaShard, ui64 writeId, ui64 tableId,
const TString& data, std::shared_ptr<arrow::Schema> schema = {}, bool waitResult = true);
std::optional<ui64> WriteData(TTestBasicRuntime& runtime, TActorId& sender, const NLongTxService::TLongTxId& longTxId,
diff --git a/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h b/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h
index 7f39d3c376c..a485cca6978 100644
--- a/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h
+++ b/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h
@@ -39,6 +39,7 @@ public:
virtual const std::shared_ptr<arrow::Schema>& GetSchema() const = 0;
virtual const TIndexInfo& GetIndexInfo() const = 0;
virtual const TSnapshot& GetSnapshot() const = 0;
+ virtual ui32 GetColumnsCount() const = 0;
std::shared_ptr<arrow::RecordBatch> NormalizeBatch(const ISnapshotSchema& dataSchema, const std::shared_ptr<arrow::RecordBatch> batch) const;
std::shared_ptr<arrow::RecordBatch> PrepareForInsert(const TString& data, const std::shared_ptr<arrow::Schema>& dataSchema, TString& strError) const;
diff --git a/ydb/core/tx/columnshard/engines/scheme/filtered_scheme.cpp b/ydb/core/tx/columnshard/engines/scheme/filtered_scheme.cpp
index c635623fb09..31b81351385 100644
--- a/ydb/core/tx/columnshard/engines/scheme/filtered_scheme.cpp
+++ b/ydb/core/tx/columnshard/engines/scheme/filtered_scheme.cpp
@@ -59,4 +59,8 @@ const TSnapshot& TFilteredSnapshotSchema::GetSnapshot() const {
return OriginalSnapshot->GetSnapshot();
}
+ui32 TFilteredSnapshotSchema::GetColumnsCount() const {
+ return Schema->num_fields();
+}
+
}
diff --git a/ydb/core/tx/columnshard/engines/scheme/filtered_scheme.h b/ydb/core/tx/columnshard/engines/scheme/filtered_scheme.h
index b1187db6d1b..00e754e587f 100644
--- a/ydb/core/tx/columnshard/engines/scheme/filtered_scheme.h
+++ b/ydb/core/tx/columnshard/engines/scheme/filtered_scheme.h
@@ -22,6 +22,7 @@ public:
const std::shared_ptr<arrow::Schema>& GetSchema() const override;
const TIndexInfo& GetIndexInfo() const override;
const TSnapshot& GetSnapshot() const override;
+ ui32 GetColumnsCount() const override;
};
}
diff --git a/ydb/core/tx/columnshard/engines/scheme/snapshot_scheme.cpp b/ydb/core/tx/columnshard/engines/scheme/snapshot_scheme.cpp
index 69f2d4e9a12..52d4b7d3281 100644
--- a/ydb/core/tx/columnshard/engines/scheme/snapshot_scheme.cpp
+++ b/ydb/core/tx/columnshard/engines/scheme/snapshot_scheme.cpp
@@ -42,4 +42,8 @@ const TSnapshot& TSnapshotSchema::GetSnapshot() const {
return Snapshot;
}
+ui32 TSnapshotSchema::GetColumnsCount() const {
+ return Schema->num_fields();
+}
+
}
diff --git a/ydb/core/tx/columnshard/engines/scheme/snapshot_scheme.h b/ydb/core/tx/columnshard/engines/scheme/snapshot_scheme.h
index 20603eb983b..b72819a3119 100644
--- a/ydb/core/tx/columnshard/engines/scheme/snapshot_scheme.h
+++ b/ydb/core/tx/columnshard/engines/scheme/snapshot_scheme.h
@@ -22,6 +22,7 @@ public:
const std::shared_ptr<arrow::Schema>& GetSchema() const override;
const TIndexInfo& GetIndexInfo() const override;
const TSnapshot& GetSnapshot() const override;
+ ui32 GetColumnsCount() const override;
};
}
diff --git a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp
index e91d06c1db2..20170d772e1 100644
--- a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp
+++ b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp
@@ -25,14 +25,13 @@ IBlobConstructor::EStatus TIndexedWriteController::TBlobConstructor::BuildNext()
const ui64 writeId = writeMeta.GetWriteId();
// Heavy operations inside. We cannot run them in tablet event handler.
- TString strError;
{
NColumnShard::TCpuGuard guard(Owner.ResourceUsage);
- Batch = SnapshotSchema->PrepareForInsert(Owner.WriteData.GetData().GetData(), Owner.WriteData.GetData().GetArrowSchema(), strError);
+ Batch = Owner.WriteData.GetData().GetArrowBatch();
}
if (!Batch) {
- AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "ev_write_bad_data")("write_id", writeId)("table_id", tableId)("error", strError);
+ AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "ev_write_bad_data")("write_id", writeId)("table_id", tableId);
return EStatus::Error;
}
diff --git a/ydb/core/tx/columnshard/operations/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/operations/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 00000000000..48f4f0d3050
--- /dev/null
+++ b/ydb/core/tx/columnshard/operations/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,21 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(tx-columnshard-operations)
+target_link_libraries(tx-columnshard-operations PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ ydb-core-protos
+ core-tx-ev_write
+ ydb-services-metadata
+)
+target_sources(tx-columnshard-operations PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/operations/write.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/operations/write_data.cpp
+)
diff --git a/ydb/core/tx/columnshard/operations/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/operations/CMakeLists.linux-aarch64.txt
new file mode 100644
index 00000000000..4dba1ab769b
--- /dev/null
+++ b/ydb/core/tx/columnshard/operations/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,22 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(tx-columnshard-operations)
+target_link_libraries(tx-columnshard-operations PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ ydb-core-protos
+ core-tx-ev_write
+ ydb-services-metadata
+)
+target_sources(tx-columnshard-operations PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/operations/write.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/operations/write_data.cpp
+)
diff --git a/ydb/core/tx/columnshard/operations/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/operations/CMakeLists.linux-x86_64.txt
new file mode 100644
index 00000000000..4dba1ab769b
--- /dev/null
+++ b/ydb/core/tx/columnshard/operations/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,22 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(tx-columnshard-operations)
+target_link_libraries(tx-columnshard-operations PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ ydb-core-protos
+ core-tx-ev_write
+ ydb-services-metadata
+)
+target_sources(tx-columnshard-operations PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/operations/write.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/operations/write_data.cpp
+)
diff --git a/ydb/core/tx/columnshard/operations/CMakeLists.txt b/ydb/core/tx/columnshard/operations/CMakeLists.txt
new file mode 100644
index 00000000000..f8b31df0c11
--- /dev/null
+++ b/ydb/core/tx/columnshard/operations/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/core/tx/columnshard/operations/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/operations/CMakeLists.windows-x86_64.txt
new file mode 100644
index 00000000000..48f4f0d3050
--- /dev/null
+++ b/ydb/core/tx/columnshard/operations/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,21 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(tx-columnshard-operations)
+target_link_libraries(tx-columnshard-operations PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ ydb-core-protos
+ core-tx-ev_write
+ ydb-services-metadata
+)
+target_sources(tx-columnshard-operations PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/operations/write.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/operations/write_data.cpp
+)
diff --git a/ydb/core/tx/columnshard/operations/write.cpp b/ydb/core/tx/columnshard/operations/write.cpp
new file mode 100644
index 00000000000..b7d01e4b55a
--- /dev/null
+++ b/ydb/core/tx/columnshard/operations/write.cpp
@@ -0,0 +1,180 @@
+#include "write.h"
+
+#include <ydb/core/tx/columnshard/columnshard_schema.h>
+#include <ydb/core/tx/columnshard/blob_manager_db.h>
+#include <ydb/core/tx/columnshard/columnshard_impl.h>
+#include <ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h>
+
+#include <ydb/core/tablet_flat/tablet_flat_executor.h>
+
+
+namespace NKikimr::NColumnShard {
+
+ TWriteOperation::TWriteOperation(const TWriteId writeId, const ui64 txId, const EOperationStatus& status, const TInstant createdAt, const ui64 globalWriteId)
+ : Status(status)
+ , CreatedAt(createdAt)
+ , WriteId(writeId)
+ , TxId(txId)
+ , GlobalWriteId(globalWriteId)
+ {
+ }
+
+ void TWriteOperation::Start(TColumnShard& owner, const ui64 tableId, const NEvWrite::IDataContainer::TPtr& data, const NActors::TActorId& source, const TActorContext& ctx) {
+ Y_VERIFY(Status == EOperationStatus::Draft);
+
+ NEvWrite::TWriteMeta writeMeta((ui64)WriteId, tableId, source);
+ const auto& snapshotSchema = owner.TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetLastSchema();
+ auto writeController = std::make_shared<NOlap::TIndexedWriteController>(ctx.SelfID, NEvWrite::TWriteData(writeMeta, data), snapshotSchema);
+ ctx.Register(CreateWriteActor(owner.TabletID(), writeController, owner.BlobManager->StartBlobBatch(), TInstant::Max(), owner.Settings.MaxSmallBlobSize));
+ Status = EOperationStatus::Started;
+ }
+
+ void TWriteOperation::Commit(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc, const NOlap::TSnapshot& snapshot) const {
+ Y_VERIFY(Status == EOperationStatus::Prepared);
+
+ TBlobGroupSelector dsGroupSelector(owner.Info());
+ NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector);
+
+ owner.BatchCache.Commit(WriteId);
+
+ auto pathExists = [&](ui64 pathId) {
+ return owner.TablesManager.HasTable(pathId);
+ };
+
+ auto counters = owner.InsertTable->Commit(dbTable, snapshot.GetPlanStep(), snapshot.GetTxId(), 0, { WriteId },
+ pathExists);
+
+ owner.IncCounter(COUNTER_BLOBS_COMMITTED, counters.Rows);
+ owner.IncCounter(COUNTER_BYTES_COMMITTED, counters.Bytes);
+ owner.IncCounter(COUNTER_RAW_BYTES_COMMITTED, counters.RawBytes);
+ owner.UpdateInsertTableCounters();
+ }
+
+ void TWriteOperation::OnWriteFinish(NTabletFlatExecutor::TTransactionContext& txc, const ui64 globalWriteId) {
+ Y_VERIFY(Status == EOperationStatus::Started);
+ Status = EOperationStatus::Prepared;
+ GlobalWriteId = globalWriteId;
+ NIceDb::TNiceDb db(txc.DB);
+ Schema::Operations_Write(db, *this);
+ }
+
+ void TWriteOperation::Abort(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) const {
+ Y_VERIFY(Status == EOperationStatus::Prepared);
+ owner.BatchCache.EraseInserted(WriteId);
+
+ TBlobGroupSelector dsGroupSelector(owner.Info());
+ NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector);
+
+ owner.InsertTable->Abort(dbTable, 0, {WriteId});
+
+ TBlobManagerDb blobManagerDb(txc.DB);
+ auto allAborted = owner.InsertTable->GetAborted();
+ for (auto& [abortedWriteId, abortedData] : allAborted) {
+ owner.InsertTable->EraseAborted(dbTable, abortedData);
+ owner.BlobManager->DeleteBlob(abortedData.BlobId, blobManagerDb);
+ }
+ }
+
+ bool TOperationsManager::Init(NTabletFlatExecutor::TTransactionContext& txc) {
+ NIceDb::TNiceDb db(txc.DB);
+ auto rowset = db.Table<Schema::Operations>().Select();
+ if (!rowset.IsReady()) {
+ return false;
+ }
+
+ while (!rowset.EndOfSet()) {
+ const TWriteId writeId = (TWriteId) rowset.GetValue<Schema::Operations::WriteId>();
+ const ui64 createdAtSec = rowset.GetValue<Schema::Operations::CreatedAt>();
+ const ui64 txId = rowset.GetValue<Schema::Operations::TxId>();
+ const ui64 globalWriteId = rowset.GetValue<Schema::Operations::GlobalWriteId>();
+ const EOperationStatus status = (EOperationStatus) rowset.GetValue<Schema::Operations::Status>();
+
+ auto operation = std::make_shared<TWriteOperation>(writeId, txId, status, TInstant::Seconds(createdAtSec), globalWriteId);
+
+ Y_VERIFY(operation->GetStatus() != EOperationStatus::Draft);
+
+ auto [_, isOk] = Operations.emplace(operation->GetWriteId(), operation);
+ if (!isOk) {
+ AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "duplicated_operation")("operation", *operation);
+ return false;
+ }
+ Transactions[txId].push_back(operation->GetWriteId());
+ LastWriteId = std::max(LastWriteId, operation->GetWriteId());
+ if (!rowset.Next()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ bool TOperationsManager::CommitTransaction(TColumnShard& owner, const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc, const NOlap::TSnapshot& snapshot) {
+ TLogContextGuard gLogging(NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_SCAN)("tx_id", txId)("event", "transaction_commit_fails"));
+ auto tIt = Transactions.find(txId);
+ if (tIt == Transactions.end()) {
+ AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("details", "skip_unknown_transaction");
+ return true;
+ }
+
+ TVector<TWriteOperation::TPtr> commited;
+ for (auto&& opId : tIt->second) {
+ auto opPtr = Operations.FindPtr(opId);
+ (*opPtr)->Commit(owner, txc, snapshot);
+ commited.emplace_back(*opPtr);
+ }
+
+ Transactions.erase(txId);
+ for (auto&& op: commited) {
+ RemoveOperation(op, txc);
+ }
+ return true;
+ }
+
+ bool TOperationsManager::AbortTransaction(TColumnShard& owner, const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc) {
+ TLogContextGuard gLogging(NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_SCAN)("tx_id", txId)("event", "transaction_abort_fails"));
+ auto tIt = Transactions.find(txId);
+ if (tIt == Transactions.end()) {
+ AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("details", "unknown_transaction");
+ return true;
+ }
+
+ TVector<TWriteOperation::TPtr> aborted;
+ for (auto&& opId : tIt->second) {
+ auto opPtr = Operations.FindPtr(opId);
+ (*opPtr)->Abort(owner, txc);
+ aborted.emplace_back(*opPtr);
+ }
+
+ Transactions.erase(txId);
+ for (auto&& op: aborted) {
+ RemoveOperation(op, txc);
+ }
+ return true;
+ }
+
+ TWriteOperation::TPtr TOperationsManager::GetOperation(const TWriteId writeId) const {
+ auto it = Operations.find(writeId);
+ if (it == Operations.end()) {
+ return nullptr;
+ }
+ return it->second;
+ }
+
+ void TOperationsManager::RemoveOperation(const TWriteOperation::TPtr& op, NTabletFlatExecutor::TTransactionContext& txc) {
+ NIceDb::TNiceDb db(txc.DB);
+ Operations.erase(op->GetWriteId());
+ Schema::Operations_Erase(db, op->GetWriteId());
+ }
+
+ TWriteId TOperationsManager::BuildNextWriteId() {
+ return ++LastWriteId;
+ }
+
+ TWriteOperation::TPtr TOperationsManager::RegisterOperation(const ui64 txId) {
+ auto writeId = BuildNextWriteId();
+ auto operation = std::make_shared<TWriteOperation>(writeId, txId, EOperationStatus::Draft, AppData()->TimeProvider->Now());
+ Y_VERIFY(Operations.emplace(operation->GetWriteId(), operation).second);
+
+ Transactions[operation->GetTxId()].push_back(operation->GetWriteId());
+ return operation;
+ }
+}
diff --git a/ydb/core/tx/columnshard/operations/write.h b/ydb/core/tx/columnshard/operations/write.h
new file mode 100644
index 00000000000..d50b4cfcce6
--- /dev/null
+++ b/ydb/core/tx/columnshard/operations/write.h
@@ -0,0 +1,74 @@
+#pragma once
+
+#include <ydb/core/tx/ev_write/write_data.h>
+#include <ydb/core/tx/columnshard/common/snapshot.h>
+#include <ydb/core/tx/columnshard/engines/defs.h>
+
+#include <ydb/core/tablet_flat/flat_cxx_database.h>
+#include <ydb/library/accessor/accessor.h>
+
+#include <util/generic/map.h>
+#include <tuple>
+
+
+namespace NKikimr::NTabletFlatExecutor {
+ class TTransactionContext;
+}
+
+namespace NKikimr::NColumnShard {
+
+ class TColumnShard;
+
+ using TWriteId = NOlap::TWriteId;
+
+ enum class EOperationStatus : ui32 {
+ Draft = 1,
+ Started = 2,
+ Prepared = 3
+ };
+
+ class TWriteOperation {
+ YDB_READONLY(EOperationStatus, Status, EOperationStatus::Draft);
+ YDB_READONLY_DEF(TInstant, CreatedAt);
+ YDB_READONLY_DEF(TWriteId, WriteId);
+ YDB_READONLY(ui64, TxId, 0);
+ YDB_READONLY(ui64, GlobalWriteId, 0);
+
+ public:
+ using TPtr = std::shared_ptr<TWriteOperation>;
+
+ TWriteOperation(const TWriteId writeId, const ui64 txId, const EOperationStatus& status, const TInstant createdAt, const ui64 globalWriteId = 0);
+
+ void Start(TColumnShard& owner, const ui64 tableId, const NEvWrite::IDataContainer::TPtr& data, const NActors::TActorId& source, const TActorContext& ctx);
+ void OnWriteFinish(NTabletFlatExecutor::TTransactionContext& txc, const ui64 globalWriteId);
+ void Commit(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc, const NOlap::TSnapshot& snapshot) const;
+ void Abort(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) const;
+
+ void Out(IOutputStream& out) const {
+ out << "write_id=" << (ui64) WriteId << ";tx_id=" << TxId;
+ }
+ };
+
+ class TOperationsManager {
+ TMap<ui64, TVector<TWriteId>> Transactions;
+ TMap<TWriteId, TWriteOperation::TPtr> Operations;
+ TWriteId LastWriteId = TWriteId(0);
+
+ public:
+ bool Init(NTabletFlatExecutor::TTransactionContext& txc);
+
+ TWriteOperation::TPtr GetOperation(const TWriteId writeId) const;
+ bool CommitTransaction(TColumnShard& owner, const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc, const NOlap::TSnapshot& snapshot);
+ bool AbortTransaction(TColumnShard& owner, const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc);
+
+ TWriteOperation::TPtr RegisterOperation(const ui64 txId);
+ private:
+ TWriteId BuildNextWriteId();
+ void RemoveOperation(const TWriteOperation::TPtr& op, NTabletFlatExecutor::TTransactionContext& txc);
+ };
+}
+
+template <>
+inline void Out<NKikimr::NColumnShard::TWriteOperation>(IOutputStream& o, const NKikimr::NColumnShard::TWriteOperation& x) {
+ return x.Out(o);
+}
diff --git a/ydb/core/tx/columnshard/operations/write_data.cpp b/ydb/core/tx/columnshard/operations/write_data.cpp
new file mode 100644
index 00000000000..38e386f6ccc
--- /dev/null
+++ b/ydb/core/tx/columnshard/operations/write_data.cpp
@@ -0,0 +1,55 @@
+#include "write_data.h"
+
+#include <ydb/core/tx/columnshard/defs.h>
+
+
+namespace NKikimr::NColumnShard {
+
+void TArrowData::Serialize(NKikimrDataEvents::TOperationData& proto) const {
+ Y_FAIL("Not implemented");
+ Y_UNUSED(proto);
+}
+
+bool TArrowData::Parse(const NKikimrDataEvents::TOperationData& proto, const IPayloadData& payload) {
+ IncomingData = payload.GetDataFromPayload(proto.GetArrowData().GetPayloadIndex());
+
+ std::vector<ui32> columns;
+ for (auto&& columnId : proto.GetColumnIds()) {
+ columns.emplace_back(columnId);
+ }
+ BatchSchema = std::make_shared<NOlap::TFilteredSnapshotSchema>(IndexSchema, columns);
+ return BatchSchema->GetColumnsCount() == columns.size() && !IncomingData.empty() && IncomingData.size() <= NColumnShard::TLimits::GetMaxBlobSize();
+}
+
+std::shared_ptr<arrow::RecordBatch> TArrowData::GetArrowBatch() const {
+ TString err;
+ return IndexSchema->PrepareForInsert(IncomingData, BatchSchema->GetSchema(), err);
+}
+
+void TProtoArrowData::Serialize(NKikimrDataEvents::TOperationData& proto) const {
+ Y_FAIL("Not implemented");
+ Y_UNUSED(proto);
+}
+
+bool TProtoArrowData::ParseFromProto(const NKikimrTxColumnShard::TEvWrite& proto) {
+ IncomingData = proto.GetData();
+ if (proto.HasMeta()) {
+ const auto& incomingDataScheme = proto.GetMeta().GetSchema();
+ if (incomingDataScheme.empty() || proto.GetMeta().GetFormat() != NKikimrTxColumnShard::FORMAT_ARROW) {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "invalid_data_format");
+ return false;
+ }
+ ArrowSchema = NArrow::DeserializeSchema(incomingDataScheme);
+ if (!ArrowSchema) {
+ return false;
+ }
+ }
+ return !IncomingData.empty() && IncomingData.size() <= NColumnShard::TLimits::GetMaxBlobSize();
+}
+
+std::shared_ptr<arrow::RecordBatch> TProtoArrowData::GetArrowBatch() const {
+ TString err;
+ return IndexSchema->PrepareForInsert(IncomingData, ArrowSchema, err);
+}
+
+}
diff --git a/ydb/core/tx/columnshard/operations/write_data.h b/ydb/core/tx/columnshard/operations/write_data.h
new file mode 100644
index 00000000000..b626c7358d6
--- /dev/null
+++ b/ydb/core/tx/columnshard/operations/write_data.h
@@ -0,0 +1,80 @@
+#pragma once
+
+#include <ydb/core/tx/ev_write/write_data.h>
+#include <ydb/core/tx/columnshard/common/snapshot.h>
+#include <ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h>
+#include <ydb/core/tx/columnshard/engines/scheme/filtered_scheme.h>
+
+
+namespace NKikimr::NColumnShard {
+
+class IPayloadData {
+public:
+ virtual TString GetDataFromPayload(const ui64 index) const = 0;
+ virtual ui64 AddDataToPayload(TString&& blobData) = 0;
+ virtual ~IPayloadData() {}
+};
+
+
+template <class TEvent>
+class TPayloadHelper : public IPayloadData {
+ TEvent& Event;
+public:
+ TPayloadHelper(TEvent& ev)
+ : Event(ev) {}
+
+ TString GetDataFromPayload(const ui64 index) const override {
+ TRope rope = Event.GetPayload(index);
+ TString data = TString::Uninitialized(rope.GetSize());
+ rope.Begin().ExtractPlainDataAndAdvance(data.Detach(), data.size());
+ return data;
+ }
+
+ ui64 AddDataToPayload(TString&& blobData) override {
+ TRope rope;
+ rope.Insert(rope.End(), TRope(blobData));
+ return Event.AddPayload(std::move(rope));
+ }
+};
+
+class TArrowData : public NEvWrite::IDataContainer {
+public:
+ TArrowData(const NOlap::ISnapshotSchema::TPtr& schema)
+ : IndexSchema(schema)
+ {}
+
+ const TString& GetData() const override {
+ return IncomingData;
+ }
+
+ bool Parse(const NKikimrDataEvents::TOperationData& proto, const IPayloadData& payload);
+ std::shared_ptr<arrow::RecordBatch> GetArrowBatch() const override;
+ void Serialize(NKikimrDataEvents::TOperationData& proto) const override;
+
+private:
+ NOlap::ISnapshotSchema::TPtr IndexSchema;
+ NOlap::ISnapshotSchema::TPtr BatchSchema;
+ TString IncomingData;
+};
+
+class TProtoArrowData : public NEvWrite::IDataContainer {
+public:
+ TProtoArrowData(const NOlap::ISnapshotSchema::TPtr& schema)
+ : IndexSchema(schema)
+ {}
+
+ const TString& GetData() const override {
+ return IncomingData;
+ }
+
+ bool ParseFromProto(const NKikimrTxColumnShard::TEvWrite& proto);
+ std::shared_ptr<arrow::RecordBatch> GetArrowBatch() const override;
+ void Serialize(NKikimrDataEvents::TOperationData& proto) const override;
+
+private:
+ NOlap::ISnapshotSchema::TPtr IndexSchema;
+ std::shared_ptr<arrow::Schema> ArrowSchema;
+ TString IncomingData;
+};
+
+}
diff --git a/ydb/core/tx/columnshard/operations/ya.make b/ydb/core/tx/columnshard/operations/ya.make
new file mode 100644
index 00000000000..cc7fd4d19cc
--- /dev/null
+++ b/ydb/core/tx/columnshard/operations/ya.make
@@ -0,0 +1,14 @@
+LIBRARY()
+
+SRCS(
+ write.cpp
+ write_data.cpp
+)
+
+PEERDIR(
+ ydb/core/protos
+ ydb/core/tx/ev_write
+ ydb/services/metadata
+)
+
+END()
diff --git a/ydb/core/tx/columnshard/tx_controller.cpp b/ydb/core/tx/columnshard/tx_controller.cpp
index 8e902861caa..9ef34cffaf7 100644
--- a/ydb/core/tx/columnshard/tx_controller.cpp
+++ b/ydb/core/tx/columnshard/tx_controller.cpp
@@ -176,6 +176,15 @@ const TTxController::TBasicTxInfo* TTxController::GetTxInfo(const ui64 txId) con
return BasicTxInfo.FindPtr(txId);
}
+NEvents::TDataEvents::TCoordinatorInfo TTxController::GetCoordinatorInfo(const ui64 txId) const {
+ auto txInfo = BasicTxInfo.FindPtr(txId);
+ Y_VERIFY(txInfo);
+ if (Owner.ProcessingParams) {
+ return NEvents::TDataEvents::TCoordinatorInfo(Owner.TabletID(), txInfo->MinStep, txInfo->MaxStep, Owner.ProcessingParams->GetCoordinators());
+ }
+ return NEvents::TDataEvents::TCoordinatorInfo(Owner.TabletID(), txInfo->MinStep, txInfo->MaxStep, {});
+}
+
size_t TTxController::CleanExpiredTxs(NTabletFlatExecutor::TTransactionContext& txc) {
size_t removedCount = 0;
if (HaveOutdatedTxs()) {
diff --git a/ydb/core/tx/columnshard/tx_controller.h b/ydb/core/tx/columnshard/tx_controller.h
index f7c30397ad8..563f1f364e7 100644
--- a/ydb/core/tx/columnshard/tx_controller.h
+++ b/ydb/core/tx/columnshard/tx_controller.h
@@ -1,7 +1,9 @@
#pragma once
#include "columnshard_schema.h"
+
#include <ydb/core/tablet_flat/tablet_flat_executed.h>
+#include <ydb/core/tx/ev_write/events.h>
namespace NKikimr::NColumnShard {
@@ -68,6 +70,7 @@ public:
std::optional<TPlanQueueItem> GetPlannedTx() const;
TPlanQueueItem GetFrontTx() const;
const TBasicTxInfo* GetTxInfo(const ui64 txId) const;
+ NEvents::TDataEvents::TCoordinatorInfo GetCoordinatorInfo(const ui64 txId) const;
size_t CleanExpiredTxs(NTabletFlatExecutor::TTransactionContext& txc);
diff --git a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
index fb7f456f368..325d3c015a7 100644
--- a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
+++ b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
@@ -8,6 +8,8 @@
#include <ydb/core/tx/columnshard/engines/changes/with_appended.h>
#include <ydb/core/tx/columnshard/engines/changes/compaction.h>
#include <ydb/core/tx/columnshard/engines/changes/cleanup.h>
+#include <ydb/core/tx/columnshard/operations/write_data.h>
+#include <library/cpp/actors/protos/unittests.pb.h>
namespace NKikimr {
@@ -1837,6 +1839,68 @@ void TestReadAggregate(const std::vector<std::pair<TString, TTypeInfo>>& ydbSche
}
+Y_UNIT_TEST_SUITE(EvWrite) {
+ class TArrowData : public NKikimr::NEvWrite::IDataContainer {
+ std::vector<std::pair<TString, TTypeInfo>> YdbSchema;
+ ui64 Index;
+
+ public:
+ TArrowData(const std::vector<std::pair<TString, TTypeInfo>>& ydbSchema, const ui64 idx)
+ : YdbSchema(ydbSchema)
+ , Index(idx)
+ {
+ }
+
+ void Serialize(NKikimrDataEvents::TOperationData& proto) const override {
+ for (ui32 i = 0; i < YdbSchema.size(); ++i) {
+ proto.AddColumnIds(i + 1);
+ }
+ proto.MutableArrowData()->SetPayloadIndex(Index);
+ }
+
+ std::shared_ptr<arrow::RecordBatch> GetArrowBatch() const override {
+ return nullptr;
+ }
+
+ const TString& GetData() const override {
+ return Default<TString>();
+ }
+ };
+
+ Y_UNIT_TEST(WriteInTransaction) {
+ TTestBasicRuntime runtime;
+ TTester::Setup(runtime);
+
+ TActorId sender = runtime.AllocateEdgeActor();
+ CreateTestBootstrapper(runtime, CreateTestTabletInfo(TTestTxConfig::TxTablet0, TTabletTypes::ColumnShard), &CreateColumnShard);
+
+ TDispatchOptions options;
+ options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvBoot));
+ runtime.DispatchEvents(options);
+
+ const ui64 tableId = 1;
+ const TestTableDescription table;
+ SetupSchema(runtime, sender, tableId, table);
+
+ const ui64 txId = 111;
+
+ const std::vector<std::pair<TString, TTypeInfo>>& ydbSchema = table.Schema;
+ TString blobData = MakeTestBlob({0, 100}, ydbSchema);
+
+ auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(txId);
+ auto dataPtr = std::make_shared<TArrowData>(ydbSchema, TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData)));
+ evWrite->AddReplaceOp(tableId, dataPtr);
+
+ ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, evWrite.release());
+
+ TAutoPtr<NActors::IEventHandle> handle;
+ auto event = runtime.GrabEdgeEvent<NKikimr::NEvents::TDataEvents::TEvWriteResult>(handle);
+ UNIT_ASSERT(event);
+ UNIT_ASSERT_EQUAL(event->Record.GetStatus(), NKikimrDataEvents::TEvWriteResult::PREPARED);
+ PlanWriteTx(runtime, sender, NOlap::TSnapshot(11, txId));
+ }
+}
+
Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
Y_UNIT_TEST(Write) {
TestTableDescription table;
diff --git a/ydb/core/tx/columnshard/ya.make b/ydb/core/tx/columnshard/ya.make
index 14b1e978ca0..c38c0ed6ea9 100644
--- a/ydb/core/tx/columnshard/ya.make
+++ b/ydb/core/tx/columnshard/ya.make
@@ -57,6 +57,7 @@ PEERDIR(
ydb/core/tx/columnshard/counters
ydb/core/tx/columnshard/common
ydb/core/tx/columnshard/splitter
+ ydb/core/tx/columnshard/operations
ydb/core/tx/tiering
ydb/core/tx/conveyor/usage
ydb/core/tx/long_tx_service/public
diff --git a/ydb/core/tx/ev_write/events.h b/ydb/core/tx/ev_write/events.h
new file mode 100644
index 00000000000..196fdb6fab3
--- /dev/null
+++ b/ydb/core/tx/ev_write/events.h
@@ -0,0 +1,91 @@
+#pragma once
+
+#include "write_data.h"
+
+#include <ydb/core/protos/ev_write.pb.h>
+#include <ydb/core/base/events.h>
+
+#include <library/cpp/actors/core/event_pb.h>
+#include <library/cpp/actors/core/log.h>
+
+
+namespace NKikimr::NEvents {
+
+struct TDataEvents {
+
+ class TCoordinatorInfo {
+ YDB_READONLY_DEF(ui64, TabletId);
+ YDB_READONLY(ui64, MinStep, 0);
+ YDB_READONLY(ui64, MaxStep, 0);
+ YDB_READONLY_DEF(google::protobuf::RepeatedField<ui64>, DomainCoordinators);
+
+ public:
+ TCoordinatorInfo(const ui64 tabletId, const ui64 minStep, const ui64 maxStep, const google::protobuf::RepeatedField<ui64>& coordinators)
+ : TabletId(tabletId)
+ , MinStep(minStep)
+ , MaxStep(maxStep)
+ , DomainCoordinators(coordinators) {}
+ };
+
+ enum EEventType {
+ EvWrite = EventSpaceBegin(TKikimrEvents::ES_DATA_OPERATIONS),
+ EvWriteResult,
+ EvEnd
+ };
+
+ static_assert(EEventType::EvEnd < EventSpaceEnd(TKikimrEvents::ES_DATA_OPERATIONS), "expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_DATA_OPERATIONS)");
+
+ struct TEvWrite : public NActors::TEventPB<TEvWrite, NKikimrDataEvents::TEvWrite, TDataEvents::EvWrite> {
+ TEvWrite() = default;
+
+ TEvWrite(ui64 txId) {
+ Record.SetTxId(txId);
+ }
+
+ void AddReplaceOp(const ui64 tableId, const NEvWrite::IDataContainer::TPtr& data) {
+ Record.MutableTableId()->SetTableId(tableId);
+ Y_VERIFY(data);
+ data->Serialize(*Record.MutableReplace());
+ }
+
+ ui64 GetTxId() const {
+ Y_VERIFY(Record.HasTxId());
+ return Record.GetTxId();
+ }
+ };
+
+ struct TEvWriteResult : public NActors::TEventPB<TEvWriteResult, NKikimrDataEvents::TEvWriteResult, TDataEvents::EvWriteResult> {
+
+ TEvWriteResult() = default;
+
+ static std::unique_ptr<TEvWriteResult> BuildError(const ui64 txId, const NKikimrDataEvents::TEvWriteResult::EOperationStatus& status, const TString& errorMsg) {
+ auto result = std::make_unique<TEvWriteResult>();
+ AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "ev_write_error")("status", NKikimrDataEvents::TEvWriteResult::EOperationStatus_Name(status))
+ ("details", errorMsg)("tx_id", txId);
+ result->Record.SetTxId(txId);
+ result->Record.SetStatus(status);
+ result->Record.MutableIssueMessage()->set_message(errorMsg);
+ return result;
+ }
+
+ static std::unique_ptr<TEvWriteResult> BuildCommited(const ui64 txId) {
+ auto result = std::make_unique<TEvWriteResult>();
+ result->Record.SetTxId(txId);
+ result->Record.SetStatus(NKikimrDataEvents::TEvWriteResult::COMPLETED);
+ return result;
+ }
+
+ static std::unique_ptr<TEvWriteResult> BuildPrepared(const ui64 txId, const TCoordinatorInfo& transactionInfo) {
+ auto result = std::make_unique<TEvWriteResult>();
+ result->Record.SetTxId(txId);
+ result->Record.SetStatus(NKikimrDataEvents::TEvWriteResult::PREPARED);
+
+ result->Record.SetMinStep(transactionInfo.GetMinStep());
+ result->Record.SetMaxStep(transactionInfo.GetMaxStep());
+ result->Record.MutableDomainCoordinators()->CopyFrom(transactionInfo.GetDomainCoordinators());
+ return result;
+ }
+ };
+};
+
+}
diff --git a/ydb/core/tx/ev_write/write_data.cpp b/ydb/core/tx/ev_write/write_data.cpp
index 21311e70d3d..4150cbd28ad 100644
--- a/ydb/core/tx/ev_write/write_data.cpp
+++ b/ydb/core/tx/ev_write/write_data.cpp
@@ -10,32 +10,4 @@ TWriteData::TWriteData(const TWriteMeta& writeMeta, IDataContainer::TPtr data)
, Data(data)
{}
-bool TArrowData::ParseFromProto(const NKikimrTxColumnShard::TEvWrite& proto) {
- IncomingData = proto.GetData();
- if (proto.HasMeta()) {
- const auto& incomingDataScheme = proto.GetMeta().GetSchema();
- if (incomingDataScheme.empty() || proto.GetMeta().GetFormat() != NKikimrTxColumnShard::FORMAT_ARROW) {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "invalid_data_format");
- return false;
- }
- ArrowSchema = NArrow::DeserializeSchema(incomingDataScheme);
- if (!ArrowSchema) {
- return false;
- }
- }
- return !IncomingData.empty() && IncomingData.size() <= NColumnShard::TLimits::GetMaxBlobSize();
-}
-
-std::shared_ptr<arrow::RecordBatch> TArrowData::GetArrowBatch() const {
- auto batch = NArrow::DeserializeBatch(IncomingData, ArrowSchema);
- if (!batch) {
- return nullptr;
- }
-
- if (batch->num_rows() == 0) {
- return nullptr;
- }
- return batch;
-}
-
}
diff --git a/ydb/core/tx/ev_write/write_data.h b/ydb/core/tx/ev_write/write_data.h
index 5a0e562c278..8f499ad39d5 100644
--- a/ydb/core/tx/ev_write/write_data.h
+++ b/ydb/core/tx/ev_write/write_data.h
@@ -4,6 +4,7 @@
#include <ydb/core/formats/arrow/arrow_helpers.h>
#include <ydb/core/protos/tx_columnshard.pb.h>
+#include <ydb/core/protos/ev_write.pb.h>
namespace NKikimr::NEvWrite {
@@ -12,30 +13,11 @@ class IDataContainer {
public:
using TPtr = std::shared_ptr<IDataContainer>;
virtual ~IDataContainer() {}
-
- virtual std::shared_ptr<arrow::Schema> GetArrowSchema() const = 0;
+ virtual void Serialize(NKikimrDataEvents::TOperationData& proto) const = 0;
+ virtual std::shared_ptr<arrow::RecordBatch> GetArrowBatch() const = 0;
virtual const TString& GetData() const = 0;
};
-class TArrowData : public IDataContainer {
-public:
- std::shared_ptr<arrow::Schema> GetArrowSchema() const override {
- return ArrowSchema;
- }
-
- const TString& GetData() const override {
- return IncomingData;
- }
-
- bool ParseFromProto(const NKikimrTxColumnShard::TEvWrite& proto);
-
- std::shared_ptr<arrow::RecordBatch> GetArrowBatch() const;
-
-private:
- TString IncomingData;
- std::shared_ptr<arrow::Schema> ArrowSchema;
-};
-
class TWriteMeta {
YDB_ACCESSOR(ui64, WriteId, 0);
YDB_READONLY(ui64, TableId, 0);