aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-09-11 11:06:51 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-09-11 11:26:48 +0300
commit7e312058f4354a5d71b978f48e15e784021e125c (patch)
tree1a01829614e7b530266f66586f4c7aeaf75c0773
parent4f1de009d5b8057a7b1e010724626a0364c368fc (diff)
downloadydb-7e312058f4354a5d71b978f48e15e784021e125c.tar.gz
KIKIMR-19215: start separation blobs operations and metadata operations
-rw-r--r--ydb/core/protos/counters_columnshard.proto1
-rw-r--r--ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt3
-rw-r--r--ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt3
-rw-r--r--ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt3
-rw-r--r--ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt3
-rw-r--r--ydb/core/tx/columnshard/blob_manager.cpp24
-rw-r--r--ydb/core/tx/columnshard/blob_manager.h20
-rw-r--r--ydb/core/tx/columnshard/blob_manager_txs.cpp3
-rw-r--r--ydb/core/tx/columnshard/blobs_action/CMakeLists.darwin-x86_64.txt23
-rw-r--r--ydb/core/tx/columnshard/blobs_action/CMakeLists.linux-aarch64.txt24
-rw-r--r--ydb/core/tx/columnshard/blobs_action/CMakeLists.linux-x86_64.txt24
-rw-r--r--ydb/core/tx/columnshard/blobs_action/CMakeLists.txt17
-rw-r--r--ydb/core/tx/columnshard/blobs_action/CMakeLists.windows-x86_64.txt23
-rw-r--r--ydb/core/tx/columnshard/blobs_action/abstract.cpp5
-rw-r--r--ydb/core/tx/columnshard/blobs_action/abstract.h68
-rw-r--r--ydb/core/tx/columnshard/blobs_action/blob_manager_db.cpp (renamed from ydb/core/tx/columnshard/blob_manager_db.cpp)3
-rw-r--r--ydb/core/tx/columnshard/blobs_action/blob_manager_db.h (renamed from ydb/core/tx/columnshard/blob_manager_db.h)5
-rw-r--r--ydb/core/tx/columnshard/blobs_action/bs.cpp16
-rw-r--r--ydb/core/tx/columnshard/blobs_action/bs.h57
-rw-r--r--ydb/core/tx/columnshard/blobs_action/ya.make16
-rw-r--r--ydb/core/tx/columnshard/columnshard__export.cpp2
-rw-r--r--ydb/core/tx/columnshard/columnshard__forget.cpp2
-rw-r--r--ydb/core/tx/columnshard/columnshard__init.cpp2
-rw-r--r--ydb/core/tx/columnshard/columnshard__read_blob_ranges.cpp2
-rw-r--r--ydb/core/tx/columnshard/columnshard__write.cpp60
-rw-r--r--ydb/core/tx/columnshard/columnshard__write_index.cpp26
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.cpp9
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.h10
-rw-r--r--ydb/core/tx/columnshard/columnshard_private_events.h27
-rw-r--r--ydb/core/tx/columnshard/engines/changes/abstract/abstract.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/changes/cleanup.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/changes/indexation.cpp10
-rw-r--r--ydb/core/tx/columnshard/engines/changes/ttl.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.h2
-rw-r--r--ydb/core/tx/columnshard/engines/writer/CMakeLists.darwin-x86_64.txt3
-rw-r--r--ydb/core/tx/columnshard/engines/writer/CMakeLists.linux-aarch64.txt3
-rw-r--r--ydb/core/tx/columnshard/engines/writer/CMakeLists.linux-x86_64.txt3
-rw-r--r--ydb/core/tx/columnshard/engines/writer/CMakeLists.windows-x86_64.txt3
-rw-r--r--ydb/core/tx/columnshard/engines/writer/blob_constructor.cpp5
-rw-r--r--ydb/core/tx/columnshard/engines/writer/blob_constructor.h32
-rw-r--r--ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.cpp35
-rw-r--r--ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.h31
-rw-r--r--ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp71
-rw-r--r--ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h35
-rw-r--r--ydb/core/tx/columnshard/engines/writer/put_status.cpp5
-rw-r--r--ydb/core/tx/columnshard/engines/writer/write_controller.cpp5
-rw-r--r--ydb/core/tx/columnshard/engines/writer/write_controller.h16
-rw-r--r--ydb/core/tx/columnshard/engines/writer/ya.make5
-rw-r--r--ydb/core/tx/columnshard/operations/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/operations/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/tx/columnshard/operations/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/operations/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/operations/slice_builder.cpp53
-rw-r--r--ydb/core/tx/columnshard/operations/slice_builder.h33
-rw-r--r--ydb/core/tx/columnshard/operations/write.cpp10
-rw-r--r--ydb/core/tx/columnshard/operations/ya.make1
-rw-r--r--ydb/core/tx/columnshard/tables_manager.cpp2
-rw-r--r--ydb/core/tx/columnshard/write_actor.cpp55
-rw-r--r--ydb/core/tx/columnshard/ya.make2
59 files changed, 629 insertions, 282 deletions
diff --git a/ydb/core/protos/counters_columnshard.proto b/ydb/core/protos/counters_columnshard.proto
index 804ac16744..4bb74f05b8 100644
--- a/ydb/core/protos/counters_columnshard.proto
+++ b/ydb/core/protos/counters_columnshard.proto
@@ -174,4 +174,5 @@ enum ETxTypes {
TXTYPE_READ_BLOB_RANGES = 12 [(TxTypeOpts) = {Name: "TxReadBlobRanges"}];
TXTYPE_EXPORT = 13 [(TxTypeOpts) = {Name: "TxExport"}];
TXTYPE_FORGET = 14 [(TxTypeOpts) = {Name: "TxForget"}];
+ TXTYPE_WRITE_DRAFT = 15 [(TxTypeOpts) = {Name: "TxWriteDraft"}];
}
diff --git a/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt
index c0614e205c..46da987a57 100644
--- a/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt
@@ -6,6 +6,7 @@
# original buildsystem will not be accepted.
+add_subdirectory(blobs_action)
add_subdirectory(blobs_reader)
add_subdirectory(common)
add_subdirectory(counters)
@@ -53,6 +54,7 @@ target_link_libraries(core-tx-columnshard PUBLIC
tx-columnshard-splitter
tx-columnshard-operations
tx-columnshard-blobs_reader
+ tx-columnshard-blobs_action
core-tx-tiering
tx-conveyor-usage
tx-long_tx_service-public
@@ -66,7 +68,6 @@ target_sources(core-tx-columnshard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blob.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blob_cache.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blob_manager.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blob_manager_db.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blob_manager_txs.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard__export.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard__forget.cpp
diff --git a/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt
index 67c260955f..4cf6358090 100644
--- a/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt
@@ -6,6 +6,7 @@
# original buildsystem will not be accepted.
+add_subdirectory(blobs_action)
add_subdirectory(blobs_reader)
add_subdirectory(common)
add_subdirectory(counters)
@@ -54,6 +55,7 @@ target_link_libraries(core-tx-columnshard PUBLIC
tx-columnshard-splitter
tx-columnshard-operations
tx-columnshard-blobs_reader
+ tx-columnshard-blobs_action
core-tx-tiering
tx-conveyor-usage
tx-long_tx_service-public
@@ -67,7 +69,6 @@ target_sources(core-tx-columnshard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blob.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blob_cache.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blob_manager.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blob_manager_db.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blob_manager_txs.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard__export.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard__forget.cpp
diff --git a/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt
index 67c260955f..4cf6358090 100644
--- a/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt
@@ -6,6 +6,7 @@
# original buildsystem will not be accepted.
+add_subdirectory(blobs_action)
add_subdirectory(blobs_reader)
add_subdirectory(common)
add_subdirectory(counters)
@@ -54,6 +55,7 @@ target_link_libraries(core-tx-columnshard PUBLIC
tx-columnshard-splitter
tx-columnshard-operations
tx-columnshard-blobs_reader
+ tx-columnshard-blobs_action
core-tx-tiering
tx-conveyor-usage
tx-long_tx_service-public
@@ -67,7 +69,6 @@ target_sources(core-tx-columnshard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blob.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blob_cache.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blob_manager.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blob_manager_db.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blob_manager_txs.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard__export.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard__forget.cpp
diff --git a/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt
index c0614e205c..46da987a57 100644
--- a/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt
@@ -6,6 +6,7 @@
# original buildsystem will not be accepted.
+add_subdirectory(blobs_action)
add_subdirectory(blobs_reader)
add_subdirectory(common)
add_subdirectory(counters)
@@ -53,6 +54,7 @@ target_link_libraries(core-tx-columnshard PUBLIC
tx-columnshard-splitter
tx-columnshard-operations
tx-columnshard-blobs_reader
+ tx-columnshard-blobs_action
core-tx-tiering
tx-conveyor-usage
tx-long_tx_service-public
@@ -66,7 +68,6 @@ target_sources(core-tx-columnshard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blob.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blob_cache.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blob_manager.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blob_manager_db.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blob_manager_txs.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard__export.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard__forget.cpp
diff --git a/ydb/core/tx/columnshard/blob_manager.cpp b/ydb/core/tx/columnshard/blob_manager.cpp
index 056fb528d7..1bda48bfe1 100644
--- a/ydb/core/tx/columnshard/blob_manager.cpp
+++ b/ydb/core/tx/columnshard/blob_manager.cpp
@@ -1,8 +1,8 @@
#include "defs.h"
#include "columnshard_impl.h"
#include "blob_manager.h"
-#include "blob_manager_db.h"
#include "blob_cache.h"
+#include <ydb/core/tx/columnshard/blobs_action/blob_manager_db.h>
#include <ydb/core/base/blobstorage.h>
@@ -91,22 +91,16 @@ void TBlobBatch::SendWriteRequest(const TActorContext& ctx, ui32 groupId, const
SendPutToGroup(ctx, groupId, BatchInfo->TabletInfo.Get(), std::move(put), cookie);
}
-TUnifiedBlobId TBlobBatch::SendWriteBlobRequest(const TString& blobData, TInstant deadline, const TActorContext& ctx) {
- Y_VERIFY(blobData.size() <= TLimits::GetBlobSizeLimit(), "Blob %" PRISZT" size exceeds the limit %" PRIu64,
- blobData.size(), TLimits::GetBlobSizeLimit());
-
- TUnifiedBlobId blobId = BatchInfo->NextBlobId(blobData.size());
- ui32 groupId = blobId.GetDsGroup();
+void TBlobBatch::SendWriteBlobRequest(const TString& blobData, const TUnifiedBlobId& blobId, TInstant deadline, const TActorContext& ctx) {
+ Y_VERIFY(blobData.size() <= TLimits::GetBlobSizeLimit(), "Blob %" PRISZT" size exceeds the limit %" PRIu64, blobData.size(), TLimits::GetBlobSizeLimit());
+ const ui32 groupId = blobId.GetDsGroup();
SendWriteRequest(ctx, groupId, blobId.GetLogoBlobId(), blobData, 0, deadline);
-
- return blobId;
}
-void TBlobBatch::OnBlobWriteResult(TEvBlobStorage::TEvPutResult::TPtr& ev) {
- TLogoBlobID blobId = ev->Get()->Id;
+void TBlobBatch::OnBlobWriteResult(const TLogoBlobID& blobId, const NKikimrProto::EReplyStatus status) {
BatchInfo->Counters.OnPutResult(blobId.BlobSize());
- Y_VERIFY(ev->Get()->Status == NKikimrProto::OK, "The caller must handle unsuccessful status");
+ Y_VERIFY(status == NKikimrProto::OK, "The caller must handle unsuccessful status");
Y_VERIFY(BatchInfo);
Y_VERIFY(BatchInfo->InFlight[blobId.Cookie()], "Blob %s is already acked!", blobId.ToString().c_str());
@@ -140,6 +134,10 @@ TUnifiedBlobId TBlobBatch::AddSmallBlob(const TString& data) {
return BatchInfo->AddSmallBlob(data);
}
+TUnifiedBlobId TBlobBatch::AllocateNextBlobId(const TString& blobData) {
+ return BatchInfo->NextBlobId(blobData.size());
+}
+
TBlobManager::TBlobManager(TIntrusivePtr<TTabletStorageInfo> tabletInfo, ui32 gen)
: TabletInfo(tabletInfo)
, CurrentGen(gen)
@@ -467,7 +465,7 @@ TBlobBatch TBlobManager::StartBlobBatch(ui32 channel) {
return TBlobBatch(std::move(batchInfo));
}
-void TBlobManager::SaveBlobBatch(TBlobBatch&& blobBatch, IBlobManagerDb& db) {
+void TBlobManager::DoSaveBlobBatch(TBlobBatch&& blobBatch, IBlobManagerDb& db) {
Y_VERIFY(blobBatch.BatchInfo);
++CountersUpdate.BatchesCommitted;
CountersUpdate.BlobsWritten += blobBatch.GetBlobCount();
diff --git a/ydb/core/tx/columnshard/blob_manager.h b/ydb/core/tx/columnshard/blob_manager.h
index 9d4852565c..24777f3a2b 100644
--- a/ydb/core/tx/columnshard/blob_manager.h
+++ b/ydb/core/tx/columnshard/blob_manager.h
@@ -40,10 +40,12 @@ public:
~TBlobBatch();
// Write new blob as a part of this batch
- TUnifiedBlobId SendWriteBlobRequest(const TString& blobData, TInstant deadline, const TActorContext& ctx);
+ void SendWriteBlobRequest(const TString& blobData, const TUnifiedBlobId& blobId, TInstant deadline, const TActorContext& ctx);
+
+ TUnifiedBlobId AllocateNextBlobId(const TString& blobData);
// Called with the result of WriteBlob request
- void OnBlobWriteResult(TEvBlobStorage::TEvPutResult::TPtr& ev);
+ void OnBlobWriteResult(const TLogoBlobID& blobId, const NKikimrProto::EReplyStatus status);
// Tells if all WriteBlob requests got corresponding results
bool AllBlobWritesCompleted() const;
@@ -65,7 +67,7 @@ class IBlobManagerDb;
class IBlobManager {
protected:
static constexpr ui32 BLOB_CHANNEL = 2;
-
+ virtual void DoSaveBlobBatch(TBlobBatch&& blobBatch, IBlobManagerDb& db) = 0;
public:
virtual ~IBlobManager() = default;
@@ -76,8 +78,13 @@ public:
// This method is called in the same transaction in which the user saves references to blobs
// in some LocalDB table. It tells the BlobManager that the blobs are becoming permanently saved.
- // NOTE: At this point all blob writes must be already acknowleged.
- virtual void SaveBlobBatch(TBlobBatch&& blobBatch, IBlobManagerDb& db) = 0;
+ // NOTE: At this point all blob writes must be already acknowledged.
+ void SaveBlobBatch(TBlobBatch&& blobBatch, IBlobManagerDb& db) {
+ if (blobBatch.GetBlobCount() == 0) {
+ return;
+ }
+ return DoSaveBlobBatch(std::move(blobBatch), db);
+ }
// Deletes the blob that was previously permanently saved
virtual void DeleteBlob(const TUnifiedBlobId& blobId, IBlobManagerDb& db) = 0;
@@ -207,6 +214,8 @@ private:
std::unordered_map<TEvictedBlob, TEvictMetadata, THash<NKikimr::NOlap::TEvictedBlob>> EvictedBlobs;
std::unordered_map<TEvictedBlob, TEvictMetadata, THash<NKikimr::NOlap::TEvictedBlob>> DroppedEvictedBlobs;
+ virtual void DoSaveBlobBatch(TBlobBatch&& blobBatch, IBlobManagerDb& db) override;
+
public:
TBlobManager(TIntrusivePtr<TTabletStorageInfo> tabletInfo, ui32 gen);
@@ -239,7 +248,6 @@ public:
// Implementation of IBlobManager interface
TBlobBatch StartBlobBatch(ui32 channel = BLOB_CHANNEL) override;
- void SaveBlobBatch(TBlobBatch&& blobBatch, IBlobManagerDb& db) override;
void DeleteBlob(const TUnifiedBlobId& blobId, IBlobManagerDb& db) override;
// Implementation of IBlobExporter
diff --git a/ydb/core/tx/columnshard/blob_manager_txs.cpp b/ydb/core/tx/columnshard/blob_manager_txs.cpp
index 8f1206d09c..f3548cff1c 100644
--- a/ydb/core/tx/columnshard/blob_manager_txs.cpp
+++ b/ydb/core/tx/columnshard/blob_manager_txs.cpp
@@ -1,7 +1,8 @@
#include "defs.h"
#include "columnshard_impl.h"
#include "blob_manager.h"
-#include "blob_manager_db.h"
+
+#include <ydb/core/tx/columnshard/blobs_action/blob_manager_db.h>
#include <ydb/core/base/blobstorage.h>
diff --git a/ydb/core/tx/columnshard/blobs_action/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/blobs_action/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 0000000000..1558e932a7
--- /dev/null
+++ b/ydb/core/tx/columnshard/blobs_action/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,23 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(tx-columnshard-blobs_action)
+target_link_libraries(tx-columnshard-blobs_action PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ ydb-core-protos
+ libs-apache-arrow
+ ydb-core-tablet_flat
+ core-tx-tiering
+)
+target_sources(tx-columnshard-blobs_action PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_action/abstract.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_action/bs.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_action/blob_manager_db.cpp
+)
diff --git a/ydb/core/tx/columnshard/blobs_action/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/blobs_action/CMakeLists.linux-aarch64.txt
new file mode 100644
index 0000000000..bed25b4c44
--- /dev/null
+++ b/ydb/core/tx/columnshard/blobs_action/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,24 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(tx-columnshard-blobs_action)
+target_link_libraries(tx-columnshard-blobs_action PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ ydb-core-protos
+ libs-apache-arrow
+ ydb-core-tablet_flat
+ core-tx-tiering
+)
+target_sources(tx-columnshard-blobs_action PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_action/abstract.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_action/bs.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_action/blob_manager_db.cpp
+)
diff --git a/ydb/core/tx/columnshard/blobs_action/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/blobs_action/CMakeLists.linux-x86_64.txt
new file mode 100644
index 0000000000..bed25b4c44
--- /dev/null
+++ b/ydb/core/tx/columnshard/blobs_action/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,24 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(tx-columnshard-blobs_action)
+target_link_libraries(tx-columnshard-blobs_action PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ ydb-core-protos
+ libs-apache-arrow
+ ydb-core-tablet_flat
+ core-tx-tiering
+)
+target_sources(tx-columnshard-blobs_action PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_action/abstract.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_action/bs.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_action/blob_manager_db.cpp
+)
diff --git a/ydb/core/tx/columnshard/blobs_action/CMakeLists.txt b/ydb/core/tx/columnshard/blobs_action/CMakeLists.txt
new file mode 100644
index 0000000000..f8b31df0c1
--- /dev/null
+++ b/ydb/core/tx/columnshard/blobs_action/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/core/tx/columnshard/blobs_action/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/blobs_action/CMakeLists.windows-x86_64.txt
new file mode 100644
index 0000000000..1558e932a7
--- /dev/null
+++ b/ydb/core/tx/columnshard/blobs_action/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,23 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(tx-columnshard-blobs_action)
+target_link_libraries(tx-columnshard-blobs_action PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ ydb-core-protos
+ libs-apache-arrow
+ ydb-core-tablet_flat
+ core-tx-tiering
+)
+target_sources(tx-columnshard-blobs_action PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_action/abstract.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_action/bs.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_action/blob_manager_db.cpp
+)
diff --git a/ydb/core/tx/columnshard/blobs_action/abstract.cpp b/ydb/core/tx/columnshard/blobs_action/abstract.cpp
new file mode 100644
index 0000000000..8b6b1715fc
--- /dev/null
+++ b/ydb/core/tx/columnshard/blobs_action/abstract.cpp
@@ -0,0 +1,5 @@
+#include "abstract.h"
+
+namespace NKikimr::NOlap {
+
+}
diff --git a/ydb/core/tx/columnshard/blobs_action/abstract.h b/ydb/core/tx/columnshard/blobs_action/abstract.h
new file mode 100644
index 0000000000..49f7409b84
--- /dev/null
+++ b/ydb/core/tx/columnshard/blobs_action/abstract.h
@@ -0,0 +1,68 @@
+#pragma once
+#include "blob_manager_db.h"
+
+#include <ydb/core/protos/base.pb.h>
+#include <ydb/core/tx/columnshard/blob.h>
+#include <ydb/core/tx/columnshard/counters/blobs_manager.h>
+
+#include <ydb/core/tablet_flat/flat_executor.h>
+#include <ydb/core/util/backoff.h>
+#include <ydb/core/protos/tx_columnshard.pb.h>
+
+#include <util/generic/string.h>
+
+namespace NKikimr::NOlap {
+
+using NOlap::TUnifiedBlobId;
+using NOlap::TBlobRange;
+using NOlap::TEvictedBlob;
+using NOlap::EEvictState;
+using NKikimrTxColumnShard::TEvictMetadata;
+
+class IBlobsAction {
+protected:
+ virtual void DoOnExecuteTxBeforeWrite(NColumnShard::TColumnShard& self, NColumnShard::TBlobManagerDb& dbBlobs) = 0;
+ virtual void DoOnCompleteTxBeforeWrite(NColumnShard::TColumnShard& self) = 0;
+
+ virtual void DoSendWriteBlobRequest(const TString& data, const TUnifiedBlobId& blobId) = 0;
+ virtual void DoOnBlobWriteResult(const TLogoBlobID& blobId, const NKikimrProto::EReplyStatus status) = 0;
+
+ virtual void DoOnExecuteTxAfterWrite(NColumnShard::TColumnShard& self, NColumnShard::TBlobManagerDb& dbBlobs) = 0;
+ virtual void DoOnCompleteTxAfterWrite(NColumnShard::TColumnShard& self) = 0;
+public:
+ virtual ~IBlobsAction() = default;
+ virtual bool IsReady() const = 0;
+
+ virtual TUnifiedBlobId AllocateNextBlobId(const TString& data) = 0;
+
+ void OnBlobWriteResult(const TLogoBlobID& blobId, const NKikimrProto::EReplyStatus status) {
+ return DoOnBlobWriteResult(blobId, status);
+ }
+
+ void OnExecuteTxBeforeWrite(NColumnShard::TColumnShard& self, NColumnShard::TBlobManagerDb& dbBlobs) {
+ return DoOnExecuteTxBeforeWrite(self, dbBlobs);
+ }
+
+ virtual ui32 GetBlobsCount() const = 0;
+ virtual ui32 GetTotalSize() const = 0;
+
+ virtual bool NeedDraftTransaction() const = 0;
+
+ void OnCompleteTxBeforeWrite(NColumnShard::TColumnShard& self) {
+ return DoOnCompleteTxBeforeWrite(self);
+ }
+
+ void OnExecuteTxAfterWrite(NColumnShard::TColumnShard& self, NColumnShard::TBlobManagerDb& dbBlobs) {
+ return DoOnExecuteTxAfterWrite(self, dbBlobs);
+ }
+
+ void OnCompleteTxAfterWrite(NColumnShard::TColumnShard& self) {
+ return DoOnCompleteTxAfterWrite(self);
+ }
+
+ void SendWriteBlobRequest(const TString& data, const TUnifiedBlobId& blobId) {
+ return DoSendWriteBlobRequest(data, blobId);
+ }
+};
+
+}
diff --git a/ydb/core/tx/columnshard/blob_manager_db.cpp b/ydb/core/tx/columnshard/blobs_action/blob_manager_db.cpp
index c7a8b90bd5..2920654f51 100644
--- a/ydb/core/tx/columnshard/blob_manager_db.cpp
+++ b/ydb/core/tx/columnshard/blobs_action/blob_manager_db.cpp
@@ -1,6 +1,5 @@
#include "blob_manager_db.h"
-#include "blob_manager.h"
-#include "columnshard_schema.h"
+#include <ydb/core/tx/columnshard/columnshard_schema.h>
namespace NKikimr::NColumnShard {
diff --git a/ydb/core/tx/columnshard/blob_manager_db.h b/ydb/core/tx/columnshard/blobs_action/blob_manager_db.h
index e840c0d336..2c807387c1 100644
--- a/ydb/core/tx/columnshard/blob_manager_db.h
+++ b/ydb/core/tx/columnshard/blobs_action/blob_manager_db.h
@@ -1,7 +1,6 @@
#pragma once
-#include "defs.h"
-
-#include "blob_manager.h"
+#include <ydb/core/tx/columnshard/defs.h>
+#include <ydb/core/tx/columnshard/blob_manager.h>
namespace NKikimr::NTable {
class TDatabase;
diff --git a/ydb/core/tx/columnshard/blobs_action/bs.cpp b/ydb/core/tx/columnshard/blobs_action/bs.cpp
new file mode 100644
index 0000000000..ac99515955
--- /dev/null
+++ b/ydb/core/tx/columnshard/blobs_action/bs.cpp
@@ -0,0 +1,16 @@
+#include "bs.h"
+#include <ydb/core/tx/columnshard/columnshard_impl.h>
+
+namespace NKikimr::NOlap {
+
+void TBSWriteAction::DoOnExecuteTxAfterWrite(NColumnShard::TColumnShard& self, NColumnShard::TBlobManagerDb& dbBlobs) {
+ ui64 blobsWritten = BlobBatch.GetBlobCount();
+ ui64 bytesWritten = BlobBatch.GetTotalSize();
+ self.IncCounter(NColumnShard::COUNTER_UPSERT_BLOBS_WRITTEN, blobsWritten);
+ self.IncCounter(NColumnShard::COUNTER_UPSERT_BYTES_WRITTEN, bytesWritten);
+// self.IncCounter(NColumnShard::COUNTER_RAW_BYTES_UPSERTED, insertedBytes);
+ self.IncCounter(NColumnShard::COUNTER_WRITE_SUCCESS);
+ self.BlobManager->SaveBlobBatch(std::move(BlobBatch), dbBlobs);
+}
+
+}
diff --git a/ydb/core/tx/columnshard/blobs_action/bs.h b/ydb/core/tx/columnshard/blobs_action/bs.h
new file mode 100644
index 0000000000..6a53117a57
--- /dev/null
+++ b/ydb/core/tx/columnshard/blobs_action/bs.h
@@ -0,0 +1,57 @@
+#pragma once
+
+#include "abstract.h"
+#include <ydb/core/tx/columnshard/blob_manager.h>
+
+namespace NKikimr::NOlap {
+
+class TBSWriteAction: public IBlobsAction {
+private:
+ NColumnShard::TBlobBatch BlobBatch;
+protected:
+ virtual void DoSendWriteBlobRequest(const TString& data, const TUnifiedBlobId& blobId) override {
+ return BlobBatch.SendWriteBlobRequest(data, blobId, TInstant::Max(), TActorContext::AsActorContext());
+ }
+
+ virtual void DoOnBlobWriteResult(const TLogoBlobID& blobId, const NKikimrProto::EReplyStatus status) override {
+ return BlobBatch.OnBlobWriteResult(blobId, status);
+ }
+
+ virtual void DoOnExecuteTxBeforeWrite(NColumnShard::TColumnShard& /*self*/, NColumnShard::TBlobManagerDb& /*dbBlobs*/) override {
+ return;
+ }
+
+ virtual void DoOnCompleteTxBeforeWrite(NColumnShard::TColumnShard& /*self*/) override {
+ return;
+ }
+
+ virtual void DoOnExecuteTxAfterWrite(NColumnShard::TColumnShard& self, NColumnShard::TBlobManagerDb& dbBlobs) override;
+ virtual void DoOnCompleteTxAfterWrite(NColumnShard::TColumnShard& /*self*/) override {
+
+ }
+public:
+ virtual ui32 GetBlobsCount() const override {
+ return BlobBatch.GetBlobCount();
+ }
+ virtual ui32 GetTotalSize() const override {
+ return BlobBatch.GetTotalSize();
+ }
+ virtual bool NeedDraftTransaction() const override {
+ return false;
+ }
+
+ virtual TUnifiedBlobId AllocateNextBlobId(const TString& data) override {
+ return BlobBatch.AllocateNextBlobId(data);
+ }
+ virtual bool IsReady() const override {
+ return BlobBatch.AllBlobWritesCompleted();
+ }
+
+ TBSWriteAction(NColumnShard::IBlobManager& blobManager)
+ : BlobBatch(blobManager.StartBlobBatch())
+ {
+
+ }
+};
+
+}
diff --git a/ydb/core/tx/columnshard/blobs_action/ya.make b/ydb/core/tx/columnshard/blobs_action/ya.make
new file mode 100644
index 0000000000..0993365a18
--- /dev/null
+++ b/ydb/core/tx/columnshard/blobs_action/ya.make
@@ -0,0 +1,16 @@
+LIBRARY()
+
+SRCS(
+ abstract.cpp
+ bs.cpp
+ blob_manager_db.cpp
+)
+
+PEERDIR(
+ ydb/core/protos
+ contrib/libs/apache/arrow
+ ydb/core/tablet_flat
+ ydb/core/tx/tiering
+)
+
+END()
diff --git a/ydb/core/tx/columnshard/columnshard__export.cpp b/ydb/core/tx/columnshard/columnshard__export.cpp
index b74cafade3..d5a1d740b1 100644
--- a/ydb/core/tx/columnshard/columnshard__export.cpp
+++ b/ydb/core/tx/columnshard/columnshard__export.cpp
@@ -1,6 +1,6 @@
#include "columnshard_impl.h"
-#include "blob_manager_db.h"
#include "columnshard_schema.h"
+#include <ydb/core/tx/columnshard/blobs_action/blob_manager_db.h>
namespace NKikimr::NColumnShard {
diff --git a/ydb/core/tx/columnshard/columnshard__forget.cpp b/ydb/core/tx/columnshard/columnshard__forget.cpp
index 4fe9466a24..9843849c7a 100644
--- a/ydb/core/tx/columnshard/columnshard__forget.cpp
+++ b/ydb/core/tx/columnshard/columnshard__forget.cpp
@@ -1,5 +1,5 @@
#include "columnshard_impl.h"
-#include "blob_manager_db.h"
+#include <ydb/core/tx/columnshard/blobs_action/blob_manager_db.h>
namespace NKikimr::NColumnShard {
diff --git a/ydb/core/tx/columnshard/columnshard__init.cpp b/ydb/core/tx/columnshard/columnshard__init.cpp
index 392d294809..60f48f2216 100644
--- a/ydb/core/tx/columnshard/columnshard__init.cpp
+++ b/ydb/core/tx/columnshard/columnshard__init.cpp
@@ -2,7 +2,7 @@
#include "columnshard_ttl.h"
#include "columnshard_private_events.h"
#include "columnshard_schema.h"
-#include "blob_manager_db.h"
+#include <ydb/core/tx/columnshard/blobs_action/blob_manager_db.h>
#include <ydb/core/tablet/tablet_exception.h>
#include <ydb/core/tx/columnshard/operations/write.h>
diff --git a/ydb/core/tx/columnshard/columnshard__read_blob_ranges.cpp b/ydb/core/tx/columnshard/columnshard__read_blob_ranges.cpp
index 08f291b55b..e5a04b48d2 100644
--- a/ydb/core/tx/columnshard/columnshard__read_blob_ranges.cpp
+++ b/ydb/core/tx/columnshard/columnshard__read_blob_ranges.cpp
@@ -1,7 +1,7 @@
#include "columnshard_impl.h"
#include "columnshard_private_events.h"
#include "columnshard_schema.h"
-#include "blob_manager_db.h"
+#include <ydb/core/tx/columnshard/blobs_action/blob_manager_db.h>
namespace NKikimr::NColumnShard {
diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp
index 30d214fb4d..0e7b6e3b0b 100644
--- a/ydb/core/tx/columnshard/columnshard__write.cpp
+++ b/ydb/core/tx/columnshard/columnshard__write.cpp
@@ -1,8 +1,11 @@
#include "columnshard_impl.h"
#include "columnshard_schema.h"
-#include "blob_manager_db.h"
#include "blob_cache.h"
+#include "blobs_action/bs.h"
+#include "operations/slice_builder.h"
+#include <ydb/core/tx/conveyor/usage/service.h>
+#include <ydb/core/tx/columnshard/blobs_action/blob_manager_db.h>
#include <ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h>
#include <ydb/core/tx/columnshard/operations/write.h>
#include <ydb/core/tx/columnshard/operations/write_data.h>
@@ -11,6 +14,31 @@ namespace NKikimr::NColumnShard {
using namespace NTabletFlatExecutor;
+class TTxWriteDraft: public TTransactionBase<TColumnShard> {
+private:
+ const IWriteController::TPtr WriteController;
+public:
+ TTxWriteDraft(TColumnShard* self, const IWriteController::TPtr writeController)
+ : TBase(self)
+ , WriteController(writeController) {
+ }
+
+ bool Execute(TTransactionContext& txc, const TActorContext& /*ctx*/) override {
+ TBlobManagerDb blobManagerDb(txc.DB);
+ for (auto&& action : WriteController->GetBlobActions()) {
+ action->OnExecuteTxBeforeWrite(*Self, blobManagerDb);
+ }
+ return true;
+ }
+ void Complete(const TActorContext& ctx) override {
+ for (auto&& action : WriteController->GetBlobActions()) {
+ action->OnCompleteTxBeforeWrite(*Self);
+ }
+ ctx.Register(NColumnShard::CreateWriteActor(Self->TabletID(), WriteController, TInstant::Max()));
+ }
+ TTxType GetTxType() const override { return TXTYPE_WRITE_DRAFT; }
+};
+
class TTxWrite : public TTransactionBase<TColumnShard> {
public:
TTxWrite(TColumnShard* self, const TEvPrivate::TEvWriteBlobsResult::TPtr& putBlobResult)
@@ -40,8 +68,6 @@ private:
};
bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const TEvPrivate::TEvWriteBlobsResult::TPutBlobData& blobData, const TWriteId writeId) {
- const TString data = blobData.GetBlobData();
-
const NKikimrTxColumnShard::TLogicalMetadata& meta = blobData.GetLogicalMeta();
const auto& blobRange = blobData.GetBlobRange();
@@ -77,7 +103,6 @@ bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const TEvPrivate::TEvWrit
// Put new data into blob cache
Y_VERIFY(blobRange.IsFullBlob());
- NBlobCache::AddRangeToCache(blobRange, blobData.GetBlobData());
Self->UpdateInsertTableCounters();
return true;
@@ -103,7 +128,6 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
}
TVector<TWriteId> writeIds;
- ui64 insertedBytes = 0;
for (auto blobData : PutBlobResult->Get()->GetBlobData()) {
auto writeId = TWriteId(writeMeta.GetWriteId());
if (operation) {
@@ -113,26 +137,16 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
writeId = Self->GetLongTxWrite(db, writeMeta.GetLongTxIdUnsafe(), writeMeta.GetWritePartId());
}
- if (InsertOneBlob(txc, blobData, writeId)) {
- insertedBytes += blobData.GetLogicalMeta().GetRawBytes();
- } else {
+ if (!InsertOneBlob(txc, blobData, writeId)) {
LOG_S_DEBUG(TxPrefix() << "duplicate writeId " << (ui64)writeId << TxSuffix());
Self->IncCounter(COUNTER_WRITE_DUPLICATE);
}
writeIds.push_back(writeId);
}
- if (insertedBytes > 0) {
- TBlobManagerDb blobManagerDb(txc.DB);
- const auto& blobBatch(PutBlobResult->Get()->GetPutResult().GetBlobBatch());
- ui64 blobsWritten = blobBatch.GetBlobCount();
- ui64 bytesWritten = blobBatch.GetTotalSize();
- Self->IncCounter(COUNTER_UPSERT_BLOBS_WRITTEN, blobsWritten);
- Self->IncCounter(COUNTER_UPSERT_BYTES_WRITTEN, bytesWritten);
- Self->IncCounter(COUNTER_RAW_BYTES_UPSERTED, insertedBytes);
- Self->IncCounter(COUNTER_WRITE_SUCCESS);
-
- Self->BlobManager->SaveBlobBatch((std::move(PutBlobResult->Get()->GetPutResultPtr()->ReleaseBlobBatch())), blobManagerDb);
+ TBlobManagerDb blobManagerDb(txc.DB);
+ for (auto&& i : PutBlobResult->Get()->GetActions()) {
+ i->OnExecuteTxAfterWrite(*Self, blobManagerDb);
}
if (operation) {
@@ -234,7 +248,10 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteBlobsResult::TPtr& ev, const TActo
Execute(new TTxWrite(this, ev), ctx);
}
+}
+void TColumnShard::Handle(TEvPrivate::TEvWriteDraft::TPtr& ev, const TActorContext& ctx) {
+ Execute(new TTxWriteDraft(this, ev->Get()->WriteController), ctx);
}
void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContext& ctx) {
@@ -304,9 +321,8 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
<< WritesMonitor.DebugString()
<< " at tablet " << TabletID());
- auto writeController = std::make_shared<NOlap::TIndexedWriteController>(ctx.SelfID, writeData);
- CSCounters.OnWritePutBlobsStart();
- ctx.Register(CreateWriteActor(TabletID(), writeController, BlobManager->StartBlobBatch(), TInstant::Max(), Settings.MaxSmallBlobSize));
+ std::shared_ptr<NConveyor::ITask> task = std::make_shared<NOlap::TBuildSlicesTask>(TabletID(), SelfId(), std::make_shared<NOlap::TBSWriteAction>(*BlobManager), writeData);
+ NConveyor::TCompServiceOperator::SendTaskToExecute(task);
}
}
diff --git a/ydb/core/tx/columnshard/columnshard__write_index.cpp b/ydb/core/tx/columnshard/columnshard__write_index.cpp
index 69360af9ce..bcc4b10592 100644
--- a/ydb/core/tx/columnshard/columnshard__write_index.cpp
+++ b/ydb/core/tx/columnshard/columnshard__write_index.cpp
@@ -1,10 +1,12 @@
#include "columnshard_impl.h"
#include "columnshard_private_events.h"
#include "columnshard_schema.h"
-#include "blob_manager_db.h"
#include "blob_cache.h"
+#include "blobs_action/bs.h"
+#include <ydb/core/tx/columnshard/blobs_action/blob_manager_db.h>
#include <ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.h>
+#include <library/cpp/actors/core/log.h>
namespace NKikimr::NColumnShard {
@@ -73,16 +75,13 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx)
NOlap::TWriteIndexContext context(txc, dbWrap);
changes->WriteIndex(*Self, context);
- if (Ev->Get()->PutResult->GetBlobBatch().GetBlobCount()) {
- Self->BlobManager->SaveBlobBatch(std::move(Ev->Get()->PutResult->ReleaseBlobBatch()), *context.BlobManagerDb);
- }
+ Ev->Get()->BlobsAction->OnExecuteTxAfterWrite(*Self, *context.BlobManagerDb);
Self->UpdateIndexCounters();
} else {
for (ui32 i = 0; i < changes->GetWritePortionsCount(); ++i) {
for (auto&& i : changes->GetWritePortionInfo(i)->GetPortionInfo().Records) {
- LOG_S_WARN(TxPrefix() << "(" << changes->TypeString() << ":" << i.BlobRange << ") blob cannot apply changes: "
- << TxSuffix());
+ LOG_S_WARN(TxPrefix() << "(" << changes->TypeString() << ":" << i.BlobRange << ") blob cannot apply changes: " << TxSuffix());
}
}
NOlap::TChangesFinishContext context("cannot write index blobs");
@@ -99,8 +98,8 @@ void TTxWriteIndex::Complete(const TActorContext& ctx) {
CompleteReady = true;
LOG_S_DEBUG(TxPrefix() << "complete" << TxSuffix());
- const ui64 blobsWritten = Ev->Get()->PutResult->GetBlobBatch().GetBlobCount();
- const ui64 bytesWritten = Ev->Get()->PutResult->GetBlobBatch().GetTotalSize();
+ const ui64 blobsWritten = Ev->Get()->BlobsAction->GetBlobsCount();
+ const ui64 bytesWritten = Ev->Get()->BlobsAction->GetTotalSize();
if (!Ev->Get()->IndexChanges->IsAborted()) {
NOlap::TWriteIndexCompleteContext context(ctx, blobsWritten, bytesWritten, Ev->Get()->Duration, TriggerActivity);
@@ -114,6 +113,7 @@ void TTxWriteIndex::Complete(const TActorContext& ctx) {
}
Self->UpdateResourceMetrics(ctx, Ev->Get()->PutResult->GetResourceUsage());
+ Ev->Get()->BlobsAction->OnCompleteTxAfterWrite(*Self);
}
void TColumnShard::Handle(TEvPrivate::TEvWriteIndex::TPtr& ev, const TActorContext& ctx) {
@@ -121,7 +121,7 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteIndex::TPtr& ev, const TActorConte
if (putStatus == NKikimrProto::UNKNOWN) {
if (IsAnyChannelYellowStop()) {
- LOG_S_ERROR("WriteIndex (out of disk space) at tablet " << TabletID());
+ ACFL_ERROR("event", "TEvWriteIndex failed")("reason", "channel yellow stop");
IncCounter(COUNTER_OUT_OF_SPACE);
ev->Get()->SetPutStatus(NKikimrProto::TRYLATER);
@@ -129,11 +129,11 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteIndex::TPtr& ev, const TActorConte
ev->Get()->IndexChanges->Abort(*this, context);
ctx.Schedule(FailActivationDelay, new TEvPrivate::TEvPeriodicWakeup(true));
} else {
- LOG_S_DEBUG("WriteIndex (" << ev->Get()->IndexChanges->GetWritePortionsCount() << " portions) at tablet " << TabletID());
+ ACFL_DEBUG("event", "TEvWriteIndex")("count", ev->Get()->IndexChanges->GetWritePortionsCount());
+ AFL_VERIFY(ev->Get()->IndexChanges->GetWritePortionsCount());
- Y_VERIFY(ev->Get()->IndexChanges->GetWritePortionsCount());
- auto writeController = std::make_shared<NOlap::TCompactedWriteController>(ctx.SelfID, ev->Release(), Settings.BlobWriteGrouppingEnabled);
- ctx.Register(CreateWriteActor(TabletID(), writeController, BlobManager->StartBlobBatch(), TInstant::Max(), Settings.MaxSmallBlobSize));
+ auto writeController = std::make_shared<NOlap::TCompactedWriteController>(ctx.SelfID, ev->Release(), Settings.BlobWriteGrouppingEnabled);
+ ctx.Register(CreateWriteActor(TabletID(), writeController, TInstant::Max()));
}
} else {
if (putStatus == NKikimrProto::OK) {
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp
index d06645841c..823992ef03 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.cpp
+++ b/ydb/core/tx/columnshard/columnshard_impl.cpp
@@ -2,6 +2,7 @@
#include "columnshard_schema.h"
#include "blobs_reader/task.h"
#include "blobs_reader/events.h"
+#include "blobs_action/bs.h"
#include "engines/changes/ttl.h"
#include <ydb/core/scheme/scheme_types_proto.h>
#include <ydb/core/tablet/tablet_counters_protobuf.h>
@@ -771,7 +772,7 @@ void TColumnShard::SetupIndexation() {
auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndex();
indexChanges->Start(*this);
auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move(actualIndexInfo), indexChanges,
- Settings.CacheDataAfterIndexing);
+ Settings.CacheDataAfterIndexing, std::make_shared<NOlap::TBSWriteAction>(*BlobManager));
ActorContext().Send(BlobsReadActor, std::make_unique<NOlap::NBlobOperations::NRead::TEvStartReadTask>(std::make_unique<TChangesReadTask>(std::move(ev), SelfId(), TabletID(), IndexationCounters)));
}
@@ -793,7 +794,7 @@ void TColumnShard::SetupCompaction() {
indexChanges->Start(*this);
auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndex();
- auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move(actualIndexInfo), indexChanges, Settings.CacheDataAfterCompaction);
+ auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move(actualIndexInfo), indexChanges, Settings.CacheDataAfterCompaction, std::make_shared<NOlap::TBSWriteAction>(*BlobManager));
ActorContext().Send(BlobsReadActor, std::make_unique<NOlap::NBlobOperations::NRead::TEvStartReadTask>(std::make_unique<TChangesReadTask>(std::move(ev), SelfId(), TabletID(), CompactionCounters)));
}
@@ -831,7 +832,7 @@ bool TColumnShard::SetupTtl(const THashMap<ui64, NOlap::TTiering>& pathTtls, con
LOG_S_INFO("TTL" << (needWrites ? " with writes" : "" ) << " prepared at tablet " << TabletID());
indexChanges->Start(*this);
- auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move(actualIndexInfo), indexChanges, false);
+ auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move(actualIndexInfo), indexChanges, false, std::make_shared<NOlap::TBSWriteAction>(*BlobManager));
if (needWrites) {
ActorContext().Send(BlobsReadActor, std::make_unique<NOlap::NBlobOperations::NRead::TEvStartReadTask>(std::make_unique<TChangesReadTask>(std::move(ev), SelfId(), TabletID(), CompactionCounters)));
@@ -882,7 +883,7 @@ std::unique_ptr<TEvPrivate::TEvWriteIndex> TColumnShard::SetupCleanup() {
}
auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndex();
- auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move(actualIndexInfo), changes, false);
+ auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move(actualIndexInfo), changes, false, std::make_shared<NOlap::TBSWriteAction>(*BlobManager));
ev->SetPutStatus(NKikimrProto::OK); // No new blobs to write
changes->Start(*this);
diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h
index d880f84b75..39e3847eb1 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.h
+++ b/ydb/core/tx/columnshard/columnshard_impl.h
@@ -29,9 +29,8 @@ class TCleanupColumnEngineChanges;
class TTTLColumnEngineChanges;
class TChangesWithAppend;
class TCompactColumnEngineChanges;
-class TInGranuleCompactColumnEngineChanges;
-class TSplitCompactColumnEngineChanges;
class TInsertColumnEngineChanges;
+class TBSWriteAction;
namespace NCompaction {
class TGeneralCompactColumnEngineChanges;
}
@@ -43,7 +42,7 @@ class TOperationsManager;
extern bool gAllowLogBatchingDefaultValue;
-IActor* CreateWriteActor(ui64 tabletId, IWriteController::TPtr writeController, TBlobBatch&& blobBatch, const TInstant& deadline, const ui64 maxSmallBlobSize);
+IActor* CreateWriteActor(ui64 tabletId, IWriteController::TPtr writeController, const TInstant& deadline);
IActor* CreateReadActor(ui64 tabletId,
const TActorId& dstActor,
std::unique_ptr<TEvColumnShard::TEvReadResult>&& event,
@@ -111,11 +110,10 @@ class TColumnShard
friend class NOlap::TTTLColumnEngineChanges;
friend class NOlap::TChangesWithAppend;
friend class NOlap::TCompactColumnEngineChanges;
- friend class NOlap::TInGranuleCompactColumnEngineChanges;
- friend class NOlap::TSplitCompactColumnEngineChanges;
friend class NOlap::TInsertColumnEngineChanges;
friend class NOlap::TColumnEngineChanges;
friend class NOlap::NCompaction::TGeneralCompactColumnEngineChanges;
+ friend class NOlap::TBSWriteAction;
friend class TTxController;
@@ -151,6 +149,7 @@ class TColumnShard
void Handle(TEvBlobStorage::TEvCollectGarbageResult::TPtr& ev, const TActorContext& ctx);
void Handle(NMetadata::NProvider::TEvRefreshSubscriberData::TPtr& ev);
void Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvPrivate::TEvWriteDraft::TPtr& ev, const TActorContext& ctx);
ITransaction* CreateTxInitSchema();
ITransaction* CreateTxRunGc();
@@ -262,6 +261,7 @@ protected:
HFunc(TEvPrivate::TEvReadFinished, Handle);
HFunc(TEvPrivate::TEvPeriodicWakeup, Handle);
HFunc(NEvents::TDataEvents::TEvWrite, Handle);
+ HFunc(TEvPrivate::TEvWriteDraft, Handle);
default:
if (!HandleDefaultEvents(ev, SelfId())) {
LOG_S_WARN("TColumnShard.StateWork at " << TabletID()
diff --git a/ydb/core/tx/columnshard/columnshard_private_events.h b/ydb/core/tx/columnshard/columnshard_private_events.h
index 8400bfc436..b95bca26b2 100644
--- a/ydb/core/tx/columnshard/columnshard_private_events.h
+++ b/ydb/core/tx/columnshard/columnshard_private_events.h
@@ -24,28 +24,39 @@ struct TEvPrivate {
EvGetExported,
EvWriteBlobsResult,
EvStartReadTask,
+ EvWriteDraft,
EvEnd
};
static_assert(EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE)");
+ struct TEvWriteDraft: public TEventLocal<TEvWriteDraft, EvWriteDraft> {
+ const std::shared_ptr<IWriteController> WriteController;
+ TEvWriteDraft(std::shared_ptr<IWriteController> controller)
+ : WriteController(controller)
+ {
+
+ }
+ };
+
/// Common event for Indexing and GranuleCompaction: write index data in TTxWriteIndex transaction.
struct TEvWriteIndex : public TEventLocal<TEvWriteIndex, EvWriteIndex> {
NOlap::TVersionedIndex IndexInfo;
std::shared_ptr<NOlap::TColumnEngineChanges> IndexChanges;
bool GranuleCompaction{false};
- TBlobBatch BlobBatch;
TUsage ResourceUsage;
bool CacheData{false};
TDuration Duration;
TBlobPutResult::TPtr PutResult;
+ std::shared_ptr<NOlap::IBlobsAction> BlobsAction;
TEvWriteIndex(NOlap::TVersionedIndex&& indexInfo,
std::shared_ptr<NOlap::TColumnEngineChanges> indexChanges,
- bool cacheData)
+ bool cacheData, std::shared_ptr<NOlap::IBlobsAction> action)
: IndexInfo(std::move(indexInfo))
, IndexChanges(indexChanges)
, CacheData(cacheData)
+ , BlobsAction(action)
{
PutResult = std::make_shared<TBlobPutResult>(NKikimrProto::UNKNOWN);
}
@@ -184,16 +195,14 @@ struct TEvPrivate {
public:
class TPutBlobData {
YDB_READONLY_DEF(TBlobRange, BlobRange);
- YDB_READONLY_DEF(TString, BlobData);
YDB_READONLY_DEF(NKikimrTxColumnShard::TLogicalMetadata, LogicalMeta);
YDB_ACCESSOR(ui64, RowsCount, 0);
YDB_ACCESSOR(ui64, RawBytes, 0);
public:
TPutBlobData() = default;
- TPutBlobData(const TBlobRange& blobRange, const TString& data, const NArrow::TFirstLastSpecialKeys& specialKeys, ui64 rowsCount, ui64 rawBytes, const TInstant dirtyTime)
+ TPutBlobData(const TBlobRange& blobRange, const NArrow::TFirstLastSpecialKeys& specialKeys, ui64 rowsCount, ui64 rawBytes, const TInstant dirtyTime)
: BlobRange(blobRange)
- , BlobData(data)
, RowsCount(rowsCount)
, RawBytes(rawBytes)
{
@@ -211,13 +220,18 @@ struct TEvPrivate {
Y_VERIFY(PutResult);
}
- TEvWriteBlobsResult(const NColumnShard::TBlobPutResult::TPtr& putResult, TVector<TPutBlobData>&& blobData, const NEvWrite::TWriteMeta& writeMeta, const ui64 schemaVersion)
+ TEvWriteBlobsResult(const NColumnShard::TBlobPutResult::TPtr& putResult, TVector<TPutBlobData>&& blobData, const std::vector<std::shared_ptr<NOlap::IBlobsAction>>& actions, const NEvWrite::TWriteMeta& writeMeta, const ui64 schemaVersion)
: TEvWriteBlobsResult(putResult, writeMeta)
{
+ Actions = actions;
BlobData = std::move(blobData);
SchemaVersion = schemaVersion;
}
+ const std::vector<std::shared_ptr<NOlap::IBlobsAction>>& GetActions() const {
+ return Actions;
+ }
+
const TVector<TPutBlobData>& GetBlobData() const {
return BlobData;
}
@@ -241,6 +255,7 @@ struct TEvPrivate {
private:
NColumnShard::TBlobPutResult::TPtr PutResult;
TVector<TPutBlobData> BlobData;
+ std::vector<std::shared_ptr<NOlap::IBlobsAction>> Actions;
NEvWrite::TWriteMeta WriteMeta;
ui64 SchemaVersion = 0;
};
diff --git a/ydb/core/tx/columnshard/engines/changes/abstract/abstract.cpp b/ydb/core/tx/columnshard/engines/changes/abstract/abstract.cpp
index 66a6149034..27a459e465 100644
--- a/ydb/core/tx/columnshard/engines/changes/abstract/abstract.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/abstract/abstract.cpp
@@ -1,6 +1,6 @@
#include "abstract.h"
#include <ydb/core/tx/columnshard/engines/column_engine_logs.h>
-#include <ydb/core/tx/columnshard/blob_manager_db.h>
+#include <ydb/core/tx/columnshard/blobs_action/blob_manager_db.h>
#include <ydb/core/tx/columnshard/columnshard_impl.h>
#include <library/cpp/actors/core/actor.h>
diff --git a/ydb/core/tx/columnshard/engines/changes/cleanup.cpp b/ydb/core/tx/columnshard/engines/changes/cleanup.cpp
index d54eda0d93..d129a79f8f 100644
--- a/ydb/core/tx/columnshard/engines/changes/cleanup.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/cleanup.cpp
@@ -1,7 +1,7 @@
#include "cleanup.h"
#include <ydb/core/tx/columnshard/columnshard_impl.h>
#include <ydb/core/tx/columnshard/engines/column_engine_logs.h>
-#include <ydb/core/tx/columnshard/blob_manager_db.h>
+#include <ydb/core/tx/columnshard/blobs_action/blob_manager_db.h>
namespace NKikimr::NOlap {
diff --git a/ydb/core/tx/columnshard/engines/changes/indexation.cpp b/ydb/core/tx/columnshard/engines/changes/indexation.cpp
index 79e5c2906d..b7e366d1b4 100644
--- a/ydb/core/tx/columnshard/engines/changes/indexation.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/indexation.cpp
@@ -3,7 +3,7 @@
#include <ydb/core/tx/columnshard/blob_cache.h>
#include <ydb/core/protos/counters_columnshard.pb.h>
#include <ydb/core/tx/columnshard/columnshard_impl.h>
-#include <ydb/core/tx/columnshard/blob_manager_db.h>
+#include <ydb/core/tx/columnshard/blobs_action/blob_manager_db.h>
namespace NKikimr::NOlap {
@@ -26,10 +26,10 @@ bool TInsertColumnEngineChanges::DoApplyChanges(TColumnEngineForLogs& self, TApp
void TInsertColumnEngineChanges::DoWriteIndex(NColumnShard::TColumnShard& self, TWriteIndexContext& context) {
TBase::DoWriteIndex(self, context);
- for (const auto& indsertedData : DataToIndex) {
- self.InsertTable->EraseCommitted(context.DBWrapper, indsertedData);
- Y_VERIFY(indsertedData.GetBlobRange().IsFullBlob());
- self.BlobManager->DeleteBlob(indsertedData.GetBlobRange().GetBlobId(), *context.BlobManagerDb);
+ for (const auto& insertedData : DataToIndex) {
+ self.InsertTable->EraseCommitted(context.DBWrapper, insertedData);
+ Y_VERIFY(insertedData.GetBlobRange().IsFullBlob());
+ self.BlobManager->DeleteBlob(insertedData.GetBlobRange().GetBlobId(), *context.BlobManagerDb);
}
if (!DataToIndex.empty()) {
self.UpdateInsertTableCounters();
diff --git a/ydb/core/tx/columnshard/engines/changes/ttl.cpp b/ydb/core/tx/columnshard/engines/changes/ttl.cpp
index 9c30f0f4f4..ed8ff71be6 100644
--- a/ydb/core/tx/columnshard/engines/changes/ttl.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/ttl.cpp
@@ -3,7 +3,7 @@
#include <ydb/core/tx/columnshard/engines/column_engine_logs.h>
#include <ydb/core/tx/columnshard/columnshard_schema.h>
#include <ydb/core/tx/columnshard/columnshard_private_events.h>
-#include <ydb/core/tx/columnshard/blob_manager_db.h>
+#include <ydb/core/tx/columnshard/blobs_action/blob_manager_db.h>
namespace NKikimr::NOlap {
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h
index 55a103b0cd..085a819d16 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.h
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h
@@ -183,7 +183,7 @@ public:
const TGranuleMeta& GetGranuleVerified(const ui64 granuleId) const {
auto it = Granules.find(granuleId);
- Y_VERIFY(it != Granules.end());
+ AFL_VERIFY(it != Granules.end())("granule_id", granuleId)("count", Granules.size());
return *it->second;
}
diff --git a/ydb/core/tx/columnshard/engines/writer/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/writer/CMakeLists.darwin-x86_64.txt
index 778729ab5b..b16798a455 100644
--- a/ydb/core/tx/columnshard/engines/writer/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/writer/CMakeLists.darwin-x86_64.txt
@@ -24,4 +24,7 @@ target_link_libraries(columnshard-engines-writer PUBLIC
target_sources(columnshard-engines-writer PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/writer/blob_constructor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/writer/put_status.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/writer/write_controller.cpp
)
diff --git a/ydb/core/tx/columnshard/engines/writer/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/writer/CMakeLists.linux-aarch64.txt
index fd95dc3520..d0bcc33578 100644
--- a/ydb/core/tx/columnshard/engines/writer/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/columnshard/engines/writer/CMakeLists.linux-aarch64.txt
@@ -25,4 +25,7 @@ target_link_libraries(columnshard-engines-writer PUBLIC
target_sources(columnshard-engines-writer PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/writer/blob_constructor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/writer/put_status.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/writer/write_controller.cpp
)
diff --git a/ydb/core/tx/columnshard/engines/writer/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/writer/CMakeLists.linux-x86_64.txt
index fd95dc3520..d0bcc33578 100644
--- a/ydb/core/tx/columnshard/engines/writer/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/writer/CMakeLists.linux-x86_64.txt
@@ -25,4 +25,7 @@ target_link_libraries(columnshard-engines-writer PUBLIC
target_sources(columnshard-engines-writer PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/writer/blob_constructor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/writer/put_status.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/writer/write_controller.cpp
)
diff --git a/ydb/core/tx/columnshard/engines/writer/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/writer/CMakeLists.windows-x86_64.txt
index 778729ab5b..b16798a455 100644
--- a/ydb/core/tx/columnshard/engines/writer/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/writer/CMakeLists.windows-x86_64.txt
@@ -24,4 +24,7 @@ target_link_libraries(columnshard-engines-writer PUBLIC
target_sources(columnshard-engines-writer PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/writer/blob_constructor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/writer/put_status.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/writer/write_controller.cpp
)
diff --git a/ydb/core/tx/columnshard/engines/writer/blob_constructor.cpp b/ydb/core/tx/columnshard/engines/writer/blob_constructor.cpp
new file mode 100644
index 0000000000..2fae90b9e7
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/writer/blob_constructor.cpp
@@ -0,0 +1,5 @@
+#include "blob_constructor.h"
+
+namespace NKikimr::NOlap {
+
+}
diff --git a/ydb/core/tx/columnshard/engines/writer/blob_constructor.h b/ydb/core/tx/columnshard/engines/writer/blob_constructor.h
index de6f9c7b1f..cadaf2ca2d 100644
--- a/ydb/core/tx/columnshard/engines/writer/blob_constructor.h
+++ b/ydb/core/tx/columnshard/engines/writer/blob_constructor.h
@@ -1,8 +1,10 @@
#pragma once
-#include <library/cpp/actors/core/event.h>
#include <ydb/core/protos/base.pb.h>
+#include <ydb/core/tx/columnshard/blobs_action/abstract.h>
+#include <ydb/library/accessor/accessor.h>
+#include <library/cpp/actors/core/event.h>
namespace NKikimr {
@@ -19,20 +21,22 @@ namespace NOlap {
class TUnifiedBlobId;
-class IBlobConstructor {
+class TBlobWriteInfo {
+private:
+ YDB_READONLY_DEF(TUnifiedBlobId, BlobId);
+ YDB_READONLY_DEF(TString, Data);
+ YDB_READONLY_DEF(std::shared_ptr<IBlobsAction>, WriteOperator);
+
+ TBlobWriteInfo(const TString& data, const std::shared_ptr<IBlobsAction>& writeOperator)
+ : Data(data)
+ , WriteOperator(writeOperator) {
+ Y_VERIFY(WriteOperator);
+ BlobId = WriteOperator->AllocateNextBlobId(data);
+ }
public:
- using TPtr = std::shared_ptr<IBlobConstructor>;
-
- enum class EStatus {
- Ok,
- Finished,
- Error
- };
-
- virtual ~IBlobConstructor() {}
- virtual const TString& GetBlob() const = 0;
- virtual bool RegisterBlobId(const TUnifiedBlobId& blobId) = 0;
- virtual EStatus BuildNext() = 0;
+ static TBlobWriteInfo BuildWriteTask(const TString& data, const std::shared_ptr<IBlobsAction>& writeOperator) {
+ return TBlobWriteInfo(data, writeOperator);
+ }
};
}
diff --git a/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.cpp b/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.cpp
index d3feb5cec3..974909dd4c 100644
--- a/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.cpp
+++ b/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.cpp
@@ -6,40 +6,29 @@
namespace NKikimr::NOlap {
-TCompactedWriteController::TBlobsConstructor::TBlobsConstructor(TCompactedWriteController& owner)
- : Owner(owner)
- , IndexChanges(*Owner.WriteIndexEv->IndexChanges)
-{
-}
-
-const TString& TCompactedWriteController::TBlobsConstructor::GetBlob() const {
- return CurrentBlobInfo->GetBlob();
-}
-
-bool TCompactedWriteController::TBlobsConstructor::RegisterBlobId(const TUnifiedBlobId& blobId) {
- Y_VERIFY(CurrentBlobInfo);
- CurrentBlobInfo->RegisterBlobId(*IndexChanges.GetWritePortionInfo(CurrentPortion), blobId);
- return true;
-}
-
-IBlobConstructor::EStatus TCompactedWriteController::TBlobsConstructor::BuildNext() {
- while (CurrentPortion < IndexChanges.GetWritePortionsCount()) {
- TPortionInfoWithBlobs& portionWithBlobs = *IndexChanges.GetWritePortionInfo(CurrentPortion);
- if (CurrentBlobIndex < portionWithBlobs.GetBlobs().size() && IndexChanges.NeedWritePortion(CurrentPortion)) {
+std::optional<TBlobWriteInfo> TCompactedWriteController::Next() {
+ auto& changes = *WriteIndexEv->IndexChanges;
+ while (CurrentPortion < changes.GetWritePortionsCount()) {
+ auto* pInfo = changes.GetWritePortionInfo(CurrentPortion);
+ Y_VERIFY(pInfo);
+ TPortionInfoWithBlobs& portionWithBlobs = *pInfo;
+ if (CurrentBlobIndex < portionWithBlobs.GetBlobs().size() && changes.NeedWritePortion(CurrentPortion)) {
CurrentBlobInfo = &portionWithBlobs.GetBlobs()[CurrentBlobIndex];
++CurrentBlobIndex;
- return EStatus::Ok;
+ auto result = TBlobWriteInfo::BuildWriteTask(CurrentBlobInfo->GetBlob(), WriteIndexEv->BlobsAction);
+ CurrentBlobInfo->RegisterBlobId(portionWithBlobs, result.GetBlobId());
+ return result;
} else {
++CurrentPortion;
CurrentBlobIndex = 0;
}
}
- return EStatus::Finished;
+ return {};
}
TCompactedWriteController::TCompactedWriteController(const TActorId& dstActor, TAutoPtr<NColumnShard::TEvPrivate::TEvWriteIndex> writeEv, bool /*blobGrouppingEnabled*/)
: WriteIndexEv(writeEv)
- , BlobConstructor(std::make_shared<TBlobsConstructor>(*this))
+ , Action(WriteIndexEv->BlobsAction)
, DstActor(dstActor)
{}
diff --git a/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.h b/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.h
index 2cb404d1bb..bfe1d986a3 100644
--- a/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.h
+++ b/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.h
@@ -10,25 +10,18 @@ namespace NKikimr::NOlap {
class TCompactedWriteController : public NColumnShard::IWriteController {
private:
- class TBlobsConstructor : public IBlobConstructor {
- TCompactedWriteController& Owner;
- NOlap::TColumnEngineChanges& IndexChanges;
-
- ui64 CurrentPortion = 0;
- ui64 CurrentBlobIndex = 0;
- TPortionInfoWithBlobs::TBlobInfo* CurrentBlobInfo = nullptr;
- public:
- TBlobsConstructor(TCompactedWriteController& owner);
- const TString& GetBlob() const override;
- bool RegisterBlobId(const TUnifiedBlobId& blobId) override;
- EStatus BuildNext() override;
- };
+ ui64 CurrentPortion = 0;
+ ui64 CurrentBlobIndex = 0;
+ TPortionInfoWithBlobs::TBlobInfo* CurrentBlobInfo = nullptr;
TAutoPtr<NColumnShard::TEvPrivate::TEvWriteIndex> WriteIndexEv;
- std::shared_ptr<TBlobsConstructor> BlobConstructor;
+ std::shared_ptr<IBlobsAction> Action;
TActorId DstActor;
protected:
void DoOnReadyResult(const NActors::TActorContext& ctx, const NColumnShard::TBlobPutResult::TPtr& putResult) override;
+ virtual bool IsBlobActionsReady() const override {
+ return Action->IsReady();
+ }
public:
TCompactedWriteController(const TActorId& dstActor, TAutoPtr<NColumnShard::TEvPrivate::TEvWriteIndex> writeEv, bool blobGrouppingEnabled);
~TCompactedWriteController() {
@@ -37,8 +30,14 @@ public:
}
}
- NOlap::IBlobConstructor::TPtr GetBlobConstructor() override {
- return BlobConstructor;
+ virtual std::vector<std::shared_ptr<IBlobsAction>> GetBlobActions() const override {
+ return {Action};
+ }
+
+ virtual std::optional<TBlobWriteInfo> Next() override;
+
+ virtual void OnBlobWriteResult(const TEvBlobStorage::TEvPutResult& result) override {
+ Action->OnBlobWriteResult(result.Id, result.Status);
}
};
diff --git a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp
index acb8596d8e..c27df8fd4e 100644
--- a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp
+++ b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp
@@ -6,70 +6,30 @@
namespace NKikimr::NOlap {
-TIndexedWriteController::TBlobConstructor::TBlobConstructor(TIndexedWriteController& owner)
- : Owner(owner)
-{}
-
-const TString& TIndexedWriteController::TBlobConstructor::GetBlob() const {
- Y_VERIFY(CurrentIndex > 0);
- return BlobsSplitted[CurrentIndex - 1].GetData();
-}
-
-IBlobConstructor::EStatus TIndexedWriteController::TBlobConstructor::BuildNext() {
+std::optional<TBlobWriteInfo> TIndexedWriteController::Next() {
if (CurrentIndex == BlobsSplitted.size()) {
- return EStatus::Finished;
+ return {};
}
CurrentIndex++;
- return EStatus::Ok;
-}
-
-bool TIndexedWriteController::TBlobConstructor::RegisterBlobId(const TUnifiedBlobId& blobId) {
- const auto& blobInfo = BlobsSplitted[CurrentIndex - 1];
- Owner.BlobData.emplace_back(TBlobRange(blobId, 0, blobId.BlobSize()), blobInfo.GetData(), blobInfo.GetSpecialKeys(), blobInfo.GetRowsCount(), blobInfo.GetRawBytes(), AppData()->TimeProvider->Now());
- return true;
+ auto& bInfo = BlobsSplitted[CurrentIndex - 1];
+ auto result = TBlobWriteInfo::BuildWriteTask(bInfo.GetData(), Action);
+ BlobData.emplace_back(TBlobRange(result.GetBlobId(), 0, result.GetBlobId().BlobSize()), bInfo.GetSpecialKeys(), bInfo.GetRowsCount(), bInfo.GetRawBytes(), AppData()->TimeProvider->Now());
+ return result;
}
-bool TIndexedWriteController::TBlobConstructor::Init() {
- const auto& writeMeta = Owner.WriteData.GetWriteMeta();
- const ui64 tableId = writeMeta.GetTableId();
- const ui64 writeId = writeMeta.GetWriteId();
-
- std::shared_ptr<arrow::RecordBatch> batch;
- {
- NColumnShard::TCpuGuard guard(Owner.ResourceUsage);
- batch = Owner.WriteData.GetData().GetArrowBatch();
- }
-
- if (!batch) {
- AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "ev_write_bad_data")("write_id", writeId)("table_id", tableId);
- return false;
- }
-
- auto splitResult = NArrow::SplitByBlobSize(batch, NColumnShard::TLimits::GetMaxBlobSize());
- if (!splitResult) {
- AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", TStringBuilder() << "cannot split batch in according to limits: " + splitResult.GetErrorMessage());
- return false;
- }
- BlobsSplitted = splitResult.ReleaseResult();
- if (BlobsSplitted.size() > 1) {
- for (auto&& i : BlobsSplitted) {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "strange_blobs_splitting")("blob", i.DebugString())("original_size", Owner.WriteData.GetSize());
- }
- }
- return true;
-}
-
-TIndexedWriteController::TIndexedWriteController(const TActorId& dstActor, const NEvWrite::TWriteData& writeData)
- : WriteData(writeData)
- , BlobConstructor(std::make_shared<TBlobConstructor>(*this))
+TIndexedWriteController::TIndexedWriteController(const TActorId& dstActor, const NEvWrite::TWriteData& writeData, const std::shared_ptr<IBlobsAction>& action, std::vector<NArrow::TSerializedBatch>&& blobsSplitted)
+ : BlobsSplitted(std::move(blobsSplitted))
+ , WriteData(writeData)
, DstActor(dstActor)
+ , Action(action)
{
ResourceUsage.SourceMemorySize = WriteData.GetSize();
}
void TIndexedWriteController::DoOnReadyResult(const NActors::TActorContext& ctx, const NColumnShard::TBlobPutResult::TPtr& putResult) {
if (putResult->GetPutStatus() == NKikimrProto::OK) {
- auto result = std::make_unique<NColumnShard::TEvPrivate::TEvWriteBlobsResult>(putResult, std::move(BlobData), WriteData.GetWriteMeta(), WriteData.GetData().GetSchemaVersion());
+ std::vector<std::shared_ptr<IBlobsAction>> actions = {Action};
+ auto result = std::make_unique<NColumnShard::TEvPrivate::TEvWriteBlobsResult>(putResult, std::move(BlobData), actions, WriteData.GetWriteMeta(), WriteData.GetData().GetSchemaVersion());
ctx.Send(DstActor, result.release());
} else {
auto result = std::make_unique<NColumnShard::TEvPrivate::TEvWriteBlobsResult>(putResult, WriteData.GetWriteMeta());
@@ -77,11 +37,4 @@ void TIndexedWriteController::DoOnReadyResult(const NActors::TActorContext& ctx,
}
}
-NOlap::IBlobConstructor::TPtr TIndexedWriteController::GetBlobConstructor() {
- if (!BlobConstructor->Init()) {
- return nullptr;
- }
- return BlobConstructor;
-}
-
}
diff --git a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h
index 2800c58fc0..e2be02ece5 100644
--- a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h
+++ b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h
@@ -4,6 +4,7 @@
#include "write_controller.h"
#include <ydb/core/tx/ev_write/write_data.h>
+#include <ydb/core/tx/columnshard/blobs_action/abstract.h>
#include <ydb/core/tx/columnshard/engines/portion_info.h>
#include <ydb/core/tx/columnshard/columnshard.h>
#include <ydb/core/tx/columnshard/columnshard_private_events.h>
@@ -14,31 +15,29 @@ namespace NKikimr::NOlap {
class TIndexedWriteController : public NColumnShard::IWriteController {
private:
- class TBlobConstructor : public IBlobConstructor {
- TIndexedWriteController& Owner;
- std::vector<NArrow::TSerializedBatch> BlobsSplitted;
-
- ui64 CurrentIndex = 0;
- public:
- TBlobConstructor(TIndexedWriteController& owner);
-
- const TString& GetBlob() const override;
- EStatus BuildNext() override;
- bool RegisterBlobId(const TUnifiedBlobId& blobId) override;
- bool Init();
- };
+ virtual bool IsBlobActionsReady() const override {
+ return Action->IsReady();
+ }
+ ui64 CurrentIndex = 0;
+ std::vector<NArrow::TSerializedBatch> BlobsSplitted;
NEvWrite::TWriteData WriteData;
- std::shared_ptr<TBlobConstructor> BlobConstructor;
TVector<NColumnShard::TEvPrivate::TEvWriteBlobsResult::TPutBlobData> BlobData;
TActorId DstActor;
-
+ std::shared_ptr<IBlobsAction> Action;
+ void DoOnReadyResult(const NActors::TActorContext& ctx, const NColumnShard::TBlobPutResult::TPtr& putResult) override;
public:
- TIndexedWriteController(const TActorId& dstActor, const NEvWrite::TWriteData& writeData);
+ virtual std::vector<std::shared_ptr<IBlobsAction>> GetBlobActions() const override {
+ return {Action};
+ }
- void DoOnReadyResult(const NActors::TActorContext& ctx, const NColumnShard::TBlobPutResult::TPtr& putResult) override;
+ virtual void OnBlobWriteResult(const TEvBlobStorage::TEvPutResult& result) override {
+ Action->OnBlobWriteResult(result.Id, result.Status);
+ }
+
+ TIndexedWriteController(const TActorId& dstActor, const NEvWrite::TWriteData& writeData, const std::shared_ptr<IBlobsAction>& action, std::vector<NArrow::TSerializedBatch>&& blobsSplitted);
- NOlap::IBlobConstructor::TPtr GetBlobConstructor() override;
+ virtual std::optional<TBlobWriteInfo> Next() override;
};
}
diff --git a/ydb/core/tx/columnshard/engines/writer/put_status.cpp b/ydb/core/tx/columnshard/engines/writer/put_status.cpp
new file mode 100644
index 0000000000..f5fb5de167
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/writer/put_status.cpp
@@ -0,0 +1,5 @@
+#include "put_status.h"
+
+namespace NKikimr::NColumnShard {
+
+}
diff --git a/ydb/core/tx/columnshard/engines/writer/write_controller.cpp b/ydb/core/tx/columnshard/engines/writer/write_controller.cpp
new file mode 100644
index 0000000000..abae873b3c
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/writer/write_controller.cpp
@@ -0,0 +1,5 @@
+#include "write_controller.h"
+
+namespace NKikimr::NColumnShard {
+
+}
diff --git a/ydb/core/tx/columnshard/engines/writer/write_controller.h b/ydb/core/tx/columnshard/engines/writer/write_controller.h
index 9636b2ebde..ff7730600a 100644
--- a/ydb/core/tx/columnshard/engines/writer/write_controller.h
+++ b/ydb/core/tx/columnshard/engines/writer/write_controller.h
@@ -6,6 +6,7 @@
#include <library/cpp/actors/core/actor.h>
#include <ydb/core/tx/columnshard/blob_manager.h>
#include <ydb/core/tx/columnshard/defs.h>
+#include <ydb/core/tx/columnshard/blobs_action/abstract.h>
namespace NKikimr::NColumnShard {
@@ -15,12 +16,10 @@ public:
using TPtr = std::shared_ptr<TBlobPutResult>;
TBlobPutResult(NKikimrProto::EReplyStatus status,
- NColumnShard::TBlobBatch&& blobBatch,
THashSet<ui32>&& yellowMoveChannels,
THashSet<ui32>&& yellowStopChannels,
const NColumnShard::TUsage& resourceUsage)
- : BlobBatch(std::move(blobBatch))
- , ResourceUsage(resourceUsage)
+ : ResourceUsage(resourceUsage)
{
SetPutStatus(status, std::move(yellowMoveChannels), std::move(yellowStopChannels));
}
@@ -29,10 +28,6 @@ public:
SetPutStatus(status);
}
- NColumnShard::TBlobBatch&& ReleaseBlobBatch() {
- return std::move(BlobBatch);
- }
-
void AddResources(const NColumnShard::TUsage& usage) {
ResourceUsage.Add(usage);
}
@@ -42,7 +37,6 @@ public:
}
private:
- YDB_READONLY_DEF(NColumnShard::TBlobBatch, BlobBatch);
YDB_READONLY_DEF(NColumnShard::TUsage, ResourceUsage);
};
@@ -56,8 +50,10 @@ public:
DoOnReadyResult(ctx, putResult);
}
- virtual NOlap::IBlobConstructor::TPtr GetBlobConstructor() = 0;
-
+ virtual void OnBlobWriteResult(const TEvBlobStorage::TEvPutResult& result) = 0;
+ virtual std::optional<NOlap::TBlobWriteInfo> Next() = 0;
+ virtual bool IsBlobActionsReady() const = 0;
+ virtual std::vector<std::shared_ptr<NOlap::IBlobsAction>> GetBlobActions() const = 0;
private:
virtual void DoOnReadyResult(const NActors::TActorContext& ctx, const TBlobPutResult::TPtr& putResult) = 0;
protected:
diff --git a/ydb/core/tx/columnshard/engines/writer/ya.make b/ydb/core/tx/columnshard/engines/writer/ya.make
index feba1368c7..0a611c956d 100644
--- a/ydb/core/tx/columnshard/engines/writer/ya.make
+++ b/ydb/core/tx/columnshard/engines/writer/ya.make
@@ -3,6 +3,9 @@ LIBRARY()
SRCS(
compacted_blob_constructor.cpp
indexed_blob_constructor.cpp
+ blob_constructor.cpp
+ put_status.cpp
+ write_controller.cpp
)
PEERDIR(
@@ -11,7 +14,7 @@ PEERDIR(
ydb/core/blobstorage/vdisk/protos
ydb/core/tablet_flat
ydb/core/formats/arrow
-
+
library/cpp/actors/core
)
diff --git a/ydb/core/tx/columnshard/operations/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/operations/CMakeLists.darwin-x86_64.txt
index 48f4f0d305..7972cccf59 100644
--- a/ydb/core/tx/columnshard/operations/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/columnshard/operations/CMakeLists.darwin-x86_64.txt
@@ -18,4 +18,5 @@ target_link_libraries(tx-columnshard-operations PUBLIC
target_sources(tx-columnshard-operations PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/operations/write.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/operations/write_data.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/operations/slice_builder.cpp
)
diff --git a/ydb/core/tx/columnshard/operations/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/operations/CMakeLists.linux-aarch64.txt
index 4dba1ab769..a97ebc42a1 100644
--- a/ydb/core/tx/columnshard/operations/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/columnshard/operations/CMakeLists.linux-aarch64.txt
@@ -19,4 +19,5 @@ target_link_libraries(tx-columnshard-operations PUBLIC
target_sources(tx-columnshard-operations PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/operations/write.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/operations/write_data.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/operations/slice_builder.cpp
)
diff --git a/ydb/core/tx/columnshard/operations/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/operations/CMakeLists.linux-x86_64.txt
index 4dba1ab769..a97ebc42a1 100644
--- a/ydb/core/tx/columnshard/operations/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/columnshard/operations/CMakeLists.linux-x86_64.txt
@@ -19,4 +19,5 @@ target_link_libraries(tx-columnshard-operations PUBLIC
target_sources(tx-columnshard-operations PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/operations/write.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/operations/write_data.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/operations/slice_builder.cpp
)
diff --git a/ydb/core/tx/columnshard/operations/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/operations/CMakeLists.windows-x86_64.txt
index 48f4f0d305..7972cccf59 100644
--- a/ydb/core/tx/columnshard/operations/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/columnshard/operations/CMakeLists.windows-x86_64.txt
@@ -18,4 +18,5 @@ target_link_libraries(tx-columnshard-operations PUBLIC
target_sources(tx-columnshard-operations PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/operations/write.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/operations/write_data.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/operations/slice_builder.cpp
)
diff --git a/ydb/core/tx/columnshard/operations/slice_builder.cpp b/ydb/core/tx/columnshard/operations/slice_builder.cpp
new file mode 100644
index 0000000000..6988c621f1
--- /dev/null
+++ b/ydb/core/tx/columnshard/operations/slice_builder.cpp
@@ -0,0 +1,53 @@
+#include "slice_builder.h"
+#include <library/cpp/actors/core/log.h>
+#include <ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h>
+#include <ydb/core/tx/columnshard/columnshard_impl.h>
+#include <ydb/core/tx/columnshard/columnshard_private_events.h>
+
+namespace NKikimr::NOlap {
+
+std::optional<std::vector<NKikimr::NArrow::TSerializedBatch>> TBuildSlicesTask::BuildSlices() {
+ NActors::TLogContextGuard g(NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", TabletId)("parent_id", ParentActorId));
+ const auto& writeMeta = WriteData.GetWriteMeta();
+ const ui64 tableId = writeMeta.GetTableId();
+ const ui64 writeId = writeMeta.GetWriteId();
+
+ std::shared_ptr<arrow::RecordBatch> batch = WriteData.GetData().GetArrowBatch();
+
+ if (!batch) {
+ AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "ev_write_bad_data")("write_id", writeId)("table_id", tableId);
+ return {};
+ }
+
+ auto splitResult = NArrow::SplitByBlobSize(batch, NColumnShard::TLimits::GetMaxBlobSize());
+ if (!splitResult) {
+ AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", TStringBuilder() << "cannot split batch in according to limits: " + splitResult.GetErrorMessage());
+ return {};
+ }
+ auto result = splitResult.ReleaseResult();
+ if (result.size() > 1) {
+ for (auto&& i : result) {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "strange_blobs_splitting")("blob", i.DebugString())("original_size", WriteData.GetSize());
+ }
+ }
+ return result;
+}
+
+bool TBuildSlicesTask::DoExecute() {
+ auto batches = BuildSlices();
+ if (batches) {
+ auto writeController = std::make_shared<NOlap::TIndexedWriteController>(ParentActorId, WriteData, Action, std::move(*batches));
+ if (batches && Action->NeedDraftTransaction()) {
+ TActorContext::AsActorContext().Send(ParentActorId, std::make_unique<NColumnShard::TEvPrivate::TEvWriteDraft>(writeController));
+ } else {
+ TActorContext::AsActorContext().Register(NColumnShard::CreateWriteActor(TabletId, writeController, TInstant::Max()));
+ }
+ } else {
+ auto result = std::make_unique<NColumnShard::TEvPrivate::TEvWriteBlobsResult>(std::make_shared<NColumnShard::TBlobPutResult>(NKikimrProto::EReplyStatus::CORRUPTED), WriteData.GetWriteMeta());
+ TActorContext::AsActorContext().Send(ParentActorId, result.release());
+ }
+
+ return true;
+}
+
+}
diff --git a/ydb/core/tx/columnshard/operations/slice_builder.h b/ydb/core/tx/columnshard/operations/slice_builder.h
new file mode 100644
index 0000000000..fc2eea4ea3
--- /dev/null
+++ b/ydb/core/tx/columnshard/operations/slice_builder.h
@@ -0,0 +1,33 @@
+#pragma once
+#include <ydb/core/tx/conveyor/usage/abstract.h>
+#include <ydb/core/formats/arrow/size_calcer.h>
+#include <ydb/core/tx/columnshard/blobs_action/abstract.h>
+#include <ydb/core/tx/ev_write/write_data.h>
+
+namespace NKikimr::NOlap {
+
+class TBuildSlicesTask: public NConveyor::ITask {
+private:
+ std::shared_ptr<IBlobsAction> Action;
+ NEvWrite::TWriteData WriteData;
+ const ui64 TabletId;
+ const NActors::TActorId ParentActorId;
+ std::optional<std::vector<NArrow::TSerializedBatch>> BuildSlices();
+
+protected:
+ virtual bool DoExecute() override;
+public:
+ virtual TString GetTaskClassIdentifier() const override {
+ return "Write::ConstructBlobs::Slices";
+ }
+
+ TBuildSlicesTask(const ui64 tabletId, const NActors::TActorId parentActorId, const std::shared_ptr<IBlobsAction>& action, const NEvWrite::TWriteData& writeData)
+ : Action(action)
+ , WriteData(writeData)
+ , TabletId(tabletId)
+ , ParentActorId(parentActorId)
+ {
+ Y_VERIFY(Action);
+ }
+};
+}
diff --git a/ydb/core/tx/columnshard/operations/write.cpp b/ydb/core/tx/columnshard/operations/write.cpp
index 8a7850081a..57898279c9 100644
--- a/ydb/core/tx/columnshard/operations/write.cpp
+++ b/ydb/core/tx/columnshard/operations/write.cpp
@@ -1,9 +1,12 @@
#include "write.h"
+#include "slice_builder.h"
#include <ydb/core/tx/columnshard/columnshard_schema.h>
-#include <ydb/core/tx/columnshard/blob_manager_db.h>
+#include <ydb/core/tx/columnshard/blobs_action/blob_manager_db.h>
+#include <ydb/core/tx/columnshard/blobs_action/bs.h>
#include <ydb/core/tx/columnshard/columnshard_impl.h>
#include <ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h>
+#include <ydb/core/tx/conveyor/usage/service.h>
#include <ydb/core/tablet_flat/tablet_flat_executor.h>
@@ -22,8 +25,9 @@ namespace NKikimr::NColumnShard {
Y_VERIFY(Status == EOperationStatus::Draft);
NEvWrite::TWriteMeta writeMeta((ui64)WriteId, tableId, source);
- auto writeController = std::make_shared<NOlap::TIndexedWriteController>(ctx.SelfID, NEvWrite::TWriteData(writeMeta, data));
- ctx.Register(CreateWriteActor(owner.TabletID(), writeController, owner.BlobManager->StartBlobBatch(), TInstant::Max(), owner.Settings.MaxSmallBlobSize));
+ std::shared_ptr<NConveyor::ITask> task = std::make_shared<NOlap::TBuildSlicesTask>(owner.TabletID(), ctx.SelfID, std::make_shared<NOlap::TBSWriteAction>(*owner.BlobManager), NEvWrite::TWriteData(writeMeta, data));
+ NConveyor::TCompServiceOperator::SendTaskToExecute(task);
+
Status = EOperationStatus::Started;
}
diff --git a/ydb/core/tx/columnshard/operations/ya.make b/ydb/core/tx/columnshard/operations/ya.make
index cc7fd4d19c..ebc4e740c1 100644
--- a/ydb/core/tx/columnshard/operations/ya.make
+++ b/ydb/core/tx/columnshard/operations/ya.make
@@ -3,6 +3,7 @@ LIBRARY()
SRCS(
write.cpp
write_data.cpp
+ slice_builder.cpp
)
PEERDIR(
diff --git a/ydb/core/tx/columnshard/tables_manager.cpp b/ydb/core/tx/columnshard/tables_manager.cpp
index 88096dc024..b9a97fb323 100644
--- a/ydb/core/tx/columnshard/tables_manager.cpp
+++ b/ydb/core/tx/columnshard/tables_manager.cpp
@@ -1,7 +1,7 @@
#include "tables_manager.h"
#include "columnshard_schema.h"
-#include "blob_manager_db.h"
#include "engines/column_engine_logs.h"
+#include <ydb/core/tx/columnshard/blobs_action/blob_manager_db.h>
#include <ydb/core/scheme/scheme_types_proto.h>
#include <ydb/core/tx/tiering/manager.h>
diff --git a/ydb/core/tx/columnshard/write_actor.cpp b/ydb/core/tx/columnshard/write_actor.cpp
index 636db1b338..c9a7c005bc 100644
--- a/ydb/core/tx/columnshard/write_actor.cpp
+++ b/ydb/core/tx/columnshard/write_actor.cpp
@@ -8,29 +8,21 @@ namespace NKikimr::NColumnShard {
namespace {
-class TWriteActor : public TActorBootstrapped<TWriteActor> {
+class TWriteActor : public TActorBootstrapped<TWriteActor>, public TMonitoringObjectsCounter<TWriteActor> {
ui64 TabletId;
TUsage ResourceUsage;
- TBlobBatch BlobBatch;
IWriteController::TPtr WriteController;
THashSet<ui32> YellowMoveChannels;
THashSet<ui32> YellowStopChannels;
TInstant Deadline;
- std::optional<ui64> MaxSmallBlobSize;
public:
- static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
- return NKikimrServices::TActivity::TX_COLUMNSHARD_WRITE_ACTOR;
- }
-
- TWriteActor(ui64 tabletId, TBlobBatch&& blobBatch, IWriteController::TPtr writeController, const TInstant& deadline, std::optional<ui64> maxSmallBlobSize = {})
+ TWriteActor(ui64 tabletId, IWriteController::TPtr writeController, const TInstant& deadline)
: TabletId(tabletId)
- , BlobBatch(std::move(blobBatch))
, WriteController(writeController)
, Deadline(deadline)
- , MaxSmallBlobSize(maxSmallBlobSize)
{}
void Handle(TEvBlobStorage::TEvPutResult::TPtr& ev, const TActorContext& ctx) {
@@ -50,9 +42,8 @@ public:
}
LOG_S_TRACE("TEvPutResult for blob " << msg->Id.ToString());
-
- BlobBatch.OnBlobWriteResult(ev);
- if (BlobBatch.AllBlobWritesCompleted()) {
+ WriteController->OnBlobWriteResult(*msg);
+ if (WriteController->IsBlobActionsReady()) {
return SendResultAndDie(ctx, NKikimrProto::OK);
}
}
@@ -72,7 +63,7 @@ public:
}
}
- auto putResult = std::make_shared<TBlobPutResult>(putStatus, std::move(BlobBatch),
+ auto putResult = std::make_shared<TBlobPutResult>(putStatus,
std::move(YellowMoveChannels),
std::move(YellowStopChannels),
ResourceUsage);
@@ -92,25 +83,12 @@ public:
ctx.Schedule(timeout, new TEvents::TEvWakeup());
}
- auto blobsConstructor = WriteController->GetBlobConstructor();
- if (!blobsConstructor) {
- return SendResultAndDie(ctx, NKikimrProto::CORRUPTED);
+ while (auto writeInfo = WriteController->Next()) {
+ ResourceUsage.Network += writeInfo->GetData().size();
+ writeInfo->GetWriteOperator()->SendWriteBlobRequest(writeInfo->GetData(), writeInfo->GetBlobId());
}
- auto status = NOlap::IBlobConstructor::EStatus::Finished;
- while (true) {
- status = blobsConstructor->BuildNext();
- if (status != NOlap::IBlobConstructor::EStatus::Ok) {
- break;
- }
- auto blobId = SendWriteBlobRequest(blobsConstructor->GetBlob(), ctx);
- blobsConstructor->RegisterBlobId(blobId);
- }
- if (status != NOlap::IBlobConstructor::EStatus::Finished) {
- return SendResultAndDie(ctx, NKikimrProto::CORRUPTED);
- }
-
- if (BlobBatch.AllBlobWritesCompleted()) {
+ if (WriteController->IsBlobActionsReady()) {
return SendResultAndDie(ctx, NKikimrProto::OK);
}
Become(&TThis::StateWait);
@@ -124,23 +102,12 @@ public:
break;
}
}
-
-private:
- TUnifiedBlobId SendWriteBlobRequest(const TString& data, const TActorContext& ctx) {
- ResourceUsage.Network += data.size();
- if (MaxSmallBlobSize && data.size() <= *MaxSmallBlobSize) {
- TUnifiedBlobId smallBlobId = BlobBatch.AddSmallBlob(data);
- Y_VERIFY(smallBlobId.IsSmallBlob());
- return smallBlobId;
- }
- return BlobBatch.SendWriteBlobRequest(data, Deadline, ctx);
- }
};
} // namespace
-IActor* CreateWriteActor(ui64 tabletId, IWriteController::TPtr writeController, TBlobBatch&& blobBatch, const TInstant& deadline, const ui64 maxSmallBlobSize) {
- return new TWriteActor(tabletId, std::move(blobBatch), writeController, deadline, maxSmallBlobSize);
+IActor* CreateWriteActor(ui64 tabletId, IWriteController::TPtr writeController, const TInstant& deadline) {
+ return new TWriteActor(tabletId, writeController, deadline);
}
}
diff --git a/ydb/core/tx/columnshard/ya.make b/ydb/core/tx/columnshard/ya.make
index cbb9ca0a0d..6988f4b61c 100644
--- a/ydb/core/tx/columnshard/ya.make
+++ b/ydb/core/tx/columnshard/ya.make
@@ -5,7 +5,6 @@ SRCS(
blob.cpp
blob_cache.cpp
blob_manager.cpp
- blob_manager_db.cpp
blob_manager_txs.cpp
columnshard__export.cpp
columnshard__forget.cpp
@@ -57,6 +56,7 @@ PEERDIR(
ydb/core/tx/columnshard/splitter
ydb/core/tx/columnshard/operations
ydb/core/tx/columnshard/blobs_reader
+ ydb/core/tx/columnshard/blobs_action
ydb/core/tx/tiering
ydb/core/tx/conveyor/usage
ydb/core/tx/long_tx_service/public