diff options
| author | Artem Zuikov <[email protected]> | 2022-03-30 16:42:39 +0300 |
|---|---|---|
| committer | Artem Zuikov <[email protected]> | 2022-03-30 16:42:39 +0300 |
| commit | 79ab149038faf6a9c169ba1ea051a3bf4b69ad7b (patch) | |
| tree | 153506cd9ed1d88451396d91a00815ef2a413e60 | |
| parent | cdb65e317d7f038dc5df74949759e7ff151443bf (diff) | |
KIKIMR-13595: write to S3 (in progress)
ref:e3d17e90d5bd325c7a132414325b18f6937de349
40 files changed, 1843 insertions, 386 deletions
diff --git a/ydb/core/protos/CMakeLists.txt b/ydb/core/protos/CMakeLists.txt index 8672caaf3ca..3e6df2490b3 100644 --- a/ydb/core/protos/CMakeLists.txt +++ b/ydb/core/protos/CMakeLists.txt @@ -60,7 +60,6 @@ target_proto_messages(ydb-core-protos PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/protos/counters_hive.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/counters_kesus.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/counters_keyvalue.proto - ${CMAKE_SOURCE_DIR}/ydb/core/protos/counters_olapshard.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/counters_pq.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/counters_replication.proto ${CMAKE_SOURCE_DIR}/ydb/core/protos/counters_schemeshard.proto @@ -178,7 +177,6 @@ target_sources(ydb-core-protos PRIVATE ${CMAKE_BINARY_DIR}/ydb/core/protos/counters_hive.pb.cc ${CMAKE_BINARY_DIR}/ydb/core/protos/counters_kesus.pb.cc ${CMAKE_BINARY_DIR}/ydb/core/protos/counters_keyvalue.pb.cc - ${CMAKE_BINARY_DIR}/ydb/core/protos/counters_olapshard.pb.cc ${CMAKE_BINARY_DIR}/ydb/core/protos/counters_pq.pb.cc ${CMAKE_BINARY_DIR}/ydb/core/protos/counters_replication.pb.cc ${CMAKE_BINARY_DIR}/ydb/core/protos/counters_schemeshard.pb.cc diff --git a/ydb/core/protos/counters_columnshard.proto b/ydb/core/protos/counters_columnshard.proto index 8c42878bab8..ca3597424b5 100644 --- a/ydb/core/protos/counters_columnshard.proto +++ b/ydb/core/protos/counters_columnshard.proto @@ -118,6 +118,10 @@ enum ECumulativeCounters { COUNTER_EVICTION_PORTIONS_WRITTEN = 64 [(CounterOpts) = {Name: "EvictionPortionsWritten"}]; COUNTER_EVICTION_BLOBS_WRITTEN = 65 [(CounterOpts) = {Name: "EvictionBlobsWritten"}]; COUNTER_EVICTION_BYTES_WRITTEN = 66 [(CounterOpts) = {Name: "EvictionBytesWritten"}]; + COUNTER_EXPORT_SUCCESS = 67 [(CounterOpts) = {Name: "ExportSuccess"}]; + COUNTER_EXPORT_FAIL = 68 [(CounterOpts) = {Name: "ExportFail"}]; + COUNTER_FORGET_SUCCESS = 69 [(CounterOpts) = {Name: "ForgetSuccess"}]; + COUNTER_FORGET_FAIL = 70 [(CounterOpts) = {Name: "ForgetFail"}]; } enum EPercentileCounters { @@ -157,4 +161,6 @@ enum ETxTypes { TXTYPE_PROGRESS = 10 [(TxTypeOpts) = {Name: "TxProgress"}]; TXTYPE_START_SCAN = 11 [(TxTypeOpts) = {Name: "TxStartScan"}]; TXTYPE_READ_BLOB_RANGES = 12 [(TxTypeOpts) = {Name: "TxReadBlobRanges"}]; + TXTYPE_EXPORT = 13 [(TxTypeOpts) = {Name: "TxExport"}]; + TXTYPE_FORGET = 14 [(TxTypeOpts) = {Name: "TxForget"}]; } diff --git a/ydb/core/protos/counters_olapshard.proto b/ydb/core/protos/counters_olapshard.proto deleted file mode 100644 index 78ae9d95dee..00000000000 --- a/ydb/core/protos/counters_olapshard.proto +++ /dev/null @@ -1,44 +0,0 @@ -import "ydb/core/protos/counters.proto"; - -package NKikimr.NOlapShard; -option java_package = "ru.yandex.kikimr.proto"; - -option (TabletTypeName) = "OlapShard"; // Used as prefix for all counters - -enum ESimpleCounters { - COUNTER_SIMPLE_IGNORE = 0; -} - -enum ECumulativeCounters { - COUNTER_CUMULATIVE_IGNORE = 0; - COUNTER_BEGIN_OLAP_TX_SUCCESS = 1 [(CounterOpts) = {Name: "BeginOlapTxSuccess"}]; - COUNTER_BEGIN_OLAP_TX_FAIL = 2 [(CounterOpts) = {Name: "BeginOlapTxFail"}]; - COUNTER_COMMIT_OLAP_TX_SUCCESS = 3 [(CounterOpts) = {Name: "CommitOlapTxSuccess"}]; - COUNTER_COMMIT_OLAP_TX_FAIL = 4 [(CounterOpts) = {Name: "CommitOlapTxFail"}]; - -} - -enum EPercentileCounters { - option (GlobalCounterOpts) = { - Ranges { Value: 0 Name: "0 ms" } - Ranges { Value: 1 Name: "1 ms" } - }; - - COUNTER_PERCENTILE_IGNORE = 0; -} - -enum ETxTypes { - TXTYPE_INIT = 0 [(TxTypeOpts) = {Name: "TxInit"}]; - TXTYPE_INIT_SCHEMA = 1 [(TxTypeOpts) = {Name: "TxInitSchema"}]; - TXTYPE_UPDATE_SCHEMA = 2 [(TxTypeOpts) = {Name: "TxUpdateSchema"}]; - TXTYPE_BEGIN_OLAP_TX = 3 [(TxTypeOpts) = {Name: "TxBeginOlapTx"}]; - TXTYPE_COMMIT_OLAP_TX = 4 [(TxTypeOpts) = {Name: "TxCommitOlapTx"}]; - TXTYPE_PROPOSE = 5 [(TxTypeOpts) = {Name: "TxPropose"}]; - TXTYPE_PLANSTEP = 6 [(TxTypeOpts) = {Name: "TxPlanStep"}]; - TXTYPE_NOTIFY_TX_COMPLETION = 7 [(TxTypeOpts) = {Name: "TxNotifyTxCompletion"}]; - TXTYPE_COMMIT_UPDATE_LAST_TXID = 8 [(TxTypeOpts) = {Name: "TxCommitUpdateLastTxId"}]; - TXTYPE_COMMIT_UPDATE_PREPARED = 9 [(TxTypeOpts) = {Name: "TxCommitUpdatePrepared"}]; - TXTYPE_COMMIT_FINISHED = 10 [(TxTypeOpts) = {Name: "TxCommitFinished"}]; - TXTYPE_ATTACH_TX_COLUMN_SHARDS = 11 [(TxTypeOpts) = {Name: "TxAttachTxColumnShards"}]; - TXTYPE_PROPOSE_CANCEL = 12 [(TxTypeOpts) = {Name: "TxProposeCancel"}]; -} diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index 6085b914830..ce65302fa1c 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -847,6 +847,10 @@ message TS3Settings { optional string AccessKey = 5; optional string SecretKey = 6; optional Ydb.Export.ExportToS3Settings.StorageClass StorageClass = 7; + optional bool VerifySSL = 8; + optional string ProxyHost = 9; + optional uint32 ProxyPort = 10; + optional EScheme ProxyScheme = 11; message TLimits { optional uint32 ReadBatchSize = 1 [default = 8388608]; // 8 MB diff --git a/ydb/core/protos/services.proto b/ydb/core/protos/services.proto index 72b8384cc3f..0cd67891f40 100644 --- a/ydb/core/protos/services.proto +++ b/ydb/core/protos/services.proto @@ -882,5 +882,7 @@ message TActivity { YQ_AUDIT_ACTOR = 564; YQ_AUDIT_EVENT_SENDER_ACTOR = 565; BLOCKSTORE_USER_STATS = 566; + TX_COLUMNSHARD_S3_ACTOR = 567; + TX_COLUMNSHARD_EXPORT_ACTOR = 568; }; }; diff --git a/ydb/core/protos/tx_columnshard.proto b/ydb/core/protos/tx_columnshard.proto index f79c256a5bc..4356af3258c 100644 --- a/ydb/core/protos/tx_columnshard.proto +++ b/ydb/core/protos/tx_columnshard.proto @@ -53,6 +53,11 @@ message TMetadata { optional bytes LogicalMeta = 4; // TLogicalMetadata } +message TEvictMetadata { + optional string TierName = 1; + optional uint32 TierVersion = 2; +} + message TEvWrite { optional NActorsProto.TActorId Source = 1; optional uint64 TxInitiator = 2; diff --git a/ydb/core/tx/columnshard/CMakeLists.txt b/ydb/core/tx/columnshard/CMakeLists.txt index da2da444cf2..c178513ccc6 100644 --- a/ydb/core/tx/columnshard/CMakeLists.txt +++ b/ydb/core/tx/columnshard/CMakeLists.txt @@ -38,6 +38,8 @@ target_sources(core-tx-columnshard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blob_manager.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blob_manager_db.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blob_manager_txs.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard__export.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard__forget.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard__init.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard__notify_tx_completion.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard__plan_step.cpp @@ -54,9 +56,11 @@ target_sources(core-tx-columnshard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_common.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/compaction_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/eviction_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/export_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/indexing_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/read_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/write_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/s3_actor.cpp ) generate_enum_serilization(core-tx-columnshard ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard.h diff --git a/ydb/core/tx/columnshard/blob.cpp b/ydb/core/tx/columnshard/blob.cpp index b22d3fca73c..99bde3a526a 100644 --- a/ydb/core/tx/columnshard/blob.cpp +++ b/ydb/core/tx/columnshard/blob.cpp @@ -7,6 +7,50 @@ namespace NKikimr::NOlap { +TString DsToS3Key(const TString& s) { + Y_VERIFY(s.size() > 2); + TString res = s; + res[0] = 'S'; + res[1] = '3'; + for (size_t i = 2; i < res.size(); ++i) { + switch (res[i]) { + case '[': + res[i] = 'a'; + break; + case ']': + res[i] = 'b'; + break; + case ':': + res[i] = '_'; + break; + } + } + return res; +} + +TString S3ToDsKey(const TString& s) { + Y_VERIFY(s.size() > 2); + TString res = s; + res[0] = 'D'; + res[1] = 'S'; + for (size_t i = 2; i < res.size(); ++i) { + switch (res[i]) { + case 'a': + res[i] = '['; + break; + case 'b': + res[i] = ']'; + break; + case '_': + res[i] = ':'; + break; + } + } + return res; +} + +namespace { + #define PARSE_INT_COMPONENT(fieldType, fieldName, endChar) \ if (pos >= endPos) { \ error = "Failed to parse " #fieldName " component"; \ @@ -29,7 +73,7 @@ namespace NKikimr::NOlap { // Format: "DS:group:logoBlobId" // Example: "DS:2181038103:[72075186224038245:51:31595:2:0:11952:0]" -TUnifiedBlobId DoParseExtendedDsBlobId(const TString& s, TString& error) { +TUnifiedBlobId ParseExtendedDsBlobId(const TString& s, TString& error) { Y_VERIFY(s.size() > 2); const char* str = s.c_str(); Y_VERIFY(str[0] == 'D' && str[1] == 'S'); @@ -52,7 +96,7 @@ TUnifiedBlobId DoParseExtendedDsBlobId(const TString& s, TString& error) { // Format: "SM[tabletId:generation:step:cookie:size]" // Example: "SM[72075186224038245:51:31184:0:2528]" -TUnifiedBlobId DoParseSmallBlobId(const TString& s, TString& error) { +TUnifiedBlobId ParseSmallBlobId(const TString& s, TString& error) { Y_VERIFY(s.size() > 2); const char* str = s.c_str(); Y_VERIFY(str[0] == 'S' && str[1] == 'M'); @@ -77,11 +121,32 @@ TUnifiedBlobId DoParseSmallBlobId(const TString& s, TString& error) { return TUnifiedBlobId(tabletId, gen, step, cookie, size); } +// Format: "S3_key|bucket" +TUnifiedBlobId ParseS3BlobId(const TString& s, TString& error) { + TVector<TString> keyBucket; + Split(s, "|", keyBucket); + + if (s.size() < 2 || s[0] != 'S' || s[1] != '3' || + keyBucket.size() != 2) { + error = TStringBuilder() << "Wrong S3 id '" << s << "'"; + return TUnifiedBlobId(); + } + + TUnifiedBlobId dsBlobId = ParseExtendedDsBlobId(S3ToDsKey(keyBucket[0]), error); + if (!dsBlobId.IsValid()) { + return TUnifiedBlobId(); + } + + return TUnifiedBlobId(dsBlobId, TUnifiedBlobId::S3_BLOB, keyBucket[1]); +} + +} + TUnifiedBlobId TUnifiedBlobId::ParseFromString(const TString& str, const IBlobGroupSelector* dsGroupSelector, TString& error) { if (str.size() <= 2) { - error = "String size is too small"; + error = TStringBuilder() << "Wrong blob id: '" << str << "'"; return TUnifiedBlobId(); } @@ -107,15 +172,14 @@ TUnifiedBlobId TUnifiedBlobId::ParseFromString(const TString& str, return TUnifiedBlobId(dsGroupSelector->GetGroup(logoBlobId), logoBlobId); } } else if (str[0] == 'D' && str[1] == 'S') { - return DoParseExtendedDsBlobId(str, error); + return ParseExtendedDsBlobId(str, error); } else if (str[0] == 'S' && str[1] == 'M') { - return DoParseSmallBlobId(str, error); + return ParseSmallBlobId(str, error); } else if (str[0] == 'S' && str[1] == '3') { - error = "S3 blob id parsing is not yet implemented"; - return TUnifiedBlobId(); + return ParseS3BlobId(str, error); } - error = Sprintf("Unknown blob id format: %c%c", str[0], str[1]); + error = TStringBuilder() << "Wrong blob id: '" << str << "'"; return TUnifiedBlobId(); } diff --git a/ydb/core/tx/columnshard/blob.h b/ydb/core/tx/columnshard/blob.h index ec83cff7601..6b730a57a58 100644 --- a/ydb/core/tx/columnshard/blob.h +++ b/ydb/core/tx/columnshard/blob.h @@ -8,6 +8,9 @@ namespace NKikimr::NOlap { class IBlobGroupSelector; +TString DsToS3Key(const TString& s); +TString S3ToDsKey(const TString& s); + // Encapsulates different types of blob ids to simplify dealing with blobs for the // components that do not need to know where the blob is stored // Blob id formats: @@ -81,10 +84,39 @@ class TUnifiedBlobId { } }; + struct TS3BlobId { + TDsBlobId DsBlobId; + TString Bucket; + TString Key; + + TS3BlobId() = default; + + TS3BlobId(const TUnifiedBlobId& dsBlob, const TString& bucket) + : Bucket(bucket) + { + Y_VERIFY(dsBlob.IsDsBlob()); + DsBlobId = std::get<TDsBlobId>(dsBlob.Id); + Key = DsToS3Key(DsBlobId.ToStringNew()); + } + + bool operator == (const TS3BlobId& other) const { + return Bucket == other.Bucket && Key == other.Key; + } + + TString ToStringNew() const { + return Sprintf("%s|%s", Key.c_str(), Bucket.c_str()); + } + + ui64 Hash() const { + return CombineHashes<ui64>(THash<TString>()(Bucket), THash<TString>()(Key)); + } + }; + std::variant< TInvalid, TDsBlobId, - TSmallBlobId + TSmallBlobId, + TS3BlobId > Id; public: @@ -92,6 +124,7 @@ public: INVALID = 0, DS_BLOB = 1, TABLET_SMALL_BLOB = 2, + S3_BLOB = 3, }; TUnifiedBlobId() @@ -108,6 +141,13 @@ public: : Id(TSmallBlobId{tabletId, gen, step, cookie, size}) {} + // Make S3 blob Id from DS one + TUnifiedBlobId(const TUnifiedBlobId& blob, EBlobType type, const TString& bucket) + : Id(TS3BlobId(blob, bucket)) + { + Y_VERIFY(type == S3_BLOB); + } + TUnifiedBlobId(const TUnifiedBlobId& other) = default; TUnifiedBlobId& operator = (const TUnifiedBlobId& logoBlobId) = default; TUnifiedBlobId(TUnifiedBlobId&& other) = default; @@ -134,9 +174,12 @@ public: return std::get<TDsBlobId>(Id).BlobId.BlobSize(); case TABLET_SMALL_BLOB: return std::get<TSmallBlobId>(Id).Size; - default: + case S3_BLOB: + return std::get<TS3BlobId>(Id).DsBlobId.BlobId.BlobSize(); + case INVALID: Y_FAIL("Invalid blob id"); } + Y_FAIL(); } bool IsSmallBlob() const { @@ -147,6 +190,10 @@ public: return GetType() == DS_BLOB; } + bool IsS3Blob() const { + return GetType() == S3_BLOB; + } + TLogoBlobID GetLogoBlobId() const { Y_VERIFY(IsDsBlob()); return std::get<TDsBlobId>(Id).BlobId; @@ -157,15 +204,23 @@ public: return std::get<TDsBlobId>(Id).DsGroup; } + TString GetS3Key() const { + Y_VERIFY(IsS3Blob()); + return std::get<TS3BlobId>(Id).Key; + } + ui64 GetTabletId() const { switch (Id.index()) { case DS_BLOB: return std::get<TDsBlobId>(Id).BlobId.TabletID(); case TABLET_SMALL_BLOB: return std::get<TSmallBlobId>(Id).TabletId; - default: - Y_FAIL("No Tablet Id"); + case S3_BLOB: + return std::get<TS3BlobId>(Id).DsBlobId.BlobId.TabletID(); + case INVALID: + Y_FAIL("Invalid blob id"); } + Y_FAIL(); } ui64 Hash() const noexcept { @@ -176,9 +231,10 @@ public: return std::get<TDsBlobId>(Id).Hash(); case TABLET_SMALL_BLOB: return std::get<TSmallBlobId>(Id).Hash(); - default: - Y_FAIL("Not implemented"); + case S3_BLOB: + return std::get<TS3BlobId>(Id).Hash(); } + Y_FAIL(); } // This is only implemented for DS for backward compatibility with persisted data. @@ -194,9 +250,12 @@ public: return std::get<TDsBlobId>(Id).ToStringLegacy(); case TABLET_SMALL_BLOB: return std::get<TSmallBlobId>(Id).ToStringLegacy(); - default: + case S3_BLOB: + Y_FAIL("Not implemented"); + case INVALID: return "<Invalid blob id>"; } + Y_FAIL(); } TString ToStringNew() const { @@ -205,9 +264,12 @@ public: return std::get<TDsBlobId>(Id).ToStringNew(); case TABLET_SMALL_BLOB: return std::get<TSmallBlobId>(Id).ToStringNew(); - default: + case S3_BLOB: + return std::get<TS3BlobId>(Id).ToStringNew(); + case INVALID: return "<Invalid blob id>"; } + Y_FAIL(); } }; @@ -244,6 +306,31 @@ struct TBlobRange { } }; +// Expected blob lifecycle: EVICTING -> SELF_CACHED -> EXTERN <-> CACHED +enum class EEvictState : ui8 { + UNKNOWN = 0, + EVICTING = 1, // source, extern, cached blobs: 1-- + SELF_CACHED = 2, // source, extern, cached blobs: 11- + EXTERN = 3, // source, extern, cached blobs: -1- + CACHED = 4, // source, extern, cached blobs: -11 + ERASED = 5, // source, extern, cached blobs: --- +}; + +struct TEvictedBlob { + EEvictState State = EEvictState::UNKNOWN; + TUnifiedBlobId Blob; + TUnifiedBlobId ExternBlob; + TUnifiedBlobId CachedBlob; + + bool operator == (const TEvictedBlob& other) const { + return Blob == other.Blob; + } + + ui64 Hash() const noexcept { + return Blob.Hash(); + } +}; + } inline @@ -269,3 +356,10 @@ struct THash<NKikimr::NOlap::TBlobRange> { return key.Hash(); } }; + +template <> +struct THash<NKikimr::NOlap::TEvictedBlob> { + inline size_t operator() (const NKikimr::NOlap::TEvictedBlob& key) const { + return key.Hash(); + } +}; diff --git a/ydb/core/tx/columnshard/blob_manager.cpp b/ydb/core/tx/columnshard/blob_manager.cpp index 2d390802d0f..48c66c894e4 100644 --- a/ydb/core/tx/columnshard/blob_manager.cpp +++ b/ydb/core/tx/columnshard/blob_manager.cpp @@ -456,6 +456,129 @@ void TBlobManager::DeleteBlob(const TUnifiedBlobId& blobId, IBlobManagerDb& db) } } +bool TBlobManager::ExportOneToOne(const TUnifiedBlobId& blobId, const NKikimrTxColumnShard::TEvictMetadata& meta, + IBlobManagerDb& db) +{ + NOlap::TEvictedBlob evict{ + .State = EEvictState::EVICTING, + .Blob = blobId + }; + + if (EvictedBlobs.count(evict)) { + return false; + } + + TString strMeta; + Y_PROTOBUF_SUPPRESS_NODISCARD meta.SerializeToString(&strMeta); + + db.UpdateEvictBlob(evict, strMeta); + EvictedBlobs.emplace(std::move(evict), meta); + return true; +} + +bool TBlobManager::DropOneToOne(const TUnifiedBlobId& blobId, IBlobManagerDb& db) { + NOlap::TEvictedBlob evict{ + .State = EEvictState::UNKNOWN, + .Blob = blobId + }; + + TEvictMetadata meta; + bool extracted = ExtractEvicted(evict, meta); + if (!extracted) { + return false; // It's not at exported blob. + } +#if 0 // TODO: SELF_CACHED logic + if (evict.State == EEvictState::SELF_CACHED) { + evict.State = EEvictState::EXTERN; // SELF_CACHED -> EXTERN for dropped + } +#endif + db.DropEvictBlob(evict); + DroppedEvictedBlobs.emplace(std::move(evict), std::move(meta)); + return true; +} + +bool TBlobManager::UpdateOneToOne(TEvictedBlob&& evict, IBlobManagerDb& db, bool& dropped) { + TEvictMetadata meta; + + TEvictedBlob old{.Blob = evict.Blob}; + bool extracted = ExtractEvicted(old, meta); + dropped = false; + if (!extracted) { + dropped = DroppedEvictedBlobs.count(evict); + if (!dropped) { + return false; // update after erase + } + extracted = ExtractEvicted(old, meta, true); + } + Y_VERIFY(extracted); + + switch (evict.State) { + case EEvictState::SELF_CACHED: + Y_VERIFY(old.State == EEvictState::EVICTING); + break; + case EEvictState::EXTERN: + Y_VERIFY(old.State == EEvictState::EVICTING || old.State == EEvictState::SELF_CACHED); + break; + default: + break; + } + + if (dropped) { + if (evict.State == EEvictState::SELF_CACHED) { + evict.State = EEvictState::EXTERN; // SELF_CACHED -> EXTERN for dropped + } + DroppedEvictedBlobs.emplace(evict, meta); + } else { + EvictedBlobs.emplace(evict, meta); + } + + // TODO: update meta if needed + db.UpdateEvictBlob(evict, {}); + return true; +} + +bool TBlobManager::EraseOneToOne(const TEvictedBlob& evict, IBlobManagerDb& db) { + db.EraseEvictBlob(evict); + return DroppedEvictedBlobs.erase(evict); +} + +bool TBlobManager::LoadOneToOneExport(IBlobManagerDb& db) { + EvictedBlobs.clear(); + DroppedEvictedBlobs.clear(); + + TBlobGroupSelector dsGroupSelector(TabletInfo); + THashMap<TEvictedBlob, TString> evicted; + THashMap<TEvictedBlob, TString> dropped; + if (!db.LoadEvicted(evicted, dropped, dsGroupSelector)) { + return false; + } + + for (auto& [evict, metadata] : evicted) { + NKikimrTxColumnShard::TEvictMetadata meta; + Y_VERIFY(meta.ParseFromString(metadata)); + + EvictedBlobs.emplace(evict, meta); + } + + for (auto& [evict, metadata] : dropped) { + NKikimrTxColumnShard::TEvictMetadata meta; + Y_VERIFY(meta.ParseFromString(metadata)); + + DroppedEvictedBlobs.emplace(evict, meta); + } + + return true; +} + +TEvictedBlob TBlobManager::GetDropped(const TUnifiedBlobId& blobId, TEvictMetadata& meta) { + auto it = DroppedEvictedBlobs.find(TEvictedBlob{.Blob = blobId}); + if (it != DroppedEvictedBlobs.end()) { + meta = it->second; + return it->first; + } + return {}; +} + void TBlobManager::DeleteSmallBlob(const TUnifiedBlobId& blobId, IBlobManagerDb& db) { LOG_S_DEBUG("BlobManager at tablet " << TabletInfo->TabletID << " Delete Small Blob " << blobId); db.EraseSmallBlob(blobId); diff --git a/ydb/core/tx/columnshard/blob_manager.h b/ydb/core/tx/columnshard/blob_manager.h index fb9814ceac7..12611e835e6 100644 --- a/ydb/core/tx/columnshard/blob_manager.h +++ b/ydb/core/tx/columnshard/blob_manager.h @@ -12,6 +12,9 @@ namespace NKikimr::NColumnShard { using NOlap::TUnifiedBlobId; using NOlap::TBlobRange; +using NOlap::TEvictedBlob; +using NOlap::EEvictState; +using NKikimrTxColumnShard::TEvictMetadata; // A batch of blobs that are written by a single task. @@ -79,6 +82,23 @@ public: virtual void DeleteBlob(const TUnifiedBlobId& blobId, IBlobManagerDb& db) = 0; }; +// An interface for exporting and caching exported blobs out of ColumnShard index to external storages like S3. +// Just do not mix it with IBlobManager that use out storage model. +class IBlobExporter { +protected: + ~IBlobExporter() = default; + +public: + // Lazily export blob to external object store. Keep it available via blobId. + virtual bool ExportOneToOne(const TUnifiedBlobId& blobId, const TEvictMetadata& meta, IBlobManagerDb& db) = 0; + virtual bool DropOneToOne(const TUnifiedBlobId& blobId, IBlobManagerDb& db) = 0; + virtual bool UpdateOneToOne(TEvictedBlob&& evict, IBlobManagerDb& db, bool& dropped) = 0; + virtual bool EraseOneToOne(const TEvictedBlob& evict, IBlobManagerDb& db) = 0; + virtual bool LoadOneToOneExport(IBlobManagerDb& db) = 0; + //virtual TEvictedBlob GetEvicted(const TUnifiedBlobId& blob, TEvictMetadata& meta) = 0; + virtual TEvictedBlob GetDropped(const TUnifiedBlobId& blobId, TEvictMetadata& meta) = 0; +}; + // Garbage Collection generation and step using TGenStep = std::tuple<ui32, ui32>; @@ -115,7 +135,7 @@ struct TBlobManagerCounters { }; // The implementation of BlobManager that hides all GC-related details -class TBlobManager : public IBlobManager, public IBlobInUseTracker { +class TBlobManager : public IBlobManager, public IBlobExporter, public IBlobInUseTracker { private: static constexpr size_t BLOB_COUNT_TO_TRIGGER_GC_DEFAULT = 1000; static constexpr ui64 GC_INTERVAL_SECONDS_DEFAULT = 60; @@ -178,6 +198,10 @@ private: TInstant PreviousGCTime; // Used for delaying next GC if there are too few blobs to collect + // + std::unordered_map<TEvictedBlob, TEvictMetadata, THash<NKikimr::NOlap::TEvictedBlob>> EvictedBlobs; + std::unordered_map<TEvictedBlob, TEvictMetadata, THash<NKikimr::NOlap::TEvictedBlob>> DroppedEvictedBlobs; + public: TBlobManager(TIntrusivePtr<TTabletStorageInfo> tabletInfo, ui32 gen); @@ -206,6 +230,14 @@ public: void SaveBlobBatch(TBlobBatch&& blobBatch, IBlobManagerDb& db) override; void DeleteBlob(const TUnifiedBlobId& blobId, IBlobManagerDb& db) override; + // Implementation of IBlobExporter + bool ExportOneToOne(const TUnifiedBlobId& blobId, const TEvictMetadata& meta, IBlobManagerDb& db) override; + bool DropOneToOne(const TUnifiedBlobId& blob, IBlobManagerDb& db) override; + bool UpdateOneToOne(TEvictedBlob&& evict, IBlobManagerDb& db, bool& dropped) override; + bool EraseOneToOne(const TEvictedBlob& evict, IBlobManagerDb& db) override; + bool LoadOneToOneExport(IBlobManagerDb& db) override; + TEvictedBlob GetDropped(const TUnifiedBlobId& blobId, TEvictMetadata& meta) override; + // Implementation of IBlobInUseTracker void SetBlobInUse(const TUnifiedBlobId& blobId, bool inUse) override; @@ -214,6 +246,29 @@ private: // Delete small blobs that were previously in use and could not be deleted void PerformDelayedDeletes(IBlobManagerDb& db); + + bool ExtractEvicted(TEvictedBlob& evict, TEvictMetadata& meta, bool fromDropped = false) { + if (fromDropped) { + if (DroppedEvictedBlobs.count(evict)) { + auto node = DroppedEvictedBlobs.extract(evict); + if (!node.empty()) { + evict = node.key(); + meta = node.mapped(); + return true; + } + } + } else { + if (EvictedBlobs.count(evict)) { + auto node = EvictedBlobs.extract(evict); + if (!node.empty()) { + evict = node.key(); + meta = node.mapped(); + return true; + } + } + } + return false; + } }; } diff --git a/ydb/core/tx/columnshard/blob_manager_db.cpp b/ydb/core/tx/columnshard/blob_manager_db.cpp index 839c179f543..52f62b86230 100644 --- a/ydb/core/tx/columnshard/blob_manager_db.cpp +++ b/ydb/core/tx/columnshard/blob_manager_db.cpp @@ -106,4 +106,99 @@ void TBlobManagerDb::EraseSmallBlob(const TUnifiedBlobId& blobId) { db.Table<Schema::SmallBlobs>().Key(blobId.ToStringNew()).Delete(); } +bool TBlobManagerDb::LoadEvicted(THashMap<TEvictedBlob, TString>& evicted, THashMap<TEvictedBlob, TString>& dropped, + const NOlap::IBlobGroupSelector& dsGroupSelector) { + evicted.clear(); + dropped.clear(); + + NIceDb::TNiceDb db(Database); + + auto rowset = db.Table<Schema::OneToOneEvictedBlobs>().Select(); + if (!rowset.IsReady()) { + return false; + } + + TString error; + + while (!rowset.EndOfSet()) { + TString strBlobId = rowset.GetValue<Schema::OneToOneEvictedBlobs::BlobId>(); + //ui64 size = rowset.GetValue<Schema::OneToOneEvictedBlobs::Size>(); + ui8 state = rowset.GetValue<Schema::OneToOneEvictedBlobs::State>(); + bool isDropped = rowset.GetValue<Schema::OneToOneEvictedBlobs::Dropped>(); + TString meta = rowset.GetValue<Schema::OneToOneEvictedBlobs::Metadata>(); + TString strExternId = rowset.GetValue<Schema::OneToOneEvictedBlobs::ExternBlobId>(); + // TODO: CachedBlob + + TUnifiedBlobId blobId = TUnifiedBlobId::ParseFromString(strBlobId, &dsGroupSelector, error); + Y_VERIFY(blobId.IsValid(), "%s", error.c_str()); + + TUnifiedBlobId externId = TUnifiedBlobId::ParseFromString(strExternId, nullptr, error); + Y_VERIFY(externId.IsValid(), "%s", error.c_str()); + + TEvictedBlob evict{ + .State = (EEvictState)state, + .Blob = std::move(blobId), + .ExternBlob = std::move(externId), + }; + + if (isDropped) { + dropped.emplace(std::move(evict), std::move(meta)); + } else { + evicted.emplace(std::move(evict), std::move(meta)); + } + + if (!rowset.Next()) + return false; + } + + return true; +} + +void TBlobManagerDb::UpdateEvictBlob(const TEvictedBlob& evict, const TString& meta) { + NIceDb::TNiceDb db(Database); + + TString serializedBlobId = evict.Blob.ToStringNew(); + + switch (evict.State) { + case EEvictState::EVICTING: + db.Table<Schema::OneToOneEvictedBlobs>().Key(serializedBlobId).Update( + NIceDb::TUpdate<Schema::OneToOneEvictedBlobs::Size>(evict.Blob.BlobSize()), + NIceDb::TUpdate<Schema::OneToOneEvictedBlobs::State>((ui8)evict.State), + NIceDb::TUpdate<Schema::OneToOneEvictedBlobs::Metadata>(meta) + ); + break; + case EEvictState::SELF_CACHED: + case EEvictState::EXTERN: { + Y_VERIFY(meta.empty()); + Y_VERIFY(evict.ExternBlob.IsS3Blob()); + TString serializedExternId = evict.ExternBlob.ToStringNew(); + + db.Table<Schema::OneToOneEvictedBlobs>().Key(serializedBlobId).Update( + NIceDb::TUpdate<Schema::OneToOneEvictedBlobs::State>((ui8)evict.State), + NIceDb::TUpdate<Schema::OneToOneEvictedBlobs::ExternBlobId>(serializedExternId) + ); + break; + } + default: + Y_VERIFY(false); + break; + } +} + +void TBlobManagerDb::DropEvictBlob(const TEvictedBlob& evict) { + NIceDb::TNiceDb db(Database); + + TString serializedBlobId = evict.Blob.ToStringNew(); + db.Table<Schema::OneToOneEvictedBlobs>().Key(serializedBlobId).Update( + NIceDb::TUpdate<Schema::OneToOneEvictedBlobs::State>((ui8)evict.State), + NIceDb::TUpdate<Schema::OneToOneEvictedBlobs::Dropped>(true)); +} + +void TBlobManagerDb::EraseEvictBlob(const TEvictedBlob& evict) { + NIceDb::TNiceDb db(Database); + + TString serializedBlobId = evict.Blob.ToStringNew(); + db.Table<Schema::OneToOneEvictedBlobs>().Key(serializedBlobId).Delete(); +} + } diff --git a/ydb/core/tx/columnshard/blob_manager_db.h b/ydb/core/tx/columnshard/blob_manager_db.h index 0ffae5f4724..72f0031c35d 100644 --- a/ydb/core/tx/columnshard/blob_manager_db.h +++ b/ydb/core/tx/columnshard/blob_manager_db.h @@ -24,6 +24,13 @@ public: virtual void EraseBlobToDelete(const TUnifiedBlobId& blobId) = 0; virtual void WriteSmallBlob(const TUnifiedBlobId& blobId, const TString& data) = 0; virtual void EraseSmallBlob(const TUnifiedBlobId& blobId) = 0; + + virtual bool LoadEvicted(THashMap<TEvictedBlob, TString>& evicted, + THashMap<TEvictedBlob, TString>& dropped, + const NOlap::IBlobGroupSelector& dsGroupSelector) = 0; + virtual void UpdateEvictBlob(const TEvictedBlob& evict, const TString& meta) = 0; + virtual void DropEvictBlob(const TEvictedBlob& evict) = 0; + virtual void EraseEvictBlob(const TEvictedBlob& evict) = 0; }; @@ -45,6 +52,13 @@ public: void WriteSmallBlob(const TUnifiedBlobId& blobId, const TString& data) override; void EraseSmallBlob(const TUnifiedBlobId& blobId) override; + virtual bool LoadEvicted(THashMap<TEvictedBlob, TString>& evicted, + THashMap<TEvictedBlob, TString>& dropped, + const NOlap::IBlobGroupSelector& dsGroupSelector) override; + void UpdateEvictBlob(const TEvictedBlob& evict, const TString& meta) override; + void DropEvictBlob(const TEvictedBlob& evict) override; + void EraseEvictBlob(const TEvictedBlob& evict) override; + private: NTable::TDatabase& Database; }; diff --git a/ydb/core/tx/columnshard/columnshard.cpp b/ydb/core/tx/columnshard/columnshard.cpp index fd0c913ca3f..93b6898e968 100644 --- a/ydb/core/tx/columnshard/columnshard.cpp +++ b/ydb/core/tx/columnshard/columnshard.cpp @@ -11,17 +11,6 @@ IActor* CreateColumnShard(const TActorId& tablet, TTabletStorageInfo* info) { namespace NKikimr::NColumnShard { -IActor* CreateIndexingActor(ui64 tabletId, const TActorId& parent); -IActor* CreateCompactionActor(ui64 tabletId, const TActorId& parent); -IActor* CreateEvictionActor(ui64 tabletId, const TActorId& parent); -IActor* CreateWriteActor(ui64 tabletId, const NOlap::TIndexInfo& indexTable, - const TActorId& dstActor, TBlobBatch&& blobBatch, bool blobGrouppingEnabled, - TAutoPtr<TEvColumnShard::TEvWrite> ev, const TInstant& deadline = TInstant::Max()); -IActor* CreateWriteActor(ui64 tabletId, const NOlap::TIndexInfo& indexTable, - const TActorId& dstActor, TBlobBatch&& blobBatch, bool blobGrouppingEnabled, - TAutoPtr<TEvPrivate::TEvWriteIndex> ev, const TInstant& deadline = TInstant::Max()); -IActor* CreateColumnShardScan(const TActorId& scanComputeActor, ui32 scanId, ui64 txId); - void TColumnShard::BecomeBroken(const TActorContext& ctx) { Become(&TThis::StateBroken); @@ -29,14 +18,18 @@ void TColumnShard::BecomeBroken(const TActorContext& ctx) ctx.Send(IndexingActor, new TEvents::TEvPoisonPill); ctx.Send(CompactionActor, new TEvents::TEvPoisonPill); ctx.Send(EvictionActor, new TEvents::TEvPoisonPill); + StopS3Actors(ctx); } void TColumnShard::SwitchToWork(const TActorContext& ctx) { Become(&TThis::StateWork); LOG_S_INFO("Switched to work at " << TabletID() << " actor " << ctx.SelfID); + IndexingActor = ctx.Register(CreateIndexingActor(TabletID(), ctx.SelfID)); CompactionActor = ctx.Register(CreateCompactionActor(TabletID(), ctx.SelfID)); EvictionActor = ctx.Register(CreateEvictionActor(TabletID(), ctx.SelfID)); + InitS3Actors(ctx, true); + SignalTabletActive(ctx); } @@ -88,131 +81,6 @@ void TColumnShard::Handle(TEvTabletPipe::TEvServerDisconnected::TPtr& ev, const LOG_S_DEBUG("Server pipe reset at tablet " << TabletID() << ", remote " << tabletId); } -void TColumnShard::Handle(TEvColumnShard::TEvProposeTransaction::TPtr& ev, const TActorContext& ctx) { - auto& record = Proto(ev->Get()); - ui32 txKind = record.GetTxKind(); - ui64 txId = record.GetTxId(); - LOG_S_DEBUG("ProposeTransaction kind " << txKind << " txId " << txId << " at tablet " << TabletID()); - - Execute(new TTxProposeTransaction(this, ev), ctx); -} - -void TColumnShard::Handle(TEvTxProcessing::TEvPlanStep::TPtr& ev, const TActorContext& ctx) { - ui64 step = ev->Get()->Record.GetStep(); - ui64 mediatorId = ev->Get()->Record.GetMediatorID(); - LOG_S_DEBUG("PlanStep " << step << " at tablet " << TabletID() << ", mediator " << mediatorId); - - Execute(new TTxPlanStep(this, ev), ctx); -} - -// EvWrite -> WriteActor (attach BlobId without proto changes) -> EvWrite -void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContext& ctx) { - OnYellowChannels(std::move(ev->Get()->YellowMoveChannels), std::move(ev->Get()->YellowStopChannels)); - - auto& data = Proto(ev->Get()).GetData(); - const ui64 tableId = ev->Get()->Record.GetTableId(); - bool error = data.empty() || data.size() > TLimits::MAX_BLOB_SIZE || !PrimaryIndex || !IsTableWritable(tableId) - || ev->Get()->PutStatus == NKikimrProto::ERROR; - - if (error) { - LOG_S_WARN("Write (fail) " << data.size() << " bytes at tablet " << TabletID()); - - ev->Get()->PutStatus = NKikimrProto::ERROR; - Execute(new TTxWrite(this, ev), ctx); - } else if (InsertTable->IsOverloaded(tableId)) { - LOG_S_INFO("Write (overload) " << data.size() << " bytes for table " << tableId << " at tablet " << TabletID()); - - ev->Get()->PutStatus = NKikimrProto::TRYLATER; - Execute(new TTxWrite(this, ev), ctx); - } else if (ev->Get()->BlobId.IsValid()) { - LOG_S_DEBUG("Write (record) " << data.size() << " bytes at tablet " << TabletID()); - - Execute(new TTxWrite(this, ev), ctx); - } else { - if (IsAnyChannelYellowStop()) { - LOG_S_ERROR("Write (out of disk space) at tablet " << TabletID()); - - IncCounter(COUNTER_OUT_OF_SPACE); - ev->Get()->PutStatus = NKikimrProto::TRYLATER; - Execute(new TTxWrite(this, ev), ctx); - } else { - LOG_S_DEBUG("Write (blob) " << data.size() << " bytes at tablet " << TabletID()); - - ev->Get()->MaxSmallBlobSize = Settings.MaxSmallBlobSize; - - ctx.Register(CreateWriteActor(TabletID(), PrimaryIndex->GetIndexInfo(), ctx.SelfID, - BlobManager->StartBlobBatch(), Settings.BlobWriteGrouppingEnabled, ev->Release())); - } - } -} - -void TColumnShard::Handle(TEvColumnShard::TEvRead::TPtr& ev, const TActorContext& ctx) { - const auto* msg = ev->Get(); - TRowVersion readVersion(msg->Record.GetPlanStep(), msg->Record.GetTxId()); - TRowVersion maxReadVersion = GetMaxReadVersion(); - LOG_S_DEBUG("Read at tablet " << TabletID() << " version=" << readVersion << " readable=" << maxReadVersion); - - if (maxReadVersion < readVersion) { - WaitingReads.emplace(readVersion, std::move(ev)); - WaitPlanStep(readVersion.Step); - return; - } - - Execute(new TTxRead(this, ev), ctx); -} - -void TColumnShard::Handle(TEvPrivate::TEvWriteIndex::TPtr& ev, const TActorContext& ctx) { - auto& blobs = ev->Get()->Blobs; - bool isCompaction = ev->Get()->GranuleCompaction; - if (isCompaction && blobs.empty()) { - ev->Get()->PutStatus = NKikimrProto::OK; - } - - if (ev->Get()->PutStatus == NKikimrProto::UNKNOWN) { - if (IsAnyChannelYellowStop()) { - LOG_S_ERROR("WriteIndex (out of disk space) at tablet " << TabletID()); - - IncCounter(COUNTER_OUT_OF_SPACE); - ev->Get()->PutStatus = NKikimrProto::TRYLATER; - Execute(new TTxWriteIndex(this, ev), ctx); - } else { - LOG_S_DEBUG("WriteIndex (" << blobs.size() << " blobs) at tablet " << TabletID()); - - Y_VERIFY(!blobs.empty()); - ctx.Register(CreateWriteActor(TabletID(), NOlap::TIndexInfo("dummy", 0), ctx.SelfID, - BlobManager->StartBlobBatch(), Settings.BlobWriteGrouppingEnabled, ev->Release())); - } - } else { - if (ev->Get()->PutStatus == NKikimrProto::OK) { - LOG_S_DEBUG("WriteIndex (records) at tablet " << TabletID()); - } else { - LOG_S_INFO("WriteIndex error at tablet " << TabletID()); - } - - OnYellowChannels(std::move(ev->Get()->YellowMoveChannels), std::move(ev->Get()->YellowStopChannels)); - Execute(new TTxWriteIndex(this, ev), ctx); - } -} - -void TColumnShard::Handle(TEvColumnShard::TEvScan::TPtr& ev, const TActorContext& ctx) { - const auto* msg = ev->Get(); - ui64 txId = msg->Record.GetTxId(); - const auto& snapshot = msg->Record.GetSnapshot(); - TRowVersion readVersion(snapshot.GetStep(), snapshot.GetTxId()); - TRowVersion maxReadVersion = GetMaxReadVersion(); - LOG_S_DEBUG("Scan at tablet " << TabletID() << " version=" << readVersion << " readable=" << maxReadVersion); - - if (maxReadVersion < readVersion) { - WaitingScans.emplace(readVersion, std::move(ev)); - WaitPlanStep(readVersion.Step); - return; - } - - ScanTxInFlight.insert({txId, TAppData::TimeProvider->Now()}); - SetCounter(COUNTER_SCAN_IN_FLY, ScanTxInFlight.size()); - Execute(new TTxScan(this, ev), ctx); -} - void TColumnShard::Handle(TEvPrivate::TEvScanStats::TPtr& ev, const TActorContext &ctx) { Y_UNUSED(ctx); @@ -235,11 +103,6 @@ void TColumnShard::Handle(TEvPrivate::TEvReadFinished::TPtr& ev, const TActorCon } } -void TColumnShard::Handle(TEvColumnShard::TEvReadBlobRanges::TPtr& ev, const TActorContext& ctx) { - LOG_S_DEBUG("Read blob ranges at tablet " << TabletID() << ev->Get()->Record); - Execute(new TTxReadBlobRanges(this, ev), ctx); -} - void TColumnShard::Handle(TEvPrivate::TEvPeriodicWakeup::TPtr& ev, const TActorContext& ctx) { if (ev->Get()->Manual) { EnqueueBackgroundActivities(); diff --git a/ydb/core/tx/columnshard/columnshard__export.cpp b/ydb/core/tx/columnshard/columnshard__export.cpp new file mode 100644 index 00000000000..04e4381091e --- /dev/null +++ b/ydb/core/tx/columnshard/columnshard__export.cpp @@ -0,0 +1,118 @@ +#include "columnshard_impl.h" +#include "blob_manager_db.h" + +namespace NKikimr::NColumnShard { + +using namespace NTabletFlatExecutor; + +class TTxExport : public TTransactionBase<TColumnShard> { +public: + TTxExport(TColumnShard* self, TEvPrivate::TEvExport::TPtr& ev) + : TBase(self) + , Ev(ev) + {} + + bool Execute(TTransactionContext& txc, const TActorContext& ctx) override; + void Complete(const TActorContext& ctx) override; + TTxType GetTxType() const override { return TXTYPE_EXPORT; } + +private: + TEvPrivate::TEvExport::TPtr Ev; +}; + + +bool TTxExport::Execute(TTransactionContext& txc, const TActorContext&) { + Y_VERIFY(Ev); + LOG_S_DEBUG("TTxExport.Execute at tablet " << Self->TabletID()); + + txc.DB.NoMoreReadsForTx(); + //NIceDb::TNiceDb db(txc.DB); + + auto& msg = *Ev->Get(); + auto status = msg.Status; + + if (status == NKikimrProto::OK) { + TBlobManagerDb blobManagerDb(txc.DB); + + for (auto& [blobId, externId] : msg.SrcToDstBlobs) { + Y_VERIFY(blobId.IsDsBlob()); + Y_VERIFY(externId.IsS3Blob()); + bool dropped = false; + +#if 0 // TODO: SELF_CACHED logic + NOlap::TEvictedBlob evict{ + .State = EEvictState::SELF_CACHED, + .Blob = blobId, + .ExternBlob = externId + }; + Self->BlobManager->UpdateOneToOne(std::move(evict), blobManagerDb, dropped); +#else + NOlap::TEvictedBlob evict{ + .State = EEvictState::EXTERN, + .Blob = blobId, + .ExternBlob = externId + }; + bool present = Self->BlobManager->UpdateOneToOne(std::move(evict), blobManagerDb, dropped); + + // Delayed erase of evicted blob. Blob could be already deleted. + if (present && !dropped) { + Self->BlobManager->DeleteBlob(blobId, blobManagerDb); + Self->IncCounter(COUNTER_BLOBS_ERASED); + Self->IncCounter(COUNTER_BYTES_ERASED, blobId.BlobSize()); + } + + // TODO: delete not present in S3 for sure (avoid race between export and forget) +#endif + } + + Self->IncCounter(COUNTER_EXPORT_SUCCESS); + } else { + Self->IncCounter(COUNTER_EXPORT_FAIL); + } + + return true; +} + +void TTxExport::Complete(const TActorContext&) { + LOG_S_DEBUG("TTxExport.Complete at tablet " << Self->TabletID()); +} + + +void TColumnShard::Handle(TEvPrivate::TEvExport::TPtr& ev, const TActorContext& ctx) { + auto status = ev->Get()->Status; + bool error = status == NKikimrProto::ERROR; + + if (error) { + LOG_S_WARN("Export (fail): '" << ev->Get()->ErrorStr << "' at tablet " << TabletID()); + } else if (status == NKikimrProto::UNKNOWN) { + ui64 exportNo = ev->Get()->ExportNo; + auto& tierBlobs = ev->Get()->TierBlobs; + Y_VERIFY(tierBlobs.size()); + + LOG_S_DEBUG("Export (write): " << exportNo << " at tablet " << TabletID()); + + for (auto& [tierName, blobIds] : tierBlobs) { + if (!S3Actors.count(tierName)) { + TString tier(tierName); + LOG_S_ERROR("No S3 actor for tier '" << tier << "' (on export) at tablet " << TabletID()); + continue; + } + auto& s3 = S3Actors[tierName]; + if (!s3) { + TString tier(tierName); + LOG_S_ERROR("Not started S3 actor for tier '" << tier << "' (on export) at tablet " << TabletID()); + continue; + } + auto event = std::make_unique<TEvPrivate::TEvExport>(exportNo, s3, std::move(blobIds)); + ctx.Register(CreateExportActor(TabletID(), ctx.SelfID, event.release())); + } + } else if (status == NKikimrProto::OK) { + LOG_S_DEBUG("Export (apply) at tablet " << TabletID()); + + Execute(new TTxExport(this, ev), ctx); + } else { + Y_VERIFY(false); + } +} + +} diff --git a/ydb/core/tx/columnshard/columnshard__forget.cpp b/ydb/core/tx/columnshard/columnshard__forget.cpp new file mode 100644 index 00000000000..89c5838d37f --- /dev/null +++ b/ydb/core/tx/columnshard/columnshard__forget.cpp @@ -0,0 +1,72 @@ +#include "columnshard_impl.h" +#include "blob_manager_db.h" + +namespace NKikimr::NColumnShard { + +using namespace NTabletFlatExecutor; + +class TTxForget : public TTransactionBase<TColumnShard> { +public: + TTxForget(TColumnShard* self, TEvPrivate::TEvForget::TPtr& ev) + : TBase(self) + , Ev(ev) + {} + + bool Execute(TTransactionContext& txc, const TActorContext& ctx) override; + void Complete(const TActorContext& ctx) override; + TTxType GetTxType() const override { return TXTYPE_FORGET; } + +private: + TEvPrivate::TEvForget::TPtr Ev; +}; + + +bool TTxForget::Execute(TTransactionContext& txc, const TActorContext&) { + Y_VERIFY(Ev); + LOG_S_DEBUG("TTxForget.Execute at tablet " << Self->TabletID()); + + txc.DB.NoMoreReadsForTx(); + //NIceDb::TNiceDb db(txc.DB); + + auto& msg = *Ev->Get(); + auto status = msg.Status; + + if (status == NKikimrProto::OK) { + TBlobManagerDb blobManagerDb(txc.DB); + + for (auto& evict : msg.Evicted) { + bool erased = Self->BlobManager->EraseOneToOne(evict, blobManagerDb); + if (!erased) { + LOG_S_ERROR("Forget unknown blob " << evict.Blob << " at tablet " << Self->TabletID()); + } + } + + Self->IncCounter(COUNTER_FORGET_SUCCESS); + } else { + Self->IncCounter(COUNTER_FORGET_FAIL); + } + + return true; +} + +void TTxForget::Complete(const TActorContext&) { + LOG_S_DEBUG("TTxForget.Complete at tablet " << Self->TabletID()); +} + + +void TColumnShard::Handle(TEvPrivate::TEvForget::TPtr& ev, const TActorContext& ctx) { + auto status = ev->Get()->Status; + bool error = status == NKikimrProto::ERROR; + + if (error) { + LOG_S_WARN("Forget (fail): '" << ev->Get()->ErrorStr << "' at tablet " << TabletID()); + } else if (status == NKikimrProto::OK) { + LOG_S_DEBUG("Forget (apply) at tablet " << TabletID()); + + Execute(new TTxForget(this, ev), ctx); + } else { + Y_VERIFY(false); + } +} + +} diff --git a/ydb/core/tx/columnshard/columnshard__init.cpp b/ydb/core/tx/columnshard/columnshard__init.cpp index 1a5420361b4..ac8ba69fe1f 100644 --- a/ydb/core/tx/columnshard/columnshard__init.cpp +++ b/ydb/core/tx/columnshard/columnshard__init.cpp @@ -63,6 +63,7 @@ bool TTxInit::ReadEverything(TTransactionContext& txc, const TActorContext& ctx) ready = ready && Schema::GetSpecialValue(db, Schema::EValueIds::LastWriteId, Self->LastWriteId); ready = ready && Schema::GetSpecialValue(db, Schema::EValueIds::LastPlannedStep, Self->LastPlannedStep); ready = ready && Schema::GetSpecialValue(db, Schema::EValueIds::LastPlannedTxId, Self->LastPlannedTxId); + ready = ready && Schema::GetSpecialValue(db, Schema::EValueIds::LastExportNumber, Self->LastExportNo); if (!ready) return false; @@ -319,6 +320,9 @@ bool TTxInit::ReadEverything(TTransactionContext& txc, const TActorContext& ctx) if (!Self->BlobManager->LoadState(blobManagerDb)) { return false; } + if (!Self->BlobManager->LoadOneToOneExport(blobManagerDb)) { + return false; + } } Self->UpdateInsertTableCounters(); diff --git a/ydb/core/tx/columnshard/columnshard__plan_step.cpp b/ydb/core/tx/columnshard/columnshard__plan_step.cpp index 59fb09dbd93..b34582ba7e7 100644 --- a/ydb/core/tx/columnshard/columnshard__plan_step.cpp +++ b/ydb/core/tx/columnshard/columnshard__plan_step.cpp @@ -8,6 +8,24 @@ namespace NKikimr::NColumnShard { using namespace NTabletFlatExecutor; +class TTxPlanStep : public NTabletFlatExecutor::TTransactionBase<TColumnShard> { +public: + TTxPlanStep(TColumnShard* self, TEvTxProcessing::TEvPlanStep::TPtr& ev) + : TBase(self) + , Ev(ev) + {} + + bool Execute(TTransactionContext& txc, const TActorContext& ctx) override; + void Complete(const TActorContext& ctx) override; + TTxType GetTxType() const override { return TXTYPE_PLANSTEP; } + +private: + TEvTxProcessing::TEvPlanStep::TPtr Ev; + THashMap<TActorId, TVector<ui64>> TxAcks; + std::unique_ptr<TEvTxProcessing::TEvPlanStepAccepted> Result; +}; + + bool TTxPlanStep::Execute(TTransactionContext& txc, const TActorContext&) { Y_VERIFY(Ev); LOG_S_DEBUG("TTxPlanStep.Execute at tablet " << Self->TabletID()); @@ -93,4 +111,13 @@ void TTxPlanStep::Complete(const TActorContext& ctx) { ctx.Send(Ev->Sender, Result.release()); } + +void TColumnShard::Handle(TEvTxProcessing::TEvPlanStep::TPtr& ev, const TActorContext& ctx) { + ui64 step = ev->Get()->Record.GetStep(); + ui64 mediatorId = ev->Get()->Record.GetMediatorID(); + LOG_S_DEBUG("PlanStep " << step << " at tablet " << TabletID() << ", mediator " << mediatorId); + + Execute(new TTxPlanStep(this, ev), ctx); +} + } diff --git a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp index 9caf23e866d..53abef0d31e 100644 --- a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp +++ b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp @@ -76,6 +76,7 @@ public: MakeHolder<TEvColumnShard::TEvNotifyTxCompletionResult>(Self->TabletID(), txId)); } Self->AltersInFlight.erase(txId); + Self->InitS3Actors(ctx, false); break; } case NKikimrTxColumnShard::TX_KIND_COMMIT: { diff --git a/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp b/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp index 352500107f7..cb18edaf0da 100644 --- a/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp +++ b/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp @@ -6,6 +6,23 @@ namespace NKikimr::NColumnShard { using namespace NTabletFlatExecutor; +class TTxProposeTransaction : public NTabletFlatExecutor::TTransactionBase<TColumnShard> { +public: + TTxProposeTransaction(TColumnShard* self, TEvColumnShard::TEvProposeTransaction::TPtr& ev) + : TBase(self) + , Ev(ev) + {} + + bool Execute(TTransactionContext& txc, const TActorContext& ctx) override; + void Complete(const TActorContext& ctx) override; + TTxType GetTxType() const override { return TXTYPE_PROPOSE; } + +private: + TEvColumnShard::TEvProposeTransaction::TPtr Ev; + std::unique_ptr<TEvColumnShard::TEvProposeTransactionResult> Result; +}; + + bool TTxProposeTransaction::Execute(TTransactionContext& txc, const TActorContext& ctx) { Y_VERIFY(Ev); LOG_S_DEBUG("TTxProposeTransaction.Execute at tablet " << Self->TabletID()); @@ -242,4 +259,14 @@ void TTxProposeTransaction::Complete(const TActorContext& ctx) { Self->TryRegisterMediatorTimeCast(); } + +void TColumnShard::Handle(TEvColumnShard::TEvProposeTransaction::TPtr& ev, const TActorContext& ctx) { + auto& record = Proto(ev->Get()); + ui32 txKind = record.GetTxKind(); + ui64 txId = record.GetTxId(); + LOG_S_DEBUG("ProposeTransaction kind " << txKind << " txId " << txId << " at tablet " << TabletID()); + + Execute(new TTxProposeTransaction(this, ev), ctx); +} + } diff --git a/ydb/core/tx/columnshard/columnshard__read.cpp b/ydb/core/tx/columnshard/columnshard__read.cpp index 6dcfc3d703e..73723847c5e 100644 --- a/ydb/core/tx/columnshard/columnshard__read.cpp +++ b/ydb/core/tx/columnshard/columnshard__read.cpp @@ -16,16 +16,26 @@ TVector<T> ProtoToVector(const U& cont) { } -IActor* CreateReadActor(ui64 tabletId, - const TActorId& dstActor, - std::unique_ptr<TEvColumnShard::TEvReadResult>&& event, - NOlap::TReadMetadata::TConstPtr readMetadata, - const TInstant& deadline, - const TActorId& columnShardActorId, - ui64 requestCookie); - using namespace NTabletFlatExecutor; +class TTxRead : public TTxReadBase { +public: + TTxRead(TColumnShard* self, TEvColumnShard::TEvRead::TPtr& ev) + : TTxReadBase(self) + , Ev(ev) + {} + + bool Execute(TTransactionContext& txc, const TActorContext& ctx) override; + void Complete(const TActorContext& ctx) override; + TTxType GetTxType() const override { return TXTYPE_READ; } + +private: + TEvColumnShard::TEvRead::TPtr Ev; + std::unique_ptr<TEvColumnShard::TEvReadResult> Result; + NOlap::TReadMetadata::TConstPtr ReadMetadata; +}; + + NOlap::TReadMetadata::TPtr TTxReadBase::PrepareReadMetadata(const TActorContext& ctx, const TReadDescription& read, const std::unique_ptr<NOlap::TInsertTable>& insertTable, @@ -293,4 +303,20 @@ void TTxRead::Complete(const TActorContext& ctx) { } } + +void TColumnShard::Handle(TEvColumnShard::TEvRead::TPtr& ev, const TActorContext& ctx) { + const auto* msg = ev->Get(); + TRowVersion readVersion(msg->Record.GetPlanStep(), msg->Record.GetTxId()); + TRowVersion maxReadVersion = GetMaxReadVersion(); + LOG_S_DEBUG("Read at tablet " << TabletID() << " version=" << readVersion << " readable=" << maxReadVersion); + + if (maxReadVersion < readVersion) { + WaitingReads.emplace(readVersion, std::move(ev)); + WaitPlanStep(readVersion.Step); + return; + } + + Execute(new TTxRead(this, ev), ctx); +} + } diff --git a/ydb/core/tx/columnshard/columnshard__read_blob_ranges.cpp b/ydb/core/tx/columnshard/columnshard__read_blob_ranges.cpp index 59383f94ed8..b742a726a73 100644 --- a/ydb/core/tx/columnshard/columnshard__read_blob_ranges.cpp +++ b/ydb/core/tx/columnshard/columnshard__read_blob_ranges.cpp @@ -7,6 +7,23 @@ namespace NKikimr::NColumnShard { using namespace NTabletFlatExecutor; +class TTxReadBlobRanges : public TTransactionBase<TColumnShard> { +public: + TTxReadBlobRanges(TColumnShard* self, TEvColumnShard::TEvReadBlobRanges::TPtr& ev) + : TTransactionBase<TColumnShard>(self) + , Ev(ev) + {} + + bool Execute(TTransactionContext& txc, const TActorContext& ctx) override; + void Complete(const TActorContext& ctx) override; + TTxType GetTxType() const override { return TXTYPE_READ_BLOB_RANGES; } + +private: + TEvColumnShard::TEvReadBlobRanges::TPtr Ev; + std::unique_ptr<TEvColumnShard::TEvReadBlobRangesResult> Result; +}; + + // Returns false in case of page fault bool TryReadValue(NIceDb::TNiceDb& db, const TString& key, TString& value, ui32& readStatus) { auto rowset = db.Table<Schema::SmallBlobs>().Key(key).Select<Schema::SmallBlobs::Data>(); @@ -101,4 +118,9 @@ void TTxReadBlobRanges::Complete(const TActorContext& ctx) { LOG_S_DEBUG("TTxReadBlobRanges.Complete at tablet " << Self->TabletID()); } +void TColumnShard::Handle(TEvColumnShard::TEvReadBlobRanges::TPtr& ev, const TActorContext& ctx) { + LOG_S_DEBUG("Read blob ranges at tablet " << TabletID() << ev->Get()->Record); + Execute(new TTxReadBlobRanges(this, ev), ctx); +} + } diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp index 697114c0b84..17a5b58e0c6 100644 --- a/ydb/core/tx/columnshard/columnshard__scan.cpp +++ b/ydb/core/tx/columnshard/columnshard__scan.cpp @@ -17,6 +17,29 @@ namespace NKikimr::NColumnShard { using namespace NKqp; using NBlobCache::TBlobRange; +class TTxScan : public TTxReadBase { +public: + using TReadMetadataPtr = NOlap::TReadMetadataBase::TConstPtr; + + TTxScan(TColumnShard* self, TEvColumnShard::TEvScan::TPtr& ev) + : TTxReadBase(self) + , Ev(ev) + {} + + bool Execute(TTransactionContext& txc, const TActorContext& ctx) override; + void Complete(const TActorContext& ctx) override; + TTxType GetTxType() const override { return TXTYPE_START_SCAN; } + +private: + NOlap::TReadMetadataBase::TConstPtr CreateReadMetadata(const TActorContext& ctx, TReadDescription& read, + bool isIndexStats, bool isReverse, ui64 limit); + +private: + TEvColumnShard::TEvScan::TPtr Ev; + TVector<TReadMetadataPtr> ReadMetadataRanges; +}; + + constexpr ui64 INIT_BATCH_ROWS = 1000; constexpr i64 DEFAULT_READ_AHEAD_BYTES = 1*1024*1024; constexpr TDuration SCAN_HARD_TIMEOUT = TDuration::Minutes(10); @@ -721,4 +744,24 @@ void TTxScan::Complete(const TActorContext& ctx) { scanId, txId, scanGen, requestCookie, table, timeout, std::move(ReadMetadataRanges), dataFormat)); } + +void TColumnShard::Handle(TEvColumnShard::TEvScan::TPtr& ev, const TActorContext& ctx) { + const auto* msg = ev->Get(); + ui64 txId = msg->Record.GetTxId(); + const auto& snapshot = msg->Record.GetSnapshot(); + TRowVersion readVersion(snapshot.GetStep(), snapshot.GetTxId()); + TRowVersion maxReadVersion = GetMaxReadVersion(); + LOG_S_DEBUG("Scan at tablet " << TabletID() << " version=" << readVersion << " readable=" << maxReadVersion); + + if (maxReadVersion < readVersion) { + WaitingScans.emplace(readVersion, std::move(ev)); + WaitPlanStep(readVersion.Step); + return; + } + + ScanTxInFlight.insert({txId, TAppData::TimeProvider->Now()}); + SetCounter(COUNTER_SCAN_IN_FLY, ScanTxInFlight.size()); + Execute(new TTxScan(this, ev), ctx); +} + } diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp index 76fb8669609..af19dcca408 100644 --- a/ydb/core/tx/columnshard/columnshard__write.cpp +++ b/ydb/core/tx/columnshard/columnshard__write.cpp @@ -1,5 +1,4 @@ #include "columnshard_impl.h" -#include "columnshard_txs.h" #include "columnshard_schema.h" #include "blob_manager_db.h" #include "blob_cache.h" @@ -8,6 +7,23 @@ namespace NKikimr::NColumnShard { using namespace NTabletFlatExecutor; +class TTxWrite : public TTransactionBase<TColumnShard> { +public: + TTxWrite(TColumnShard* self, TEvColumnShard::TEvWrite::TPtr& ev) + : TBase(self) + , Ev(ev) + {} + + bool Execute(TTransactionContext& txc, const TActorContext& ctx) override; + void Complete(const TActorContext& ctx) override; + TTxType GetTxType() const override { return TXTYPE_WRITE; } + +private: + TEvColumnShard::TEvWrite::TPtr Ev; + std::unique_ptr<TEvColumnShard::TEvWriteResult> Result; +}; + + bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) { Y_VERIFY(Ev); LOG_S_DEBUG("TTxWrite.Execute at tablet " << Self->TabletID()); @@ -105,4 +121,46 @@ void TTxWrite::Complete(const TActorContext& ctx) { ctx.Send(Ev->Get()->GetSource(), Result.release()); } + +// EvWrite -> WriteActor (attach BlobId without proto changes) -> EvWrite +void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContext& ctx) { + OnYellowChannels(std::move(ev->Get()->YellowMoveChannels), std::move(ev->Get()->YellowStopChannels)); + + auto& data = Proto(ev->Get()).GetData(); + const ui64 tableId = ev->Get()->Record.GetTableId(); + bool error = data.empty() || data.size() > TLimits::MAX_BLOB_SIZE || !PrimaryIndex || !IsTableWritable(tableId) + || ev->Get()->PutStatus == NKikimrProto::ERROR; + + if (error) { + LOG_S_WARN("Write (fail) " << data.size() << " bytes at tablet " << TabletID()); + + ev->Get()->PutStatus = NKikimrProto::ERROR; + Execute(new TTxWrite(this, ev), ctx); + } else if (InsertTable->IsOverloaded(tableId)) { + LOG_S_INFO("Write (overload) " << data.size() << " bytes for table " << tableId << " at tablet " << TabletID()); + + ev->Get()->PutStatus = NKikimrProto::TRYLATER; + Execute(new TTxWrite(this, ev), ctx); + } else if (ev->Get()->BlobId.IsValid()) { + LOG_S_DEBUG("Write (record) " << data.size() << " bytes at tablet " << TabletID()); + + Execute(new TTxWrite(this, ev), ctx); + } else { + if (IsAnyChannelYellowStop()) { + LOG_S_ERROR("Write (out of disk space) at tablet " << TabletID()); + + IncCounter(COUNTER_OUT_OF_SPACE); + ev->Get()->PutStatus = NKikimrProto::TRYLATER; + Execute(new TTxWrite(this, ev), ctx); + } else { + LOG_S_DEBUG("Write (blob) " << data.size() << " bytes at tablet " << TabletID()); + + ev->Get()->MaxSmallBlobSize = Settings.MaxSmallBlobSize; + + ctx.Register(CreateWriteActor(TabletID(), PrimaryIndex->GetIndexInfo(), ctx.SelfID, + BlobManager->StartBlobBatch(), Settings.BlobWriteGrouppingEnabled, ev->Release())); + } + } +} + } diff --git a/ydb/core/tx/columnshard/columnshard__write_index.cpp b/ydb/core/tx/columnshard/columnshard__write_index.cpp index 7b6b67a26b2..23ca51fe5fb 100644 --- a/ydb/core/tx/columnshard/columnshard__write_index.cpp +++ b/ydb/core/tx/columnshard/columnshard__write_index.cpp @@ -8,6 +8,28 @@ namespace NKikimr::NColumnShard { using namespace NTabletFlatExecutor; +/// Common transaction for WriteIndex and GranuleCompaction. +/// For WriteIndex it writes new portion from InsertTable into index. +/// For GranuleCompaction it writes new portion of indexed data and mark old data with "switching" snapshot. +class TTxWriteIndex : public TTransactionBase<TColumnShard> { +public: + TTxWriteIndex(TColumnShard* self, TEvPrivate::TEvWriteIndex::TPtr& ev) + : TBase(self) + , Ev(ev) + {} + + bool Execute(TTransactionContext& txc, const TActorContext& ctx) override; + void Complete(const TActorContext& ctx) override; + TTxType GetTxType() const override { return TXTYPE_WRITE_INDEX; } + +private: + TEvPrivate::TEvWriteIndex::TPtr Ev; + THashMap<TUnifiedBlobId, TString> BlobsToExport; + THashMap<TString, std::vector<NOlap::TEvictedBlob>> TierBlobsToForget; + ui64 ExportNo = 0; +}; + + bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext&) { Y_VERIFY(Ev); Y_VERIFY(Self->InsertTable); @@ -92,6 +114,29 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext&) { } Self->IncCounter(COUNTER_EVICTION_PORTIONS_WRITTEN, changes->PortionsToEvict.size()); + for (auto& [portionInfo, _] : changes->PortionsToEvict) { + auto& tierName = portionInfo.TierName; + if (tierName.empty()) { + continue; + } + + // Mark exported blobs + Y_VERIFY(Self->TierConfigs.count(tierName)); + auto& config = Self->TierConfigs[tierName]; + + if (config.NeedExport()) { + for (auto& rec : portionInfo.Records) { + auto& blobId = rec.BlobRange.BlobId; + if (!BlobsToExport.count(blobId)) { + BlobsToExport.emplace(blobId, tierName); + + NKikimrTxColumnShard::TEvictMetadata meta; + meta.SetTierName(tierName); + Self->BlobManager->ExportOneToOne(blobId, meta, blobManagerDb); + } + } + } + } const auto& portionsToDrop = changes->PortionsToDrop; THashSet<TUnifiedBlobId> blobsToDrop; @@ -104,14 +149,33 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext&) { } // Note: RAW_BYTES_ERASED and BYTES_ERASED counters are not in sync for evicted data - const auto& evictedRecords = changes->EvictedRecords; - for (const auto& rec : evictedRecords) { - blobsToDrop.insert(rec.BlobRange.BlobId); + THashSet<TUnifiedBlobId> blobsToEvict; + for (const auto& rec : changes->EvictedRecords) { + blobsToEvict.insert(rec.BlobRange.BlobId); } - Self->IncCounter(COUNTER_BLOBS_ERASED, blobsToDrop.size()); for (const auto& blobId : blobsToDrop) { + if (Self->BlobManager->DropOneToOne(blobId, blobManagerDb)) { + TEvictMetadata meta; + auto evict = Self->BlobManager->GetDropped(blobId, meta); + Y_VERIFY(evict.State != EEvictState::UNKNOWN); + bool deleted = ui8(evict.State) >= ui8(EEvictState::EXTERN); // !EVICTING and !SELF_CACHED + TierBlobsToForget[meta.GetTierName()].emplace_back(std::move(evict)); + if (deleted) { + continue; + } + } Self->BlobManager->DeleteBlob(blobId, blobManagerDb); + Self->IncCounter(COUNTER_BLOBS_ERASED); + Self->IncCounter(COUNTER_BYTES_ERASED, blobId.BlobSize()); + } + for (const auto& blobId : blobsToEvict) { + if (BlobsToExport.count(blobId)) { + // DS to S3 eviction. Keep source blob in DS till EEvictState::EXTERN state. + continue; + } + Self->BlobManager->DeleteBlob(blobId, blobManagerDb); + Self->IncCounter(COUNTER_BLOBS_ERASED); Self->IncCounter(COUNTER_BYTES_ERASED, blobId.BlobSize()); } @@ -134,6 +198,13 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext&) { << ") cannot write index blobs at tablet " << Self->TabletID()); } + if (BlobsToExport.size()) { + ExportNo = ++Self->LastExportNo; + + NIceDb::TNiceDb db(txc.DB); + Schema::SaveSpecialValue(db, Schema::EValueIds::LastExportNumber, Self->LastExportNo); + } + if (changes->IsInsert()) { Self->ActiveIndexing = false; @@ -180,6 +251,67 @@ void TTxWriteIndex::Complete(const TActorContext& ctx) { } else { Self->EnqueueBackgroundActivities(); } + + if (ExportNo) { + Y_VERIFY(BlobsToExport.size()); + THashMap<TString, THashMap<TUnifiedBlobId, TString>> tierBlobs; + for (auto& [blobId, tierName] : BlobsToExport) { + tierBlobs[tierName].emplace(blobId, TString()); + } + ctx.Send(Self->SelfId(), new TEvPrivate::TEvExport(ExportNo, std::move(tierBlobs))); + } + + for (auto [tierName, blobs] : TierBlobsToForget) { + if (!Self->S3Actors.count(tierName)) { + TString tier(tierName); + LOG_S_ERROR("No S3 actor for tier '" << tier << "' (on forget) at tablet " << Self->TabletID()); + continue; + } + auto& s3 = Self->S3Actors[tierName]; + if (!s3) { + TString tier(tierName); + LOG_S_ERROR("Not started S3 actor for tier '" << tier << "' (on forget) at tablet " << Self->TabletID()); + continue; + } + + auto forget = std::make_unique<TEvPrivate::TEvForget>(); + forget->Evicted = std::move(blobs); + ctx.Send(s3, forget.release()); + } +} + + +void TColumnShard::Handle(TEvPrivate::TEvWriteIndex::TPtr& ev, const TActorContext& ctx) { + auto& blobs = ev->Get()->Blobs; + bool isCompaction = ev->Get()->GranuleCompaction; + if (isCompaction && blobs.empty()) { + ev->Get()->PutStatus = NKikimrProto::OK; + } + + if (ev->Get()->PutStatus == NKikimrProto::UNKNOWN) { + if (IsAnyChannelYellowStop()) { + LOG_S_ERROR("WriteIndex (out of disk space) at tablet " << TabletID()); + + IncCounter(COUNTER_OUT_OF_SPACE); + ev->Get()->PutStatus = NKikimrProto::TRYLATER; + Execute(new TTxWriteIndex(this, ev), ctx); + } else { + LOG_S_DEBUG("WriteIndex (" << blobs.size() << " blobs) at tablet " << TabletID()); + + Y_VERIFY(!blobs.empty()); + ctx.Register(CreateWriteActor(TabletID(), NOlap::TIndexInfo("dummy", 0), ctx.SelfID, + BlobManager->StartBlobBatch(), Settings.BlobWriteGrouppingEnabled, ev->Release())); + } + } else { + if (ev->Get()->PutStatus == NKikimrProto::OK) { + LOG_S_DEBUG("WriteIndex (records) at tablet " << TabletID()); + } else { + LOG_S_INFO("WriteIndex error at tablet " << TabletID()); + } + + OnYellowChannels(std::move(ev->Get()->YellowMoveChannels), std::move(ev->Get()->YellowStopChannels)); + Execute(new TTxWriteIndex(this, ev), ctx); + } } } diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index b1b866aa760..abd00790d9a 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -843,15 +843,57 @@ NOlap::TIndexInfo TColumnShard::ConvertSchema(const NKikimrSchemeOp::TColumnTabl } for (auto& tierConfig : schema.GetStorageTiers()) { - NOlap::TStorageTier tier; - tier.Name = tierConfig.GetName(); + auto& tierName = tierConfig.GetName(); + TierConfigs[tierName] = TTierConfig{tierConfig}; + + NOlap::TStorageTier tier{ .Name = tierName }; if (tierConfig.HasCompression()) { tier.Compression = ConvertCompression(tierConfig.GetCompression()); } + if (TierConfigs[tierName].NeedExport()) { + S3Actors[tierName] = {}; // delayed actor creation + } indexInfo.AddStorageTier(std::move(tier)); } return indexInfo; } +ui32 TColumnShard::InitS3Actors(const TActorContext& ctx, bool init) { + ui32 count = 0; +#ifndef KIKIMR_DISABLE_S3_OPS + for (auto& [tierName, actor] : S3Actors) { + if (!init && actor) { + continue; + } + + Y_VERIFY(!actor); + Y_VERIFY(TierConfigs.count(tierName)); + auto& tierConfig = TierConfigs[tierName]; + Y_VERIFY(tierConfig.NeedExport()); + + actor = ctx.Register(CreateS3Actor(TabletID(), ctx.SelfID, tierName)); + ctx.Send(actor, new TEvPrivate::TEvS3Settings(tierConfig.S3Settings())); + ++count; + } +#else + Y_UNUSED(ctx); + Y_UNUSED(init); +#endif + return count; +} + +void TColumnShard::StopS3Actors(const TActorContext& ctx) { +#ifndef KIKIMR_DISABLE_S3_OPS + for (auto& [_, actor] : S3Actors) { + if (actor) { + ctx.Send(actor, new TEvents::TEvPoisonPill); + actor = {}; + } + } +#else + Y_UNUSED(ctx); +#endif +} + } diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h index b067efa3d72..b0959555536 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.h +++ b/ydb/core/tx/columnshard/columnshard_impl.h @@ -17,6 +17,28 @@ namespace NKikimr::NColumnShard { extern bool gAllowLogBatchingDefaultValue; +IActor* CreateIndexingActor(ui64 tabletId, const TActorId& parent); +IActor* CreateCompactionActor(ui64 tabletId, const TActorId& parent); +IActor* CreateEvictionActor(ui64 tabletId, const TActorId& parent); +IActor* CreateWriteActor(ui64 tabletId, const NOlap::TIndexInfo& indexTable, + const TActorId& dstActor, TBlobBatch&& blobBatch, bool blobGrouppingEnabled, + TAutoPtr<TEvColumnShard::TEvWrite> ev, const TInstant& deadline = TInstant::Max()); +IActor* CreateWriteActor(ui64 tabletId, const NOlap::TIndexInfo& indexTable, + const TActorId& dstActor, TBlobBatch&& blobBatch, bool blobGrouppingEnabled, + TAutoPtr<TEvPrivate::TEvWriteIndex> ev, const TInstant& deadline = TInstant::Max()); +IActor* CreateReadActor(ui64 tabletId, + const TActorId& dstActor, + std::unique_ptr<TEvColumnShard::TEvReadResult>&& event, + NOlap::TReadMetadata::TConstPtr readMetadata, + const TInstant& deadline, + const TActorId& columnShardActorId, + ui64 requestCookie); +IActor* CreateColumnShardScan(const TActorId& scanComputeActor, ui32 scanId, ui64 txId); +IActor* CreateExportActor(ui64 tabletId, const TActorId& dstActor, TAutoPtr<TEvPrivate::TEvExport> ev); +#ifndef KIKIMR_DISABLE_S3_OPS +IActor* CreateS3Actor(ui64 tabletId, const TActorId& parent, const TString& tierName); +#endif + struct TSettings { TControlWrapper BlobWriteGrouppingEnabled; TControlWrapper CacheDataAfterIndexing; @@ -57,6 +79,8 @@ class TColumnShard friend class TTxRead; friend class TTxScan; friend class TTxWriteIndex; + friend class TTxExport; + friend class TTxForget; friend class TTxRunGC; friend class TTxProcessGCResult; friend class TTxReadBlobRanges; @@ -85,6 +109,8 @@ class TColumnShard void Handle(TEvPrivate::TEvReadFinished::TPtr &ev, const TActorContext &ctx); void Handle(TEvPrivate::TEvPeriodicWakeup::TPtr& ev, const TActorContext& ctx); void Handle(TEvPrivate::TEvWriteIndex::TPtr& ev, const TActorContext& ctx); + void Handle(TEvPrivate::TEvExport::TPtr& ev, const TActorContext& ctx); + void Handle(TEvPrivate::TEvForget::TPtr& ev, const TActorContext& ctx); void Handle(TEvBlobStorage::TEvCollectGarbageResult::TPtr& ev, const TActorContext& ctx); ITransaction* CreateTxInitSchema(); @@ -178,6 +204,8 @@ protected: HFunc(TEvMediatorTimecast::TEvNotifyPlanStep, Handle); HFunc(TEvBlobStorage::TEvCollectGarbageResult, Handle); HFunc(TEvPrivate::TEvWriteIndex, Handle); + HFunc(TEvPrivate::TEvExport, Handle); + HFunc(TEvPrivate::TEvForget, Handle); HFunc(TEvPrivate::TEvScanStats, Handle); HFunc(TEvPrivate::TEvReadFinished, Handle); HFunc(TEvPrivate::TEvPeriodicWakeup, Handle); @@ -283,6 +311,21 @@ private: } }; + struct TTierConfig { + using TTierProto = NKikimrSchemeOp::TStorageTierConfig; + using TS3SettingsProto = NKikimrSchemeOp::TS3Settings; + + TTierProto Proto; + + bool NeedExport() const { + return Proto.HasObjectStorage(); + } + + const TS3SettingsProto& S3Settings() const { + return Proto.GetObjectStorage(); + } + }; + struct TLongTxWriteInfo { ui64 WriteId; NLongTxService::TLongTxId LongTxId; @@ -296,6 +339,7 @@ private: ui64 LastPlannedStep = 0; ui64 LastPlannedTxId = 0; ui64 LastCompactedGranule = 0; + ui64 LastExportNo = 0; TIntrusivePtr<TMediatorTimecastEntry> MediatorTimeCastEntry; bool MediatorTimeCastRegistered = false; @@ -309,11 +353,13 @@ private: TActorId IndexingActor; // It's logically bounded to 1: we move each portion of data to multiple indices. TActorId CompactionActor; // It's memory bounded to 1: we have no memory for parallel compation. TActorId EvictionActor; + THashMap<TString, TActorId> S3Actors; std::unique_ptr<TTabletCountersBase> TabletCountersPtr; TTabletCountersBase* TabletCounters; std::unique_ptr<NTabletPipe::IClientCache> PipeClientCache; std::unique_ptr<NOlap::TInsertTable> InsertTable; std::unique_ptr<NOlap::IColumnEngine> PrimaryIndex; + THashMap<TString, TTierConfig> TierConfigs; TTtl Ttl; THashMap<ui64, TBasicTxInfo> BasicTxInfo; @@ -379,6 +425,8 @@ private: void SetPrimaryIndex(TMap<NOlap::TSnapshot, NOlap::TIndexInfo>&& schemaVersions); NOlap::TIndexInfo ConvertSchema(const NKikimrSchemeOp::TColumnTableSchema& schema); + ui32 InitS3Actors(const TActorContext& ctx, bool init); + void StopS3Actors(const TActorContext& ctx); std::unique_ptr<TEvPrivate::TEvIndexing> SetupIndexation(); std::unique_ptr<TEvPrivate::TEvCompaction> SetupCompaction(); diff --git a/ydb/core/tx/columnshard/columnshard_schema.h b/ydb/core/tx/columnshard/columnshard_schema.h index 9c178796aed..229fc91c4c8 100644 --- a/ydb/core/tx/columnshard/columnshard_schema.h +++ b/ydb/core/tx/columnshard/columnshard_schema.h @@ -44,9 +44,9 @@ struct Schema : NIceDb::Schema { LastPlannedTxId = 5, LastSchemaSeqNoGeneration = 6, LastSchemaSeqNoRound = 7, - LastGcBarrierGen = 8, LastGcBarrierStep = 9, + LastExportNumber = 10, }; enum class EInsertTableIds : ui8 { @@ -168,6 +168,20 @@ struct Schema : NIceDb::Schema { using TColumns = TableColumns<BlobId, Data>; }; + struct OneToOneEvictedBlobs : Table<13> { + struct BlobId : Column<1, NScheme::NTypeIds::String> {}; + struct Size : Column<2, NScheme::NTypeIds::Uint32> {}; // extracted from BlobId for better introspection + struct State : Column<3, NScheme::NTypeIds::Byte> {}; // evicting -> (self) cached <-> exported + struct Dropped : Column<4, NScheme::NTypeIds::Bool> {}; + struct Metadata : Column<5, NScheme::NTypeIds::String> {}; // NKikimrTxColumnShard.TEvictMetadata + struct ExternBlobId : Column<6, NScheme::NTypeIds::String> {}; + //struct Format : Column<7, NScheme::NTypeIds::Byte> {}; + //struct CachedBlobId : Column<8, NScheme::NTypeIds::String> {}; // TODO + + using TKey = TableKey<BlobId>; + using TColumns = TableColumns<BlobId, Size, State, Dropped, Metadata, ExternBlobId>; + }; + // Index tables // InsertTable - common for all indices @@ -241,7 +255,8 @@ struct Schema : NIceDb::Schema { IndexGranules, IndexColumns, IndexCounters, - SmallBlobs + SmallBlobs, + OneToOneEvictedBlobs >; // diff --git a/ydb/core/tx/columnshard/columnshard_txs.h b/ydb/core/tx/columnshard/columnshard_txs.h index a5662a6876e..a3ecac75a39 100644 --- a/ydb/core/tx/columnshard/columnshard_txs.h +++ b/ydb/core/tx/columnshard/columnshard_txs.h @@ -10,10 +10,6 @@ #include <ydb/core/tablet_flat/tablet_flat_executor.h> #include <ydb/core/tx/tx_processing.h> -namespace arrow { - class Schema; -} - namespace NKikimr::NColumnShard { struct TEvPrivate { @@ -24,6 +20,9 @@ struct TEvPrivate { EvReadFinished, EvPeriodicWakeup, EvEviction, + EvS3Settings, + EvExport, + EvForget, EvEnd }; @@ -85,6 +84,47 @@ struct TEvPrivate { } }; + struct TEvS3Settings : public TEventLocal<TEvS3Settings, EvS3Settings> { + NKikimrSchemeOp::TS3Settings Settings; + + explicit TEvS3Settings(const NKikimrSchemeOp::TS3Settings& settings) + : Settings(settings) + {} + }; + + struct TEvExport : public TEventLocal<TEvExport, EvExport> { + using TBlobDataMap = THashMap<TUnifiedBlobId, TString>; + + NKikimrProto::EReplyStatus Status = NKikimrProto::UNKNOWN; + ui64 ExportNo{}; + TActorId DstActor; + THashMap<TString, TBlobDataMap> TierBlobs; + TBlobDataMap Blobs; + THashMap<TUnifiedBlobId, TUnifiedBlobId> SrcToDstBlobs; + TString ErrorStr; + + explicit TEvExport(ui64 exportNo, THashMap<TString, TBlobDataMap>&& tierBlobs) + : ExportNo(exportNo) + , TierBlobs(std::move(tierBlobs)) + { + Y_VERIFY(ExportNo); + } + + TEvExport(ui64 exportNo, TActorId dstActor, TBlobDataMap&& blobs) + : ExportNo(exportNo) + , DstActor(dstActor) + , Blobs(std::move(blobs)) + { + Y_VERIFY(ExportNo); + } + }; + + struct TEvForget : public TEventLocal<TEvForget, EvForget> { + NKikimrProto::EReplyStatus Status = NKikimrProto::UNKNOWN; + std::vector<NOlap::TEvictedBlob> Evicted; + TString ErrorStr; + }; + struct TEvScanStats : public TEventLocal<TEvScanStats, EvScanStats> { TEvScanStats(ui64 rows, ui64 bytes) : Rows(rows), Bytes(bytes) {} ui64 Rows; @@ -155,58 +195,6 @@ public: TTxType GetTxType() const override { return TXTYPE_UPDATE_SCHEMA; } }; -/// Propose deterministic tx -class TTxProposeTransaction : public NTabletFlatExecutor::TTransactionBase<TColumnShard> { -public: - TTxProposeTransaction(TColumnShard* self, TEvColumnShard::TEvProposeTransaction::TPtr& ev) - : TBase(self) - , Ev(ev) - {} - - bool Execute(TTransactionContext& txc, const TActorContext& ctx) override; - void Complete(const TActorContext& ctx) override; - TTxType GetTxType() const override { return TXTYPE_PROPOSE; } - -private: - TEvColumnShard::TEvProposeTransaction::TPtr Ev; - std::unique_ptr<TEvColumnShard::TEvProposeTransactionResult> Result; -}; - -/// Plan deterministic txs -class TTxPlanStep : public NTabletFlatExecutor::TTransactionBase<TColumnShard> { -public: - TTxPlanStep(TColumnShard* self, TEvTxProcessing::TEvPlanStep::TPtr& ev) - : TBase(self) - , Ev(ev) - {} - - bool Execute(TTransactionContext& txc, const TActorContext& ctx) override; - void Complete(const TActorContext& ctx) override; - TTxType GetTxType() const override { return TXTYPE_PLANSTEP; } - -private: - TEvTxProcessing::TEvPlanStep::TPtr Ev; - THashMap<TActorId, TVector<ui64>> TxAcks; - std::unique_ptr<TEvTxProcessing::TEvPlanStepAccepted> Result; -}; - -/// Write portion of data in OLAP transaction -class TTxWrite : public TTransactionBase<TColumnShard> { -public: - TTxWrite(TColumnShard* self, TEvColumnShard::TEvWrite::TPtr& ev) - : TBase(self) - , Ev(ev) - {} - - bool Execute(TTransactionContext& txc, const TActorContext& ctx) override; - void Complete(const TActorContext& ctx) override; - TTxType GetTxType() const override { return TXTYPE_WRITE; } - -private: - TEvColumnShard::TEvWrite::TPtr Ev; - std::unique_ptr<TEvColumnShard::TEvWriteResult> Result; -}; - /// Read portion of data in OLAP transaction class TTxReadBase : public TTransactionBase<TColumnShard> { protected: @@ -234,79 +222,4 @@ protected: TString ErrorDescription; }; -class TTxRead : public TTxReadBase { -public: - TTxRead(TColumnShard* self, TEvColumnShard::TEvRead::TPtr& ev) - : TTxReadBase(self) - , Ev(ev) - {} - - bool Execute(TTransactionContext& txc, const TActorContext& ctx) override; - void Complete(const TActorContext& ctx) override; - TTxType GetTxType() const override { return TXTYPE_READ; } - -private: - TEvColumnShard::TEvRead::TPtr Ev; - std::unique_ptr<TEvColumnShard::TEvReadResult> Result; - NOlap::TReadMetadata::TConstPtr ReadMetadata; -}; - -class TTxScan : public TTxReadBase { -public: - using TReadMetadataPtr = NOlap::TReadMetadataBase::TConstPtr; - - TTxScan(TColumnShard* self, TEvColumnShard::TEvScan::TPtr& ev) - : TTxReadBase(self) - , Ev(ev) - {} - - bool Execute(TTransactionContext& txc, const TActorContext& ctx) override; - void Complete(const TActorContext& ctx) override; - TTxType GetTxType() const override { return TXTYPE_START_SCAN; } - -private: - NOlap::TReadMetadataBase::TConstPtr CreateReadMetadata(const TActorContext& ctx, TReadDescription& read, - bool isIndexStats, bool isReverse, ui64 limit); - -private: - TEvColumnShard::TEvScan::TPtr Ev; - TVector<TReadMetadataPtr> ReadMetadataRanges; -}; - - -class TTxReadBlobRanges : public TTransactionBase<TColumnShard> { -public: - TTxReadBlobRanges(TColumnShard* self, TEvColumnShard::TEvReadBlobRanges::TPtr& ev) - : TTransactionBase<TColumnShard>(self) - , Ev(ev) - {} - - bool Execute(TTransactionContext& txc, const TActorContext& ctx) override; - void Complete(const TActorContext& ctx) override; - TTxType GetTxType() const override { return TXTYPE_READ_BLOB_RANGES; } - -private: - TEvColumnShard::TEvReadBlobRanges::TPtr Ev; - std::unique_ptr<TEvColumnShard::TEvReadBlobRangesResult> Result; -}; - - -/// Common transaction for WriteIndex and GranuleCompaction. -/// For WriteIndex it writes new portion from InsertTable into index. -/// For GranuleCompaction it writes new portion of indexed data and mark old data with "switching" snapshot. -class TTxWriteIndex : public TTransactionBase<TColumnShard> { -public: - TTxWriteIndex(TColumnShard* self, TEvPrivate::TEvWriteIndex::TPtr& ev) - : TBase(self) - , Ev(ev) - {} - - bool Execute(TTransactionContext& txc, const TActorContext& ctx) override; - void Complete(const TActorContext& ctx) override; - TTxType GetTxType() const override { return TXTYPE_WRITE_INDEX; } - -private: - TEvPrivate::TEvWriteIndex::TPtr Ev; -}; - } diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.cpp b/ydb/core/tx/columnshard/columnshard_ut_common.cpp index fea0855cee2..114f6a87c58 100644 --- a/ydb/core/tx/columnshard/columnshard_ut_common.cpp +++ b/ydb/core/tx/columnshard/columnshard_ut_common.cpp @@ -15,6 +15,7 @@ void TTester::Setup(TTestActorRuntime& runtime) { runtime.SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NActors::NLog::PRI_DEBUG); runtime.SetLogPriority(NKikimrServices::BLOB_CACHE, NActors::NLog::PRI_INFO); runtime.SetLogPriority(NKikimrServices::TX_COLUMNSHARD_SCAN, NActors::NLog::PRI_DEBUG); + runtime.SetLogPriority(NKikimrServices::S3_WRAPPER, NLog::PRI_DEBUG); ui32 domainId = 0; ui32 planResolution = 500; diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.h b/ydb/core/tx/columnshard/columnshard_ut_common.h index 4920b5c694a..8576efa7a10 100644 --- a/ydb/core/tx/columnshard/columnshard_ut_common.h +++ b/ydb/core/tx/columnshard/columnshard_ut_common.h @@ -30,8 +30,8 @@ struct TTestSchema { TString Codec; std::optional<int> CompressionLevel; std::optional<ui32> EvictAfterSeconds; - //ui64 EvictAfterBytes; // TODO TString TtlColumn; + std::optional<NKikimrSchemeOp::TS3Settings> S3; TStorageTier(const TString& name = {}) : Name(name) @@ -180,6 +180,9 @@ struct TTestSchema { if (tier.CompressionLevel) { t->MutableCompression()->SetCompressionLevel(*tier.CompressionLevel); } + if (tier.S3) { + t->MutableObjectStorage()->CopyFrom(*tier.S3); + } } } diff --git a/ydb/core/tx/columnshard/export_actor.cpp b/ydb/core/tx/columnshard/export_actor.cpp new file mode 100644 index 00000000000..3963279ffc8 --- /dev/null +++ b/ydb/core/tx/columnshard/export_actor.cpp @@ -0,0 +1,106 @@ +#include "columnshard_impl.h" +#include "blob_cache.h" + +#include <ydb/core/blobstorage/dsproxy/blobstorage_backoff.h> +#include <library/cpp/actors/core/actor_bootstrapped.h> + +namespace NKikimr::NColumnShard { + +class TExportActor : public TActorBootstrapped<TExportActor> { +public: + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::TX_COLUMNSHARD_EXPORT_ACTOR; + } + + TExportActor(ui64 tabletId, const TActorId& parent, TAutoPtr<TEvPrivate::TEvExport> ev) + : TabletId(tabletId) + , Parent(parent) + , BlobCacheActorId(NBlobCache::MakeBlobCacheServiceId()) + , Event(ev.Release()) + { + Y_VERIFY(Event); + } + + void Handle(NBlobCache::TEvBlobCache::TEvReadBlobRangeResult::TPtr& ev, const TActorContext& ctx) { + LOG_S_DEBUG("TEvReadBlobRangeResult (waiting " << BlobsToRead.size() + << ") at tablet " << TabletId << " (export)"); + + auto& event = *ev->Get(); + const TUnifiedBlobId& blobId = event.BlobRange.BlobId; + if (event.Status != NKikimrProto::EReplyStatus::OK) { + LOG_S_ERROR("TEvReadBlobRangeResult cannot get blob " << blobId << " status " << event.Status + << " at tablet " << TabletId << " (export)"); + + BlobsToRead.erase(blobId); + Event->Status = event.Status; + if (Event->Status == NKikimrProto::UNKNOWN) { + Event->Status = NKikimrProto::ERROR; + } + return; + } + + TString blobData = event.Data; + Y_VERIFY(blobData.size() == blobId.BlobSize()); + + if (!BlobsToRead.count(blobId)) { + return; + } + + BlobsToRead.erase(blobId); + + Y_VERIFY(Event); + Event->Blobs[blobId] = blobData; + + if (BlobsToRead.empty()) { + SendResultAndDie(ctx); + } + } + + void Bootstrap(const TActorContext& /*ctx*/) { + LOG_S_DEBUG("Exporting " << Event->Blobs.size() << " blobs at tablet " << TabletId); + + SendReads(); + Become(&TThis::StateWait); + } + + STFUNC(StateWait) { + switch (ev->GetTypeRewrite()) { + HFunc(NBlobCache::TEvBlobCache::TEvReadBlobRangeResult, Handle); + default: + break; + } + } + +private: + ui64 TabletId; + TActorId Parent; + TActorId BlobCacheActorId; + std::unique_ptr<TEvPrivate::TEvExport> Event; + THashSet<TUnifiedBlobId> BlobsToRead; + + void SendReads() { + for (auto& [blobId, _] : Event->Blobs) { + BlobsToRead.emplace(blobId); + SendReadRequest(NBlobCache::TBlobRange(blobId, 0, blobId.BlobSize())); + } + } + + void SendReadRequest(const NBlobCache::TBlobRange& blobRange) { + Y_VERIFY(!blobRange.Offset); + Y_VERIFY(blobRange.Size); + Send(BlobCacheActorId, new NBlobCache::TEvBlobCache::TEvReadBlobRange(blobRange, false)); + } + + void SendResultAndDie(const TActorContext& ctx) { + auto s3Actor = Event->DstActor; + Event->DstActor = Parent; + ctx.Send(s3Actor, Event.release()); + Die(ctx); + } +}; + +IActor* CreateExportActor(ui64 tabletId, const TActorId& dstActor, TAutoPtr<TEvPrivate::TEvExport> ev) { + return new TExportActor(tabletId, dstActor, ev); +} + +} diff --git a/ydb/core/tx/columnshard/indexing_actor.cpp b/ydb/core/tx/columnshard/indexing_actor.cpp index 5155051a9a5..4117e9f57fb 100644 --- a/ydb/core/tx/columnshard/indexing_actor.cpp +++ b/ydb/core/tx/columnshard/indexing_actor.cpp @@ -17,7 +17,6 @@ public: {} void Handle(TEvPrivate::TEvIndexing::TPtr& ev, const TActorContext& ctx) { - Y_UNUSED(ev); LOG_S_DEBUG("TEvIndexing at tablet " << TabletId << " (index)"); auto& event = *ev->Get(); @@ -31,7 +30,7 @@ public: auto& blobId = blobsToIndex[i].BlobId; auto res = BlobsToRead.emplace(blobId, i); Y_VERIFY(res.second, "Duplicate blob in DataToIndex: %s", blobId.ToStringNew().c_str()); - SendReadRequest(ctx, blobId); + SendReadRequest(NBlobCache::TBlobRange(blobId, 0, blobId.BlobSize())); } if (BlobsToRead.empty()) { @@ -99,10 +98,9 @@ private: std::unique_ptr<TEvPrivate::TEvWriteIndex> TxEvent; THashMap<TUnifiedBlobId, ui32> BlobsToRead; - void SendReadRequest(const TActorContext&, const TUnifiedBlobId& blobId) { - Y_VERIFY(blobId.BlobSize()); - Send(BlobCacheActorId, - new NBlobCache::TEvBlobCache::TEvReadBlobRange(NBlobCache::TBlobRange(blobId, 0, blobId.BlobSize()), false)); + void SendReadRequest(const NBlobCache::TBlobRange& blobRange) { + Y_VERIFY(blobRange.Size); + Send(BlobCacheActorId, new NBlobCache::TEvBlobCache::TEvReadBlobRange(blobRange, false)); } void Index(const TActorContext& ctx) { diff --git a/ydb/core/tx/columnshard/s3_actor.cpp b/ydb/core/tx/columnshard/s3_actor.cpp new file mode 100644 index 00000000000..8d61847214f --- /dev/null +++ b/ydb/core/tx/columnshard/s3_actor.cpp @@ -0,0 +1,385 @@ +#ifndef KIKIMR_DISABLE_S3_OPS + +#include "defs.h" +#include "columnshard_impl.h" + +#include <ydb/core/protos/flat_scheme_op.pb.h> +#include <ydb/core/tx/datashard/s3_common.h> +#include <ydb/core/wrappers/s3_wrapper.h> + +namespace NKikimr::NColumnShard { + +using NWrappers::TEvS3Wrapper; + +namespace { + +struct TS3Export { + std::unique_ptr<TEvPrivate::TEvExport> Event; + THashSet<TString> KeysToWrite; + + TS3Export() = default; + + explicit TS3Export(TAutoPtr<TEvPrivate::TEvExport> ev) + : Event(ev.Release()) + {} + + THashMap<TUnifiedBlobId, TString>& Blobs() { + return Event->Blobs; + } + + TUnifiedBlobId AddExported(const TString& bucket, const TUnifiedBlobId& srcBlob) { + Event->SrcToDstBlobs[srcBlob] = TUnifiedBlobId(srcBlob, TUnifiedBlobId::S3_BLOB, bucket); + return Event->SrcToDstBlobs[srcBlob]; + } +}; + +struct TS3Forget { + std::unique_ptr<TEvPrivate::TEvForget> Event; + THashSet<TString> KeysToDelete; + + TS3Forget() = default; + + explicit TS3Forget(TAutoPtr<TEvPrivate::TEvForget> ev) + : Event(ev.Release()) + {} +}; + +// S3 objects need InitAPI() called frist. TS3User calls it in ctor. +struct TAwsContext : private NWrappers::TS3User { + Aws::Client::ClientConfiguration Config; + Aws::Auth::AWSCredentials Credentials; + TActorId Client; // S3Wrapper should be created after API owner too + + void SetConfig(const NKikimrSchemeOp::TS3Settings& settings) { + Config = NDataShard::ConfigFromSettings(settings); + Credentials = NDataShard::CredentialsFromSettings(settings); + } + + IActor* CreateS3Wrapper() const { + return NWrappers::CreateS3Wrapper(Credentials, Config); + } +}; + +} + + +class TS3Actor : public TActorBootstrapped<TS3Actor> { +public: + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::TX_COLUMNSHARD_S3_ACTOR; + } + + TS3Actor(ui64 tabletId, const TActorId& parent, const TString& tierName) + : TabletId(tabletId) + , ShardActor(parent) + , TierName(tierName) + {} + + void Bootstrap() { + LOG_S_DEBUG("[S3] Starting actor for tier '" << TierName << "' at tablet " << TabletId); + Become(&TThis::StateWait); + } + + void Handle(TEvPrivate::TEvS3Settings::TPtr& ev) { + auto& msg = *ev->Get(); + auto& endpoint = msg.Settings.GetEndpoint(); + Bucket = msg.Settings.GetBucket(); + + LOG_S_DEBUG("[S3] Update settings for tier '" << TierName << "' endpoint '" << endpoint + << "' bucket '" << Bucket << "' at tablet " << TabletId); + + if (endpoint.empty()) { + LOG_S_ERROR("[S3] No endpoint in settings for tier '" << TierName << "' at tablet " << TabletId); + return; + } + if (Bucket.empty()) { + LOG_S_ERROR("[S3] No bucket in settings for tier '" << TierName << "' at tablet " << TabletId); + return; + } + + S3Ctx.SetConfig(msg.Settings); + if (S3Ctx.Client) { + Send(S3Ctx.Client, new TEvents::TEvPoisonPill); + S3Ctx.Client = {}; + } + S3Ctx.Client = this->RegisterWithSameMailbox(S3Ctx.CreateS3Wrapper()); + } + + void Handle(TEvPrivate::TEvExport::TPtr& ev) { + auto& msg = *ev->Get(); + ui64 exportNo = msg.ExportNo; + + Y_VERIFY(!Exports.count(exportNo)); + Exports[exportNo] = TS3Export(ev->Release()); + auto& ex = Exports[exportNo]; + + for (auto& [blobId, blob] : ex.Blobs()) { + TString key = ex.AddExported(Bucket, blobId).GetS3Key(); + Y_VERIFY(!ExportingKeys.count(key)); // TODO + + ex.KeysToWrite.emplace(key); + ExportingKeys[key] = exportNo; + SendPutObject(key, std::move(blob)); + } + } + + void Handle(TEvPrivate::TEvForget::TPtr& ev) { + ui64 forgetNo = ++ForgetNo; + + Forgets[forgetNo] = TS3Forget(ev->Release()); + auto& forget = Forgets[forgetNo]; + + for (auto& evict : forget.Event->Evicted) { + if (!evict.ExternBlob.IsValid()) { + LOG_S_INFO("[S3] Forget not exported '" << evict.Blob.ToStringNew() << "' at tablet " << TabletId); + continue; // TODO + } + + TString key = evict.ExternBlob.GetS3Key(); + Y_VERIFY(!ForgettingKeys.count(key)); // TODO + + forget.KeysToDelete.emplace(key); + ForgettingKeys[key] = forgetNo; + SendDeleteObject(key); + } + } + + // TODO: clean written blobs in failed export + void Handle(TEvS3Wrapper::TEvPutObjectResponse::TPtr& ev) { + Y_VERIFY(Initialized()); + + auto& msg = *ev->Get(); + const auto& resultOutcome = msg.Result; + + TString errStr; + if (!resultOutcome.IsSuccess()) { + errStr = LogError("PutObjectResponse", resultOutcome.GetError(), !!msg.Key); + } + + Y_VERIFY(msg.Key); // FIXME + TString key = *msg.Key; + + LOG_S_DEBUG("[S3] PutObjectResponse '" << key << "' at tablet " << TabletId); + + if (!ExportingKeys.count(key)) { + LOG_S_DEBUG("[S3] PutObjectResponse for unknown key '" << key << "' at tablet " << TabletId); + return; + } + + ui64 exportNo = ExportingKeys[key]; + ExportingKeys.erase(key); + + if (!Exports.count(exportNo)) { + LOG_S_DEBUG("[S3] PutObjectResponse for unknown export with key '" << key << "' at tablet " << TabletId); + return; + } + + auto& ex = Exports[exportNo]; + ex.KeysToWrite.erase(key); + Y_VERIFY(ex.Event->DstActor == ShardActor); + + if (!errStr.empty()) { + ex.Event->Status = NKikimrProto::ERROR; + ex.Event->ErrorStr = errStr; + Send(ShardActor, ex.Event.release()); + Exports.erase(exportNo); + } else if (ex.KeysToWrite.empty()) { + ex.Event->Status = NKikimrProto::OK; + Send(ShardActor, ex.Event.release()); + Exports.erase(exportNo); + } + } + + void Handle(TEvS3Wrapper::TEvDeleteObjectResponse::TPtr& ev) { + Y_VERIFY(Initialized()); + + auto& msg = *ev->Get(); + const auto& resultOutcome = msg.Result; + + TString errStr; + if (!resultOutcome.IsSuccess()) { + errStr = LogError("DeleteObjectResponse", resultOutcome.GetError(), !!msg.Key); + } + + Y_VERIFY(msg.Key); // FIXME + TString key = *msg.Key; + + LOG_S_DEBUG("[S3] DeleteObjectResponse '" << key << "' at tablet " << TabletId); + + if (!ForgettingKeys.count(key)) { + LOG_S_DEBUG("[S3] DeleteObjectResponse for unknown key '" << key << "' at tablet " << TabletId); + return; + } + + ui64 forgetNo = ForgettingKeys[key]; + ForgettingKeys.erase(key); + + if (!Forgets.count(forgetNo)) { + LOG_S_DEBUG("[S3] DeleteObjectResponse for unknown forget with key '" << key << "' at tablet " << TabletId); + return; + } + + auto& forget = Forgets[forgetNo]; + forget.KeysToDelete.erase(key); + + if (!errStr.empty()) { + forget.Event->Status = NKikimrProto::ERROR; + forget.Event->ErrorStr = errStr; + Send(ShardActor, forget.Event.release()); + Forgets.erase(forgetNo); + } else if (forget.KeysToDelete.empty()) { + forget.Event->Status = NKikimrProto::OK; + Send(ShardActor, forget.Event.release()); + Forgets.erase(forgetNo); + } + } +#if 0 + void Handle(TEvS3Wrapper::TEvHeadObjectResponse::TPtr& ev) { + Y_VERIFY(Initialized()); + + auto& msg = *ev->Get(); + const auto& key = msg.Key; + const auto& resultOutcome = msg.Result; + + TString errStr; + if (!resultOutcome.IsSuccess()) { + errStr = LogError("HeadObjectResponse", resultOutcome.GetError(), !!key); + } + + if (!errStr.empty()) { + //Send(ShardActor, new TEvPrivate::TEvGetResponse(*key, {}, errStr)); + } + + Y_VERIFY(key); + ui64 contentLength = resultOutcome.GetResult().GetContentLength(); + LOG_S_DEBUG("HeadObjectResponse '" << *key << "', size: " << contentLength << " at tablet " << TabletId); + + //Send(ShardActor, new TEvPrivate::TEvGetResponse(*key, {})); + } + + void Handle(TEvS3Wrapper::TEvGetObjectResponse::TPtr& ev) { + Y_VERIFY(Initialized()); + + auto& msg = *ev->Get(); + const auto& key = msg.Key; + const auto& data = msg.Body; + const auto& resultOutcome = msg.Result; + + TString errStr; + if (!resultOutcome.IsSuccess()) { + errStr = LogError("GetObjectResponse", resultOutcome.GetError(), !!key); + } + + if (!errStr.empty()) { + //Send(ShardActor, new TEvPrivate::TEvGetResponse(*key, {}, errStr)); + } + + // TODO: CheckETag + + Y_VERIFY(key); + LOG_S_DEBUG("GetObjectResponse '" << *key << "', size: " << data.size() << " at tablet " << TabletId); + + //Send(ShardActor, new TEvPrivate::TEvGetResponse(*key, data)); + } +#endif + +private: + ui64 TabletId; + TActorId ShardActor; + TAwsContext S3Ctx; + TString TierName; + TString Bucket; + ui64 ForgetNo{}; + THashMap<ui64, TS3Export> Exports; + THashMap<ui64, TS3Forget> Forgets; + THashMap<TString, ui64> ExportingKeys; + THashMap<TString, ui64> ForgettingKeys; + + STATEFN(StateWait) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvPrivate::TEvS3Settings, Handle); + hFunc(TEvPrivate::TEvExport, Handle); + hFunc(TEvPrivate::TEvForget, Handle); + cFunc(TEvents::TEvPoisonPill::EventType, PassAway); + hFunc(TEvS3Wrapper::TEvPutObjectResponse, Handle); + hFunc(TEvS3Wrapper::TEvDeleteObjectResponse, Handle); +#if 0 + hFunc(TEvS3Wrapper::TEvHeadObjectResponse, Handle); + hFunc(TEvS3Wrapper::TEvGetObjectResponse, Handle); +#endif + default: + break; + } + } + + bool Initialized() const { + return (bool)S3Ctx.Client; + } + + void PassAway() override { + if (S3Ctx.Client) { + Send(S3Ctx.Client, new TEvents::TEvPoisonPill()); + S3Ctx.Client = {}; + } + TActor::PassAway(); + } + + void SendPutObject(const TString& key, TString&& data) const { + auto request = Aws::S3::Model::PutObjectRequest() + .WithBucket(Bucket) + .WithKey(key) + .WithStorageClass(Aws::S3::Model::StorageClass::STANDARD_IA); +#if 0 + Aws::Map<Aws::String, Aws::String> metadata; + metadata.emplace("Content-Type", "application/x-compressed"); + request.SetMetadata(std::move(metadata)); +#endif + LOG_S_DEBUG("[S3] PutObjectRequest key '" << key << "' at tablet " << TabletId); + Send(S3Ctx.Client, new TEvS3Wrapper::TEvPutObjectRequest(request, std::move(data))); + } + + void SendHeadObject(const TString& key) const { + auto request = Aws::S3::Model::HeadObjectRequest() + .WithBucket(Bucket) + .WithKey(key); + + LOG_S_DEBUG("[S3] HeadObjectRequest key '" << key << "' at tablet " << TabletId); + Send(S3Ctx.Client, new TEvS3Wrapper::TEvHeadObjectRequest(request)); + } + + void SendGetObject(const TString& key) { + auto request = Aws::S3::Model::GetObjectRequest() + .WithBucket(Bucket) + .WithKey(key); + //.WithRange(TStringBuilder() << "bytes=" << range.first << "-" << range.second); // TODO + + LOG_S_DEBUG("[S3] GetObjectRequest key '" << key << "' at tablet " << TabletId); + Send(S3Ctx.Client, new TEvS3Wrapper::TEvGetObjectRequest(request)); + } + + void SendDeleteObject(const TString& key) const { + auto request = Aws::S3::Model::DeleteObjectRequest() + .WithBucket(Bucket) + .WithKey(key); + + Send(S3Ctx.Client, new TEvS3Wrapper::TEvDeleteObjectRequest(request)); + } + + TString LogError(const TString& responseType, const Aws::S3::S3Error& error, bool hasKey) const { + TString errStr = TString(error.GetExceptionName()) + " " + error.GetMessage(); + if (errStr.empty() && !hasKey) { + errStr = responseType + " with no key"; + } + + LOG_S_NOTICE("[S3] Error in " << responseType << " at tablet " << TabletId << ": " << errStr); + return errStr; + } +}; + +IActor* CreateS3Actor(ui64 tabletId, const TActorId& parent, const TString& tierName) { + return new TS3Actor(tabletId, parent, tierName); +} + +} + +#endif diff --git a/ydb/core/tx/columnshard/ut_columnshard_schema.cpp b/ydb/core/tx/columnshard/ut_columnshard_schema.cpp index 1511c3219b1..4af7892db2f 100644 --- a/ydb/core/tx/columnshard/ut_columnshard_schema.cpp +++ b/ydb/core/tx/columnshard/ut_columnshard_schema.cpp @@ -1,8 +1,10 @@ #include "columnshard_ut_common.h" +#include <ydb/core/wrappers/ut_helpers/s3_mock.h> namespace NKikimr { using namespace NTxUT; +using NWrappers::NTestHelpers::TS3Mock; namespace { @@ -317,7 +319,11 @@ TestTiers(bool reboots, const std::vector<TString>& blobs, const std::vector<TTe } TriggerTTL(runtime, sender, {++planStep, ++txId}, {}); - +#if 0 + if (i) { + sleep(1); // TODO: wait export + } +#endif // Read --planStep; @@ -354,15 +360,10 @@ TestTiers(bool reboots, const std::vector<TString>& blobs, const std::vector<TTe return resColumns; } -void TestTiersT1(bool reboots) { +void TestTwoTiers(const TTestSchema::TTableSpecials& spec, bool compressed, bool reboots) { std::vector<ui64> ts = {1600000000, 1620000000}; ui64 nowSec = TInstant::Now().Seconds(); - TTestSchema::TTableSpecials spec; - spec.Tiers.emplace_back(TTestSchema::TStorageTier("tier0")); - spec.Tiers.emplace_back(TTestSchema::TStorageTier("tier1")); - spec.Tiers.back().SetCodec("zstd"); - std::vector<TTestSchema::TTableSpecials> alters(4, spec); ui64 allowBoth = nowSec - ts[0] + 600; @@ -402,7 +403,56 @@ void TestTiersT1(bool reboots) { UNIT_ASSERT_EQUAL(columns[2].first->length(), portionSize); Cerr << "read bytes: " << columns[0].second << ", " << columns[1].second << ", " << columns[2].second << "\n"; - UNIT_ASSERT_GT(columns[0].second, columns[1].second); + if (compressed) { + UNIT_ASSERT_GT(columns[0].second, columns[1].second); + } else { + UNIT_ASSERT_EQUAL(columns[0].second, columns[1].second); + } +} + +void TestTwoHotTiers(bool reboot) { + TTestSchema::TTableSpecials spec; + spec.Tiers.emplace_back(TTestSchema::TStorageTier("tier0")); + spec.Tiers.emplace_back(TTestSchema::TStorageTier("tier1")); + spec.Tiers.back().SetCodec("zstd"); + + TestTwoTiers(spec, true, reboot); +} + +void TestHotAndColdTiers(bool reboot) { +#if 1 + TString bucket = "ydb"; + TPortManager portManager; + ui16 port = portManager.GetPort(); + TString connString = Sprintf("localhost:%hu", port); + Cerr << "S3 at " << connString << "\n"; + + TS3Mock s3Mock({}, TS3Mock::TSettings(port)); + UNIT_ASSERT(s3Mock.Start()); +#endif + + TTestSchema::TTableSpecials spec; + spec.Tiers.emplace_back(TTestSchema::TStorageTier("tier0")); + spec.Tiers.emplace_back(TTestSchema::TStorageTier("tier1")); + spec.Tiers.back().S3 = NKikimrSchemeOp::TS3Settings(); + auto& s3 = *spec.Tiers.back().S3; + + s3.SetScheme(NKikimrSchemeOp::TS3Settings::HTTP); + s3.SetVerifySSL(false); +#if 0 + s3.SetEndpoint("storage.cloud-preprod.yandex.net"); + s3.SetBucket("ch-s3"); + s3.SetAccessKey(); <-- + s3.SetSecretKey(); <-- + s3.SetProxyHost("localhost"); + s3.SetProxyPort(8080); + s3.SetProxyScheme(NKikimrSchemeOp::TS3Settings::HTTP); +#else + s3.SetEndpoint(connString); + s3.SetBucket(bucket); +#endif + + TestTwoTiers(spec, false, reboot); } void TestDrop(bool reboots) { @@ -536,15 +586,24 @@ Y_UNIT_TEST_SUITE(TColumnShardTestSchema) { TestTtl(true, false, specs); } - Y_UNIT_TEST(Tiers) { - TestTiersT1(false); + Y_UNIT_TEST(HotTiers) { + TestTwoHotTiers(false); } - Y_UNIT_TEST(RebootTiers) { + Y_UNIT_TEST(RebootHotTiers) { NColumnShard::gAllowLogBatchingDefaultValue = false; - TestTiersT1(true); + TestTwoHotTiers(true); } + Y_UNIT_TEST(ColdTiers) { + TestHotAndColdTiers(false); + } +#if 0 + Y_UNIT_TEST(RebootColdTiers) { + NColumnShard::gAllowLogBatchingDefaultValue = false; + TestHotAndColdTiers(true); + } +#endif Y_UNIT_TEST(Drop) { TestDrop(false); } diff --git a/ydb/core/tx/datashard/s3_common.h b/ydb/core/tx/datashard/s3_common.h index a9f7c570f9c..8e4b4c14cb9 100644 --- a/ydb/core/tx/datashard/s3_common.h +++ b/ydb/core/tx/datashard/s3_common.h @@ -34,6 +34,26 @@ inline Aws::Client::ClientConfiguration ConfigFromSettings(const NKikimrSchemeOp Y_FAIL("Unknown scheme"); } + if (settings.HasVerifySSL()) { + config.verifySSL = settings.GetVerifySSL(); + } + + if (settings.HasProxyHost()) { + config.proxyHost = settings.GetProxyHost(); + config.proxyPort = settings.GetProxyPort(); + + switch (settings.GetProxyScheme()) { + case NKikimrSchemeOp::TS3Settings::HTTP: + config.proxyScheme = Aws::Http::Scheme::HTTP; + break; + case NKikimrSchemeOp::TS3Settings::HTTPS: + config.proxyScheme = Aws::Http::Scheme::HTTPS; + break; + default: + break; + } + } + return config; } diff --git a/ydb/core/wrappers/s3_out.cpp b/ydb/core/wrappers/s3_out.cpp index 0bc2745a6d0..6918e4eac79 100644 --- a/ydb/core/wrappers/s3_out.cpp +++ b/ydb/core/wrappers/s3_out.cpp @@ -102,6 +102,19 @@ void Out(IOutputStream& out, const PutObjectOutcome& outcome) { OutOutcome(out, outcome); } +void Out(IOutputStream& out, const DeleteObjectRequest& request) { + using T = DeleteObjectRequest; + OutRequest(out, request, {&Bucket<T>, &Key<T>}); +} + +void Out(IOutputStream& out, const DeleteObjectResult& result) { + OutResult(out, result, "DeleteObjectResult", {}); +} + +void Out(IOutputStream& out, const DeleteObjectOutcome& outcome) { + OutOutcome(out, outcome); +} + void Out(IOutputStream& out, const CreateMultipartUploadRequest& request) { using T = CreateMultipartUploadRequest; OutRequest(out, request, {&Bucket<T>, &Key<T>}); diff --git a/ydb/core/wrappers/s3_out.h b/ydb/core/wrappers/s3_out.h index ac6143592f8..040c435be22 100644 --- a/ydb/core/wrappers/s3_out.h +++ b/ydb/core/wrappers/s3_out.h @@ -6,6 +6,7 @@ #include <contrib/libs/aws-sdk-cpp/aws-cpp-sdk-s3/include/aws/s3/model/GetObjectRequest.h> #include <contrib/libs/aws-sdk-cpp/aws-cpp-sdk-s3/include/aws/s3/model/HeadObjectRequest.h> #include <contrib/libs/aws-sdk-cpp/aws-cpp-sdk-s3/include/aws/s3/model/PutObjectRequest.h> +#include <contrib/libs/aws-sdk-cpp/aws-cpp-sdk-s3/include/aws/s3/model/DeleteObjectRequest.h> #include <contrib/libs/aws-sdk-cpp/aws-cpp-sdk-s3/include/aws/s3/model/UploadPartRequest.h> #include <contrib/libs/aws-sdk-cpp/aws-cpp-sdk-s3/include/aws/s3/S3Client.h> @@ -26,6 +27,10 @@ void Out(IOutputStream& out, const Aws::S3::Model::PutObjectRequest& request); void Out(IOutputStream& out, const Aws::S3::Model::PutObjectResult& result); void Out(IOutputStream& out, const Aws::S3::Model::PutObjectOutcome& outcome); +void Out(IOutputStream& out, const Aws::S3::Model::DeleteObjectRequest& request); +void Out(IOutputStream& out, const Aws::S3::Model::DeleteObjectResult& result); +void Out(IOutputStream& out, const Aws::S3::Model::DeleteObjectOutcome& outcome); + void Out(IOutputStream& out, const Aws::S3::Model::CreateMultipartUploadRequest& request); void Out(IOutputStream& out, const Aws::S3::Model::CreateMultipartUploadResult& result); void Out(IOutputStream& out, const Aws::S3::Model::CreateMultipartUploadOutcome& outcome); @@ -87,6 +92,18 @@ Y_DECLARE_OUT_SPEC(inline, Aws::S3::Model::PutObjectOutcome, out, value) { NKikimr::NWrappers::Out(out, value); } +Y_DECLARE_OUT_SPEC(inline, Aws::S3::Model::DeleteObjectRequest, out, value) { + NKikimr::NWrappers::Out(out, value); +} + +Y_DECLARE_OUT_SPEC(inline, Aws::S3::Model::DeleteObjectResult, out, value) { + NKikimr::NWrappers::Out(out, value); +} + +Y_DECLARE_OUT_SPEC(inline, Aws::S3::Model::DeleteObjectOutcome, out, value) { + NKikimr::NWrappers::Out(out, value); +} + Y_DECLARE_OUT_SPEC(inline, Aws::S3::Model::CreateMultipartUploadRequest, out, value) { NKikimr::NWrappers::Out(out, value); } diff --git a/ydb/core/wrappers/s3_wrapper.cpp b/ydb/core/wrappers/s3_wrapper.cpp index b0fe378e218..5d5a21cc033 100644 --- a/ydb/core/wrappers/s3_wrapper.cpp +++ b/ydb/core/wrappers/s3_wrapper.cpp @@ -112,9 +112,17 @@ class TS3Wrapper: public TActor<TS3Wrapper>, private TS3User { return ev->Get()->Request; } - void Reply(const typename TEvResponse::TOutcome& outcome) const { + void Reply(const typename TEvRequest::TRequest& request, + const typename TEvResponse::TOutcome& outcome) const + { Y_VERIFY(!std::exchange(Replied, true), "Double-reply"); - Send(MakeResponse(outcome).Release()); + + std::optional<TString> key; + if (request.KeyHasBeenSet()) { + key = request.GetKey(); + } + + Send(MakeResponse(key, outcome).Release()); } protected: @@ -126,8 +134,10 @@ class TS3Wrapper: public TActor<TS3Wrapper>, private TS3User { Send(Sender, ev); } - virtual THolder<IEventBase> MakeResponse(const typename TEvResponse::TOutcome& outcome) const { - return MakeHolder<TEvResponse>(outcome); + virtual THolder<IEventBase> MakeResponse(const std::optional<TString>& key, + const typename TEvResponse::TOutcome& outcome) const + { + return MakeHolder<TEvResponse>(key, outcome); } private: @@ -192,11 +202,13 @@ class TS3Wrapper: public TActor<TS3Wrapper>, private TS3User { } protected: - THolder<IEventBase> MakeResponse(const typename TEvResponse::TOutcome& outcome) const override { + THolder<IEventBase> MakeResponse(const std::optional<TString>& key, + const typename TEvResponse::TOutcome& outcome) const override + { if (outcome.IsSuccess()) { - return MakeHolder<TEvResponse>(outcome, std::move(Buffer)); + return MakeHolder<TEvResponse>(key, outcome, std::move(Buffer)); } else { - return MakeHolder<TEvResponse>(outcome); + return MakeHolder<TEvResponse>(key, outcome); } } @@ -256,7 +268,7 @@ class TS3Wrapper: public TActor<TS3Wrapper>, private TS3User { auto ctx = std::make_shared<TCtx>(TlsActivationContext->ActorSystem(), ev->Sender); auto callback = []( const S3Client*, - const typename TEvRequest::TRequest&, + const typename TEvRequest::TRequest& request, const typename TEvResponse::TOutcome& outcome, const std::shared_ptr<const AsyncCallerContext>& context) { const auto* ctx = static_cast<const TCtx*>(context.get()); @@ -264,7 +276,7 @@ class TS3Wrapper: public TActor<TS3Wrapper>, private TS3User { LOG_NOTICE_S(*ctx->GetActorSystem(), NKikimrServices::S3_WRAPPER, "Response" << ": uuid# " << ctx->GetUUID() << ", response# " << outcome); - ctx->Reply(outcome); + ctx->Reply(request, outcome); }; LOG_NOTICE_S(*TlsActivationContext, NKikimrServices::S3_WRAPPER, "Request" @@ -288,6 +300,11 @@ class TS3Wrapper: public TActor<TS3Wrapper>, private TS3User { ev, &S3Client::PutObjectAsync); } + void Handle(TEvS3Wrapper::TEvDeleteObjectRequest::TPtr& ev) { + Call<TEvS3Wrapper::TEvDeleteObjectRequest, TEvS3Wrapper::TEvDeleteObjectResponse>( + ev, &S3Client::DeleteObjectAsync); + } + void Handle(TEvS3Wrapper::TEvCreateMultipartUploadRequest::TPtr& ev) { Call<TEvS3Wrapper::TEvCreateMultipartUploadRequest, TEvS3Wrapper::TEvCreateMultipartUploadResponse>( ev, &S3Client::CreateMultipartUploadAsync); @@ -330,6 +347,7 @@ public: hFunc(TEvS3Wrapper::TEvGetObjectRequest, Handle); hFunc(TEvS3Wrapper::TEvHeadObjectRequest, Handle); hFunc(TEvS3Wrapper::TEvPutObjectRequest, Handle); + hFunc(TEvS3Wrapper::TEvDeleteObjectRequest, Handle); hFunc(TEvS3Wrapper::TEvCreateMultipartUploadRequest, Handle); hFunc(TEvS3Wrapper::TEvUploadPartRequest, Handle); hFunc(TEvS3Wrapper::TEvCompleteMultipartUploadRequest, Handle); diff --git a/ydb/core/wrappers/s3_wrapper.h b/ydb/core/wrappers/s3_wrapper.h index 1197606202b..c623113e9a0 100644 --- a/ydb/core/wrappers/s3_wrapper.h +++ b/ydb/core/wrappers/s3_wrapper.h @@ -10,6 +10,7 @@ #include <contrib/libs/aws-sdk-cpp/aws-cpp-sdk-s3/include/aws/s3/model/GetObjectRequest.h> #include <contrib/libs/aws-sdk-cpp/aws-cpp-sdk-s3/include/aws/s3/model/HeadObjectRequest.h> #include <contrib/libs/aws-sdk-cpp/aws-cpp-sdk-s3/include/aws/s3/model/PutObjectRequest.h> +#include <contrib/libs/aws-sdk-cpp/aws-cpp-sdk-s3/include/aws/s3/model/DeleteObjectRequest.h> #include <contrib/libs/aws-sdk-cpp/aws-cpp-sdk-s3/include/aws/s3/model/UploadPartRequest.h> #include <contrib/libs/aws-sdk-cpp/aws-cpp-sdk-s3/include/aws/s3/S3Client.h> @@ -57,10 +58,12 @@ struct TEvS3Wrapper { using TOutcome = Aws::Utils::Outcome<T, Aws::S3::S3Error>; using TResult = Aws::Utils::Outcome<U, Aws::S3::S3Error>; + std::optional<TString> Key; TResult Result; - explicit TGenericResponse(const TOutcome& outcome) - : Result(TDerived::ResultFromOutcome(outcome)) + explicit TGenericResponse(const std::optional<TString>& key, const TOutcome& outcome) + : Key(key) + , Result(TDerived::ResultFromOutcome(outcome)) { } @@ -77,13 +80,14 @@ struct TEvS3Wrapper { TString Body; - explicit TResponseWithBody(const typename TGeneric::TOutcome& outcome) - : TGeneric(outcome) + explicit TResponseWithBody(const std::optional<TString>& key, const typename TGeneric::TOutcome& outcome) + : TGeneric(key, outcome) { } - explicit TResponseWithBody(const typename TGeneric::TOutcome& outcome, TString&& body) - : TGeneric(outcome) + explicit TResponseWithBody(const std::optional<TString>& key, const typename TGeneric::TOutcome& outcome, + TString&& body) + : TGeneric(key, outcome) , Body(std::move(body)) { } @@ -101,6 +105,7 @@ struct TEvS3Wrapper { EV_REQUEST_RESPONSE(GetObject), EV_REQUEST_RESPONSE(HeadObject), EV_REQUEST_RESPONSE(PutObject), + EV_REQUEST_RESPONSE(DeleteObject), EV_REQUEST_RESPONSE(CreateMultipartUpload), EV_REQUEST_RESPONSE(UploadPart), EV_REQUEST_RESPONSE(CompleteMultipartUpload), @@ -156,6 +161,7 @@ struct TEvS3Wrapper { DEFINE_GENERIC_RESPONSE(UploadPart); DEFINE_GENERIC_REQUEST_RESPONSE(HeadObject); + DEFINE_GENERIC_REQUEST_RESPONSE(DeleteObject); DEFINE_GENERIC_REQUEST_RESPONSE(CreateMultipartUpload); DEFINE_GENERIC_REQUEST_RESPONSE(CompleteMultipartUpload); DEFINE_GENERIC_REQUEST_RESPONSE(AbortMultipartUpload); |
