diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-09-11 11:06:51 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-09-11 11:26:48 +0300 |
commit | 7e312058f4354a5d71b978f48e15e784021e125c (patch) | |
tree | 1a01829614e7b530266f66586f4c7aeaf75c0773 | |
parent | 4f1de009d5b8057a7b1e010724626a0364c368fc (diff) | |
download | ydb-7e312058f4354a5d71b978f48e15e784021e125c.tar.gz |
KIKIMR-19215: start separation blobs operations and metadata operations
59 files changed, 629 insertions, 282 deletions
diff --git a/ydb/core/protos/counters_columnshard.proto b/ydb/core/protos/counters_columnshard.proto index 804ac16744..4bb74f05b8 100644 --- a/ydb/core/protos/counters_columnshard.proto +++ b/ydb/core/protos/counters_columnshard.proto @@ -174,4 +174,5 @@ enum ETxTypes { TXTYPE_READ_BLOB_RANGES = 12 [(TxTypeOpts) = {Name: "TxReadBlobRanges"}]; TXTYPE_EXPORT = 13 [(TxTypeOpts) = {Name: "TxExport"}]; TXTYPE_FORGET = 14 [(TxTypeOpts) = {Name: "TxForget"}]; + TXTYPE_WRITE_DRAFT = 15 [(TxTypeOpts) = {Name: "TxWriteDraft"}]; } diff --git a/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt index c0614e205c..46da987a57 100644 --- a/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt @@ -6,6 +6,7 @@ # original buildsystem will not be accepted. +add_subdirectory(blobs_action) add_subdirectory(blobs_reader) add_subdirectory(common) add_subdirectory(counters) @@ -53,6 +54,7 @@ target_link_libraries(core-tx-columnshard PUBLIC tx-columnshard-splitter tx-columnshard-operations tx-columnshard-blobs_reader + tx-columnshard-blobs_action core-tx-tiering tx-conveyor-usage tx-long_tx_service-public @@ -66,7 +68,6 @@ target_sources(core-tx-columnshard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blob.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blob_cache.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blob_manager.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blob_manager_db.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blob_manager_txs.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard__export.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard__forget.cpp diff --git a/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt index 67c260955f..4cf6358090 100644 --- a/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt @@ -6,6 +6,7 @@ # original buildsystem will not be accepted. +add_subdirectory(blobs_action) add_subdirectory(blobs_reader) add_subdirectory(common) add_subdirectory(counters) @@ -54,6 +55,7 @@ target_link_libraries(core-tx-columnshard PUBLIC tx-columnshard-splitter tx-columnshard-operations tx-columnshard-blobs_reader + tx-columnshard-blobs_action core-tx-tiering tx-conveyor-usage tx-long_tx_service-public @@ -67,7 +69,6 @@ target_sources(core-tx-columnshard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blob.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blob_cache.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blob_manager.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blob_manager_db.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blob_manager_txs.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard__export.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard__forget.cpp diff --git a/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt index 67c260955f..4cf6358090 100644 --- a/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt @@ -6,6 +6,7 @@ # original buildsystem will not be accepted. +add_subdirectory(blobs_action) add_subdirectory(blobs_reader) add_subdirectory(common) add_subdirectory(counters) @@ -54,6 +55,7 @@ target_link_libraries(core-tx-columnshard PUBLIC tx-columnshard-splitter tx-columnshard-operations tx-columnshard-blobs_reader + tx-columnshard-blobs_action core-tx-tiering tx-conveyor-usage tx-long_tx_service-public @@ -67,7 +69,6 @@ target_sources(core-tx-columnshard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blob.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blob_cache.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blob_manager.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blob_manager_db.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blob_manager_txs.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard__export.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard__forget.cpp diff --git a/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt index c0614e205c..46da987a57 100644 --- a/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt @@ -6,6 +6,7 @@ # original buildsystem will not be accepted. +add_subdirectory(blobs_action) add_subdirectory(blobs_reader) add_subdirectory(common) add_subdirectory(counters) @@ -53,6 +54,7 @@ target_link_libraries(core-tx-columnshard PUBLIC tx-columnshard-splitter tx-columnshard-operations tx-columnshard-blobs_reader + tx-columnshard-blobs_action core-tx-tiering tx-conveyor-usage tx-long_tx_service-public @@ -66,7 +68,6 @@ target_sources(core-tx-columnshard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blob.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blob_cache.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blob_manager.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blob_manager_db.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blob_manager_txs.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard__export.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard__forget.cpp diff --git a/ydb/core/tx/columnshard/blob_manager.cpp b/ydb/core/tx/columnshard/blob_manager.cpp index 056fb528d7..1bda48bfe1 100644 --- a/ydb/core/tx/columnshard/blob_manager.cpp +++ b/ydb/core/tx/columnshard/blob_manager.cpp @@ -1,8 +1,8 @@ #include "defs.h" #include "columnshard_impl.h" #include "blob_manager.h" -#include "blob_manager_db.h" #include "blob_cache.h" +#include <ydb/core/tx/columnshard/blobs_action/blob_manager_db.h> #include <ydb/core/base/blobstorage.h> @@ -91,22 +91,16 @@ void TBlobBatch::SendWriteRequest(const TActorContext& ctx, ui32 groupId, const SendPutToGroup(ctx, groupId, BatchInfo->TabletInfo.Get(), std::move(put), cookie); } -TUnifiedBlobId TBlobBatch::SendWriteBlobRequest(const TString& blobData, TInstant deadline, const TActorContext& ctx) { - Y_VERIFY(blobData.size() <= TLimits::GetBlobSizeLimit(), "Blob %" PRISZT" size exceeds the limit %" PRIu64, - blobData.size(), TLimits::GetBlobSizeLimit()); - - TUnifiedBlobId blobId = BatchInfo->NextBlobId(blobData.size()); - ui32 groupId = blobId.GetDsGroup(); +void TBlobBatch::SendWriteBlobRequest(const TString& blobData, const TUnifiedBlobId& blobId, TInstant deadline, const TActorContext& ctx) { + Y_VERIFY(blobData.size() <= TLimits::GetBlobSizeLimit(), "Blob %" PRISZT" size exceeds the limit %" PRIu64, blobData.size(), TLimits::GetBlobSizeLimit()); + const ui32 groupId = blobId.GetDsGroup(); SendWriteRequest(ctx, groupId, blobId.GetLogoBlobId(), blobData, 0, deadline); - - return blobId; } -void TBlobBatch::OnBlobWriteResult(TEvBlobStorage::TEvPutResult::TPtr& ev) { - TLogoBlobID blobId = ev->Get()->Id; +void TBlobBatch::OnBlobWriteResult(const TLogoBlobID& blobId, const NKikimrProto::EReplyStatus status) { BatchInfo->Counters.OnPutResult(blobId.BlobSize()); - Y_VERIFY(ev->Get()->Status == NKikimrProto::OK, "The caller must handle unsuccessful status"); + Y_VERIFY(status == NKikimrProto::OK, "The caller must handle unsuccessful status"); Y_VERIFY(BatchInfo); Y_VERIFY(BatchInfo->InFlight[blobId.Cookie()], "Blob %s is already acked!", blobId.ToString().c_str()); @@ -140,6 +134,10 @@ TUnifiedBlobId TBlobBatch::AddSmallBlob(const TString& data) { return BatchInfo->AddSmallBlob(data); } +TUnifiedBlobId TBlobBatch::AllocateNextBlobId(const TString& blobData) { + return BatchInfo->NextBlobId(blobData.size()); +} + TBlobManager::TBlobManager(TIntrusivePtr<TTabletStorageInfo> tabletInfo, ui32 gen) : TabletInfo(tabletInfo) , CurrentGen(gen) @@ -467,7 +465,7 @@ TBlobBatch TBlobManager::StartBlobBatch(ui32 channel) { return TBlobBatch(std::move(batchInfo)); } -void TBlobManager::SaveBlobBatch(TBlobBatch&& blobBatch, IBlobManagerDb& db) { +void TBlobManager::DoSaveBlobBatch(TBlobBatch&& blobBatch, IBlobManagerDb& db) { Y_VERIFY(blobBatch.BatchInfo); ++CountersUpdate.BatchesCommitted; CountersUpdate.BlobsWritten += blobBatch.GetBlobCount(); diff --git a/ydb/core/tx/columnshard/blob_manager.h b/ydb/core/tx/columnshard/blob_manager.h index 9d4852565c..24777f3a2b 100644 --- a/ydb/core/tx/columnshard/blob_manager.h +++ b/ydb/core/tx/columnshard/blob_manager.h @@ -40,10 +40,12 @@ public: ~TBlobBatch(); // Write new blob as a part of this batch - TUnifiedBlobId SendWriteBlobRequest(const TString& blobData, TInstant deadline, const TActorContext& ctx); + void SendWriteBlobRequest(const TString& blobData, const TUnifiedBlobId& blobId, TInstant deadline, const TActorContext& ctx); + + TUnifiedBlobId AllocateNextBlobId(const TString& blobData); // Called with the result of WriteBlob request - void OnBlobWriteResult(TEvBlobStorage::TEvPutResult::TPtr& ev); + void OnBlobWriteResult(const TLogoBlobID& blobId, const NKikimrProto::EReplyStatus status); // Tells if all WriteBlob requests got corresponding results bool AllBlobWritesCompleted() const; @@ -65,7 +67,7 @@ class IBlobManagerDb; class IBlobManager { protected: static constexpr ui32 BLOB_CHANNEL = 2; - + virtual void DoSaveBlobBatch(TBlobBatch&& blobBatch, IBlobManagerDb& db) = 0; public: virtual ~IBlobManager() = default; @@ -76,8 +78,13 @@ public: // This method is called in the same transaction in which the user saves references to blobs // in some LocalDB table. It tells the BlobManager that the blobs are becoming permanently saved. - // NOTE: At this point all blob writes must be already acknowleged. - virtual void SaveBlobBatch(TBlobBatch&& blobBatch, IBlobManagerDb& db) = 0; + // NOTE: At this point all blob writes must be already acknowledged. + void SaveBlobBatch(TBlobBatch&& blobBatch, IBlobManagerDb& db) { + if (blobBatch.GetBlobCount() == 0) { + return; + } + return DoSaveBlobBatch(std::move(blobBatch), db); + } // Deletes the blob that was previously permanently saved virtual void DeleteBlob(const TUnifiedBlobId& blobId, IBlobManagerDb& db) = 0; @@ -207,6 +214,8 @@ private: std::unordered_map<TEvictedBlob, TEvictMetadata, THash<NKikimr::NOlap::TEvictedBlob>> EvictedBlobs; std::unordered_map<TEvictedBlob, TEvictMetadata, THash<NKikimr::NOlap::TEvictedBlob>> DroppedEvictedBlobs; + virtual void DoSaveBlobBatch(TBlobBatch&& blobBatch, IBlobManagerDb& db) override; + public: TBlobManager(TIntrusivePtr<TTabletStorageInfo> tabletInfo, ui32 gen); @@ -239,7 +248,6 @@ public: // Implementation of IBlobManager interface TBlobBatch StartBlobBatch(ui32 channel = BLOB_CHANNEL) override; - void SaveBlobBatch(TBlobBatch&& blobBatch, IBlobManagerDb& db) override; void DeleteBlob(const TUnifiedBlobId& blobId, IBlobManagerDb& db) override; // Implementation of IBlobExporter diff --git a/ydb/core/tx/columnshard/blob_manager_txs.cpp b/ydb/core/tx/columnshard/blob_manager_txs.cpp index 8f1206d09c..f3548cff1c 100644 --- a/ydb/core/tx/columnshard/blob_manager_txs.cpp +++ b/ydb/core/tx/columnshard/blob_manager_txs.cpp @@ -1,7 +1,8 @@ #include "defs.h" #include "columnshard_impl.h" #include "blob_manager.h" -#include "blob_manager_db.h" + +#include <ydb/core/tx/columnshard/blobs_action/blob_manager_db.h> #include <ydb/core/base/blobstorage.h> diff --git a/ydb/core/tx/columnshard/blobs_action/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/blobs_action/CMakeLists.darwin-x86_64.txt new file mode 100644 index 0000000000..1558e932a7 --- /dev/null +++ b/ydb/core/tx/columnshard/blobs_action/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,23 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(tx-columnshard-blobs_action) +target_link_libraries(tx-columnshard-blobs_action PUBLIC + contrib-libs-cxxsupp + yutil + ydb-core-protos + libs-apache-arrow + ydb-core-tablet_flat + core-tx-tiering +) +target_sources(tx-columnshard-blobs_action PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_action/abstract.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_action/bs.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_action/blob_manager_db.cpp +) diff --git a/ydb/core/tx/columnshard/blobs_action/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/blobs_action/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..bed25b4c44 --- /dev/null +++ b/ydb/core/tx/columnshard/blobs_action/CMakeLists.linux-aarch64.txt @@ -0,0 +1,24 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(tx-columnshard-blobs_action) +target_link_libraries(tx-columnshard-blobs_action PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + ydb-core-protos + libs-apache-arrow + ydb-core-tablet_flat + core-tx-tiering +) +target_sources(tx-columnshard-blobs_action PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_action/abstract.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_action/bs.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_action/blob_manager_db.cpp +) diff --git a/ydb/core/tx/columnshard/blobs_action/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/blobs_action/CMakeLists.linux-x86_64.txt new file mode 100644 index 0000000000..bed25b4c44 --- /dev/null +++ b/ydb/core/tx/columnshard/blobs_action/CMakeLists.linux-x86_64.txt @@ -0,0 +1,24 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(tx-columnshard-blobs_action) +target_link_libraries(tx-columnshard-blobs_action PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + ydb-core-protos + libs-apache-arrow + ydb-core-tablet_flat + core-tx-tiering +) +target_sources(tx-columnshard-blobs_action PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_action/abstract.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_action/bs.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_action/blob_manager_db.cpp +) diff --git a/ydb/core/tx/columnshard/blobs_action/CMakeLists.txt b/ydb/core/tx/columnshard/blobs_action/CMakeLists.txt new file mode 100644 index 0000000000..f8b31df0c1 --- /dev/null +++ b/ydb/core/tx/columnshard/blobs_action/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/core/tx/columnshard/blobs_action/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/blobs_action/CMakeLists.windows-x86_64.txt new file mode 100644 index 0000000000..1558e932a7 --- /dev/null +++ b/ydb/core/tx/columnshard/blobs_action/CMakeLists.windows-x86_64.txt @@ -0,0 +1,23 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(tx-columnshard-blobs_action) +target_link_libraries(tx-columnshard-blobs_action PUBLIC + contrib-libs-cxxsupp + yutil + ydb-core-protos + libs-apache-arrow + ydb-core-tablet_flat + core-tx-tiering +) +target_sources(tx-columnshard-blobs_action PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_action/abstract.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_action/bs.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_action/blob_manager_db.cpp +) diff --git a/ydb/core/tx/columnshard/blobs_action/abstract.cpp b/ydb/core/tx/columnshard/blobs_action/abstract.cpp new file mode 100644 index 0000000000..8b6b1715fc --- /dev/null +++ b/ydb/core/tx/columnshard/blobs_action/abstract.cpp @@ -0,0 +1,5 @@ +#include "abstract.h" + +namespace NKikimr::NOlap { + +} diff --git a/ydb/core/tx/columnshard/blobs_action/abstract.h b/ydb/core/tx/columnshard/blobs_action/abstract.h new file mode 100644 index 0000000000..49f7409b84 --- /dev/null +++ b/ydb/core/tx/columnshard/blobs_action/abstract.h @@ -0,0 +1,68 @@ +#pragma once +#include "blob_manager_db.h" + +#include <ydb/core/protos/base.pb.h> +#include <ydb/core/tx/columnshard/blob.h> +#include <ydb/core/tx/columnshard/counters/blobs_manager.h> + +#include <ydb/core/tablet_flat/flat_executor.h> +#include <ydb/core/util/backoff.h> +#include <ydb/core/protos/tx_columnshard.pb.h> + +#include <util/generic/string.h> + +namespace NKikimr::NOlap { + +using NOlap::TUnifiedBlobId; +using NOlap::TBlobRange; +using NOlap::TEvictedBlob; +using NOlap::EEvictState; +using NKikimrTxColumnShard::TEvictMetadata; + +class IBlobsAction { +protected: + virtual void DoOnExecuteTxBeforeWrite(NColumnShard::TColumnShard& self, NColumnShard::TBlobManagerDb& dbBlobs) = 0; + virtual void DoOnCompleteTxBeforeWrite(NColumnShard::TColumnShard& self) = 0; + + virtual void DoSendWriteBlobRequest(const TString& data, const TUnifiedBlobId& blobId) = 0; + virtual void DoOnBlobWriteResult(const TLogoBlobID& blobId, const NKikimrProto::EReplyStatus status) = 0; + + virtual void DoOnExecuteTxAfterWrite(NColumnShard::TColumnShard& self, NColumnShard::TBlobManagerDb& dbBlobs) = 0; + virtual void DoOnCompleteTxAfterWrite(NColumnShard::TColumnShard& self) = 0; +public: + virtual ~IBlobsAction() = default; + virtual bool IsReady() const = 0; + + virtual TUnifiedBlobId AllocateNextBlobId(const TString& data) = 0; + + void OnBlobWriteResult(const TLogoBlobID& blobId, const NKikimrProto::EReplyStatus status) { + return DoOnBlobWriteResult(blobId, status); + } + + void OnExecuteTxBeforeWrite(NColumnShard::TColumnShard& self, NColumnShard::TBlobManagerDb& dbBlobs) { + return DoOnExecuteTxBeforeWrite(self, dbBlobs); + } + + virtual ui32 GetBlobsCount() const = 0; + virtual ui32 GetTotalSize() const = 0; + + virtual bool NeedDraftTransaction() const = 0; + + void OnCompleteTxBeforeWrite(NColumnShard::TColumnShard& self) { + return DoOnCompleteTxBeforeWrite(self); + } + + void OnExecuteTxAfterWrite(NColumnShard::TColumnShard& self, NColumnShard::TBlobManagerDb& dbBlobs) { + return DoOnExecuteTxAfterWrite(self, dbBlobs); + } + + void OnCompleteTxAfterWrite(NColumnShard::TColumnShard& self) { + return DoOnCompleteTxAfterWrite(self); + } + + void SendWriteBlobRequest(const TString& data, const TUnifiedBlobId& blobId) { + return DoSendWriteBlobRequest(data, blobId); + } +}; + +} diff --git a/ydb/core/tx/columnshard/blob_manager_db.cpp b/ydb/core/tx/columnshard/blobs_action/blob_manager_db.cpp index c7a8b90bd5..2920654f51 100644 --- a/ydb/core/tx/columnshard/blob_manager_db.cpp +++ b/ydb/core/tx/columnshard/blobs_action/blob_manager_db.cpp @@ -1,6 +1,5 @@ #include "blob_manager_db.h" -#include "blob_manager.h" -#include "columnshard_schema.h" +#include <ydb/core/tx/columnshard/columnshard_schema.h> namespace NKikimr::NColumnShard { diff --git a/ydb/core/tx/columnshard/blob_manager_db.h b/ydb/core/tx/columnshard/blobs_action/blob_manager_db.h index e840c0d336..2c807387c1 100644 --- a/ydb/core/tx/columnshard/blob_manager_db.h +++ b/ydb/core/tx/columnshard/blobs_action/blob_manager_db.h @@ -1,7 +1,6 @@ #pragma once -#include "defs.h" - -#include "blob_manager.h" +#include <ydb/core/tx/columnshard/defs.h> +#include <ydb/core/tx/columnshard/blob_manager.h> namespace NKikimr::NTable { class TDatabase; diff --git a/ydb/core/tx/columnshard/blobs_action/bs.cpp b/ydb/core/tx/columnshard/blobs_action/bs.cpp new file mode 100644 index 0000000000..ac99515955 --- /dev/null +++ b/ydb/core/tx/columnshard/blobs_action/bs.cpp @@ -0,0 +1,16 @@ +#include "bs.h" +#include <ydb/core/tx/columnshard/columnshard_impl.h> + +namespace NKikimr::NOlap { + +void TBSWriteAction::DoOnExecuteTxAfterWrite(NColumnShard::TColumnShard& self, NColumnShard::TBlobManagerDb& dbBlobs) { + ui64 blobsWritten = BlobBatch.GetBlobCount(); + ui64 bytesWritten = BlobBatch.GetTotalSize(); + self.IncCounter(NColumnShard::COUNTER_UPSERT_BLOBS_WRITTEN, blobsWritten); + self.IncCounter(NColumnShard::COUNTER_UPSERT_BYTES_WRITTEN, bytesWritten); +// self.IncCounter(NColumnShard::COUNTER_RAW_BYTES_UPSERTED, insertedBytes); + self.IncCounter(NColumnShard::COUNTER_WRITE_SUCCESS); + self.BlobManager->SaveBlobBatch(std::move(BlobBatch), dbBlobs); +} + +} diff --git a/ydb/core/tx/columnshard/blobs_action/bs.h b/ydb/core/tx/columnshard/blobs_action/bs.h new file mode 100644 index 0000000000..6a53117a57 --- /dev/null +++ b/ydb/core/tx/columnshard/blobs_action/bs.h @@ -0,0 +1,57 @@ +#pragma once + +#include "abstract.h" +#include <ydb/core/tx/columnshard/blob_manager.h> + +namespace NKikimr::NOlap { + +class TBSWriteAction: public IBlobsAction { +private: + NColumnShard::TBlobBatch BlobBatch; +protected: + virtual void DoSendWriteBlobRequest(const TString& data, const TUnifiedBlobId& blobId) override { + return BlobBatch.SendWriteBlobRequest(data, blobId, TInstant::Max(), TActorContext::AsActorContext()); + } + + virtual void DoOnBlobWriteResult(const TLogoBlobID& blobId, const NKikimrProto::EReplyStatus status) override { + return BlobBatch.OnBlobWriteResult(blobId, status); + } + + virtual void DoOnExecuteTxBeforeWrite(NColumnShard::TColumnShard& /*self*/, NColumnShard::TBlobManagerDb& /*dbBlobs*/) override { + return; + } + + virtual void DoOnCompleteTxBeforeWrite(NColumnShard::TColumnShard& /*self*/) override { + return; + } + + virtual void DoOnExecuteTxAfterWrite(NColumnShard::TColumnShard& self, NColumnShard::TBlobManagerDb& dbBlobs) override; + virtual void DoOnCompleteTxAfterWrite(NColumnShard::TColumnShard& /*self*/) override { + + } +public: + virtual ui32 GetBlobsCount() const override { + return BlobBatch.GetBlobCount(); + } + virtual ui32 GetTotalSize() const override { + return BlobBatch.GetTotalSize(); + } + virtual bool NeedDraftTransaction() const override { + return false; + } + + virtual TUnifiedBlobId AllocateNextBlobId(const TString& data) override { + return BlobBatch.AllocateNextBlobId(data); + } + virtual bool IsReady() const override { + return BlobBatch.AllBlobWritesCompleted(); + } + + TBSWriteAction(NColumnShard::IBlobManager& blobManager) + : BlobBatch(blobManager.StartBlobBatch()) + { + + } +}; + +} diff --git a/ydb/core/tx/columnshard/blobs_action/ya.make b/ydb/core/tx/columnshard/blobs_action/ya.make new file mode 100644 index 0000000000..0993365a18 --- /dev/null +++ b/ydb/core/tx/columnshard/blobs_action/ya.make @@ -0,0 +1,16 @@ +LIBRARY() + +SRCS( + abstract.cpp + bs.cpp + blob_manager_db.cpp +) + +PEERDIR( + ydb/core/protos + contrib/libs/apache/arrow + ydb/core/tablet_flat + ydb/core/tx/tiering +) + +END() diff --git a/ydb/core/tx/columnshard/columnshard__export.cpp b/ydb/core/tx/columnshard/columnshard__export.cpp index b74cafade3..d5a1d740b1 100644 --- a/ydb/core/tx/columnshard/columnshard__export.cpp +++ b/ydb/core/tx/columnshard/columnshard__export.cpp @@ -1,6 +1,6 @@ #include "columnshard_impl.h" -#include "blob_manager_db.h" #include "columnshard_schema.h" +#include <ydb/core/tx/columnshard/blobs_action/blob_manager_db.h> namespace NKikimr::NColumnShard { diff --git a/ydb/core/tx/columnshard/columnshard__forget.cpp b/ydb/core/tx/columnshard/columnshard__forget.cpp index 4fe9466a24..9843849c7a 100644 --- a/ydb/core/tx/columnshard/columnshard__forget.cpp +++ b/ydb/core/tx/columnshard/columnshard__forget.cpp @@ -1,5 +1,5 @@ #include "columnshard_impl.h" -#include "blob_manager_db.h" +#include <ydb/core/tx/columnshard/blobs_action/blob_manager_db.h> namespace NKikimr::NColumnShard { diff --git a/ydb/core/tx/columnshard/columnshard__init.cpp b/ydb/core/tx/columnshard/columnshard__init.cpp index 392d294809..60f48f2216 100644 --- a/ydb/core/tx/columnshard/columnshard__init.cpp +++ b/ydb/core/tx/columnshard/columnshard__init.cpp @@ -2,7 +2,7 @@ #include "columnshard_ttl.h" #include "columnshard_private_events.h" #include "columnshard_schema.h" -#include "blob_manager_db.h" +#include <ydb/core/tx/columnshard/blobs_action/blob_manager_db.h> #include <ydb/core/tablet/tablet_exception.h> #include <ydb/core/tx/columnshard/operations/write.h> diff --git a/ydb/core/tx/columnshard/columnshard__read_blob_ranges.cpp b/ydb/core/tx/columnshard/columnshard__read_blob_ranges.cpp index 08f291b55b..e5a04b48d2 100644 --- a/ydb/core/tx/columnshard/columnshard__read_blob_ranges.cpp +++ b/ydb/core/tx/columnshard/columnshard__read_blob_ranges.cpp @@ -1,7 +1,7 @@ #include "columnshard_impl.h" #include "columnshard_private_events.h" #include "columnshard_schema.h" -#include "blob_manager_db.h" +#include <ydb/core/tx/columnshard/blobs_action/blob_manager_db.h> namespace NKikimr::NColumnShard { diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp index 30d214fb4d..0e7b6e3b0b 100644 --- a/ydb/core/tx/columnshard/columnshard__write.cpp +++ b/ydb/core/tx/columnshard/columnshard__write.cpp @@ -1,8 +1,11 @@ #include "columnshard_impl.h" #include "columnshard_schema.h" -#include "blob_manager_db.h" #include "blob_cache.h" +#include "blobs_action/bs.h" +#include "operations/slice_builder.h" +#include <ydb/core/tx/conveyor/usage/service.h> +#include <ydb/core/tx/columnshard/blobs_action/blob_manager_db.h> #include <ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h> #include <ydb/core/tx/columnshard/operations/write.h> #include <ydb/core/tx/columnshard/operations/write_data.h> @@ -11,6 +14,31 @@ namespace NKikimr::NColumnShard { using namespace NTabletFlatExecutor; +class TTxWriteDraft: public TTransactionBase<TColumnShard> { +private: + const IWriteController::TPtr WriteController; +public: + TTxWriteDraft(TColumnShard* self, const IWriteController::TPtr writeController) + : TBase(self) + , WriteController(writeController) { + } + + bool Execute(TTransactionContext& txc, const TActorContext& /*ctx*/) override { + TBlobManagerDb blobManagerDb(txc.DB); + for (auto&& action : WriteController->GetBlobActions()) { + action->OnExecuteTxBeforeWrite(*Self, blobManagerDb); + } + return true; + } + void Complete(const TActorContext& ctx) override { + for (auto&& action : WriteController->GetBlobActions()) { + action->OnCompleteTxBeforeWrite(*Self); + } + ctx.Register(NColumnShard::CreateWriteActor(Self->TabletID(), WriteController, TInstant::Max())); + } + TTxType GetTxType() const override { return TXTYPE_WRITE_DRAFT; } +}; + class TTxWrite : public TTransactionBase<TColumnShard> { public: TTxWrite(TColumnShard* self, const TEvPrivate::TEvWriteBlobsResult::TPtr& putBlobResult) @@ -40,8 +68,6 @@ private: }; bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const TEvPrivate::TEvWriteBlobsResult::TPutBlobData& blobData, const TWriteId writeId) { - const TString data = blobData.GetBlobData(); - const NKikimrTxColumnShard::TLogicalMetadata& meta = blobData.GetLogicalMeta(); const auto& blobRange = blobData.GetBlobRange(); @@ -77,7 +103,6 @@ bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const TEvPrivate::TEvWrit // Put new data into blob cache Y_VERIFY(blobRange.IsFullBlob()); - NBlobCache::AddRangeToCache(blobRange, blobData.GetBlobData()); Self->UpdateInsertTableCounters(); return true; @@ -103,7 +128,6 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) { } TVector<TWriteId> writeIds; - ui64 insertedBytes = 0; for (auto blobData : PutBlobResult->Get()->GetBlobData()) { auto writeId = TWriteId(writeMeta.GetWriteId()); if (operation) { @@ -113,26 +137,16 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) { writeId = Self->GetLongTxWrite(db, writeMeta.GetLongTxIdUnsafe(), writeMeta.GetWritePartId()); } - if (InsertOneBlob(txc, blobData, writeId)) { - insertedBytes += blobData.GetLogicalMeta().GetRawBytes(); - } else { + if (!InsertOneBlob(txc, blobData, writeId)) { LOG_S_DEBUG(TxPrefix() << "duplicate writeId " << (ui64)writeId << TxSuffix()); Self->IncCounter(COUNTER_WRITE_DUPLICATE); } writeIds.push_back(writeId); } - if (insertedBytes > 0) { - TBlobManagerDb blobManagerDb(txc.DB); - const auto& blobBatch(PutBlobResult->Get()->GetPutResult().GetBlobBatch()); - ui64 blobsWritten = blobBatch.GetBlobCount(); - ui64 bytesWritten = blobBatch.GetTotalSize(); - Self->IncCounter(COUNTER_UPSERT_BLOBS_WRITTEN, blobsWritten); - Self->IncCounter(COUNTER_UPSERT_BYTES_WRITTEN, bytesWritten); - Self->IncCounter(COUNTER_RAW_BYTES_UPSERTED, insertedBytes); - Self->IncCounter(COUNTER_WRITE_SUCCESS); - - Self->BlobManager->SaveBlobBatch((std::move(PutBlobResult->Get()->GetPutResultPtr()->ReleaseBlobBatch())), blobManagerDb); + TBlobManagerDb blobManagerDb(txc.DB); + for (auto&& i : PutBlobResult->Get()->GetActions()) { + i->OnExecuteTxAfterWrite(*Self, blobManagerDb); } if (operation) { @@ -234,7 +248,10 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteBlobsResult::TPtr& ev, const TActo Execute(new TTxWrite(this, ev), ctx); } +} +void TColumnShard::Handle(TEvPrivate::TEvWriteDraft::TPtr& ev, const TActorContext& ctx) { + Execute(new TTxWriteDraft(this, ev->Get()->WriteController), ctx); } void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContext& ctx) { @@ -304,9 +321,8 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex << WritesMonitor.DebugString() << " at tablet " << TabletID()); - auto writeController = std::make_shared<NOlap::TIndexedWriteController>(ctx.SelfID, writeData); - CSCounters.OnWritePutBlobsStart(); - ctx.Register(CreateWriteActor(TabletID(), writeController, BlobManager->StartBlobBatch(), TInstant::Max(), Settings.MaxSmallBlobSize)); + std::shared_ptr<NConveyor::ITask> task = std::make_shared<NOlap::TBuildSlicesTask>(TabletID(), SelfId(), std::make_shared<NOlap::TBSWriteAction>(*BlobManager), writeData); + NConveyor::TCompServiceOperator::SendTaskToExecute(task); } } diff --git a/ydb/core/tx/columnshard/columnshard__write_index.cpp b/ydb/core/tx/columnshard/columnshard__write_index.cpp index 69360af9ce..bcc4b10592 100644 --- a/ydb/core/tx/columnshard/columnshard__write_index.cpp +++ b/ydb/core/tx/columnshard/columnshard__write_index.cpp @@ -1,10 +1,12 @@ #include "columnshard_impl.h" #include "columnshard_private_events.h" #include "columnshard_schema.h" -#include "blob_manager_db.h" #include "blob_cache.h" +#include "blobs_action/bs.h" +#include <ydb/core/tx/columnshard/blobs_action/blob_manager_db.h> #include <ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.h> +#include <library/cpp/actors/core/log.h> namespace NKikimr::NColumnShard { @@ -73,16 +75,13 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx) NOlap::TWriteIndexContext context(txc, dbWrap); changes->WriteIndex(*Self, context); - if (Ev->Get()->PutResult->GetBlobBatch().GetBlobCount()) { - Self->BlobManager->SaveBlobBatch(std::move(Ev->Get()->PutResult->ReleaseBlobBatch()), *context.BlobManagerDb); - } + Ev->Get()->BlobsAction->OnExecuteTxAfterWrite(*Self, *context.BlobManagerDb); Self->UpdateIndexCounters(); } else { for (ui32 i = 0; i < changes->GetWritePortionsCount(); ++i) { for (auto&& i : changes->GetWritePortionInfo(i)->GetPortionInfo().Records) { - LOG_S_WARN(TxPrefix() << "(" << changes->TypeString() << ":" << i.BlobRange << ") blob cannot apply changes: " - << TxSuffix()); + LOG_S_WARN(TxPrefix() << "(" << changes->TypeString() << ":" << i.BlobRange << ") blob cannot apply changes: " << TxSuffix()); } } NOlap::TChangesFinishContext context("cannot write index blobs"); @@ -99,8 +98,8 @@ void TTxWriteIndex::Complete(const TActorContext& ctx) { CompleteReady = true; LOG_S_DEBUG(TxPrefix() << "complete" << TxSuffix()); - const ui64 blobsWritten = Ev->Get()->PutResult->GetBlobBatch().GetBlobCount(); - const ui64 bytesWritten = Ev->Get()->PutResult->GetBlobBatch().GetTotalSize(); + const ui64 blobsWritten = Ev->Get()->BlobsAction->GetBlobsCount(); + const ui64 bytesWritten = Ev->Get()->BlobsAction->GetTotalSize(); if (!Ev->Get()->IndexChanges->IsAborted()) { NOlap::TWriteIndexCompleteContext context(ctx, blobsWritten, bytesWritten, Ev->Get()->Duration, TriggerActivity); @@ -114,6 +113,7 @@ void TTxWriteIndex::Complete(const TActorContext& ctx) { } Self->UpdateResourceMetrics(ctx, Ev->Get()->PutResult->GetResourceUsage()); + Ev->Get()->BlobsAction->OnCompleteTxAfterWrite(*Self); } void TColumnShard::Handle(TEvPrivate::TEvWriteIndex::TPtr& ev, const TActorContext& ctx) { @@ -121,7 +121,7 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteIndex::TPtr& ev, const TActorConte if (putStatus == NKikimrProto::UNKNOWN) { if (IsAnyChannelYellowStop()) { - LOG_S_ERROR("WriteIndex (out of disk space) at tablet " << TabletID()); + ACFL_ERROR("event", "TEvWriteIndex failed")("reason", "channel yellow stop"); IncCounter(COUNTER_OUT_OF_SPACE); ev->Get()->SetPutStatus(NKikimrProto::TRYLATER); @@ -129,11 +129,11 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteIndex::TPtr& ev, const TActorConte ev->Get()->IndexChanges->Abort(*this, context); ctx.Schedule(FailActivationDelay, new TEvPrivate::TEvPeriodicWakeup(true)); } else { - LOG_S_DEBUG("WriteIndex (" << ev->Get()->IndexChanges->GetWritePortionsCount() << " portions) at tablet " << TabletID()); + ACFL_DEBUG("event", "TEvWriteIndex")("count", ev->Get()->IndexChanges->GetWritePortionsCount()); + AFL_VERIFY(ev->Get()->IndexChanges->GetWritePortionsCount()); - Y_VERIFY(ev->Get()->IndexChanges->GetWritePortionsCount()); - auto writeController = std::make_shared<NOlap::TCompactedWriteController>(ctx.SelfID, ev->Release(), Settings.BlobWriteGrouppingEnabled); - ctx.Register(CreateWriteActor(TabletID(), writeController, BlobManager->StartBlobBatch(), TInstant::Max(), Settings.MaxSmallBlobSize)); + auto writeController = std::make_shared<NOlap::TCompactedWriteController>(ctx.SelfID, ev->Release(), Settings.BlobWriteGrouppingEnabled); + ctx.Register(CreateWriteActor(TabletID(), writeController, TInstant::Max())); } } else { if (putStatus == NKikimrProto::OK) { diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index d06645841c..823992ef03 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -2,6 +2,7 @@ #include "columnshard_schema.h" #include "blobs_reader/task.h" #include "blobs_reader/events.h" +#include "blobs_action/bs.h" #include "engines/changes/ttl.h" #include <ydb/core/scheme/scheme_types_proto.h> #include <ydb/core/tablet/tablet_counters_protobuf.h> @@ -771,7 +772,7 @@ void TColumnShard::SetupIndexation() { auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndex(); indexChanges->Start(*this); auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move(actualIndexInfo), indexChanges, - Settings.CacheDataAfterIndexing); + Settings.CacheDataAfterIndexing, std::make_shared<NOlap::TBSWriteAction>(*BlobManager)); ActorContext().Send(BlobsReadActor, std::make_unique<NOlap::NBlobOperations::NRead::TEvStartReadTask>(std::make_unique<TChangesReadTask>(std::move(ev), SelfId(), TabletID(), IndexationCounters))); } @@ -793,7 +794,7 @@ void TColumnShard::SetupCompaction() { indexChanges->Start(*this); auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndex(); - auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move(actualIndexInfo), indexChanges, Settings.CacheDataAfterCompaction); + auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move(actualIndexInfo), indexChanges, Settings.CacheDataAfterCompaction, std::make_shared<NOlap::TBSWriteAction>(*BlobManager)); ActorContext().Send(BlobsReadActor, std::make_unique<NOlap::NBlobOperations::NRead::TEvStartReadTask>(std::make_unique<TChangesReadTask>(std::move(ev), SelfId(), TabletID(), CompactionCounters))); } @@ -831,7 +832,7 @@ bool TColumnShard::SetupTtl(const THashMap<ui64, NOlap::TTiering>& pathTtls, con LOG_S_INFO("TTL" << (needWrites ? " with writes" : "" ) << " prepared at tablet " << TabletID()); indexChanges->Start(*this); - auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move(actualIndexInfo), indexChanges, false); + auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move(actualIndexInfo), indexChanges, false, std::make_shared<NOlap::TBSWriteAction>(*BlobManager)); if (needWrites) { ActorContext().Send(BlobsReadActor, std::make_unique<NOlap::NBlobOperations::NRead::TEvStartReadTask>(std::make_unique<TChangesReadTask>(std::move(ev), SelfId(), TabletID(), CompactionCounters))); @@ -882,7 +883,7 @@ std::unique_ptr<TEvPrivate::TEvWriteIndex> TColumnShard::SetupCleanup() { } auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndex(); - auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move(actualIndexInfo), changes, false); + auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move(actualIndexInfo), changes, false, std::make_shared<NOlap::TBSWriteAction>(*BlobManager)); ev->SetPutStatus(NKikimrProto::OK); // No new blobs to write changes->Start(*this); diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h index d880f84b75..39e3847eb1 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.h +++ b/ydb/core/tx/columnshard/columnshard_impl.h @@ -29,9 +29,8 @@ class TCleanupColumnEngineChanges; class TTTLColumnEngineChanges; class TChangesWithAppend; class TCompactColumnEngineChanges; -class TInGranuleCompactColumnEngineChanges; -class TSplitCompactColumnEngineChanges; class TInsertColumnEngineChanges; +class TBSWriteAction; namespace NCompaction { class TGeneralCompactColumnEngineChanges; } @@ -43,7 +42,7 @@ class TOperationsManager; extern bool gAllowLogBatchingDefaultValue; -IActor* CreateWriteActor(ui64 tabletId, IWriteController::TPtr writeController, TBlobBatch&& blobBatch, const TInstant& deadline, const ui64 maxSmallBlobSize); +IActor* CreateWriteActor(ui64 tabletId, IWriteController::TPtr writeController, const TInstant& deadline); IActor* CreateReadActor(ui64 tabletId, const TActorId& dstActor, std::unique_ptr<TEvColumnShard::TEvReadResult>&& event, @@ -111,11 +110,10 @@ class TColumnShard friend class NOlap::TTTLColumnEngineChanges; friend class NOlap::TChangesWithAppend; friend class NOlap::TCompactColumnEngineChanges; - friend class NOlap::TInGranuleCompactColumnEngineChanges; - friend class NOlap::TSplitCompactColumnEngineChanges; friend class NOlap::TInsertColumnEngineChanges; friend class NOlap::TColumnEngineChanges; friend class NOlap::NCompaction::TGeneralCompactColumnEngineChanges; + friend class NOlap::TBSWriteAction; friend class TTxController; @@ -151,6 +149,7 @@ class TColumnShard void Handle(TEvBlobStorage::TEvCollectGarbageResult::TPtr& ev, const TActorContext& ctx); void Handle(NMetadata::NProvider::TEvRefreshSubscriberData::TPtr& ev); void Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActorContext& ctx); + void Handle(TEvPrivate::TEvWriteDraft::TPtr& ev, const TActorContext& ctx); ITransaction* CreateTxInitSchema(); ITransaction* CreateTxRunGc(); @@ -262,6 +261,7 @@ protected: HFunc(TEvPrivate::TEvReadFinished, Handle); HFunc(TEvPrivate::TEvPeriodicWakeup, Handle); HFunc(NEvents::TDataEvents::TEvWrite, Handle); + HFunc(TEvPrivate::TEvWriteDraft, Handle); default: if (!HandleDefaultEvents(ev, SelfId())) { LOG_S_WARN("TColumnShard.StateWork at " << TabletID() diff --git a/ydb/core/tx/columnshard/columnshard_private_events.h b/ydb/core/tx/columnshard/columnshard_private_events.h index 8400bfc436..b95bca26b2 100644 --- a/ydb/core/tx/columnshard/columnshard_private_events.h +++ b/ydb/core/tx/columnshard/columnshard_private_events.h @@ -24,28 +24,39 @@ struct TEvPrivate { EvGetExported, EvWriteBlobsResult, EvStartReadTask, + EvWriteDraft, EvEnd }; static_assert(EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE)"); + struct TEvWriteDraft: public TEventLocal<TEvWriteDraft, EvWriteDraft> { + const std::shared_ptr<IWriteController> WriteController; + TEvWriteDraft(std::shared_ptr<IWriteController> controller) + : WriteController(controller) + { + + } + }; + /// Common event for Indexing and GranuleCompaction: write index data in TTxWriteIndex transaction. struct TEvWriteIndex : public TEventLocal<TEvWriteIndex, EvWriteIndex> { NOlap::TVersionedIndex IndexInfo; std::shared_ptr<NOlap::TColumnEngineChanges> IndexChanges; bool GranuleCompaction{false}; - TBlobBatch BlobBatch; TUsage ResourceUsage; bool CacheData{false}; TDuration Duration; TBlobPutResult::TPtr PutResult; + std::shared_ptr<NOlap::IBlobsAction> BlobsAction; TEvWriteIndex(NOlap::TVersionedIndex&& indexInfo, std::shared_ptr<NOlap::TColumnEngineChanges> indexChanges, - bool cacheData) + bool cacheData, std::shared_ptr<NOlap::IBlobsAction> action) : IndexInfo(std::move(indexInfo)) , IndexChanges(indexChanges) , CacheData(cacheData) + , BlobsAction(action) { PutResult = std::make_shared<TBlobPutResult>(NKikimrProto::UNKNOWN); } @@ -184,16 +195,14 @@ struct TEvPrivate { public: class TPutBlobData { YDB_READONLY_DEF(TBlobRange, BlobRange); - YDB_READONLY_DEF(TString, BlobData); YDB_READONLY_DEF(NKikimrTxColumnShard::TLogicalMetadata, LogicalMeta); YDB_ACCESSOR(ui64, RowsCount, 0); YDB_ACCESSOR(ui64, RawBytes, 0); public: TPutBlobData() = default; - TPutBlobData(const TBlobRange& blobRange, const TString& data, const NArrow::TFirstLastSpecialKeys& specialKeys, ui64 rowsCount, ui64 rawBytes, const TInstant dirtyTime) + TPutBlobData(const TBlobRange& blobRange, const NArrow::TFirstLastSpecialKeys& specialKeys, ui64 rowsCount, ui64 rawBytes, const TInstant dirtyTime) : BlobRange(blobRange) - , BlobData(data) , RowsCount(rowsCount) , RawBytes(rawBytes) { @@ -211,13 +220,18 @@ struct TEvPrivate { Y_VERIFY(PutResult); } - TEvWriteBlobsResult(const NColumnShard::TBlobPutResult::TPtr& putResult, TVector<TPutBlobData>&& blobData, const NEvWrite::TWriteMeta& writeMeta, const ui64 schemaVersion) + TEvWriteBlobsResult(const NColumnShard::TBlobPutResult::TPtr& putResult, TVector<TPutBlobData>&& blobData, const std::vector<std::shared_ptr<NOlap::IBlobsAction>>& actions, const NEvWrite::TWriteMeta& writeMeta, const ui64 schemaVersion) : TEvWriteBlobsResult(putResult, writeMeta) { + Actions = actions; BlobData = std::move(blobData); SchemaVersion = schemaVersion; } + const std::vector<std::shared_ptr<NOlap::IBlobsAction>>& GetActions() const { + return Actions; + } + const TVector<TPutBlobData>& GetBlobData() const { return BlobData; } @@ -241,6 +255,7 @@ struct TEvPrivate { private: NColumnShard::TBlobPutResult::TPtr PutResult; TVector<TPutBlobData> BlobData; + std::vector<std::shared_ptr<NOlap::IBlobsAction>> Actions; NEvWrite::TWriteMeta WriteMeta; ui64 SchemaVersion = 0; }; diff --git a/ydb/core/tx/columnshard/engines/changes/abstract/abstract.cpp b/ydb/core/tx/columnshard/engines/changes/abstract/abstract.cpp index 66a6149034..27a459e465 100644 --- a/ydb/core/tx/columnshard/engines/changes/abstract/abstract.cpp +++ b/ydb/core/tx/columnshard/engines/changes/abstract/abstract.cpp @@ -1,6 +1,6 @@ #include "abstract.h" #include <ydb/core/tx/columnshard/engines/column_engine_logs.h> -#include <ydb/core/tx/columnshard/blob_manager_db.h> +#include <ydb/core/tx/columnshard/blobs_action/blob_manager_db.h> #include <ydb/core/tx/columnshard/columnshard_impl.h> #include <library/cpp/actors/core/actor.h> diff --git a/ydb/core/tx/columnshard/engines/changes/cleanup.cpp b/ydb/core/tx/columnshard/engines/changes/cleanup.cpp index d54eda0d93..d129a79f8f 100644 --- a/ydb/core/tx/columnshard/engines/changes/cleanup.cpp +++ b/ydb/core/tx/columnshard/engines/changes/cleanup.cpp @@ -1,7 +1,7 @@ #include "cleanup.h" #include <ydb/core/tx/columnshard/columnshard_impl.h> #include <ydb/core/tx/columnshard/engines/column_engine_logs.h> -#include <ydb/core/tx/columnshard/blob_manager_db.h> +#include <ydb/core/tx/columnshard/blobs_action/blob_manager_db.h> namespace NKikimr::NOlap { diff --git a/ydb/core/tx/columnshard/engines/changes/indexation.cpp b/ydb/core/tx/columnshard/engines/changes/indexation.cpp index 79e5c2906d..b7e366d1b4 100644 --- a/ydb/core/tx/columnshard/engines/changes/indexation.cpp +++ b/ydb/core/tx/columnshard/engines/changes/indexation.cpp @@ -3,7 +3,7 @@ #include <ydb/core/tx/columnshard/blob_cache.h> #include <ydb/core/protos/counters_columnshard.pb.h> #include <ydb/core/tx/columnshard/columnshard_impl.h> -#include <ydb/core/tx/columnshard/blob_manager_db.h> +#include <ydb/core/tx/columnshard/blobs_action/blob_manager_db.h> namespace NKikimr::NOlap { @@ -26,10 +26,10 @@ bool TInsertColumnEngineChanges::DoApplyChanges(TColumnEngineForLogs& self, TApp void TInsertColumnEngineChanges::DoWriteIndex(NColumnShard::TColumnShard& self, TWriteIndexContext& context) { TBase::DoWriteIndex(self, context); - for (const auto& indsertedData : DataToIndex) { - self.InsertTable->EraseCommitted(context.DBWrapper, indsertedData); - Y_VERIFY(indsertedData.GetBlobRange().IsFullBlob()); - self.BlobManager->DeleteBlob(indsertedData.GetBlobRange().GetBlobId(), *context.BlobManagerDb); + for (const auto& insertedData : DataToIndex) { + self.InsertTable->EraseCommitted(context.DBWrapper, insertedData); + Y_VERIFY(insertedData.GetBlobRange().IsFullBlob()); + self.BlobManager->DeleteBlob(insertedData.GetBlobRange().GetBlobId(), *context.BlobManagerDb); } if (!DataToIndex.empty()) { self.UpdateInsertTableCounters(); diff --git a/ydb/core/tx/columnshard/engines/changes/ttl.cpp b/ydb/core/tx/columnshard/engines/changes/ttl.cpp index 9c30f0f4f4..ed8ff71be6 100644 --- a/ydb/core/tx/columnshard/engines/changes/ttl.cpp +++ b/ydb/core/tx/columnshard/engines/changes/ttl.cpp @@ -3,7 +3,7 @@ #include <ydb/core/tx/columnshard/engines/column_engine_logs.h> #include <ydb/core/tx/columnshard/columnshard_schema.h> #include <ydb/core/tx/columnshard/columnshard_private_events.h> -#include <ydb/core/tx/columnshard/blob_manager_db.h> +#include <ydb/core/tx/columnshard/blobs_action/blob_manager_db.h> namespace NKikimr::NOlap { diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h index 55a103b0cd..085a819d16 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.h +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h @@ -183,7 +183,7 @@ public: const TGranuleMeta& GetGranuleVerified(const ui64 granuleId) const { auto it = Granules.find(granuleId); - Y_VERIFY(it != Granules.end()); + AFL_VERIFY(it != Granules.end())("granule_id", granuleId)("count", Granules.size()); return *it->second; } diff --git a/ydb/core/tx/columnshard/engines/writer/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/writer/CMakeLists.darwin-x86_64.txt index 778729ab5b..b16798a455 100644 --- a/ydb/core/tx/columnshard/engines/writer/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/writer/CMakeLists.darwin-x86_64.txt @@ -24,4 +24,7 @@ target_link_libraries(columnshard-engines-writer PUBLIC target_sources(columnshard-engines-writer PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/writer/blob_constructor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/writer/put_status.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/writer/write_controller.cpp ) diff --git a/ydb/core/tx/columnshard/engines/writer/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/writer/CMakeLists.linux-aarch64.txt index fd95dc3520..d0bcc33578 100644 --- a/ydb/core/tx/columnshard/engines/writer/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/columnshard/engines/writer/CMakeLists.linux-aarch64.txt @@ -25,4 +25,7 @@ target_link_libraries(columnshard-engines-writer PUBLIC target_sources(columnshard-engines-writer PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/writer/blob_constructor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/writer/put_status.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/writer/write_controller.cpp ) diff --git a/ydb/core/tx/columnshard/engines/writer/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/writer/CMakeLists.linux-x86_64.txt index fd95dc3520..d0bcc33578 100644 --- a/ydb/core/tx/columnshard/engines/writer/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/writer/CMakeLists.linux-x86_64.txt @@ -25,4 +25,7 @@ target_link_libraries(columnshard-engines-writer PUBLIC target_sources(columnshard-engines-writer PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/writer/blob_constructor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/writer/put_status.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/writer/write_controller.cpp ) diff --git a/ydb/core/tx/columnshard/engines/writer/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/writer/CMakeLists.windows-x86_64.txt index 778729ab5b..b16798a455 100644 --- a/ydb/core/tx/columnshard/engines/writer/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/writer/CMakeLists.windows-x86_64.txt @@ -24,4 +24,7 @@ target_link_libraries(columnshard-engines-writer PUBLIC target_sources(columnshard-engines-writer PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/writer/blob_constructor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/writer/put_status.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/writer/write_controller.cpp ) diff --git a/ydb/core/tx/columnshard/engines/writer/blob_constructor.cpp b/ydb/core/tx/columnshard/engines/writer/blob_constructor.cpp new file mode 100644 index 0000000000..2fae90b9e7 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/writer/blob_constructor.cpp @@ -0,0 +1,5 @@ +#include "blob_constructor.h" + +namespace NKikimr::NOlap { + +} diff --git a/ydb/core/tx/columnshard/engines/writer/blob_constructor.h b/ydb/core/tx/columnshard/engines/writer/blob_constructor.h index de6f9c7b1f..cadaf2ca2d 100644 --- a/ydb/core/tx/columnshard/engines/writer/blob_constructor.h +++ b/ydb/core/tx/columnshard/engines/writer/blob_constructor.h @@ -1,8 +1,10 @@ #pragma once -#include <library/cpp/actors/core/event.h> #include <ydb/core/protos/base.pb.h> +#include <ydb/core/tx/columnshard/blobs_action/abstract.h> +#include <ydb/library/accessor/accessor.h> +#include <library/cpp/actors/core/event.h> namespace NKikimr { @@ -19,20 +21,22 @@ namespace NOlap { class TUnifiedBlobId; -class IBlobConstructor { +class TBlobWriteInfo { +private: + YDB_READONLY_DEF(TUnifiedBlobId, BlobId); + YDB_READONLY_DEF(TString, Data); + YDB_READONLY_DEF(std::shared_ptr<IBlobsAction>, WriteOperator); + + TBlobWriteInfo(const TString& data, const std::shared_ptr<IBlobsAction>& writeOperator) + : Data(data) + , WriteOperator(writeOperator) { + Y_VERIFY(WriteOperator); + BlobId = WriteOperator->AllocateNextBlobId(data); + } public: - using TPtr = std::shared_ptr<IBlobConstructor>; - - enum class EStatus { - Ok, - Finished, - Error - }; - - virtual ~IBlobConstructor() {} - virtual const TString& GetBlob() const = 0; - virtual bool RegisterBlobId(const TUnifiedBlobId& blobId) = 0; - virtual EStatus BuildNext() = 0; + static TBlobWriteInfo BuildWriteTask(const TString& data, const std::shared_ptr<IBlobsAction>& writeOperator) { + return TBlobWriteInfo(data, writeOperator); + } }; } diff --git a/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.cpp b/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.cpp index d3feb5cec3..974909dd4c 100644 --- a/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.cpp +++ b/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.cpp @@ -6,40 +6,29 @@ namespace NKikimr::NOlap { -TCompactedWriteController::TBlobsConstructor::TBlobsConstructor(TCompactedWriteController& owner) - : Owner(owner) - , IndexChanges(*Owner.WriteIndexEv->IndexChanges) -{ -} - -const TString& TCompactedWriteController::TBlobsConstructor::GetBlob() const { - return CurrentBlobInfo->GetBlob(); -} - -bool TCompactedWriteController::TBlobsConstructor::RegisterBlobId(const TUnifiedBlobId& blobId) { - Y_VERIFY(CurrentBlobInfo); - CurrentBlobInfo->RegisterBlobId(*IndexChanges.GetWritePortionInfo(CurrentPortion), blobId); - return true; -} - -IBlobConstructor::EStatus TCompactedWriteController::TBlobsConstructor::BuildNext() { - while (CurrentPortion < IndexChanges.GetWritePortionsCount()) { - TPortionInfoWithBlobs& portionWithBlobs = *IndexChanges.GetWritePortionInfo(CurrentPortion); - if (CurrentBlobIndex < portionWithBlobs.GetBlobs().size() && IndexChanges.NeedWritePortion(CurrentPortion)) { +std::optional<TBlobWriteInfo> TCompactedWriteController::Next() { + auto& changes = *WriteIndexEv->IndexChanges; + while (CurrentPortion < changes.GetWritePortionsCount()) { + auto* pInfo = changes.GetWritePortionInfo(CurrentPortion); + Y_VERIFY(pInfo); + TPortionInfoWithBlobs& portionWithBlobs = *pInfo; + if (CurrentBlobIndex < portionWithBlobs.GetBlobs().size() && changes.NeedWritePortion(CurrentPortion)) { CurrentBlobInfo = &portionWithBlobs.GetBlobs()[CurrentBlobIndex]; ++CurrentBlobIndex; - return EStatus::Ok; + auto result = TBlobWriteInfo::BuildWriteTask(CurrentBlobInfo->GetBlob(), WriteIndexEv->BlobsAction); + CurrentBlobInfo->RegisterBlobId(portionWithBlobs, result.GetBlobId()); + return result; } else { ++CurrentPortion; CurrentBlobIndex = 0; } } - return EStatus::Finished; + return {}; } TCompactedWriteController::TCompactedWriteController(const TActorId& dstActor, TAutoPtr<NColumnShard::TEvPrivate::TEvWriteIndex> writeEv, bool /*blobGrouppingEnabled*/) : WriteIndexEv(writeEv) - , BlobConstructor(std::make_shared<TBlobsConstructor>(*this)) + , Action(WriteIndexEv->BlobsAction) , DstActor(dstActor) {} diff --git a/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.h b/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.h index 2cb404d1bb..bfe1d986a3 100644 --- a/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.h +++ b/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.h @@ -10,25 +10,18 @@ namespace NKikimr::NOlap { class TCompactedWriteController : public NColumnShard::IWriteController { private: - class TBlobsConstructor : public IBlobConstructor { - TCompactedWriteController& Owner; - NOlap::TColumnEngineChanges& IndexChanges; - - ui64 CurrentPortion = 0; - ui64 CurrentBlobIndex = 0; - TPortionInfoWithBlobs::TBlobInfo* CurrentBlobInfo = nullptr; - public: - TBlobsConstructor(TCompactedWriteController& owner); - const TString& GetBlob() const override; - bool RegisterBlobId(const TUnifiedBlobId& blobId) override; - EStatus BuildNext() override; - }; + ui64 CurrentPortion = 0; + ui64 CurrentBlobIndex = 0; + TPortionInfoWithBlobs::TBlobInfo* CurrentBlobInfo = nullptr; TAutoPtr<NColumnShard::TEvPrivate::TEvWriteIndex> WriteIndexEv; - std::shared_ptr<TBlobsConstructor> BlobConstructor; + std::shared_ptr<IBlobsAction> Action; TActorId DstActor; protected: void DoOnReadyResult(const NActors::TActorContext& ctx, const NColumnShard::TBlobPutResult::TPtr& putResult) override; + virtual bool IsBlobActionsReady() const override { + return Action->IsReady(); + } public: TCompactedWriteController(const TActorId& dstActor, TAutoPtr<NColumnShard::TEvPrivate::TEvWriteIndex> writeEv, bool blobGrouppingEnabled); ~TCompactedWriteController() { @@ -37,8 +30,14 @@ public: } } - NOlap::IBlobConstructor::TPtr GetBlobConstructor() override { - return BlobConstructor; + virtual std::vector<std::shared_ptr<IBlobsAction>> GetBlobActions() const override { + return {Action}; + } + + virtual std::optional<TBlobWriteInfo> Next() override; + + virtual void OnBlobWriteResult(const TEvBlobStorage::TEvPutResult& result) override { + Action->OnBlobWriteResult(result.Id, result.Status); } }; diff --git a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp index acb8596d8e..c27df8fd4e 100644 --- a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp +++ b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp @@ -6,70 +6,30 @@ namespace NKikimr::NOlap { -TIndexedWriteController::TBlobConstructor::TBlobConstructor(TIndexedWriteController& owner) - : Owner(owner) -{} - -const TString& TIndexedWriteController::TBlobConstructor::GetBlob() const { - Y_VERIFY(CurrentIndex > 0); - return BlobsSplitted[CurrentIndex - 1].GetData(); -} - -IBlobConstructor::EStatus TIndexedWriteController::TBlobConstructor::BuildNext() { +std::optional<TBlobWriteInfo> TIndexedWriteController::Next() { if (CurrentIndex == BlobsSplitted.size()) { - return EStatus::Finished; + return {}; } CurrentIndex++; - return EStatus::Ok; -} - -bool TIndexedWriteController::TBlobConstructor::RegisterBlobId(const TUnifiedBlobId& blobId) { - const auto& blobInfo = BlobsSplitted[CurrentIndex - 1]; - Owner.BlobData.emplace_back(TBlobRange(blobId, 0, blobId.BlobSize()), blobInfo.GetData(), blobInfo.GetSpecialKeys(), blobInfo.GetRowsCount(), blobInfo.GetRawBytes(), AppData()->TimeProvider->Now()); - return true; + auto& bInfo = BlobsSplitted[CurrentIndex - 1]; + auto result = TBlobWriteInfo::BuildWriteTask(bInfo.GetData(), Action); + BlobData.emplace_back(TBlobRange(result.GetBlobId(), 0, result.GetBlobId().BlobSize()), bInfo.GetSpecialKeys(), bInfo.GetRowsCount(), bInfo.GetRawBytes(), AppData()->TimeProvider->Now()); + return result; } -bool TIndexedWriteController::TBlobConstructor::Init() { - const auto& writeMeta = Owner.WriteData.GetWriteMeta(); - const ui64 tableId = writeMeta.GetTableId(); - const ui64 writeId = writeMeta.GetWriteId(); - - std::shared_ptr<arrow::RecordBatch> batch; - { - NColumnShard::TCpuGuard guard(Owner.ResourceUsage); - batch = Owner.WriteData.GetData().GetArrowBatch(); - } - - if (!batch) { - AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "ev_write_bad_data")("write_id", writeId)("table_id", tableId); - return false; - } - - auto splitResult = NArrow::SplitByBlobSize(batch, NColumnShard::TLimits::GetMaxBlobSize()); - if (!splitResult) { - AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", TStringBuilder() << "cannot split batch in according to limits: " + splitResult.GetErrorMessage()); - return false; - } - BlobsSplitted = splitResult.ReleaseResult(); - if (BlobsSplitted.size() > 1) { - for (auto&& i : BlobsSplitted) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "strange_blobs_splitting")("blob", i.DebugString())("original_size", Owner.WriteData.GetSize()); - } - } - return true; -} - -TIndexedWriteController::TIndexedWriteController(const TActorId& dstActor, const NEvWrite::TWriteData& writeData) - : WriteData(writeData) - , BlobConstructor(std::make_shared<TBlobConstructor>(*this)) +TIndexedWriteController::TIndexedWriteController(const TActorId& dstActor, const NEvWrite::TWriteData& writeData, const std::shared_ptr<IBlobsAction>& action, std::vector<NArrow::TSerializedBatch>&& blobsSplitted) + : BlobsSplitted(std::move(blobsSplitted)) + , WriteData(writeData) , DstActor(dstActor) + , Action(action) { ResourceUsage.SourceMemorySize = WriteData.GetSize(); } void TIndexedWriteController::DoOnReadyResult(const NActors::TActorContext& ctx, const NColumnShard::TBlobPutResult::TPtr& putResult) { if (putResult->GetPutStatus() == NKikimrProto::OK) { - auto result = std::make_unique<NColumnShard::TEvPrivate::TEvWriteBlobsResult>(putResult, std::move(BlobData), WriteData.GetWriteMeta(), WriteData.GetData().GetSchemaVersion()); + std::vector<std::shared_ptr<IBlobsAction>> actions = {Action}; + auto result = std::make_unique<NColumnShard::TEvPrivate::TEvWriteBlobsResult>(putResult, std::move(BlobData), actions, WriteData.GetWriteMeta(), WriteData.GetData().GetSchemaVersion()); ctx.Send(DstActor, result.release()); } else { auto result = std::make_unique<NColumnShard::TEvPrivate::TEvWriteBlobsResult>(putResult, WriteData.GetWriteMeta()); @@ -77,11 +37,4 @@ void TIndexedWriteController::DoOnReadyResult(const NActors::TActorContext& ctx, } } -NOlap::IBlobConstructor::TPtr TIndexedWriteController::GetBlobConstructor() { - if (!BlobConstructor->Init()) { - return nullptr; - } - return BlobConstructor; -} - } diff --git a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h index 2800c58fc0..e2be02ece5 100644 --- a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h +++ b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h @@ -4,6 +4,7 @@ #include "write_controller.h" #include <ydb/core/tx/ev_write/write_data.h> +#include <ydb/core/tx/columnshard/blobs_action/abstract.h> #include <ydb/core/tx/columnshard/engines/portion_info.h> #include <ydb/core/tx/columnshard/columnshard.h> #include <ydb/core/tx/columnshard/columnshard_private_events.h> @@ -14,31 +15,29 @@ namespace NKikimr::NOlap { class TIndexedWriteController : public NColumnShard::IWriteController { private: - class TBlobConstructor : public IBlobConstructor { - TIndexedWriteController& Owner; - std::vector<NArrow::TSerializedBatch> BlobsSplitted; - - ui64 CurrentIndex = 0; - public: - TBlobConstructor(TIndexedWriteController& owner); - - const TString& GetBlob() const override; - EStatus BuildNext() override; - bool RegisterBlobId(const TUnifiedBlobId& blobId) override; - bool Init(); - }; + virtual bool IsBlobActionsReady() const override { + return Action->IsReady(); + } + ui64 CurrentIndex = 0; + std::vector<NArrow::TSerializedBatch> BlobsSplitted; NEvWrite::TWriteData WriteData; - std::shared_ptr<TBlobConstructor> BlobConstructor; TVector<NColumnShard::TEvPrivate::TEvWriteBlobsResult::TPutBlobData> BlobData; TActorId DstActor; - + std::shared_ptr<IBlobsAction> Action; + void DoOnReadyResult(const NActors::TActorContext& ctx, const NColumnShard::TBlobPutResult::TPtr& putResult) override; public: - TIndexedWriteController(const TActorId& dstActor, const NEvWrite::TWriteData& writeData); + virtual std::vector<std::shared_ptr<IBlobsAction>> GetBlobActions() const override { + return {Action}; + } - void DoOnReadyResult(const NActors::TActorContext& ctx, const NColumnShard::TBlobPutResult::TPtr& putResult) override; + virtual void OnBlobWriteResult(const TEvBlobStorage::TEvPutResult& result) override { + Action->OnBlobWriteResult(result.Id, result.Status); + } + + TIndexedWriteController(const TActorId& dstActor, const NEvWrite::TWriteData& writeData, const std::shared_ptr<IBlobsAction>& action, std::vector<NArrow::TSerializedBatch>&& blobsSplitted); - NOlap::IBlobConstructor::TPtr GetBlobConstructor() override; + virtual std::optional<TBlobWriteInfo> Next() override; }; } diff --git a/ydb/core/tx/columnshard/engines/writer/put_status.cpp b/ydb/core/tx/columnshard/engines/writer/put_status.cpp new file mode 100644 index 0000000000..f5fb5de167 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/writer/put_status.cpp @@ -0,0 +1,5 @@ +#include "put_status.h" + +namespace NKikimr::NColumnShard { + +} diff --git a/ydb/core/tx/columnshard/engines/writer/write_controller.cpp b/ydb/core/tx/columnshard/engines/writer/write_controller.cpp new file mode 100644 index 0000000000..abae873b3c --- /dev/null +++ b/ydb/core/tx/columnshard/engines/writer/write_controller.cpp @@ -0,0 +1,5 @@ +#include "write_controller.h" + +namespace NKikimr::NColumnShard { + +} diff --git a/ydb/core/tx/columnshard/engines/writer/write_controller.h b/ydb/core/tx/columnshard/engines/writer/write_controller.h index 9636b2ebde..ff7730600a 100644 --- a/ydb/core/tx/columnshard/engines/writer/write_controller.h +++ b/ydb/core/tx/columnshard/engines/writer/write_controller.h @@ -6,6 +6,7 @@ #include <library/cpp/actors/core/actor.h> #include <ydb/core/tx/columnshard/blob_manager.h> #include <ydb/core/tx/columnshard/defs.h> +#include <ydb/core/tx/columnshard/blobs_action/abstract.h> namespace NKikimr::NColumnShard { @@ -15,12 +16,10 @@ public: using TPtr = std::shared_ptr<TBlobPutResult>; TBlobPutResult(NKikimrProto::EReplyStatus status, - NColumnShard::TBlobBatch&& blobBatch, THashSet<ui32>&& yellowMoveChannels, THashSet<ui32>&& yellowStopChannels, const NColumnShard::TUsage& resourceUsage) - : BlobBatch(std::move(blobBatch)) - , ResourceUsage(resourceUsage) + : ResourceUsage(resourceUsage) { SetPutStatus(status, std::move(yellowMoveChannels), std::move(yellowStopChannels)); } @@ -29,10 +28,6 @@ public: SetPutStatus(status); } - NColumnShard::TBlobBatch&& ReleaseBlobBatch() { - return std::move(BlobBatch); - } - void AddResources(const NColumnShard::TUsage& usage) { ResourceUsage.Add(usage); } @@ -42,7 +37,6 @@ public: } private: - YDB_READONLY_DEF(NColumnShard::TBlobBatch, BlobBatch); YDB_READONLY_DEF(NColumnShard::TUsage, ResourceUsage); }; @@ -56,8 +50,10 @@ public: DoOnReadyResult(ctx, putResult); } - virtual NOlap::IBlobConstructor::TPtr GetBlobConstructor() = 0; - + virtual void OnBlobWriteResult(const TEvBlobStorage::TEvPutResult& result) = 0; + virtual std::optional<NOlap::TBlobWriteInfo> Next() = 0; + virtual bool IsBlobActionsReady() const = 0; + virtual std::vector<std::shared_ptr<NOlap::IBlobsAction>> GetBlobActions() const = 0; private: virtual void DoOnReadyResult(const NActors::TActorContext& ctx, const TBlobPutResult::TPtr& putResult) = 0; protected: diff --git a/ydb/core/tx/columnshard/engines/writer/ya.make b/ydb/core/tx/columnshard/engines/writer/ya.make index feba1368c7..0a611c956d 100644 --- a/ydb/core/tx/columnshard/engines/writer/ya.make +++ b/ydb/core/tx/columnshard/engines/writer/ya.make @@ -3,6 +3,9 @@ LIBRARY() SRCS( compacted_blob_constructor.cpp indexed_blob_constructor.cpp + blob_constructor.cpp + put_status.cpp + write_controller.cpp ) PEERDIR( @@ -11,7 +14,7 @@ PEERDIR( ydb/core/blobstorage/vdisk/protos ydb/core/tablet_flat ydb/core/formats/arrow - + library/cpp/actors/core ) diff --git a/ydb/core/tx/columnshard/operations/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/operations/CMakeLists.darwin-x86_64.txt index 48f4f0d305..7972cccf59 100644 --- a/ydb/core/tx/columnshard/operations/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/columnshard/operations/CMakeLists.darwin-x86_64.txt @@ -18,4 +18,5 @@ target_link_libraries(tx-columnshard-operations PUBLIC target_sources(tx-columnshard-operations PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/operations/write.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/operations/write_data.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/operations/slice_builder.cpp ) diff --git a/ydb/core/tx/columnshard/operations/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/operations/CMakeLists.linux-aarch64.txt index 4dba1ab769..a97ebc42a1 100644 --- a/ydb/core/tx/columnshard/operations/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/columnshard/operations/CMakeLists.linux-aarch64.txt @@ -19,4 +19,5 @@ target_link_libraries(tx-columnshard-operations PUBLIC target_sources(tx-columnshard-operations PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/operations/write.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/operations/write_data.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/operations/slice_builder.cpp ) diff --git a/ydb/core/tx/columnshard/operations/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/operations/CMakeLists.linux-x86_64.txt index 4dba1ab769..a97ebc42a1 100644 --- a/ydb/core/tx/columnshard/operations/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/columnshard/operations/CMakeLists.linux-x86_64.txt @@ -19,4 +19,5 @@ target_link_libraries(tx-columnshard-operations PUBLIC target_sources(tx-columnshard-operations PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/operations/write.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/operations/write_data.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/operations/slice_builder.cpp ) diff --git a/ydb/core/tx/columnshard/operations/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/operations/CMakeLists.windows-x86_64.txt index 48f4f0d305..7972cccf59 100644 --- a/ydb/core/tx/columnshard/operations/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/columnshard/operations/CMakeLists.windows-x86_64.txt @@ -18,4 +18,5 @@ target_link_libraries(tx-columnshard-operations PUBLIC target_sources(tx-columnshard-operations PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/operations/write.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/operations/write_data.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/operations/slice_builder.cpp ) diff --git a/ydb/core/tx/columnshard/operations/slice_builder.cpp b/ydb/core/tx/columnshard/operations/slice_builder.cpp new file mode 100644 index 0000000000..6988c621f1 --- /dev/null +++ b/ydb/core/tx/columnshard/operations/slice_builder.cpp @@ -0,0 +1,53 @@ +#include "slice_builder.h" +#include <library/cpp/actors/core/log.h> +#include <ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h> +#include <ydb/core/tx/columnshard/columnshard_impl.h> +#include <ydb/core/tx/columnshard/columnshard_private_events.h> + +namespace NKikimr::NOlap { + +std::optional<std::vector<NKikimr::NArrow::TSerializedBatch>> TBuildSlicesTask::BuildSlices() { + NActors::TLogContextGuard g(NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", TabletId)("parent_id", ParentActorId)); + const auto& writeMeta = WriteData.GetWriteMeta(); + const ui64 tableId = writeMeta.GetTableId(); + const ui64 writeId = writeMeta.GetWriteId(); + + std::shared_ptr<arrow::RecordBatch> batch = WriteData.GetData().GetArrowBatch(); + + if (!batch) { + AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "ev_write_bad_data")("write_id", writeId)("table_id", tableId); + return {}; + } + + auto splitResult = NArrow::SplitByBlobSize(batch, NColumnShard::TLimits::GetMaxBlobSize()); + if (!splitResult) { + AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", TStringBuilder() << "cannot split batch in according to limits: " + splitResult.GetErrorMessage()); + return {}; + } + auto result = splitResult.ReleaseResult(); + if (result.size() > 1) { + for (auto&& i : result) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "strange_blobs_splitting")("blob", i.DebugString())("original_size", WriteData.GetSize()); + } + } + return result; +} + +bool TBuildSlicesTask::DoExecute() { + auto batches = BuildSlices(); + if (batches) { + auto writeController = std::make_shared<NOlap::TIndexedWriteController>(ParentActorId, WriteData, Action, std::move(*batches)); + if (batches && Action->NeedDraftTransaction()) { + TActorContext::AsActorContext().Send(ParentActorId, std::make_unique<NColumnShard::TEvPrivate::TEvWriteDraft>(writeController)); + } else { + TActorContext::AsActorContext().Register(NColumnShard::CreateWriteActor(TabletId, writeController, TInstant::Max())); + } + } else { + auto result = std::make_unique<NColumnShard::TEvPrivate::TEvWriteBlobsResult>(std::make_shared<NColumnShard::TBlobPutResult>(NKikimrProto::EReplyStatus::CORRUPTED), WriteData.GetWriteMeta()); + TActorContext::AsActorContext().Send(ParentActorId, result.release()); + } + + return true; +} + +} diff --git a/ydb/core/tx/columnshard/operations/slice_builder.h b/ydb/core/tx/columnshard/operations/slice_builder.h new file mode 100644 index 0000000000..fc2eea4ea3 --- /dev/null +++ b/ydb/core/tx/columnshard/operations/slice_builder.h @@ -0,0 +1,33 @@ +#pragma once +#include <ydb/core/tx/conveyor/usage/abstract.h> +#include <ydb/core/formats/arrow/size_calcer.h> +#include <ydb/core/tx/columnshard/blobs_action/abstract.h> +#include <ydb/core/tx/ev_write/write_data.h> + +namespace NKikimr::NOlap { + +class TBuildSlicesTask: public NConveyor::ITask { +private: + std::shared_ptr<IBlobsAction> Action; + NEvWrite::TWriteData WriteData; + const ui64 TabletId; + const NActors::TActorId ParentActorId; + std::optional<std::vector<NArrow::TSerializedBatch>> BuildSlices(); + +protected: + virtual bool DoExecute() override; +public: + virtual TString GetTaskClassIdentifier() const override { + return "Write::ConstructBlobs::Slices"; + } + + TBuildSlicesTask(const ui64 tabletId, const NActors::TActorId parentActorId, const std::shared_ptr<IBlobsAction>& action, const NEvWrite::TWriteData& writeData) + : Action(action) + , WriteData(writeData) + , TabletId(tabletId) + , ParentActorId(parentActorId) + { + Y_VERIFY(Action); + } +}; +} diff --git a/ydb/core/tx/columnshard/operations/write.cpp b/ydb/core/tx/columnshard/operations/write.cpp index 8a7850081a..57898279c9 100644 --- a/ydb/core/tx/columnshard/operations/write.cpp +++ b/ydb/core/tx/columnshard/operations/write.cpp @@ -1,9 +1,12 @@ #include "write.h" +#include "slice_builder.h" #include <ydb/core/tx/columnshard/columnshard_schema.h> -#include <ydb/core/tx/columnshard/blob_manager_db.h> +#include <ydb/core/tx/columnshard/blobs_action/blob_manager_db.h> +#include <ydb/core/tx/columnshard/blobs_action/bs.h> #include <ydb/core/tx/columnshard/columnshard_impl.h> #include <ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h> +#include <ydb/core/tx/conveyor/usage/service.h> #include <ydb/core/tablet_flat/tablet_flat_executor.h> @@ -22,8 +25,9 @@ namespace NKikimr::NColumnShard { Y_VERIFY(Status == EOperationStatus::Draft); NEvWrite::TWriteMeta writeMeta((ui64)WriteId, tableId, source); - auto writeController = std::make_shared<NOlap::TIndexedWriteController>(ctx.SelfID, NEvWrite::TWriteData(writeMeta, data)); - ctx.Register(CreateWriteActor(owner.TabletID(), writeController, owner.BlobManager->StartBlobBatch(), TInstant::Max(), owner.Settings.MaxSmallBlobSize)); + std::shared_ptr<NConveyor::ITask> task = std::make_shared<NOlap::TBuildSlicesTask>(owner.TabletID(), ctx.SelfID, std::make_shared<NOlap::TBSWriteAction>(*owner.BlobManager), NEvWrite::TWriteData(writeMeta, data)); + NConveyor::TCompServiceOperator::SendTaskToExecute(task); + Status = EOperationStatus::Started; } diff --git a/ydb/core/tx/columnshard/operations/ya.make b/ydb/core/tx/columnshard/operations/ya.make index cc7fd4d19c..ebc4e740c1 100644 --- a/ydb/core/tx/columnshard/operations/ya.make +++ b/ydb/core/tx/columnshard/operations/ya.make @@ -3,6 +3,7 @@ LIBRARY() SRCS( write.cpp write_data.cpp + slice_builder.cpp ) PEERDIR( diff --git a/ydb/core/tx/columnshard/tables_manager.cpp b/ydb/core/tx/columnshard/tables_manager.cpp index 88096dc024..b9a97fb323 100644 --- a/ydb/core/tx/columnshard/tables_manager.cpp +++ b/ydb/core/tx/columnshard/tables_manager.cpp @@ -1,7 +1,7 @@ #include "tables_manager.h" #include "columnshard_schema.h" -#include "blob_manager_db.h" #include "engines/column_engine_logs.h" +#include <ydb/core/tx/columnshard/blobs_action/blob_manager_db.h> #include <ydb/core/scheme/scheme_types_proto.h> #include <ydb/core/tx/tiering/manager.h> diff --git a/ydb/core/tx/columnshard/write_actor.cpp b/ydb/core/tx/columnshard/write_actor.cpp index 636db1b338..c9a7c005bc 100644 --- a/ydb/core/tx/columnshard/write_actor.cpp +++ b/ydb/core/tx/columnshard/write_actor.cpp @@ -8,29 +8,21 @@ namespace NKikimr::NColumnShard { namespace { -class TWriteActor : public TActorBootstrapped<TWriteActor> { +class TWriteActor : public TActorBootstrapped<TWriteActor>, public TMonitoringObjectsCounter<TWriteActor> { ui64 TabletId; TUsage ResourceUsage; - TBlobBatch BlobBatch; IWriteController::TPtr WriteController; THashSet<ui32> YellowMoveChannels; THashSet<ui32> YellowStopChannels; TInstant Deadline; - std::optional<ui64> MaxSmallBlobSize; public: - static constexpr NKikimrServices::TActivity::EType ActorActivityType() { - return NKikimrServices::TActivity::TX_COLUMNSHARD_WRITE_ACTOR; - } - - TWriteActor(ui64 tabletId, TBlobBatch&& blobBatch, IWriteController::TPtr writeController, const TInstant& deadline, std::optional<ui64> maxSmallBlobSize = {}) + TWriteActor(ui64 tabletId, IWriteController::TPtr writeController, const TInstant& deadline) : TabletId(tabletId) - , BlobBatch(std::move(blobBatch)) , WriteController(writeController) , Deadline(deadline) - , MaxSmallBlobSize(maxSmallBlobSize) {} void Handle(TEvBlobStorage::TEvPutResult::TPtr& ev, const TActorContext& ctx) { @@ -50,9 +42,8 @@ public: } LOG_S_TRACE("TEvPutResult for blob " << msg->Id.ToString()); - - BlobBatch.OnBlobWriteResult(ev); - if (BlobBatch.AllBlobWritesCompleted()) { + WriteController->OnBlobWriteResult(*msg); + if (WriteController->IsBlobActionsReady()) { return SendResultAndDie(ctx, NKikimrProto::OK); } } @@ -72,7 +63,7 @@ public: } } - auto putResult = std::make_shared<TBlobPutResult>(putStatus, std::move(BlobBatch), + auto putResult = std::make_shared<TBlobPutResult>(putStatus, std::move(YellowMoveChannels), std::move(YellowStopChannels), ResourceUsage); @@ -92,25 +83,12 @@ public: ctx.Schedule(timeout, new TEvents::TEvWakeup()); } - auto blobsConstructor = WriteController->GetBlobConstructor(); - if (!blobsConstructor) { - return SendResultAndDie(ctx, NKikimrProto::CORRUPTED); + while (auto writeInfo = WriteController->Next()) { + ResourceUsage.Network += writeInfo->GetData().size(); + writeInfo->GetWriteOperator()->SendWriteBlobRequest(writeInfo->GetData(), writeInfo->GetBlobId()); } - auto status = NOlap::IBlobConstructor::EStatus::Finished; - while (true) { - status = blobsConstructor->BuildNext(); - if (status != NOlap::IBlobConstructor::EStatus::Ok) { - break; - } - auto blobId = SendWriteBlobRequest(blobsConstructor->GetBlob(), ctx); - blobsConstructor->RegisterBlobId(blobId); - } - if (status != NOlap::IBlobConstructor::EStatus::Finished) { - return SendResultAndDie(ctx, NKikimrProto::CORRUPTED); - } - - if (BlobBatch.AllBlobWritesCompleted()) { + if (WriteController->IsBlobActionsReady()) { return SendResultAndDie(ctx, NKikimrProto::OK); } Become(&TThis::StateWait); @@ -124,23 +102,12 @@ public: break; } } - -private: - TUnifiedBlobId SendWriteBlobRequest(const TString& data, const TActorContext& ctx) { - ResourceUsage.Network += data.size(); - if (MaxSmallBlobSize && data.size() <= *MaxSmallBlobSize) { - TUnifiedBlobId smallBlobId = BlobBatch.AddSmallBlob(data); - Y_VERIFY(smallBlobId.IsSmallBlob()); - return smallBlobId; - } - return BlobBatch.SendWriteBlobRequest(data, Deadline, ctx); - } }; } // namespace -IActor* CreateWriteActor(ui64 tabletId, IWriteController::TPtr writeController, TBlobBatch&& blobBatch, const TInstant& deadline, const ui64 maxSmallBlobSize) { - return new TWriteActor(tabletId, std::move(blobBatch), writeController, deadline, maxSmallBlobSize); +IActor* CreateWriteActor(ui64 tabletId, IWriteController::TPtr writeController, const TInstant& deadline) { + return new TWriteActor(tabletId, writeController, deadline); } } diff --git a/ydb/core/tx/columnshard/ya.make b/ydb/core/tx/columnshard/ya.make index cbb9ca0a0d..6988f4b61c 100644 --- a/ydb/core/tx/columnshard/ya.make +++ b/ydb/core/tx/columnshard/ya.make @@ -5,7 +5,6 @@ SRCS( blob.cpp blob_cache.cpp blob_manager.cpp - blob_manager_db.cpp blob_manager_txs.cpp columnshard__export.cpp columnshard__forget.cpp @@ -57,6 +56,7 @@ PEERDIR( ydb/core/tx/columnshard/splitter ydb/core/tx/columnshard/operations ydb/core/tx/columnshard/blobs_reader + ydb/core/tx/columnshard/blobs_action ydb/core/tx/tiering ydb/core/tx/conveyor/usage ydb/core/tx/long_tx_service/public |