summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorArtem Zuikov <[email protected]>2022-03-30 16:42:39 +0300
committerArtem Zuikov <[email protected]>2022-03-30 16:42:39 +0300
commit79ab149038faf6a9c169ba1ea051a3bf4b69ad7b (patch)
tree153506cd9ed1d88451396d91a00815ef2a413e60
parentcdb65e317d7f038dc5df74949759e7ff151443bf (diff)
KIKIMR-13595: write to S3 (in progress)
ref:e3d17e90d5bd325c7a132414325b18f6937de349
-rw-r--r--ydb/core/protos/CMakeLists.txt2
-rw-r--r--ydb/core/protos/counters_columnshard.proto6
-rw-r--r--ydb/core/protos/counters_olapshard.proto44
-rw-r--r--ydb/core/protos/flat_scheme_op.proto4
-rw-r--r--ydb/core/protos/services.proto2
-rw-r--r--ydb/core/protos/tx_columnshard.proto5
-rw-r--r--ydb/core/tx/columnshard/CMakeLists.txt4
-rw-r--r--ydb/core/tx/columnshard/blob.cpp80
-rw-r--r--ydb/core/tx/columnshard/blob.h110
-rw-r--r--ydb/core/tx/columnshard/blob_manager.cpp123
-rw-r--r--ydb/core/tx/columnshard/blob_manager.h57
-rw-r--r--ydb/core/tx/columnshard/blob_manager_db.cpp95
-rw-r--r--ydb/core/tx/columnshard/blob_manager_db.h14
-rw-r--r--ydb/core/tx/columnshard/columnshard.cpp145
-rw-r--r--ydb/core/tx/columnshard/columnshard__export.cpp118
-rw-r--r--ydb/core/tx/columnshard/columnshard__forget.cpp72
-rw-r--r--ydb/core/tx/columnshard/columnshard__init.cpp4
-rw-r--r--ydb/core/tx/columnshard/columnshard__plan_step.cpp27
-rw-r--r--ydb/core/tx/columnshard/columnshard__progress_tx.cpp1
-rw-r--r--ydb/core/tx/columnshard/columnshard__propose_transaction.cpp27
-rw-r--r--ydb/core/tx/columnshard/columnshard__read.cpp42
-rw-r--r--ydb/core/tx/columnshard/columnshard__read_blob_ranges.cpp22
-rw-r--r--ydb/core/tx/columnshard/columnshard__scan.cpp43
-rw-r--r--ydb/core/tx/columnshard/columnshard__write.cpp60
-rw-r--r--ydb/core/tx/columnshard/columnshard__write_index.cpp140
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.cpp46
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.h48
-rw-r--r--ydb/core/tx/columnshard/columnshard_schema.h19
-rw-r--r--ydb/core/tx/columnshard/columnshard_txs.h175
-rw-r--r--ydb/core/tx/columnshard/columnshard_ut_common.cpp1
-rw-r--r--ydb/core/tx/columnshard/columnshard_ut_common.h5
-rw-r--r--ydb/core/tx/columnshard/export_actor.cpp106
-rw-r--r--ydb/core/tx/columnshard/indexing_actor.cpp10
-rw-r--r--ydb/core/tx/columnshard/s3_actor.cpp385
-rw-r--r--ydb/core/tx/columnshard/ut_columnshard_schema.cpp83
-rw-r--r--ydb/core/tx/datashard/s3_common.h20
-rw-r--r--ydb/core/wrappers/s3_out.cpp13
-rw-r--r--ydb/core/wrappers/s3_out.h17
-rw-r--r--ydb/core/wrappers/s3_wrapper.cpp36
-rw-r--r--ydb/core/wrappers/s3_wrapper.h18
40 files changed, 1843 insertions, 386 deletions
diff --git a/ydb/core/protos/CMakeLists.txt b/ydb/core/protos/CMakeLists.txt
index 8672caaf3ca..3e6df2490b3 100644
--- a/ydb/core/protos/CMakeLists.txt
+++ b/ydb/core/protos/CMakeLists.txt
@@ -60,7 +60,6 @@ target_proto_messages(ydb-core-protos PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/protos/counters_hive.proto
${CMAKE_SOURCE_DIR}/ydb/core/protos/counters_kesus.proto
${CMAKE_SOURCE_DIR}/ydb/core/protos/counters_keyvalue.proto
- ${CMAKE_SOURCE_DIR}/ydb/core/protos/counters_olapshard.proto
${CMAKE_SOURCE_DIR}/ydb/core/protos/counters_pq.proto
${CMAKE_SOURCE_DIR}/ydb/core/protos/counters_replication.proto
${CMAKE_SOURCE_DIR}/ydb/core/protos/counters_schemeshard.proto
@@ -178,7 +177,6 @@ target_sources(ydb-core-protos PRIVATE
${CMAKE_BINARY_DIR}/ydb/core/protos/counters_hive.pb.cc
${CMAKE_BINARY_DIR}/ydb/core/protos/counters_kesus.pb.cc
${CMAKE_BINARY_DIR}/ydb/core/protos/counters_keyvalue.pb.cc
- ${CMAKE_BINARY_DIR}/ydb/core/protos/counters_olapshard.pb.cc
${CMAKE_BINARY_DIR}/ydb/core/protos/counters_pq.pb.cc
${CMAKE_BINARY_DIR}/ydb/core/protos/counters_replication.pb.cc
${CMAKE_BINARY_DIR}/ydb/core/protos/counters_schemeshard.pb.cc
diff --git a/ydb/core/protos/counters_columnshard.proto b/ydb/core/protos/counters_columnshard.proto
index 8c42878bab8..ca3597424b5 100644
--- a/ydb/core/protos/counters_columnshard.proto
+++ b/ydb/core/protos/counters_columnshard.proto
@@ -118,6 +118,10 @@ enum ECumulativeCounters {
COUNTER_EVICTION_PORTIONS_WRITTEN = 64 [(CounterOpts) = {Name: "EvictionPortionsWritten"}];
COUNTER_EVICTION_BLOBS_WRITTEN = 65 [(CounterOpts) = {Name: "EvictionBlobsWritten"}];
COUNTER_EVICTION_BYTES_WRITTEN = 66 [(CounterOpts) = {Name: "EvictionBytesWritten"}];
+ COUNTER_EXPORT_SUCCESS = 67 [(CounterOpts) = {Name: "ExportSuccess"}];
+ COUNTER_EXPORT_FAIL = 68 [(CounterOpts) = {Name: "ExportFail"}];
+ COUNTER_FORGET_SUCCESS = 69 [(CounterOpts) = {Name: "ForgetSuccess"}];
+ COUNTER_FORGET_FAIL = 70 [(CounterOpts) = {Name: "ForgetFail"}];
}
enum EPercentileCounters {
@@ -157,4 +161,6 @@ enum ETxTypes {
TXTYPE_PROGRESS = 10 [(TxTypeOpts) = {Name: "TxProgress"}];
TXTYPE_START_SCAN = 11 [(TxTypeOpts) = {Name: "TxStartScan"}];
TXTYPE_READ_BLOB_RANGES = 12 [(TxTypeOpts) = {Name: "TxReadBlobRanges"}];
+ TXTYPE_EXPORT = 13 [(TxTypeOpts) = {Name: "TxExport"}];
+ TXTYPE_FORGET = 14 [(TxTypeOpts) = {Name: "TxForget"}];
}
diff --git a/ydb/core/protos/counters_olapshard.proto b/ydb/core/protos/counters_olapshard.proto
deleted file mode 100644
index 78ae9d95dee..00000000000
--- a/ydb/core/protos/counters_olapshard.proto
+++ /dev/null
@@ -1,44 +0,0 @@
-import "ydb/core/protos/counters.proto";
-
-package NKikimr.NOlapShard;
-option java_package = "ru.yandex.kikimr.proto";
-
-option (TabletTypeName) = "OlapShard"; // Used as prefix for all counters
-
-enum ESimpleCounters {
- COUNTER_SIMPLE_IGNORE = 0;
-}
-
-enum ECumulativeCounters {
- COUNTER_CUMULATIVE_IGNORE = 0;
- COUNTER_BEGIN_OLAP_TX_SUCCESS = 1 [(CounterOpts) = {Name: "BeginOlapTxSuccess"}];
- COUNTER_BEGIN_OLAP_TX_FAIL = 2 [(CounterOpts) = {Name: "BeginOlapTxFail"}];
- COUNTER_COMMIT_OLAP_TX_SUCCESS = 3 [(CounterOpts) = {Name: "CommitOlapTxSuccess"}];
- COUNTER_COMMIT_OLAP_TX_FAIL = 4 [(CounterOpts) = {Name: "CommitOlapTxFail"}];
-
-}
-
-enum EPercentileCounters {
- option (GlobalCounterOpts) = {
- Ranges { Value: 0 Name: "0 ms" }
- Ranges { Value: 1 Name: "1 ms" }
- };
-
- COUNTER_PERCENTILE_IGNORE = 0;
-}
-
-enum ETxTypes {
- TXTYPE_INIT = 0 [(TxTypeOpts) = {Name: "TxInit"}];
- TXTYPE_INIT_SCHEMA = 1 [(TxTypeOpts) = {Name: "TxInitSchema"}];
- TXTYPE_UPDATE_SCHEMA = 2 [(TxTypeOpts) = {Name: "TxUpdateSchema"}];
- TXTYPE_BEGIN_OLAP_TX = 3 [(TxTypeOpts) = {Name: "TxBeginOlapTx"}];
- TXTYPE_COMMIT_OLAP_TX = 4 [(TxTypeOpts) = {Name: "TxCommitOlapTx"}];
- TXTYPE_PROPOSE = 5 [(TxTypeOpts) = {Name: "TxPropose"}];
- TXTYPE_PLANSTEP = 6 [(TxTypeOpts) = {Name: "TxPlanStep"}];
- TXTYPE_NOTIFY_TX_COMPLETION = 7 [(TxTypeOpts) = {Name: "TxNotifyTxCompletion"}];
- TXTYPE_COMMIT_UPDATE_LAST_TXID = 8 [(TxTypeOpts) = {Name: "TxCommitUpdateLastTxId"}];
- TXTYPE_COMMIT_UPDATE_PREPARED = 9 [(TxTypeOpts) = {Name: "TxCommitUpdatePrepared"}];
- TXTYPE_COMMIT_FINISHED = 10 [(TxTypeOpts) = {Name: "TxCommitFinished"}];
- TXTYPE_ATTACH_TX_COLUMN_SHARDS = 11 [(TxTypeOpts) = {Name: "TxAttachTxColumnShards"}];
- TXTYPE_PROPOSE_CANCEL = 12 [(TxTypeOpts) = {Name: "TxProposeCancel"}];
-}
diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto
index 6085b914830..ce65302fa1c 100644
--- a/ydb/core/protos/flat_scheme_op.proto
+++ b/ydb/core/protos/flat_scheme_op.proto
@@ -847,6 +847,10 @@ message TS3Settings {
optional string AccessKey = 5;
optional string SecretKey = 6;
optional Ydb.Export.ExportToS3Settings.StorageClass StorageClass = 7;
+ optional bool VerifySSL = 8;
+ optional string ProxyHost = 9;
+ optional uint32 ProxyPort = 10;
+ optional EScheme ProxyScheme = 11;
message TLimits {
optional uint32 ReadBatchSize = 1 [default = 8388608]; // 8 MB
diff --git a/ydb/core/protos/services.proto b/ydb/core/protos/services.proto
index 72b8384cc3f..0cd67891f40 100644
--- a/ydb/core/protos/services.proto
+++ b/ydb/core/protos/services.proto
@@ -882,5 +882,7 @@ message TActivity {
YQ_AUDIT_ACTOR = 564;
YQ_AUDIT_EVENT_SENDER_ACTOR = 565;
BLOCKSTORE_USER_STATS = 566;
+ TX_COLUMNSHARD_S3_ACTOR = 567;
+ TX_COLUMNSHARD_EXPORT_ACTOR = 568;
};
};
diff --git a/ydb/core/protos/tx_columnshard.proto b/ydb/core/protos/tx_columnshard.proto
index f79c256a5bc..4356af3258c 100644
--- a/ydb/core/protos/tx_columnshard.proto
+++ b/ydb/core/protos/tx_columnshard.proto
@@ -53,6 +53,11 @@ message TMetadata {
optional bytes LogicalMeta = 4; // TLogicalMetadata
}
+message TEvictMetadata {
+ optional string TierName = 1;
+ optional uint32 TierVersion = 2;
+}
+
message TEvWrite {
optional NActorsProto.TActorId Source = 1;
optional uint64 TxInitiator = 2;
diff --git a/ydb/core/tx/columnshard/CMakeLists.txt b/ydb/core/tx/columnshard/CMakeLists.txt
index da2da444cf2..c178513ccc6 100644
--- a/ydb/core/tx/columnshard/CMakeLists.txt
+++ b/ydb/core/tx/columnshard/CMakeLists.txt
@@ -38,6 +38,8 @@ target_sources(core-tx-columnshard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blob_manager.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blob_manager_db.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blob_manager_txs.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard__export.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard__forget.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard__init.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard__notify_tx_completion.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard__plan_step.cpp
@@ -54,9 +56,11 @@ target_sources(core-tx-columnshard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_common.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/compaction_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/eviction_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/export_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/indexing_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/read_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/write_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/s3_actor.cpp
)
generate_enum_serilization(core-tx-columnshard
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard.h
diff --git a/ydb/core/tx/columnshard/blob.cpp b/ydb/core/tx/columnshard/blob.cpp
index b22d3fca73c..99bde3a526a 100644
--- a/ydb/core/tx/columnshard/blob.cpp
+++ b/ydb/core/tx/columnshard/blob.cpp
@@ -7,6 +7,50 @@
namespace NKikimr::NOlap {
+TString DsToS3Key(const TString& s) {
+ Y_VERIFY(s.size() > 2);
+ TString res = s;
+ res[0] = 'S';
+ res[1] = '3';
+ for (size_t i = 2; i < res.size(); ++i) {
+ switch (res[i]) {
+ case '[':
+ res[i] = 'a';
+ break;
+ case ']':
+ res[i] = 'b';
+ break;
+ case ':':
+ res[i] = '_';
+ break;
+ }
+ }
+ return res;
+}
+
+TString S3ToDsKey(const TString& s) {
+ Y_VERIFY(s.size() > 2);
+ TString res = s;
+ res[0] = 'D';
+ res[1] = 'S';
+ for (size_t i = 2; i < res.size(); ++i) {
+ switch (res[i]) {
+ case 'a':
+ res[i] = '[';
+ break;
+ case 'b':
+ res[i] = ']';
+ break;
+ case '_':
+ res[i] = ':';
+ break;
+ }
+ }
+ return res;
+}
+
+namespace {
+
#define PARSE_INT_COMPONENT(fieldType, fieldName, endChar) \
if (pos >= endPos) { \
error = "Failed to parse " #fieldName " component"; \
@@ -29,7 +73,7 @@ namespace NKikimr::NOlap {
// Format: "DS:group:logoBlobId"
// Example: "DS:2181038103:[72075186224038245:51:31595:2:0:11952:0]"
-TUnifiedBlobId DoParseExtendedDsBlobId(const TString& s, TString& error) {
+TUnifiedBlobId ParseExtendedDsBlobId(const TString& s, TString& error) {
Y_VERIFY(s.size() > 2);
const char* str = s.c_str();
Y_VERIFY(str[0] == 'D' && str[1] == 'S');
@@ -52,7 +96,7 @@ TUnifiedBlobId DoParseExtendedDsBlobId(const TString& s, TString& error) {
// Format: "SM[tabletId:generation:step:cookie:size]"
// Example: "SM[72075186224038245:51:31184:0:2528]"
-TUnifiedBlobId DoParseSmallBlobId(const TString& s, TString& error) {
+TUnifiedBlobId ParseSmallBlobId(const TString& s, TString& error) {
Y_VERIFY(s.size() > 2);
const char* str = s.c_str();
Y_VERIFY(str[0] == 'S' && str[1] == 'M');
@@ -77,11 +121,32 @@ TUnifiedBlobId DoParseSmallBlobId(const TString& s, TString& error) {
return TUnifiedBlobId(tabletId, gen, step, cookie, size);
}
+// Format: "S3_key|bucket"
+TUnifiedBlobId ParseS3BlobId(const TString& s, TString& error) {
+ TVector<TString> keyBucket;
+ Split(s, "|", keyBucket);
+
+ if (s.size() < 2 || s[0] != 'S' || s[1] != '3' ||
+ keyBucket.size() != 2) {
+ error = TStringBuilder() << "Wrong S3 id '" << s << "'";
+ return TUnifiedBlobId();
+ }
+
+ TUnifiedBlobId dsBlobId = ParseExtendedDsBlobId(S3ToDsKey(keyBucket[0]), error);
+ if (!dsBlobId.IsValid()) {
+ return TUnifiedBlobId();
+ }
+
+ return TUnifiedBlobId(dsBlobId, TUnifiedBlobId::S3_BLOB, keyBucket[1]);
+}
+
+}
+
TUnifiedBlobId TUnifiedBlobId::ParseFromString(const TString& str,
const IBlobGroupSelector* dsGroupSelector, TString& error)
{
if (str.size() <= 2) {
- error = "String size is too small";
+ error = TStringBuilder() << "Wrong blob id: '" << str << "'";
return TUnifiedBlobId();
}
@@ -107,15 +172,14 @@ TUnifiedBlobId TUnifiedBlobId::ParseFromString(const TString& str,
return TUnifiedBlobId(dsGroupSelector->GetGroup(logoBlobId), logoBlobId);
}
} else if (str[0] == 'D' && str[1] == 'S') {
- return DoParseExtendedDsBlobId(str, error);
+ return ParseExtendedDsBlobId(str, error);
} else if (str[0] == 'S' && str[1] == 'M') {
- return DoParseSmallBlobId(str, error);
+ return ParseSmallBlobId(str, error);
} else if (str[0] == 'S' && str[1] == '3') {
- error = "S3 blob id parsing is not yet implemented";
- return TUnifiedBlobId();
+ return ParseS3BlobId(str, error);
}
- error = Sprintf("Unknown blob id format: %c%c", str[0], str[1]);
+ error = TStringBuilder() << "Wrong blob id: '" << str << "'";
return TUnifiedBlobId();
}
diff --git a/ydb/core/tx/columnshard/blob.h b/ydb/core/tx/columnshard/blob.h
index ec83cff7601..6b730a57a58 100644
--- a/ydb/core/tx/columnshard/blob.h
+++ b/ydb/core/tx/columnshard/blob.h
@@ -8,6 +8,9 @@ namespace NKikimr::NOlap {
class IBlobGroupSelector;
+TString DsToS3Key(const TString& s);
+TString S3ToDsKey(const TString& s);
+
// Encapsulates different types of blob ids to simplify dealing with blobs for the
// components that do not need to know where the blob is stored
// Blob id formats:
@@ -81,10 +84,39 @@ class TUnifiedBlobId {
}
};
+ struct TS3BlobId {
+ TDsBlobId DsBlobId;
+ TString Bucket;
+ TString Key;
+
+ TS3BlobId() = default;
+
+ TS3BlobId(const TUnifiedBlobId& dsBlob, const TString& bucket)
+ : Bucket(bucket)
+ {
+ Y_VERIFY(dsBlob.IsDsBlob());
+ DsBlobId = std::get<TDsBlobId>(dsBlob.Id);
+ Key = DsToS3Key(DsBlobId.ToStringNew());
+ }
+
+ bool operator == (const TS3BlobId& other) const {
+ return Bucket == other.Bucket && Key == other.Key;
+ }
+
+ TString ToStringNew() const {
+ return Sprintf("%s|%s", Key.c_str(), Bucket.c_str());
+ }
+
+ ui64 Hash() const {
+ return CombineHashes<ui64>(THash<TString>()(Bucket), THash<TString>()(Key));
+ }
+ };
+
std::variant<
TInvalid,
TDsBlobId,
- TSmallBlobId
+ TSmallBlobId,
+ TS3BlobId
> Id;
public:
@@ -92,6 +124,7 @@ public:
INVALID = 0,
DS_BLOB = 1,
TABLET_SMALL_BLOB = 2,
+ S3_BLOB = 3,
};
TUnifiedBlobId()
@@ -108,6 +141,13 @@ public:
: Id(TSmallBlobId{tabletId, gen, step, cookie, size})
{}
+ // Make S3 blob Id from DS one
+ TUnifiedBlobId(const TUnifiedBlobId& blob, EBlobType type, const TString& bucket)
+ : Id(TS3BlobId(blob, bucket))
+ {
+ Y_VERIFY(type == S3_BLOB);
+ }
+
TUnifiedBlobId(const TUnifiedBlobId& other) = default;
TUnifiedBlobId& operator = (const TUnifiedBlobId& logoBlobId) = default;
TUnifiedBlobId(TUnifiedBlobId&& other) = default;
@@ -134,9 +174,12 @@ public:
return std::get<TDsBlobId>(Id).BlobId.BlobSize();
case TABLET_SMALL_BLOB:
return std::get<TSmallBlobId>(Id).Size;
- default:
+ case S3_BLOB:
+ return std::get<TS3BlobId>(Id).DsBlobId.BlobId.BlobSize();
+ case INVALID:
Y_FAIL("Invalid blob id");
}
+ Y_FAIL();
}
bool IsSmallBlob() const {
@@ -147,6 +190,10 @@ public:
return GetType() == DS_BLOB;
}
+ bool IsS3Blob() const {
+ return GetType() == S3_BLOB;
+ }
+
TLogoBlobID GetLogoBlobId() const {
Y_VERIFY(IsDsBlob());
return std::get<TDsBlobId>(Id).BlobId;
@@ -157,15 +204,23 @@ public:
return std::get<TDsBlobId>(Id).DsGroup;
}
+ TString GetS3Key() const {
+ Y_VERIFY(IsS3Blob());
+ return std::get<TS3BlobId>(Id).Key;
+ }
+
ui64 GetTabletId() const {
switch (Id.index()) {
case DS_BLOB:
return std::get<TDsBlobId>(Id).BlobId.TabletID();
case TABLET_SMALL_BLOB:
return std::get<TSmallBlobId>(Id).TabletId;
- default:
- Y_FAIL("No Tablet Id");
+ case S3_BLOB:
+ return std::get<TS3BlobId>(Id).DsBlobId.BlobId.TabletID();
+ case INVALID:
+ Y_FAIL("Invalid blob id");
}
+ Y_FAIL();
}
ui64 Hash() const noexcept {
@@ -176,9 +231,10 @@ public:
return std::get<TDsBlobId>(Id).Hash();
case TABLET_SMALL_BLOB:
return std::get<TSmallBlobId>(Id).Hash();
- default:
- Y_FAIL("Not implemented");
+ case S3_BLOB:
+ return std::get<TS3BlobId>(Id).Hash();
}
+ Y_FAIL();
}
// This is only implemented for DS for backward compatibility with persisted data.
@@ -194,9 +250,12 @@ public:
return std::get<TDsBlobId>(Id).ToStringLegacy();
case TABLET_SMALL_BLOB:
return std::get<TSmallBlobId>(Id).ToStringLegacy();
- default:
+ case S3_BLOB:
+ Y_FAIL("Not implemented");
+ case INVALID:
return "<Invalid blob id>";
}
+ Y_FAIL();
}
TString ToStringNew() const {
@@ -205,9 +264,12 @@ public:
return std::get<TDsBlobId>(Id).ToStringNew();
case TABLET_SMALL_BLOB:
return std::get<TSmallBlobId>(Id).ToStringNew();
- default:
+ case S3_BLOB:
+ return std::get<TS3BlobId>(Id).ToStringNew();
+ case INVALID:
return "<Invalid blob id>";
}
+ Y_FAIL();
}
};
@@ -244,6 +306,31 @@ struct TBlobRange {
}
};
+// Expected blob lifecycle: EVICTING -> SELF_CACHED -> EXTERN <-> CACHED
+enum class EEvictState : ui8 {
+ UNKNOWN = 0,
+ EVICTING = 1, // source, extern, cached blobs: 1--
+ SELF_CACHED = 2, // source, extern, cached blobs: 11-
+ EXTERN = 3, // source, extern, cached blobs: -1-
+ CACHED = 4, // source, extern, cached blobs: -11
+ ERASED = 5, // source, extern, cached blobs: ---
+};
+
+struct TEvictedBlob {
+ EEvictState State = EEvictState::UNKNOWN;
+ TUnifiedBlobId Blob;
+ TUnifiedBlobId ExternBlob;
+ TUnifiedBlobId CachedBlob;
+
+ bool operator == (const TEvictedBlob& other) const {
+ return Blob == other.Blob;
+ }
+
+ ui64 Hash() const noexcept {
+ return Blob.Hash();
+ }
+};
+
}
inline
@@ -269,3 +356,10 @@ struct THash<NKikimr::NOlap::TBlobRange> {
return key.Hash();
}
};
+
+template <>
+struct THash<NKikimr::NOlap::TEvictedBlob> {
+ inline size_t operator() (const NKikimr::NOlap::TEvictedBlob& key) const {
+ return key.Hash();
+ }
+};
diff --git a/ydb/core/tx/columnshard/blob_manager.cpp b/ydb/core/tx/columnshard/blob_manager.cpp
index 2d390802d0f..48c66c894e4 100644
--- a/ydb/core/tx/columnshard/blob_manager.cpp
+++ b/ydb/core/tx/columnshard/blob_manager.cpp
@@ -456,6 +456,129 @@ void TBlobManager::DeleteBlob(const TUnifiedBlobId& blobId, IBlobManagerDb& db)
}
}
+bool TBlobManager::ExportOneToOne(const TUnifiedBlobId& blobId, const NKikimrTxColumnShard::TEvictMetadata& meta,
+ IBlobManagerDb& db)
+{
+ NOlap::TEvictedBlob evict{
+ .State = EEvictState::EVICTING,
+ .Blob = blobId
+ };
+
+ if (EvictedBlobs.count(evict)) {
+ return false;
+ }
+
+ TString strMeta;
+ Y_PROTOBUF_SUPPRESS_NODISCARD meta.SerializeToString(&strMeta);
+
+ db.UpdateEvictBlob(evict, strMeta);
+ EvictedBlobs.emplace(std::move(evict), meta);
+ return true;
+}
+
+bool TBlobManager::DropOneToOne(const TUnifiedBlobId& blobId, IBlobManagerDb& db) {
+ NOlap::TEvictedBlob evict{
+ .State = EEvictState::UNKNOWN,
+ .Blob = blobId
+ };
+
+ TEvictMetadata meta;
+ bool extracted = ExtractEvicted(evict, meta);
+ if (!extracted) {
+ return false; // It's not at exported blob.
+ }
+#if 0 // TODO: SELF_CACHED logic
+ if (evict.State == EEvictState::SELF_CACHED) {
+ evict.State = EEvictState::EXTERN; // SELF_CACHED -> EXTERN for dropped
+ }
+#endif
+ db.DropEvictBlob(evict);
+ DroppedEvictedBlobs.emplace(std::move(evict), std::move(meta));
+ return true;
+}
+
+bool TBlobManager::UpdateOneToOne(TEvictedBlob&& evict, IBlobManagerDb& db, bool& dropped) {
+ TEvictMetadata meta;
+
+ TEvictedBlob old{.Blob = evict.Blob};
+ bool extracted = ExtractEvicted(old, meta);
+ dropped = false;
+ if (!extracted) {
+ dropped = DroppedEvictedBlobs.count(evict);
+ if (!dropped) {
+ return false; // update after erase
+ }
+ extracted = ExtractEvicted(old, meta, true);
+ }
+ Y_VERIFY(extracted);
+
+ switch (evict.State) {
+ case EEvictState::SELF_CACHED:
+ Y_VERIFY(old.State == EEvictState::EVICTING);
+ break;
+ case EEvictState::EXTERN:
+ Y_VERIFY(old.State == EEvictState::EVICTING || old.State == EEvictState::SELF_CACHED);
+ break;
+ default:
+ break;
+ }
+
+ if (dropped) {
+ if (evict.State == EEvictState::SELF_CACHED) {
+ evict.State = EEvictState::EXTERN; // SELF_CACHED -> EXTERN for dropped
+ }
+ DroppedEvictedBlobs.emplace(evict, meta);
+ } else {
+ EvictedBlobs.emplace(evict, meta);
+ }
+
+ // TODO: update meta if needed
+ db.UpdateEvictBlob(evict, {});
+ return true;
+}
+
+bool TBlobManager::EraseOneToOne(const TEvictedBlob& evict, IBlobManagerDb& db) {
+ db.EraseEvictBlob(evict);
+ return DroppedEvictedBlobs.erase(evict);
+}
+
+bool TBlobManager::LoadOneToOneExport(IBlobManagerDb& db) {
+ EvictedBlobs.clear();
+ DroppedEvictedBlobs.clear();
+
+ TBlobGroupSelector dsGroupSelector(TabletInfo);
+ THashMap<TEvictedBlob, TString> evicted;
+ THashMap<TEvictedBlob, TString> dropped;
+ if (!db.LoadEvicted(evicted, dropped, dsGroupSelector)) {
+ return false;
+ }
+
+ for (auto& [evict, metadata] : evicted) {
+ NKikimrTxColumnShard::TEvictMetadata meta;
+ Y_VERIFY(meta.ParseFromString(metadata));
+
+ EvictedBlobs.emplace(evict, meta);
+ }
+
+ for (auto& [evict, metadata] : dropped) {
+ NKikimrTxColumnShard::TEvictMetadata meta;
+ Y_VERIFY(meta.ParseFromString(metadata));
+
+ DroppedEvictedBlobs.emplace(evict, meta);
+ }
+
+ return true;
+}
+
+TEvictedBlob TBlobManager::GetDropped(const TUnifiedBlobId& blobId, TEvictMetadata& meta) {
+ auto it = DroppedEvictedBlobs.find(TEvictedBlob{.Blob = blobId});
+ if (it != DroppedEvictedBlobs.end()) {
+ meta = it->second;
+ return it->first;
+ }
+ return {};
+}
+
void TBlobManager::DeleteSmallBlob(const TUnifiedBlobId& blobId, IBlobManagerDb& db) {
LOG_S_DEBUG("BlobManager at tablet " << TabletInfo->TabletID << " Delete Small Blob " << blobId);
db.EraseSmallBlob(blobId);
diff --git a/ydb/core/tx/columnshard/blob_manager.h b/ydb/core/tx/columnshard/blob_manager.h
index fb9814ceac7..12611e835e6 100644
--- a/ydb/core/tx/columnshard/blob_manager.h
+++ b/ydb/core/tx/columnshard/blob_manager.h
@@ -12,6 +12,9 @@ namespace NKikimr::NColumnShard {
using NOlap::TUnifiedBlobId;
using NOlap::TBlobRange;
+using NOlap::TEvictedBlob;
+using NOlap::EEvictState;
+using NKikimrTxColumnShard::TEvictMetadata;
// A batch of blobs that are written by a single task.
@@ -79,6 +82,23 @@ public:
virtual void DeleteBlob(const TUnifiedBlobId& blobId, IBlobManagerDb& db) = 0;
};
+// An interface for exporting and caching exported blobs out of ColumnShard index to external storages like S3.
+// Just do not mix it with IBlobManager that use out storage model.
+class IBlobExporter {
+protected:
+ ~IBlobExporter() = default;
+
+public:
+ // Lazily export blob to external object store. Keep it available via blobId.
+ virtual bool ExportOneToOne(const TUnifiedBlobId& blobId, const TEvictMetadata& meta, IBlobManagerDb& db) = 0;
+ virtual bool DropOneToOne(const TUnifiedBlobId& blobId, IBlobManagerDb& db) = 0;
+ virtual bool UpdateOneToOne(TEvictedBlob&& evict, IBlobManagerDb& db, bool& dropped) = 0;
+ virtual bool EraseOneToOne(const TEvictedBlob& evict, IBlobManagerDb& db) = 0;
+ virtual bool LoadOneToOneExport(IBlobManagerDb& db) = 0;
+ //virtual TEvictedBlob GetEvicted(const TUnifiedBlobId& blob, TEvictMetadata& meta) = 0;
+ virtual TEvictedBlob GetDropped(const TUnifiedBlobId& blobId, TEvictMetadata& meta) = 0;
+};
+
// Garbage Collection generation and step
using TGenStep = std::tuple<ui32, ui32>;
@@ -115,7 +135,7 @@ struct TBlobManagerCounters {
};
// The implementation of BlobManager that hides all GC-related details
-class TBlobManager : public IBlobManager, public IBlobInUseTracker {
+class TBlobManager : public IBlobManager, public IBlobExporter, public IBlobInUseTracker {
private:
static constexpr size_t BLOB_COUNT_TO_TRIGGER_GC_DEFAULT = 1000;
static constexpr ui64 GC_INTERVAL_SECONDS_DEFAULT = 60;
@@ -178,6 +198,10 @@ private:
TInstant PreviousGCTime; // Used for delaying next GC if there are too few blobs to collect
+ //
+ std::unordered_map<TEvictedBlob, TEvictMetadata, THash<NKikimr::NOlap::TEvictedBlob>> EvictedBlobs;
+ std::unordered_map<TEvictedBlob, TEvictMetadata, THash<NKikimr::NOlap::TEvictedBlob>> DroppedEvictedBlobs;
+
public:
TBlobManager(TIntrusivePtr<TTabletStorageInfo> tabletInfo, ui32 gen);
@@ -206,6 +230,14 @@ public:
void SaveBlobBatch(TBlobBatch&& blobBatch, IBlobManagerDb& db) override;
void DeleteBlob(const TUnifiedBlobId& blobId, IBlobManagerDb& db) override;
+ // Implementation of IBlobExporter
+ bool ExportOneToOne(const TUnifiedBlobId& blobId, const TEvictMetadata& meta, IBlobManagerDb& db) override;
+ bool DropOneToOne(const TUnifiedBlobId& blob, IBlobManagerDb& db) override;
+ bool UpdateOneToOne(TEvictedBlob&& evict, IBlobManagerDb& db, bool& dropped) override;
+ bool EraseOneToOne(const TEvictedBlob& evict, IBlobManagerDb& db) override;
+ bool LoadOneToOneExport(IBlobManagerDb& db) override;
+ TEvictedBlob GetDropped(const TUnifiedBlobId& blobId, TEvictMetadata& meta) override;
+
// Implementation of IBlobInUseTracker
void SetBlobInUse(const TUnifiedBlobId& blobId, bool inUse) override;
@@ -214,6 +246,29 @@ private:
// Delete small blobs that were previously in use and could not be deleted
void PerformDelayedDeletes(IBlobManagerDb& db);
+
+ bool ExtractEvicted(TEvictedBlob& evict, TEvictMetadata& meta, bool fromDropped = false) {
+ if (fromDropped) {
+ if (DroppedEvictedBlobs.count(evict)) {
+ auto node = DroppedEvictedBlobs.extract(evict);
+ if (!node.empty()) {
+ evict = node.key();
+ meta = node.mapped();
+ return true;
+ }
+ }
+ } else {
+ if (EvictedBlobs.count(evict)) {
+ auto node = EvictedBlobs.extract(evict);
+ if (!node.empty()) {
+ evict = node.key();
+ meta = node.mapped();
+ return true;
+ }
+ }
+ }
+ return false;
+ }
};
}
diff --git a/ydb/core/tx/columnshard/blob_manager_db.cpp b/ydb/core/tx/columnshard/blob_manager_db.cpp
index 839c179f543..52f62b86230 100644
--- a/ydb/core/tx/columnshard/blob_manager_db.cpp
+++ b/ydb/core/tx/columnshard/blob_manager_db.cpp
@@ -106,4 +106,99 @@ void TBlobManagerDb::EraseSmallBlob(const TUnifiedBlobId& blobId) {
db.Table<Schema::SmallBlobs>().Key(blobId.ToStringNew()).Delete();
}
+bool TBlobManagerDb::LoadEvicted(THashMap<TEvictedBlob, TString>& evicted, THashMap<TEvictedBlob, TString>& dropped,
+ const NOlap::IBlobGroupSelector& dsGroupSelector) {
+ evicted.clear();
+ dropped.clear();
+
+ NIceDb::TNiceDb db(Database);
+
+ auto rowset = db.Table<Schema::OneToOneEvictedBlobs>().Select();
+ if (!rowset.IsReady()) {
+ return false;
+ }
+
+ TString error;
+
+ while (!rowset.EndOfSet()) {
+ TString strBlobId = rowset.GetValue<Schema::OneToOneEvictedBlobs::BlobId>();
+ //ui64 size = rowset.GetValue<Schema::OneToOneEvictedBlobs::Size>();
+ ui8 state = rowset.GetValue<Schema::OneToOneEvictedBlobs::State>();
+ bool isDropped = rowset.GetValue<Schema::OneToOneEvictedBlobs::Dropped>();
+ TString meta = rowset.GetValue<Schema::OneToOneEvictedBlobs::Metadata>();
+ TString strExternId = rowset.GetValue<Schema::OneToOneEvictedBlobs::ExternBlobId>();
+ // TODO: CachedBlob
+
+ TUnifiedBlobId blobId = TUnifiedBlobId::ParseFromString(strBlobId, &dsGroupSelector, error);
+ Y_VERIFY(blobId.IsValid(), "%s", error.c_str());
+
+ TUnifiedBlobId externId = TUnifiedBlobId::ParseFromString(strExternId, nullptr, error);
+ Y_VERIFY(externId.IsValid(), "%s", error.c_str());
+
+ TEvictedBlob evict{
+ .State = (EEvictState)state,
+ .Blob = std::move(blobId),
+ .ExternBlob = std::move(externId),
+ };
+
+ if (isDropped) {
+ dropped.emplace(std::move(evict), std::move(meta));
+ } else {
+ evicted.emplace(std::move(evict), std::move(meta));
+ }
+
+ if (!rowset.Next())
+ return false;
+ }
+
+ return true;
+}
+
+void TBlobManagerDb::UpdateEvictBlob(const TEvictedBlob& evict, const TString& meta) {
+ NIceDb::TNiceDb db(Database);
+
+ TString serializedBlobId = evict.Blob.ToStringNew();
+
+ switch (evict.State) {
+ case EEvictState::EVICTING:
+ db.Table<Schema::OneToOneEvictedBlobs>().Key(serializedBlobId).Update(
+ NIceDb::TUpdate<Schema::OneToOneEvictedBlobs::Size>(evict.Blob.BlobSize()),
+ NIceDb::TUpdate<Schema::OneToOneEvictedBlobs::State>((ui8)evict.State),
+ NIceDb::TUpdate<Schema::OneToOneEvictedBlobs::Metadata>(meta)
+ );
+ break;
+ case EEvictState::SELF_CACHED:
+ case EEvictState::EXTERN: {
+ Y_VERIFY(meta.empty());
+ Y_VERIFY(evict.ExternBlob.IsS3Blob());
+ TString serializedExternId = evict.ExternBlob.ToStringNew();
+
+ db.Table<Schema::OneToOneEvictedBlobs>().Key(serializedBlobId).Update(
+ NIceDb::TUpdate<Schema::OneToOneEvictedBlobs::State>((ui8)evict.State),
+ NIceDb::TUpdate<Schema::OneToOneEvictedBlobs::ExternBlobId>(serializedExternId)
+ );
+ break;
+ }
+ default:
+ Y_VERIFY(false);
+ break;
+ }
+}
+
+void TBlobManagerDb::DropEvictBlob(const TEvictedBlob& evict) {
+ NIceDb::TNiceDb db(Database);
+
+ TString serializedBlobId = evict.Blob.ToStringNew();
+ db.Table<Schema::OneToOneEvictedBlobs>().Key(serializedBlobId).Update(
+ NIceDb::TUpdate<Schema::OneToOneEvictedBlobs::State>((ui8)evict.State),
+ NIceDb::TUpdate<Schema::OneToOneEvictedBlobs::Dropped>(true));
+}
+
+void TBlobManagerDb::EraseEvictBlob(const TEvictedBlob& evict) {
+ NIceDb::TNiceDb db(Database);
+
+ TString serializedBlobId = evict.Blob.ToStringNew();
+ db.Table<Schema::OneToOneEvictedBlobs>().Key(serializedBlobId).Delete();
+}
+
}
diff --git a/ydb/core/tx/columnshard/blob_manager_db.h b/ydb/core/tx/columnshard/blob_manager_db.h
index 0ffae5f4724..72f0031c35d 100644
--- a/ydb/core/tx/columnshard/blob_manager_db.h
+++ b/ydb/core/tx/columnshard/blob_manager_db.h
@@ -24,6 +24,13 @@ public:
virtual void EraseBlobToDelete(const TUnifiedBlobId& blobId) = 0;
virtual void WriteSmallBlob(const TUnifiedBlobId& blobId, const TString& data) = 0;
virtual void EraseSmallBlob(const TUnifiedBlobId& blobId) = 0;
+
+ virtual bool LoadEvicted(THashMap<TEvictedBlob, TString>& evicted,
+ THashMap<TEvictedBlob, TString>& dropped,
+ const NOlap::IBlobGroupSelector& dsGroupSelector) = 0;
+ virtual void UpdateEvictBlob(const TEvictedBlob& evict, const TString& meta) = 0;
+ virtual void DropEvictBlob(const TEvictedBlob& evict) = 0;
+ virtual void EraseEvictBlob(const TEvictedBlob& evict) = 0;
};
@@ -45,6 +52,13 @@ public:
void WriteSmallBlob(const TUnifiedBlobId& blobId, const TString& data) override;
void EraseSmallBlob(const TUnifiedBlobId& blobId) override;
+ virtual bool LoadEvicted(THashMap<TEvictedBlob, TString>& evicted,
+ THashMap<TEvictedBlob, TString>& dropped,
+ const NOlap::IBlobGroupSelector& dsGroupSelector) override;
+ void UpdateEvictBlob(const TEvictedBlob& evict, const TString& meta) override;
+ void DropEvictBlob(const TEvictedBlob& evict) override;
+ void EraseEvictBlob(const TEvictedBlob& evict) override;
+
private:
NTable::TDatabase& Database;
};
diff --git a/ydb/core/tx/columnshard/columnshard.cpp b/ydb/core/tx/columnshard/columnshard.cpp
index fd0c913ca3f..93b6898e968 100644
--- a/ydb/core/tx/columnshard/columnshard.cpp
+++ b/ydb/core/tx/columnshard/columnshard.cpp
@@ -11,17 +11,6 @@ IActor* CreateColumnShard(const TActorId& tablet, TTabletStorageInfo* info) {
namespace NKikimr::NColumnShard {
-IActor* CreateIndexingActor(ui64 tabletId, const TActorId& parent);
-IActor* CreateCompactionActor(ui64 tabletId, const TActorId& parent);
-IActor* CreateEvictionActor(ui64 tabletId, const TActorId& parent);
-IActor* CreateWriteActor(ui64 tabletId, const NOlap::TIndexInfo& indexTable,
- const TActorId& dstActor, TBlobBatch&& blobBatch, bool blobGrouppingEnabled,
- TAutoPtr<TEvColumnShard::TEvWrite> ev, const TInstant& deadline = TInstant::Max());
-IActor* CreateWriteActor(ui64 tabletId, const NOlap::TIndexInfo& indexTable,
- const TActorId& dstActor, TBlobBatch&& blobBatch, bool blobGrouppingEnabled,
- TAutoPtr<TEvPrivate::TEvWriteIndex> ev, const TInstant& deadline = TInstant::Max());
-IActor* CreateColumnShardScan(const TActorId& scanComputeActor, ui32 scanId, ui64 txId);
-
void TColumnShard::BecomeBroken(const TActorContext& ctx)
{
Become(&TThis::StateBroken);
@@ -29,14 +18,18 @@ void TColumnShard::BecomeBroken(const TActorContext& ctx)
ctx.Send(IndexingActor, new TEvents::TEvPoisonPill);
ctx.Send(CompactionActor, new TEvents::TEvPoisonPill);
ctx.Send(EvictionActor, new TEvents::TEvPoisonPill);
+ StopS3Actors(ctx);
}
void TColumnShard::SwitchToWork(const TActorContext& ctx) {
Become(&TThis::StateWork);
LOG_S_INFO("Switched to work at " << TabletID() << " actor " << ctx.SelfID);
+
IndexingActor = ctx.Register(CreateIndexingActor(TabletID(), ctx.SelfID));
CompactionActor = ctx.Register(CreateCompactionActor(TabletID(), ctx.SelfID));
EvictionActor = ctx.Register(CreateEvictionActor(TabletID(), ctx.SelfID));
+ InitS3Actors(ctx, true);
+
SignalTabletActive(ctx);
}
@@ -88,131 +81,6 @@ void TColumnShard::Handle(TEvTabletPipe::TEvServerDisconnected::TPtr& ev, const
LOG_S_DEBUG("Server pipe reset at tablet " << TabletID() << ", remote " << tabletId);
}
-void TColumnShard::Handle(TEvColumnShard::TEvProposeTransaction::TPtr& ev, const TActorContext& ctx) {
- auto& record = Proto(ev->Get());
- ui32 txKind = record.GetTxKind();
- ui64 txId = record.GetTxId();
- LOG_S_DEBUG("ProposeTransaction kind " << txKind << " txId " << txId << " at tablet " << TabletID());
-
- Execute(new TTxProposeTransaction(this, ev), ctx);
-}
-
-void TColumnShard::Handle(TEvTxProcessing::TEvPlanStep::TPtr& ev, const TActorContext& ctx) {
- ui64 step = ev->Get()->Record.GetStep();
- ui64 mediatorId = ev->Get()->Record.GetMediatorID();
- LOG_S_DEBUG("PlanStep " << step << " at tablet " << TabletID() << ", mediator " << mediatorId);
-
- Execute(new TTxPlanStep(this, ev), ctx);
-}
-
-// EvWrite -> WriteActor (attach BlobId without proto changes) -> EvWrite
-void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContext& ctx) {
- OnYellowChannels(std::move(ev->Get()->YellowMoveChannels), std::move(ev->Get()->YellowStopChannels));
-
- auto& data = Proto(ev->Get()).GetData();
- const ui64 tableId = ev->Get()->Record.GetTableId();
- bool error = data.empty() || data.size() > TLimits::MAX_BLOB_SIZE || !PrimaryIndex || !IsTableWritable(tableId)
- || ev->Get()->PutStatus == NKikimrProto::ERROR;
-
- if (error) {
- LOG_S_WARN("Write (fail) " << data.size() << " bytes at tablet " << TabletID());
-
- ev->Get()->PutStatus = NKikimrProto::ERROR;
- Execute(new TTxWrite(this, ev), ctx);
- } else if (InsertTable->IsOverloaded(tableId)) {
- LOG_S_INFO("Write (overload) " << data.size() << " bytes for table " << tableId << " at tablet " << TabletID());
-
- ev->Get()->PutStatus = NKikimrProto::TRYLATER;
- Execute(new TTxWrite(this, ev), ctx);
- } else if (ev->Get()->BlobId.IsValid()) {
- LOG_S_DEBUG("Write (record) " << data.size() << " bytes at tablet " << TabletID());
-
- Execute(new TTxWrite(this, ev), ctx);
- } else {
- if (IsAnyChannelYellowStop()) {
- LOG_S_ERROR("Write (out of disk space) at tablet " << TabletID());
-
- IncCounter(COUNTER_OUT_OF_SPACE);
- ev->Get()->PutStatus = NKikimrProto::TRYLATER;
- Execute(new TTxWrite(this, ev), ctx);
- } else {
- LOG_S_DEBUG("Write (blob) " << data.size() << " bytes at tablet " << TabletID());
-
- ev->Get()->MaxSmallBlobSize = Settings.MaxSmallBlobSize;
-
- ctx.Register(CreateWriteActor(TabletID(), PrimaryIndex->GetIndexInfo(), ctx.SelfID,
- BlobManager->StartBlobBatch(), Settings.BlobWriteGrouppingEnabled, ev->Release()));
- }
- }
-}
-
-void TColumnShard::Handle(TEvColumnShard::TEvRead::TPtr& ev, const TActorContext& ctx) {
- const auto* msg = ev->Get();
- TRowVersion readVersion(msg->Record.GetPlanStep(), msg->Record.GetTxId());
- TRowVersion maxReadVersion = GetMaxReadVersion();
- LOG_S_DEBUG("Read at tablet " << TabletID() << " version=" << readVersion << " readable=" << maxReadVersion);
-
- if (maxReadVersion < readVersion) {
- WaitingReads.emplace(readVersion, std::move(ev));
- WaitPlanStep(readVersion.Step);
- return;
- }
-
- Execute(new TTxRead(this, ev), ctx);
-}
-
-void TColumnShard::Handle(TEvPrivate::TEvWriteIndex::TPtr& ev, const TActorContext& ctx) {
- auto& blobs = ev->Get()->Blobs;
- bool isCompaction = ev->Get()->GranuleCompaction;
- if (isCompaction && blobs.empty()) {
- ev->Get()->PutStatus = NKikimrProto::OK;
- }
-
- if (ev->Get()->PutStatus == NKikimrProto::UNKNOWN) {
- if (IsAnyChannelYellowStop()) {
- LOG_S_ERROR("WriteIndex (out of disk space) at tablet " << TabletID());
-
- IncCounter(COUNTER_OUT_OF_SPACE);
- ev->Get()->PutStatus = NKikimrProto::TRYLATER;
- Execute(new TTxWriteIndex(this, ev), ctx);
- } else {
- LOG_S_DEBUG("WriteIndex (" << blobs.size() << " blobs) at tablet " << TabletID());
-
- Y_VERIFY(!blobs.empty());
- ctx.Register(CreateWriteActor(TabletID(), NOlap::TIndexInfo("dummy", 0), ctx.SelfID,
- BlobManager->StartBlobBatch(), Settings.BlobWriteGrouppingEnabled, ev->Release()));
- }
- } else {
- if (ev->Get()->PutStatus == NKikimrProto::OK) {
- LOG_S_DEBUG("WriteIndex (records) at tablet " << TabletID());
- } else {
- LOG_S_INFO("WriteIndex error at tablet " << TabletID());
- }
-
- OnYellowChannels(std::move(ev->Get()->YellowMoveChannels), std::move(ev->Get()->YellowStopChannels));
- Execute(new TTxWriteIndex(this, ev), ctx);
- }
-}
-
-void TColumnShard::Handle(TEvColumnShard::TEvScan::TPtr& ev, const TActorContext& ctx) {
- const auto* msg = ev->Get();
- ui64 txId = msg->Record.GetTxId();
- const auto& snapshot = msg->Record.GetSnapshot();
- TRowVersion readVersion(snapshot.GetStep(), snapshot.GetTxId());
- TRowVersion maxReadVersion = GetMaxReadVersion();
- LOG_S_DEBUG("Scan at tablet " << TabletID() << " version=" << readVersion << " readable=" << maxReadVersion);
-
- if (maxReadVersion < readVersion) {
- WaitingScans.emplace(readVersion, std::move(ev));
- WaitPlanStep(readVersion.Step);
- return;
- }
-
- ScanTxInFlight.insert({txId, TAppData::TimeProvider->Now()});
- SetCounter(COUNTER_SCAN_IN_FLY, ScanTxInFlight.size());
- Execute(new TTxScan(this, ev), ctx);
-}
-
void TColumnShard::Handle(TEvPrivate::TEvScanStats::TPtr& ev, const TActorContext &ctx) {
Y_UNUSED(ctx);
@@ -235,11 +103,6 @@ void TColumnShard::Handle(TEvPrivate::TEvReadFinished::TPtr& ev, const TActorCon
}
}
-void TColumnShard::Handle(TEvColumnShard::TEvReadBlobRanges::TPtr& ev, const TActorContext& ctx) {
- LOG_S_DEBUG("Read blob ranges at tablet " << TabletID() << ev->Get()->Record);
- Execute(new TTxReadBlobRanges(this, ev), ctx);
-}
-
void TColumnShard::Handle(TEvPrivate::TEvPeriodicWakeup::TPtr& ev, const TActorContext& ctx) {
if (ev->Get()->Manual) {
EnqueueBackgroundActivities();
diff --git a/ydb/core/tx/columnshard/columnshard__export.cpp b/ydb/core/tx/columnshard/columnshard__export.cpp
new file mode 100644
index 00000000000..04e4381091e
--- /dev/null
+++ b/ydb/core/tx/columnshard/columnshard__export.cpp
@@ -0,0 +1,118 @@
+#include "columnshard_impl.h"
+#include "blob_manager_db.h"
+
+namespace NKikimr::NColumnShard {
+
+using namespace NTabletFlatExecutor;
+
+class TTxExport : public TTransactionBase<TColumnShard> {
+public:
+ TTxExport(TColumnShard* self, TEvPrivate::TEvExport::TPtr& ev)
+ : TBase(self)
+ , Ev(ev)
+ {}
+
+ bool Execute(TTransactionContext& txc, const TActorContext& ctx) override;
+ void Complete(const TActorContext& ctx) override;
+ TTxType GetTxType() const override { return TXTYPE_EXPORT; }
+
+private:
+ TEvPrivate::TEvExport::TPtr Ev;
+};
+
+
+bool TTxExport::Execute(TTransactionContext& txc, const TActorContext&) {
+ Y_VERIFY(Ev);
+ LOG_S_DEBUG("TTxExport.Execute at tablet " << Self->TabletID());
+
+ txc.DB.NoMoreReadsForTx();
+ //NIceDb::TNiceDb db(txc.DB);
+
+ auto& msg = *Ev->Get();
+ auto status = msg.Status;
+
+ if (status == NKikimrProto::OK) {
+ TBlobManagerDb blobManagerDb(txc.DB);
+
+ for (auto& [blobId, externId] : msg.SrcToDstBlobs) {
+ Y_VERIFY(blobId.IsDsBlob());
+ Y_VERIFY(externId.IsS3Blob());
+ bool dropped = false;
+
+#if 0 // TODO: SELF_CACHED logic
+ NOlap::TEvictedBlob evict{
+ .State = EEvictState::SELF_CACHED,
+ .Blob = blobId,
+ .ExternBlob = externId
+ };
+ Self->BlobManager->UpdateOneToOne(std::move(evict), blobManagerDb, dropped);
+#else
+ NOlap::TEvictedBlob evict{
+ .State = EEvictState::EXTERN,
+ .Blob = blobId,
+ .ExternBlob = externId
+ };
+ bool present = Self->BlobManager->UpdateOneToOne(std::move(evict), blobManagerDb, dropped);
+
+ // Delayed erase of evicted blob. Blob could be already deleted.
+ if (present && !dropped) {
+ Self->BlobManager->DeleteBlob(blobId, blobManagerDb);
+ Self->IncCounter(COUNTER_BLOBS_ERASED);
+ Self->IncCounter(COUNTER_BYTES_ERASED, blobId.BlobSize());
+ }
+
+ // TODO: delete not present in S3 for sure (avoid race between export and forget)
+#endif
+ }
+
+ Self->IncCounter(COUNTER_EXPORT_SUCCESS);
+ } else {
+ Self->IncCounter(COUNTER_EXPORT_FAIL);
+ }
+
+ return true;
+}
+
+void TTxExport::Complete(const TActorContext&) {
+ LOG_S_DEBUG("TTxExport.Complete at tablet " << Self->TabletID());
+}
+
+
+void TColumnShard::Handle(TEvPrivate::TEvExport::TPtr& ev, const TActorContext& ctx) {
+ auto status = ev->Get()->Status;
+ bool error = status == NKikimrProto::ERROR;
+
+ if (error) {
+ LOG_S_WARN("Export (fail): '" << ev->Get()->ErrorStr << "' at tablet " << TabletID());
+ } else if (status == NKikimrProto::UNKNOWN) {
+ ui64 exportNo = ev->Get()->ExportNo;
+ auto& tierBlobs = ev->Get()->TierBlobs;
+ Y_VERIFY(tierBlobs.size());
+
+ LOG_S_DEBUG("Export (write): " << exportNo << " at tablet " << TabletID());
+
+ for (auto& [tierName, blobIds] : tierBlobs) {
+ if (!S3Actors.count(tierName)) {
+ TString tier(tierName);
+ LOG_S_ERROR("No S3 actor for tier '" << tier << "' (on export) at tablet " << TabletID());
+ continue;
+ }
+ auto& s3 = S3Actors[tierName];
+ if (!s3) {
+ TString tier(tierName);
+ LOG_S_ERROR("Not started S3 actor for tier '" << tier << "' (on export) at tablet " << TabletID());
+ continue;
+ }
+ auto event = std::make_unique<TEvPrivate::TEvExport>(exportNo, s3, std::move(blobIds));
+ ctx.Register(CreateExportActor(TabletID(), ctx.SelfID, event.release()));
+ }
+ } else if (status == NKikimrProto::OK) {
+ LOG_S_DEBUG("Export (apply) at tablet " << TabletID());
+
+ Execute(new TTxExport(this, ev), ctx);
+ } else {
+ Y_VERIFY(false);
+ }
+}
+
+}
diff --git a/ydb/core/tx/columnshard/columnshard__forget.cpp b/ydb/core/tx/columnshard/columnshard__forget.cpp
new file mode 100644
index 00000000000..89c5838d37f
--- /dev/null
+++ b/ydb/core/tx/columnshard/columnshard__forget.cpp
@@ -0,0 +1,72 @@
+#include "columnshard_impl.h"
+#include "blob_manager_db.h"
+
+namespace NKikimr::NColumnShard {
+
+using namespace NTabletFlatExecutor;
+
+class TTxForget : public TTransactionBase<TColumnShard> {
+public:
+ TTxForget(TColumnShard* self, TEvPrivate::TEvForget::TPtr& ev)
+ : TBase(self)
+ , Ev(ev)
+ {}
+
+ bool Execute(TTransactionContext& txc, const TActorContext& ctx) override;
+ void Complete(const TActorContext& ctx) override;
+ TTxType GetTxType() const override { return TXTYPE_FORGET; }
+
+private:
+ TEvPrivate::TEvForget::TPtr Ev;
+};
+
+
+bool TTxForget::Execute(TTransactionContext& txc, const TActorContext&) {
+ Y_VERIFY(Ev);
+ LOG_S_DEBUG("TTxForget.Execute at tablet " << Self->TabletID());
+
+ txc.DB.NoMoreReadsForTx();
+ //NIceDb::TNiceDb db(txc.DB);
+
+ auto& msg = *Ev->Get();
+ auto status = msg.Status;
+
+ if (status == NKikimrProto::OK) {
+ TBlobManagerDb blobManagerDb(txc.DB);
+
+ for (auto& evict : msg.Evicted) {
+ bool erased = Self->BlobManager->EraseOneToOne(evict, blobManagerDb);
+ if (!erased) {
+ LOG_S_ERROR("Forget unknown blob " << evict.Blob << " at tablet " << Self->TabletID());
+ }
+ }
+
+ Self->IncCounter(COUNTER_FORGET_SUCCESS);
+ } else {
+ Self->IncCounter(COUNTER_FORGET_FAIL);
+ }
+
+ return true;
+}
+
+void TTxForget::Complete(const TActorContext&) {
+ LOG_S_DEBUG("TTxForget.Complete at tablet " << Self->TabletID());
+}
+
+
+void TColumnShard::Handle(TEvPrivate::TEvForget::TPtr& ev, const TActorContext& ctx) {
+ auto status = ev->Get()->Status;
+ bool error = status == NKikimrProto::ERROR;
+
+ if (error) {
+ LOG_S_WARN("Forget (fail): '" << ev->Get()->ErrorStr << "' at tablet " << TabletID());
+ } else if (status == NKikimrProto::OK) {
+ LOG_S_DEBUG("Forget (apply) at tablet " << TabletID());
+
+ Execute(new TTxForget(this, ev), ctx);
+ } else {
+ Y_VERIFY(false);
+ }
+}
+
+}
diff --git a/ydb/core/tx/columnshard/columnshard__init.cpp b/ydb/core/tx/columnshard/columnshard__init.cpp
index 1a5420361b4..ac8ba69fe1f 100644
--- a/ydb/core/tx/columnshard/columnshard__init.cpp
+++ b/ydb/core/tx/columnshard/columnshard__init.cpp
@@ -63,6 +63,7 @@ bool TTxInit::ReadEverything(TTransactionContext& txc, const TActorContext& ctx)
ready = ready && Schema::GetSpecialValue(db, Schema::EValueIds::LastWriteId, Self->LastWriteId);
ready = ready && Schema::GetSpecialValue(db, Schema::EValueIds::LastPlannedStep, Self->LastPlannedStep);
ready = ready && Schema::GetSpecialValue(db, Schema::EValueIds::LastPlannedTxId, Self->LastPlannedTxId);
+ ready = ready && Schema::GetSpecialValue(db, Schema::EValueIds::LastExportNumber, Self->LastExportNo);
if (!ready)
return false;
@@ -319,6 +320,9 @@ bool TTxInit::ReadEverything(TTransactionContext& txc, const TActorContext& ctx)
if (!Self->BlobManager->LoadState(blobManagerDb)) {
return false;
}
+ if (!Self->BlobManager->LoadOneToOneExport(blobManagerDb)) {
+ return false;
+ }
}
Self->UpdateInsertTableCounters();
diff --git a/ydb/core/tx/columnshard/columnshard__plan_step.cpp b/ydb/core/tx/columnshard/columnshard__plan_step.cpp
index 59fb09dbd93..b34582ba7e7 100644
--- a/ydb/core/tx/columnshard/columnshard__plan_step.cpp
+++ b/ydb/core/tx/columnshard/columnshard__plan_step.cpp
@@ -8,6 +8,24 @@ namespace NKikimr::NColumnShard {
using namespace NTabletFlatExecutor;
+class TTxPlanStep : public NTabletFlatExecutor::TTransactionBase<TColumnShard> {
+public:
+ TTxPlanStep(TColumnShard* self, TEvTxProcessing::TEvPlanStep::TPtr& ev)
+ : TBase(self)
+ , Ev(ev)
+ {}
+
+ bool Execute(TTransactionContext& txc, const TActorContext& ctx) override;
+ void Complete(const TActorContext& ctx) override;
+ TTxType GetTxType() const override { return TXTYPE_PLANSTEP; }
+
+private:
+ TEvTxProcessing::TEvPlanStep::TPtr Ev;
+ THashMap<TActorId, TVector<ui64>> TxAcks;
+ std::unique_ptr<TEvTxProcessing::TEvPlanStepAccepted> Result;
+};
+
+
bool TTxPlanStep::Execute(TTransactionContext& txc, const TActorContext&) {
Y_VERIFY(Ev);
LOG_S_DEBUG("TTxPlanStep.Execute at tablet " << Self->TabletID());
@@ -93,4 +111,13 @@ void TTxPlanStep::Complete(const TActorContext& ctx) {
ctx.Send(Ev->Sender, Result.release());
}
+
+void TColumnShard::Handle(TEvTxProcessing::TEvPlanStep::TPtr& ev, const TActorContext& ctx) {
+ ui64 step = ev->Get()->Record.GetStep();
+ ui64 mediatorId = ev->Get()->Record.GetMediatorID();
+ LOG_S_DEBUG("PlanStep " << step << " at tablet " << TabletID() << ", mediator " << mediatorId);
+
+ Execute(new TTxPlanStep(this, ev), ctx);
+}
+
}
diff --git a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp
index 9caf23e866d..53abef0d31e 100644
--- a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp
+++ b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp
@@ -76,6 +76,7 @@ public:
MakeHolder<TEvColumnShard::TEvNotifyTxCompletionResult>(Self->TabletID(), txId));
}
Self->AltersInFlight.erase(txId);
+ Self->InitS3Actors(ctx, false);
break;
}
case NKikimrTxColumnShard::TX_KIND_COMMIT: {
diff --git a/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp b/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp
index 352500107f7..cb18edaf0da 100644
--- a/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp
+++ b/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp
@@ -6,6 +6,23 @@ namespace NKikimr::NColumnShard {
using namespace NTabletFlatExecutor;
+class TTxProposeTransaction : public NTabletFlatExecutor::TTransactionBase<TColumnShard> {
+public:
+ TTxProposeTransaction(TColumnShard* self, TEvColumnShard::TEvProposeTransaction::TPtr& ev)
+ : TBase(self)
+ , Ev(ev)
+ {}
+
+ bool Execute(TTransactionContext& txc, const TActorContext& ctx) override;
+ void Complete(const TActorContext& ctx) override;
+ TTxType GetTxType() const override { return TXTYPE_PROPOSE; }
+
+private:
+ TEvColumnShard::TEvProposeTransaction::TPtr Ev;
+ std::unique_ptr<TEvColumnShard::TEvProposeTransactionResult> Result;
+};
+
+
bool TTxProposeTransaction::Execute(TTransactionContext& txc, const TActorContext& ctx) {
Y_VERIFY(Ev);
LOG_S_DEBUG("TTxProposeTransaction.Execute at tablet " << Self->TabletID());
@@ -242,4 +259,14 @@ void TTxProposeTransaction::Complete(const TActorContext& ctx) {
Self->TryRegisterMediatorTimeCast();
}
+
+void TColumnShard::Handle(TEvColumnShard::TEvProposeTransaction::TPtr& ev, const TActorContext& ctx) {
+ auto& record = Proto(ev->Get());
+ ui32 txKind = record.GetTxKind();
+ ui64 txId = record.GetTxId();
+ LOG_S_DEBUG("ProposeTransaction kind " << txKind << " txId " << txId << " at tablet " << TabletID());
+
+ Execute(new TTxProposeTransaction(this, ev), ctx);
+}
+
}
diff --git a/ydb/core/tx/columnshard/columnshard__read.cpp b/ydb/core/tx/columnshard/columnshard__read.cpp
index 6dcfc3d703e..73723847c5e 100644
--- a/ydb/core/tx/columnshard/columnshard__read.cpp
+++ b/ydb/core/tx/columnshard/columnshard__read.cpp
@@ -16,16 +16,26 @@ TVector<T> ProtoToVector(const U& cont) {
}
-IActor* CreateReadActor(ui64 tabletId,
- const TActorId& dstActor,
- std::unique_ptr<TEvColumnShard::TEvReadResult>&& event,
- NOlap::TReadMetadata::TConstPtr readMetadata,
- const TInstant& deadline,
- const TActorId& columnShardActorId,
- ui64 requestCookie);
-
using namespace NTabletFlatExecutor;
+class TTxRead : public TTxReadBase {
+public:
+ TTxRead(TColumnShard* self, TEvColumnShard::TEvRead::TPtr& ev)
+ : TTxReadBase(self)
+ , Ev(ev)
+ {}
+
+ bool Execute(TTransactionContext& txc, const TActorContext& ctx) override;
+ void Complete(const TActorContext& ctx) override;
+ TTxType GetTxType() const override { return TXTYPE_READ; }
+
+private:
+ TEvColumnShard::TEvRead::TPtr Ev;
+ std::unique_ptr<TEvColumnShard::TEvReadResult> Result;
+ NOlap::TReadMetadata::TConstPtr ReadMetadata;
+};
+
+
NOlap::TReadMetadata::TPtr
TTxReadBase::PrepareReadMetadata(const TActorContext& ctx, const TReadDescription& read,
const std::unique_ptr<NOlap::TInsertTable>& insertTable,
@@ -293,4 +303,20 @@ void TTxRead::Complete(const TActorContext& ctx) {
}
}
+
+void TColumnShard::Handle(TEvColumnShard::TEvRead::TPtr& ev, const TActorContext& ctx) {
+ const auto* msg = ev->Get();
+ TRowVersion readVersion(msg->Record.GetPlanStep(), msg->Record.GetTxId());
+ TRowVersion maxReadVersion = GetMaxReadVersion();
+ LOG_S_DEBUG("Read at tablet " << TabletID() << " version=" << readVersion << " readable=" << maxReadVersion);
+
+ if (maxReadVersion < readVersion) {
+ WaitingReads.emplace(readVersion, std::move(ev));
+ WaitPlanStep(readVersion.Step);
+ return;
+ }
+
+ Execute(new TTxRead(this, ev), ctx);
+}
+
}
diff --git a/ydb/core/tx/columnshard/columnshard__read_blob_ranges.cpp b/ydb/core/tx/columnshard/columnshard__read_blob_ranges.cpp
index 59383f94ed8..b742a726a73 100644
--- a/ydb/core/tx/columnshard/columnshard__read_blob_ranges.cpp
+++ b/ydb/core/tx/columnshard/columnshard__read_blob_ranges.cpp
@@ -7,6 +7,23 @@ namespace NKikimr::NColumnShard {
using namespace NTabletFlatExecutor;
+class TTxReadBlobRanges : public TTransactionBase<TColumnShard> {
+public:
+ TTxReadBlobRanges(TColumnShard* self, TEvColumnShard::TEvReadBlobRanges::TPtr& ev)
+ : TTransactionBase<TColumnShard>(self)
+ , Ev(ev)
+ {}
+
+ bool Execute(TTransactionContext& txc, const TActorContext& ctx) override;
+ void Complete(const TActorContext& ctx) override;
+ TTxType GetTxType() const override { return TXTYPE_READ_BLOB_RANGES; }
+
+private:
+ TEvColumnShard::TEvReadBlobRanges::TPtr Ev;
+ std::unique_ptr<TEvColumnShard::TEvReadBlobRangesResult> Result;
+};
+
+
// Returns false in case of page fault
bool TryReadValue(NIceDb::TNiceDb& db, const TString& key, TString& value, ui32& readStatus) {
auto rowset = db.Table<Schema::SmallBlobs>().Key(key).Select<Schema::SmallBlobs::Data>();
@@ -101,4 +118,9 @@ void TTxReadBlobRanges::Complete(const TActorContext& ctx) {
LOG_S_DEBUG("TTxReadBlobRanges.Complete at tablet " << Self->TabletID());
}
+void TColumnShard::Handle(TEvColumnShard::TEvReadBlobRanges::TPtr& ev, const TActorContext& ctx) {
+ LOG_S_DEBUG("Read blob ranges at tablet " << TabletID() << ev->Get()->Record);
+ Execute(new TTxReadBlobRanges(this, ev), ctx);
+}
+
}
diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp
index 697114c0b84..17a5b58e0c6 100644
--- a/ydb/core/tx/columnshard/columnshard__scan.cpp
+++ b/ydb/core/tx/columnshard/columnshard__scan.cpp
@@ -17,6 +17,29 @@ namespace NKikimr::NColumnShard {
using namespace NKqp;
using NBlobCache::TBlobRange;
+class TTxScan : public TTxReadBase {
+public:
+ using TReadMetadataPtr = NOlap::TReadMetadataBase::TConstPtr;
+
+ TTxScan(TColumnShard* self, TEvColumnShard::TEvScan::TPtr& ev)
+ : TTxReadBase(self)
+ , Ev(ev)
+ {}
+
+ bool Execute(TTransactionContext& txc, const TActorContext& ctx) override;
+ void Complete(const TActorContext& ctx) override;
+ TTxType GetTxType() const override { return TXTYPE_START_SCAN; }
+
+private:
+ NOlap::TReadMetadataBase::TConstPtr CreateReadMetadata(const TActorContext& ctx, TReadDescription& read,
+ bool isIndexStats, bool isReverse, ui64 limit);
+
+private:
+ TEvColumnShard::TEvScan::TPtr Ev;
+ TVector<TReadMetadataPtr> ReadMetadataRanges;
+};
+
+
constexpr ui64 INIT_BATCH_ROWS = 1000;
constexpr i64 DEFAULT_READ_AHEAD_BYTES = 1*1024*1024;
constexpr TDuration SCAN_HARD_TIMEOUT = TDuration::Minutes(10);
@@ -721,4 +744,24 @@ void TTxScan::Complete(const TActorContext& ctx) {
scanId, txId, scanGen, requestCookie, table, timeout, std::move(ReadMetadataRanges), dataFormat));
}
+
+void TColumnShard::Handle(TEvColumnShard::TEvScan::TPtr& ev, const TActorContext& ctx) {
+ const auto* msg = ev->Get();
+ ui64 txId = msg->Record.GetTxId();
+ const auto& snapshot = msg->Record.GetSnapshot();
+ TRowVersion readVersion(snapshot.GetStep(), snapshot.GetTxId());
+ TRowVersion maxReadVersion = GetMaxReadVersion();
+ LOG_S_DEBUG("Scan at tablet " << TabletID() << " version=" << readVersion << " readable=" << maxReadVersion);
+
+ if (maxReadVersion < readVersion) {
+ WaitingScans.emplace(readVersion, std::move(ev));
+ WaitPlanStep(readVersion.Step);
+ return;
+ }
+
+ ScanTxInFlight.insert({txId, TAppData::TimeProvider->Now()});
+ SetCounter(COUNTER_SCAN_IN_FLY, ScanTxInFlight.size());
+ Execute(new TTxScan(this, ev), ctx);
+}
+
}
diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp
index 76fb8669609..af19dcca408 100644
--- a/ydb/core/tx/columnshard/columnshard__write.cpp
+++ b/ydb/core/tx/columnshard/columnshard__write.cpp
@@ -1,5 +1,4 @@
#include "columnshard_impl.h"
-#include "columnshard_txs.h"
#include "columnshard_schema.h"
#include "blob_manager_db.h"
#include "blob_cache.h"
@@ -8,6 +7,23 @@ namespace NKikimr::NColumnShard {
using namespace NTabletFlatExecutor;
+class TTxWrite : public TTransactionBase<TColumnShard> {
+public:
+ TTxWrite(TColumnShard* self, TEvColumnShard::TEvWrite::TPtr& ev)
+ : TBase(self)
+ , Ev(ev)
+ {}
+
+ bool Execute(TTransactionContext& txc, const TActorContext& ctx) override;
+ void Complete(const TActorContext& ctx) override;
+ TTxType GetTxType() const override { return TXTYPE_WRITE; }
+
+private:
+ TEvColumnShard::TEvWrite::TPtr Ev;
+ std::unique_ptr<TEvColumnShard::TEvWriteResult> Result;
+};
+
+
bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
Y_VERIFY(Ev);
LOG_S_DEBUG("TTxWrite.Execute at tablet " << Self->TabletID());
@@ -105,4 +121,46 @@ void TTxWrite::Complete(const TActorContext& ctx) {
ctx.Send(Ev->Get()->GetSource(), Result.release());
}
+
+// EvWrite -> WriteActor (attach BlobId without proto changes) -> EvWrite
+void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContext& ctx) {
+ OnYellowChannels(std::move(ev->Get()->YellowMoveChannels), std::move(ev->Get()->YellowStopChannels));
+
+ auto& data = Proto(ev->Get()).GetData();
+ const ui64 tableId = ev->Get()->Record.GetTableId();
+ bool error = data.empty() || data.size() > TLimits::MAX_BLOB_SIZE || !PrimaryIndex || !IsTableWritable(tableId)
+ || ev->Get()->PutStatus == NKikimrProto::ERROR;
+
+ if (error) {
+ LOG_S_WARN("Write (fail) " << data.size() << " bytes at tablet " << TabletID());
+
+ ev->Get()->PutStatus = NKikimrProto::ERROR;
+ Execute(new TTxWrite(this, ev), ctx);
+ } else if (InsertTable->IsOverloaded(tableId)) {
+ LOG_S_INFO("Write (overload) " << data.size() << " bytes for table " << tableId << " at tablet " << TabletID());
+
+ ev->Get()->PutStatus = NKikimrProto::TRYLATER;
+ Execute(new TTxWrite(this, ev), ctx);
+ } else if (ev->Get()->BlobId.IsValid()) {
+ LOG_S_DEBUG("Write (record) " << data.size() << " bytes at tablet " << TabletID());
+
+ Execute(new TTxWrite(this, ev), ctx);
+ } else {
+ if (IsAnyChannelYellowStop()) {
+ LOG_S_ERROR("Write (out of disk space) at tablet " << TabletID());
+
+ IncCounter(COUNTER_OUT_OF_SPACE);
+ ev->Get()->PutStatus = NKikimrProto::TRYLATER;
+ Execute(new TTxWrite(this, ev), ctx);
+ } else {
+ LOG_S_DEBUG("Write (blob) " << data.size() << " bytes at tablet " << TabletID());
+
+ ev->Get()->MaxSmallBlobSize = Settings.MaxSmallBlobSize;
+
+ ctx.Register(CreateWriteActor(TabletID(), PrimaryIndex->GetIndexInfo(), ctx.SelfID,
+ BlobManager->StartBlobBatch(), Settings.BlobWriteGrouppingEnabled, ev->Release()));
+ }
+ }
+}
+
}
diff --git a/ydb/core/tx/columnshard/columnshard__write_index.cpp b/ydb/core/tx/columnshard/columnshard__write_index.cpp
index 7b6b67a26b2..23ca51fe5fb 100644
--- a/ydb/core/tx/columnshard/columnshard__write_index.cpp
+++ b/ydb/core/tx/columnshard/columnshard__write_index.cpp
@@ -8,6 +8,28 @@ namespace NKikimr::NColumnShard {
using namespace NTabletFlatExecutor;
+/// Common transaction for WriteIndex and GranuleCompaction.
+/// For WriteIndex it writes new portion from InsertTable into index.
+/// For GranuleCompaction it writes new portion of indexed data and mark old data with "switching" snapshot.
+class TTxWriteIndex : public TTransactionBase<TColumnShard> {
+public:
+ TTxWriteIndex(TColumnShard* self, TEvPrivate::TEvWriteIndex::TPtr& ev)
+ : TBase(self)
+ , Ev(ev)
+ {}
+
+ bool Execute(TTransactionContext& txc, const TActorContext& ctx) override;
+ void Complete(const TActorContext& ctx) override;
+ TTxType GetTxType() const override { return TXTYPE_WRITE_INDEX; }
+
+private:
+ TEvPrivate::TEvWriteIndex::TPtr Ev;
+ THashMap<TUnifiedBlobId, TString> BlobsToExport;
+ THashMap<TString, std::vector<NOlap::TEvictedBlob>> TierBlobsToForget;
+ ui64 ExportNo = 0;
+};
+
+
bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext&) {
Y_VERIFY(Ev);
Y_VERIFY(Self->InsertTable);
@@ -92,6 +114,29 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext&) {
}
Self->IncCounter(COUNTER_EVICTION_PORTIONS_WRITTEN, changes->PortionsToEvict.size());
+ for (auto& [portionInfo, _] : changes->PortionsToEvict) {
+ auto& tierName = portionInfo.TierName;
+ if (tierName.empty()) {
+ continue;
+ }
+
+ // Mark exported blobs
+ Y_VERIFY(Self->TierConfigs.count(tierName));
+ auto& config = Self->TierConfigs[tierName];
+
+ if (config.NeedExport()) {
+ for (auto& rec : portionInfo.Records) {
+ auto& blobId = rec.BlobRange.BlobId;
+ if (!BlobsToExport.count(blobId)) {
+ BlobsToExport.emplace(blobId, tierName);
+
+ NKikimrTxColumnShard::TEvictMetadata meta;
+ meta.SetTierName(tierName);
+ Self->BlobManager->ExportOneToOne(blobId, meta, blobManagerDb);
+ }
+ }
+ }
+ }
const auto& portionsToDrop = changes->PortionsToDrop;
THashSet<TUnifiedBlobId> blobsToDrop;
@@ -104,14 +149,33 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext&) {
}
// Note: RAW_BYTES_ERASED and BYTES_ERASED counters are not in sync for evicted data
- const auto& evictedRecords = changes->EvictedRecords;
- for (const auto& rec : evictedRecords) {
- blobsToDrop.insert(rec.BlobRange.BlobId);
+ THashSet<TUnifiedBlobId> blobsToEvict;
+ for (const auto& rec : changes->EvictedRecords) {
+ blobsToEvict.insert(rec.BlobRange.BlobId);
}
- Self->IncCounter(COUNTER_BLOBS_ERASED, blobsToDrop.size());
for (const auto& blobId : blobsToDrop) {
+ if (Self->BlobManager->DropOneToOne(blobId, blobManagerDb)) {
+ TEvictMetadata meta;
+ auto evict = Self->BlobManager->GetDropped(blobId, meta);
+ Y_VERIFY(evict.State != EEvictState::UNKNOWN);
+ bool deleted = ui8(evict.State) >= ui8(EEvictState::EXTERN); // !EVICTING and !SELF_CACHED
+ TierBlobsToForget[meta.GetTierName()].emplace_back(std::move(evict));
+ if (deleted) {
+ continue;
+ }
+ }
Self->BlobManager->DeleteBlob(blobId, blobManagerDb);
+ Self->IncCounter(COUNTER_BLOBS_ERASED);
+ Self->IncCounter(COUNTER_BYTES_ERASED, blobId.BlobSize());
+ }
+ for (const auto& blobId : blobsToEvict) {
+ if (BlobsToExport.count(blobId)) {
+ // DS to S3 eviction. Keep source blob in DS till EEvictState::EXTERN state.
+ continue;
+ }
+ Self->BlobManager->DeleteBlob(blobId, blobManagerDb);
+ Self->IncCounter(COUNTER_BLOBS_ERASED);
Self->IncCounter(COUNTER_BYTES_ERASED, blobId.BlobSize());
}
@@ -134,6 +198,13 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext&) {
<< ") cannot write index blobs at tablet " << Self->TabletID());
}
+ if (BlobsToExport.size()) {
+ ExportNo = ++Self->LastExportNo;
+
+ NIceDb::TNiceDb db(txc.DB);
+ Schema::SaveSpecialValue(db, Schema::EValueIds::LastExportNumber, Self->LastExportNo);
+ }
+
if (changes->IsInsert()) {
Self->ActiveIndexing = false;
@@ -180,6 +251,67 @@ void TTxWriteIndex::Complete(const TActorContext& ctx) {
} else {
Self->EnqueueBackgroundActivities();
}
+
+ if (ExportNo) {
+ Y_VERIFY(BlobsToExport.size());
+ THashMap<TString, THashMap<TUnifiedBlobId, TString>> tierBlobs;
+ for (auto& [blobId, tierName] : BlobsToExport) {
+ tierBlobs[tierName].emplace(blobId, TString());
+ }
+ ctx.Send(Self->SelfId(), new TEvPrivate::TEvExport(ExportNo, std::move(tierBlobs)));
+ }
+
+ for (auto [tierName, blobs] : TierBlobsToForget) {
+ if (!Self->S3Actors.count(tierName)) {
+ TString tier(tierName);
+ LOG_S_ERROR("No S3 actor for tier '" << tier << "' (on forget) at tablet " << Self->TabletID());
+ continue;
+ }
+ auto& s3 = Self->S3Actors[tierName];
+ if (!s3) {
+ TString tier(tierName);
+ LOG_S_ERROR("Not started S3 actor for tier '" << tier << "' (on forget) at tablet " << Self->TabletID());
+ continue;
+ }
+
+ auto forget = std::make_unique<TEvPrivate::TEvForget>();
+ forget->Evicted = std::move(blobs);
+ ctx.Send(s3, forget.release());
+ }
+}
+
+
+void TColumnShard::Handle(TEvPrivate::TEvWriteIndex::TPtr& ev, const TActorContext& ctx) {
+ auto& blobs = ev->Get()->Blobs;
+ bool isCompaction = ev->Get()->GranuleCompaction;
+ if (isCompaction && blobs.empty()) {
+ ev->Get()->PutStatus = NKikimrProto::OK;
+ }
+
+ if (ev->Get()->PutStatus == NKikimrProto::UNKNOWN) {
+ if (IsAnyChannelYellowStop()) {
+ LOG_S_ERROR("WriteIndex (out of disk space) at tablet " << TabletID());
+
+ IncCounter(COUNTER_OUT_OF_SPACE);
+ ev->Get()->PutStatus = NKikimrProto::TRYLATER;
+ Execute(new TTxWriteIndex(this, ev), ctx);
+ } else {
+ LOG_S_DEBUG("WriteIndex (" << blobs.size() << " blobs) at tablet " << TabletID());
+
+ Y_VERIFY(!blobs.empty());
+ ctx.Register(CreateWriteActor(TabletID(), NOlap::TIndexInfo("dummy", 0), ctx.SelfID,
+ BlobManager->StartBlobBatch(), Settings.BlobWriteGrouppingEnabled, ev->Release()));
+ }
+ } else {
+ if (ev->Get()->PutStatus == NKikimrProto::OK) {
+ LOG_S_DEBUG("WriteIndex (records) at tablet " << TabletID());
+ } else {
+ LOG_S_INFO("WriteIndex error at tablet " << TabletID());
+ }
+
+ OnYellowChannels(std::move(ev->Get()->YellowMoveChannels), std::move(ev->Get()->YellowStopChannels));
+ Execute(new TTxWriteIndex(this, ev), ctx);
+ }
}
}
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp
index b1b866aa760..abd00790d9a 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.cpp
+++ b/ydb/core/tx/columnshard/columnshard_impl.cpp
@@ -843,15 +843,57 @@ NOlap::TIndexInfo TColumnShard::ConvertSchema(const NKikimrSchemeOp::TColumnTabl
}
for (auto& tierConfig : schema.GetStorageTiers()) {
- NOlap::TStorageTier tier;
- tier.Name = tierConfig.GetName();
+ auto& tierName = tierConfig.GetName();
+ TierConfigs[tierName] = TTierConfig{tierConfig};
+
+ NOlap::TStorageTier tier{ .Name = tierName };
if (tierConfig.HasCompression()) {
tier.Compression = ConvertCompression(tierConfig.GetCompression());
}
+ if (TierConfigs[tierName].NeedExport()) {
+ S3Actors[tierName] = {}; // delayed actor creation
+ }
indexInfo.AddStorageTier(std::move(tier));
}
return indexInfo;
}
+ui32 TColumnShard::InitS3Actors(const TActorContext& ctx, bool init) {
+ ui32 count = 0;
+#ifndef KIKIMR_DISABLE_S3_OPS
+ for (auto& [tierName, actor] : S3Actors) {
+ if (!init && actor) {
+ continue;
+ }
+
+ Y_VERIFY(!actor);
+ Y_VERIFY(TierConfigs.count(tierName));
+ auto& tierConfig = TierConfigs[tierName];
+ Y_VERIFY(tierConfig.NeedExport());
+
+ actor = ctx.Register(CreateS3Actor(TabletID(), ctx.SelfID, tierName));
+ ctx.Send(actor, new TEvPrivate::TEvS3Settings(tierConfig.S3Settings()));
+ ++count;
+ }
+#else
+ Y_UNUSED(ctx);
+ Y_UNUSED(init);
+#endif
+ return count;
+}
+
+void TColumnShard::StopS3Actors(const TActorContext& ctx) {
+#ifndef KIKIMR_DISABLE_S3_OPS
+ for (auto& [_, actor] : S3Actors) {
+ if (actor) {
+ ctx.Send(actor, new TEvents::TEvPoisonPill);
+ actor = {};
+ }
+ }
+#else
+ Y_UNUSED(ctx);
+#endif
+}
+
}
diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h
index b067efa3d72..b0959555536 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.h
+++ b/ydb/core/tx/columnshard/columnshard_impl.h
@@ -17,6 +17,28 @@ namespace NKikimr::NColumnShard {
extern bool gAllowLogBatchingDefaultValue;
+IActor* CreateIndexingActor(ui64 tabletId, const TActorId& parent);
+IActor* CreateCompactionActor(ui64 tabletId, const TActorId& parent);
+IActor* CreateEvictionActor(ui64 tabletId, const TActorId& parent);
+IActor* CreateWriteActor(ui64 tabletId, const NOlap::TIndexInfo& indexTable,
+ const TActorId& dstActor, TBlobBatch&& blobBatch, bool blobGrouppingEnabled,
+ TAutoPtr<TEvColumnShard::TEvWrite> ev, const TInstant& deadline = TInstant::Max());
+IActor* CreateWriteActor(ui64 tabletId, const NOlap::TIndexInfo& indexTable,
+ const TActorId& dstActor, TBlobBatch&& blobBatch, bool blobGrouppingEnabled,
+ TAutoPtr<TEvPrivate::TEvWriteIndex> ev, const TInstant& deadline = TInstant::Max());
+IActor* CreateReadActor(ui64 tabletId,
+ const TActorId& dstActor,
+ std::unique_ptr<TEvColumnShard::TEvReadResult>&& event,
+ NOlap::TReadMetadata::TConstPtr readMetadata,
+ const TInstant& deadline,
+ const TActorId& columnShardActorId,
+ ui64 requestCookie);
+IActor* CreateColumnShardScan(const TActorId& scanComputeActor, ui32 scanId, ui64 txId);
+IActor* CreateExportActor(ui64 tabletId, const TActorId& dstActor, TAutoPtr<TEvPrivate::TEvExport> ev);
+#ifndef KIKIMR_DISABLE_S3_OPS
+IActor* CreateS3Actor(ui64 tabletId, const TActorId& parent, const TString& tierName);
+#endif
+
struct TSettings {
TControlWrapper BlobWriteGrouppingEnabled;
TControlWrapper CacheDataAfterIndexing;
@@ -57,6 +79,8 @@ class TColumnShard
friend class TTxRead;
friend class TTxScan;
friend class TTxWriteIndex;
+ friend class TTxExport;
+ friend class TTxForget;
friend class TTxRunGC;
friend class TTxProcessGCResult;
friend class TTxReadBlobRanges;
@@ -85,6 +109,8 @@ class TColumnShard
void Handle(TEvPrivate::TEvReadFinished::TPtr &ev, const TActorContext &ctx);
void Handle(TEvPrivate::TEvPeriodicWakeup::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPrivate::TEvWriteIndex::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvPrivate::TEvExport::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvPrivate::TEvForget::TPtr& ev, const TActorContext& ctx);
void Handle(TEvBlobStorage::TEvCollectGarbageResult::TPtr& ev, const TActorContext& ctx);
ITransaction* CreateTxInitSchema();
@@ -178,6 +204,8 @@ protected:
HFunc(TEvMediatorTimecast::TEvNotifyPlanStep, Handle);
HFunc(TEvBlobStorage::TEvCollectGarbageResult, Handle);
HFunc(TEvPrivate::TEvWriteIndex, Handle);
+ HFunc(TEvPrivate::TEvExport, Handle);
+ HFunc(TEvPrivate::TEvForget, Handle);
HFunc(TEvPrivate::TEvScanStats, Handle);
HFunc(TEvPrivate::TEvReadFinished, Handle);
HFunc(TEvPrivate::TEvPeriodicWakeup, Handle);
@@ -283,6 +311,21 @@ private:
}
};
+ struct TTierConfig {
+ using TTierProto = NKikimrSchemeOp::TStorageTierConfig;
+ using TS3SettingsProto = NKikimrSchemeOp::TS3Settings;
+
+ TTierProto Proto;
+
+ bool NeedExport() const {
+ return Proto.HasObjectStorage();
+ }
+
+ const TS3SettingsProto& S3Settings() const {
+ return Proto.GetObjectStorage();
+ }
+ };
+
struct TLongTxWriteInfo {
ui64 WriteId;
NLongTxService::TLongTxId LongTxId;
@@ -296,6 +339,7 @@ private:
ui64 LastPlannedStep = 0;
ui64 LastPlannedTxId = 0;
ui64 LastCompactedGranule = 0;
+ ui64 LastExportNo = 0;
TIntrusivePtr<TMediatorTimecastEntry> MediatorTimeCastEntry;
bool MediatorTimeCastRegistered = false;
@@ -309,11 +353,13 @@ private:
TActorId IndexingActor; // It's logically bounded to 1: we move each portion of data to multiple indices.
TActorId CompactionActor; // It's memory bounded to 1: we have no memory for parallel compation.
TActorId EvictionActor;
+ THashMap<TString, TActorId> S3Actors;
std::unique_ptr<TTabletCountersBase> TabletCountersPtr;
TTabletCountersBase* TabletCounters;
std::unique_ptr<NTabletPipe::IClientCache> PipeClientCache;
std::unique_ptr<NOlap::TInsertTable> InsertTable;
std::unique_ptr<NOlap::IColumnEngine> PrimaryIndex;
+ THashMap<TString, TTierConfig> TierConfigs;
TTtl Ttl;
THashMap<ui64, TBasicTxInfo> BasicTxInfo;
@@ -379,6 +425,8 @@ private:
void SetPrimaryIndex(TMap<NOlap::TSnapshot, NOlap::TIndexInfo>&& schemaVersions);
NOlap::TIndexInfo ConvertSchema(const NKikimrSchemeOp::TColumnTableSchema& schema);
+ ui32 InitS3Actors(const TActorContext& ctx, bool init);
+ void StopS3Actors(const TActorContext& ctx);
std::unique_ptr<TEvPrivate::TEvIndexing> SetupIndexation();
std::unique_ptr<TEvPrivate::TEvCompaction> SetupCompaction();
diff --git a/ydb/core/tx/columnshard/columnshard_schema.h b/ydb/core/tx/columnshard/columnshard_schema.h
index 9c178796aed..229fc91c4c8 100644
--- a/ydb/core/tx/columnshard/columnshard_schema.h
+++ b/ydb/core/tx/columnshard/columnshard_schema.h
@@ -44,9 +44,9 @@ struct Schema : NIceDb::Schema {
LastPlannedTxId = 5,
LastSchemaSeqNoGeneration = 6,
LastSchemaSeqNoRound = 7,
-
LastGcBarrierGen = 8,
LastGcBarrierStep = 9,
+ LastExportNumber = 10,
};
enum class EInsertTableIds : ui8 {
@@ -168,6 +168,20 @@ struct Schema : NIceDb::Schema {
using TColumns = TableColumns<BlobId, Data>;
};
+ struct OneToOneEvictedBlobs : Table<13> {
+ struct BlobId : Column<1, NScheme::NTypeIds::String> {};
+ struct Size : Column<2, NScheme::NTypeIds::Uint32> {}; // extracted from BlobId for better introspection
+ struct State : Column<3, NScheme::NTypeIds::Byte> {}; // evicting -> (self) cached <-> exported
+ struct Dropped : Column<4, NScheme::NTypeIds::Bool> {};
+ struct Metadata : Column<5, NScheme::NTypeIds::String> {}; // NKikimrTxColumnShard.TEvictMetadata
+ struct ExternBlobId : Column<6, NScheme::NTypeIds::String> {};
+ //struct Format : Column<7, NScheme::NTypeIds::Byte> {};
+ //struct CachedBlobId : Column<8, NScheme::NTypeIds::String> {}; // TODO
+
+ using TKey = TableKey<BlobId>;
+ using TColumns = TableColumns<BlobId, Size, State, Dropped, Metadata, ExternBlobId>;
+ };
+
// Index tables
// InsertTable - common for all indices
@@ -241,7 +255,8 @@ struct Schema : NIceDb::Schema {
IndexGranules,
IndexColumns,
IndexCounters,
- SmallBlobs
+ SmallBlobs,
+ OneToOneEvictedBlobs
>;
//
diff --git a/ydb/core/tx/columnshard/columnshard_txs.h b/ydb/core/tx/columnshard/columnshard_txs.h
index a5662a6876e..a3ecac75a39 100644
--- a/ydb/core/tx/columnshard/columnshard_txs.h
+++ b/ydb/core/tx/columnshard/columnshard_txs.h
@@ -10,10 +10,6 @@
#include <ydb/core/tablet_flat/tablet_flat_executor.h>
#include <ydb/core/tx/tx_processing.h>
-namespace arrow {
- class Schema;
-}
-
namespace NKikimr::NColumnShard {
struct TEvPrivate {
@@ -24,6 +20,9 @@ struct TEvPrivate {
EvReadFinished,
EvPeriodicWakeup,
EvEviction,
+ EvS3Settings,
+ EvExport,
+ EvForget,
EvEnd
};
@@ -85,6 +84,47 @@ struct TEvPrivate {
}
};
+ struct TEvS3Settings : public TEventLocal<TEvS3Settings, EvS3Settings> {
+ NKikimrSchemeOp::TS3Settings Settings;
+
+ explicit TEvS3Settings(const NKikimrSchemeOp::TS3Settings& settings)
+ : Settings(settings)
+ {}
+ };
+
+ struct TEvExport : public TEventLocal<TEvExport, EvExport> {
+ using TBlobDataMap = THashMap<TUnifiedBlobId, TString>;
+
+ NKikimrProto::EReplyStatus Status = NKikimrProto::UNKNOWN;
+ ui64 ExportNo{};
+ TActorId DstActor;
+ THashMap<TString, TBlobDataMap> TierBlobs;
+ TBlobDataMap Blobs;
+ THashMap<TUnifiedBlobId, TUnifiedBlobId> SrcToDstBlobs;
+ TString ErrorStr;
+
+ explicit TEvExport(ui64 exportNo, THashMap<TString, TBlobDataMap>&& tierBlobs)
+ : ExportNo(exportNo)
+ , TierBlobs(std::move(tierBlobs))
+ {
+ Y_VERIFY(ExportNo);
+ }
+
+ TEvExport(ui64 exportNo, TActorId dstActor, TBlobDataMap&& blobs)
+ : ExportNo(exportNo)
+ , DstActor(dstActor)
+ , Blobs(std::move(blobs))
+ {
+ Y_VERIFY(ExportNo);
+ }
+ };
+
+ struct TEvForget : public TEventLocal<TEvForget, EvForget> {
+ NKikimrProto::EReplyStatus Status = NKikimrProto::UNKNOWN;
+ std::vector<NOlap::TEvictedBlob> Evicted;
+ TString ErrorStr;
+ };
+
struct TEvScanStats : public TEventLocal<TEvScanStats, EvScanStats> {
TEvScanStats(ui64 rows, ui64 bytes) : Rows(rows), Bytes(bytes) {}
ui64 Rows;
@@ -155,58 +195,6 @@ public:
TTxType GetTxType() const override { return TXTYPE_UPDATE_SCHEMA; }
};
-/// Propose deterministic tx
-class TTxProposeTransaction : public NTabletFlatExecutor::TTransactionBase<TColumnShard> {
-public:
- TTxProposeTransaction(TColumnShard* self, TEvColumnShard::TEvProposeTransaction::TPtr& ev)
- : TBase(self)
- , Ev(ev)
- {}
-
- bool Execute(TTransactionContext& txc, const TActorContext& ctx) override;
- void Complete(const TActorContext& ctx) override;
- TTxType GetTxType() const override { return TXTYPE_PROPOSE; }
-
-private:
- TEvColumnShard::TEvProposeTransaction::TPtr Ev;
- std::unique_ptr<TEvColumnShard::TEvProposeTransactionResult> Result;
-};
-
-/// Plan deterministic txs
-class TTxPlanStep : public NTabletFlatExecutor::TTransactionBase<TColumnShard> {
-public:
- TTxPlanStep(TColumnShard* self, TEvTxProcessing::TEvPlanStep::TPtr& ev)
- : TBase(self)
- , Ev(ev)
- {}
-
- bool Execute(TTransactionContext& txc, const TActorContext& ctx) override;
- void Complete(const TActorContext& ctx) override;
- TTxType GetTxType() const override { return TXTYPE_PLANSTEP; }
-
-private:
- TEvTxProcessing::TEvPlanStep::TPtr Ev;
- THashMap<TActorId, TVector<ui64>> TxAcks;
- std::unique_ptr<TEvTxProcessing::TEvPlanStepAccepted> Result;
-};
-
-/// Write portion of data in OLAP transaction
-class TTxWrite : public TTransactionBase<TColumnShard> {
-public:
- TTxWrite(TColumnShard* self, TEvColumnShard::TEvWrite::TPtr& ev)
- : TBase(self)
- , Ev(ev)
- {}
-
- bool Execute(TTransactionContext& txc, const TActorContext& ctx) override;
- void Complete(const TActorContext& ctx) override;
- TTxType GetTxType() const override { return TXTYPE_WRITE; }
-
-private:
- TEvColumnShard::TEvWrite::TPtr Ev;
- std::unique_ptr<TEvColumnShard::TEvWriteResult> Result;
-};
-
/// Read portion of data in OLAP transaction
class TTxReadBase : public TTransactionBase<TColumnShard> {
protected:
@@ -234,79 +222,4 @@ protected:
TString ErrorDescription;
};
-class TTxRead : public TTxReadBase {
-public:
- TTxRead(TColumnShard* self, TEvColumnShard::TEvRead::TPtr& ev)
- : TTxReadBase(self)
- , Ev(ev)
- {}
-
- bool Execute(TTransactionContext& txc, const TActorContext& ctx) override;
- void Complete(const TActorContext& ctx) override;
- TTxType GetTxType() const override { return TXTYPE_READ; }
-
-private:
- TEvColumnShard::TEvRead::TPtr Ev;
- std::unique_ptr<TEvColumnShard::TEvReadResult> Result;
- NOlap::TReadMetadata::TConstPtr ReadMetadata;
-};
-
-class TTxScan : public TTxReadBase {
-public:
- using TReadMetadataPtr = NOlap::TReadMetadataBase::TConstPtr;
-
- TTxScan(TColumnShard* self, TEvColumnShard::TEvScan::TPtr& ev)
- : TTxReadBase(self)
- , Ev(ev)
- {}
-
- bool Execute(TTransactionContext& txc, const TActorContext& ctx) override;
- void Complete(const TActorContext& ctx) override;
- TTxType GetTxType() const override { return TXTYPE_START_SCAN; }
-
-private:
- NOlap::TReadMetadataBase::TConstPtr CreateReadMetadata(const TActorContext& ctx, TReadDescription& read,
- bool isIndexStats, bool isReverse, ui64 limit);
-
-private:
- TEvColumnShard::TEvScan::TPtr Ev;
- TVector<TReadMetadataPtr> ReadMetadataRanges;
-};
-
-
-class TTxReadBlobRanges : public TTransactionBase<TColumnShard> {
-public:
- TTxReadBlobRanges(TColumnShard* self, TEvColumnShard::TEvReadBlobRanges::TPtr& ev)
- : TTransactionBase<TColumnShard>(self)
- , Ev(ev)
- {}
-
- bool Execute(TTransactionContext& txc, const TActorContext& ctx) override;
- void Complete(const TActorContext& ctx) override;
- TTxType GetTxType() const override { return TXTYPE_READ_BLOB_RANGES; }
-
-private:
- TEvColumnShard::TEvReadBlobRanges::TPtr Ev;
- std::unique_ptr<TEvColumnShard::TEvReadBlobRangesResult> Result;
-};
-
-
-/// Common transaction for WriteIndex and GranuleCompaction.
-/// For WriteIndex it writes new portion from InsertTable into index.
-/// For GranuleCompaction it writes new portion of indexed data and mark old data with "switching" snapshot.
-class TTxWriteIndex : public TTransactionBase<TColumnShard> {
-public:
- TTxWriteIndex(TColumnShard* self, TEvPrivate::TEvWriteIndex::TPtr& ev)
- : TBase(self)
- , Ev(ev)
- {}
-
- bool Execute(TTransactionContext& txc, const TActorContext& ctx) override;
- void Complete(const TActorContext& ctx) override;
- TTxType GetTxType() const override { return TXTYPE_WRITE_INDEX; }
-
-private:
- TEvPrivate::TEvWriteIndex::TPtr Ev;
-};
-
}
diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.cpp b/ydb/core/tx/columnshard/columnshard_ut_common.cpp
index fea0855cee2..114f6a87c58 100644
--- a/ydb/core/tx/columnshard/columnshard_ut_common.cpp
+++ b/ydb/core/tx/columnshard/columnshard_ut_common.cpp
@@ -15,6 +15,7 @@ void TTester::Setup(TTestActorRuntime& runtime) {
runtime.SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NActors::NLog::PRI_DEBUG);
runtime.SetLogPriority(NKikimrServices::BLOB_CACHE, NActors::NLog::PRI_INFO);
runtime.SetLogPriority(NKikimrServices::TX_COLUMNSHARD_SCAN, NActors::NLog::PRI_DEBUG);
+ runtime.SetLogPriority(NKikimrServices::S3_WRAPPER, NLog::PRI_DEBUG);
ui32 domainId = 0;
ui32 planResolution = 500;
diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.h b/ydb/core/tx/columnshard/columnshard_ut_common.h
index 4920b5c694a..8576efa7a10 100644
--- a/ydb/core/tx/columnshard/columnshard_ut_common.h
+++ b/ydb/core/tx/columnshard/columnshard_ut_common.h
@@ -30,8 +30,8 @@ struct TTestSchema {
TString Codec;
std::optional<int> CompressionLevel;
std::optional<ui32> EvictAfterSeconds;
- //ui64 EvictAfterBytes; // TODO
TString TtlColumn;
+ std::optional<NKikimrSchemeOp::TS3Settings> S3;
TStorageTier(const TString& name = {})
: Name(name)
@@ -180,6 +180,9 @@ struct TTestSchema {
if (tier.CompressionLevel) {
t->MutableCompression()->SetCompressionLevel(*tier.CompressionLevel);
}
+ if (tier.S3) {
+ t->MutableObjectStorage()->CopyFrom(*tier.S3);
+ }
}
}
diff --git a/ydb/core/tx/columnshard/export_actor.cpp b/ydb/core/tx/columnshard/export_actor.cpp
new file mode 100644
index 00000000000..3963279ffc8
--- /dev/null
+++ b/ydb/core/tx/columnshard/export_actor.cpp
@@ -0,0 +1,106 @@
+#include "columnshard_impl.h"
+#include "blob_cache.h"
+
+#include <ydb/core/blobstorage/dsproxy/blobstorage_backoff.h>
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+
+namespace NKikimr::NColumnShard {
+
+class TExportActor : public TActorBootstrapped<TExportActor> {
+public:
+ static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
+ return NKikimrServices::TActivity::TX_COLUMNSHARD_EXPORT_ACTOR;
+ }
+
+ TExportActor(ui64 tabletId, const TActorId& parent, TAutoPtr<TEvPrivate::TEvExport> ev)
+ : TabletId(tabletId)
+ , Parent(parent)
+ , BlobCacheActorId(NBlobCache::MakeBlobCacheServiceId())
+ , Event(ev.Release())
+ {
+ Y_VERIFY(Event);
+ }
+
+ void Handle(NBlobCache::TEvBlobCache::TEvReadBlobRangeResult::TPtr& ev, const TActorContext& ctx) {
+ LOG_S_DEBUG("TEvReadBlobRangeResult (waiting " << BlobsToRead.size()
+ << ") at tablet " << TabletId << " (export)");
+
+ auto& event = *ev->Get();
+ const TUnifiedBlobId& blobId = event.BlobRange.BlobId;
+ if (event.Status != NKikimrProto::EReplyStatus::OK) {
+ LOG_S_ERROR("TEvReadBlobRangeResult cannot get blob " << blobId << " status " << event.Status
+ << " at tablet " << TabletId << " (export)");
+
+ BlobsToRead.erase(blobId);
+ Event->Status = event.Status;
+ if (Event->Status == NKikimrProto::UNKNOWN) {
+ Event->Status = NKikimrProto::ERROR;
+ }
+ return;
+ }
+
+ TString blobData = event.Data;
+ Y_VERIFY(blobData.size() == blobId.BlobSize());
+
+ if (!BlobsToRead.count(blobId)) {
+ return;
+ }
+
+ BlobsToRead.erase(blobId);
+
+ Y_VERIFY(Event);
+ Event->Blobs[blobId] = blobData;
+
+ if (BlobsToRead.empty()) {
+ SendResultAndDie(ctx);
+ }
+ }
+
+ void Bootstrap(const TActorContext& /*ctx*/) {
+ LOG_S_DEBUG("Exporting " << Event->Blobs.size() << " blobs at tablet " << TabletId);
+
+ SendReads();
+ Become(&TThis::StateWait);
+ }
+
+ STFUNC(StateWait) {
+ switch (ev->GetTypeRewrite()) {
+ HFunc(NBlobCache::TEvBlobCache::TEvReadBlobRangeResult, Handle);
+ default:
+ break;
+ }
+ }
+
+private:
+ ui64 TabletId;
+ TActorId Parent;
+ TActorId BlobCacheActorId;
+ std::unique_ptr<TEvPrivate::TEvExport> Event;
+ THashSet<TUnifiedBlobId> BlobsToRead;
+
+ void SendReads() {
+ for (auto& [blobId, _] : Event->Blobs) {
+ BlobsToRead.emplace(blobId);
+ SendReadRequest(NBlobCache::TBlobRange(blobId, 0, blobId.BlobSize()));
+ }
+ }
+
+ void SendReadRequest(const NBlobCache::TBlobRange& blobRange) {
+ Y_VERIFY(!blobRange.Offset);
+ Y_VERIFY(blobRange.Size);
+ Send(BlobCacheActorId, new NBlobCache::TEvBlobCache::TEvReadBlobRange(blobRange, false));
+ }
+
+ void SendResultAndDie(const TActorContext& ctx) {
+ auto s3Actor = Event->DstActor;
+ Event->DstActor = Parent;
+ ctx.Send(s3Actor, Event.release());
+ Die(ctx);
+ }
+};
+
+IActor* CreateExportActor(ui64 tabletId, const TActorId& dstActor, TAutoPtr<TEvPrivate::TEvExport> ev) {
+ return new TExportActor(tabletId, dstActor, ev);
+}
+
+}
diff --git a/ydb/core/tx/columnshard/indexing_actor.cpp b/ydb/core/tx/columnshard/indexing_actor.cpp
index 5155051a9a5..4117e9f57fb 100644
--- a/ydb/core/tx/columnshard/indexing_actor.cpp
+++ b/ydb/core/tx/columnshard/indexing_actor.cpp
@@ -17,7 +17,6 @@ public:
{}
void Handle(TEvPrivate::TEvIndexing::TPtr& ev, const TActorContext& ctx) {
- Y_UNUSED(ev);
LOG_S_DEBUG("TEvIndexing at tablet " << TabletId << " (index)");
auto& event = *ev->Get();
@@ -31,7 +30,7 @@ public:
auto& blobId = blobsToIndex[i].BlobId;
auto res = BlobsToRead.emplace(blobId, i);
Y_VERIFY(res.second, "Duplicate blob in DataToIndex: %s", blobId.ToStringNew().c_str());
- SendReadRequest(ctx, blobId);
+ SendReadRequest(NBlobCache::TBlobRange(blobId, 0, blobId.BlobSize()));
}
if (BlobsToRead.empty()) {
@@ -99,10 +98,9 @@ private:
std::unique_ptr<TEvPrivate::TEvWriteIndex> TxEvent;
THashMap<TUnifiedBlobId, ui32> BlobsToRead;
- void SendReadRequest(const TActorContext&, const TUnifiedBlobId& blobId) {
- Y_VERIFY(blobId.BlobSize());
- Send(BlobCacheActorId,
- new NBlobCache::TEvBlobCache::TEvReadBlobRange(NBlobCache::TBlobRange(blobId, 0, blobId.BlobSize()), false));
+ void SendReadRequest(const NBlobCache::TBlobRange& blobRange) {
+ Y_VERIFY(blobRange.Size);
+ Send(BlobCacheActorId, new NBlobCache::TEvBlobCache::TEvReadBlobRange(blobRange, false));
}
void Index(const TActorContext& ctx) {
diff --git a/ydb/core/tx/columnshard/s3_actor.cpp b/ydb/core/tx/columnshard/s3_actor.cpp
new file mode 100644
index 00000000000..8d61847214f
--- /dev/null
+++ b/ydb/core/tx/columnshard/s3_actor.cpp
@@ -0,0 +1,385 @@
+#ifndef KIKIMR_DISABLE_S3_OPS
+
+#include "defs.h"
+#include "columnshard_impl.h"
+
+#include <ydb/core/protos/flat_scheme_op.pb.h>
+#include <ydb/core/tx/datashard/s3_common.h>
+#include <ydb/core/wrappers/s3_wrapper.h>
+
+namespace NKikimr::NColumnShard {
+
+using NWrappers::TEvS3Wrapper;
+
+namespace {
+
+struct TS3Export {
+ std::unique_ptr<TEvPrivate::TEvExport> Event;
+ THashSet<TString> KeysToWrite;
+
+ TS3Export() = default;
+
+ explicit TS3Export(TAutoPtr<TEvPrivate::TEvExport> ev)
+ : Event(ev.Release())
+ {}
+
+ THashMap<TUnifiedBlobId, TString>& Blobs() {
+ return Event->Blobs;
+ }
+
+ TUnifiedBlobId AddExported(const TString& bucket, const TUnifiedBlobId& srcBlob) {
+ Event->SrcToDstBlobs[srcBlob] = TUnifiedBlobId(srcBlob, TUnifiedBlobId::S3_BLOB, bucket);
+ return Event->SrcToDstBlobs[srcBlob];
+ }
+};
+
+struct TS3Forget {
+ std::unique_ptr<TEvPrivate::TEvForget> Event;
+ THashSet<TString> KeysToDelete;
+
+ TS3Forget() = default;
+
+ explicit TS3Forget(TAutoPtr<TEvPrivate::TEvForget> ev)
+ : Event(ev.Release())
+ {}
+};
+
+// S3 objects need InitAPI() called frist. TS3User calls it in ctor.
+struct TAwsContext : private NWrappers::TS3User {
+ Aws::Client::ClientConfiguration Config;
+ Aws::Auth::AWSCredentials Credentials;
+ TActorId Client; // S3Wrapper should be created after API owner too
+
+ void SetConfig(const NKikimrSchemeOp::TS3Settings& settings) {
+ Config = NDataShard::ConfigFromSettings(settings);
+ Credentials = NDataShard::CredentialsFromSettings(settings);
+ }
+
+ IActor* CreateS3Wrapper() const {
+ return NWrappers::CreateS3Wrapper(Credentials, Config);
+ }
+};
+
+}
+
+
+class TS3Actor : public TActorBootstrapped<TS3Actor> {
+public:
+ static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
+ return NKikimrServices::TActivity::TX_COLUMNSHARD_S3_ACTOR;
+ }
+
+ TS3Actor(ui64 tabletId, const TActorId& parent, const TString& tierName)
+ : TabletId(tabletId)
+ , ShardActor(parent)
+ , TierName(tierName)
+ {}
+
+ void Bootstrap() {
+ LOG_S_DEBUG("[S3] Starting actor for tier '" << TierName << "' at tablet " << TabletId);
+ Become(&TThis::StateWait);
+ }
+
+ void Handle(TEvPrivate::TEvS3Settings::TPtr& ev) {
+ auto& msg = *ev->Get();
+ auto& endpoint = msg.Settings.GetEndpoint();
+ Bucket = msg.Settings.GetBucket();
+
+ LOG_S_DEBUG("[S3] Update settings for tier '" << TierName << "' endpoint '" << endpoint
+ << "' bucket '" << Bucket << "' at tablet " << TabletId);
+
+ if (endpoint.empty()) {
+ LOG_S_ERROR("[S3] No endpoint in settings for tier '" << TierName << "' at tablet " << TabletId);
+ return;
+ }
+ if (Bucket.empty()) {
+ LOG_S_ERROR("[S3] No bucket in settings for tier '" << TierName << "' at tablet " << TabletId);
+ return;
+ }
+
+ S3Ctx.SetConfig(msg.Settings);
+ if (S3Ctx.Client) {
+ Send(S3Ctx.Client, new TEvents::TEvPoisonPill);
+ S3Ctx.Client = {};
+ }
+ S3Ctx.Client = this->RegisterWithSameMailbox(S3Ctx.CreateS3Wrapper());
+ }
+
+ void Handle(TEvPrivate::TEvExport::TPtr& ev) {
+ auto& msg = *ev->Get();
+ ui64 exportNo = msg.ExportNo;
+
+ Y_VERIFY(!Exports.count(exportNo));
+ Exports[exportNo] = TS3Export(ev->Release());
+ auto& ex = Exports[exportNo];
+
+ for (auto& [blobId, blob] : ex.Blobs()) {
+ TString key = ex.AddExported(Bucket, blobId).GetS3Key();
+ Y_VERIFY(!ExportingKeys.count(key)); // TODO
+
+ ex.KeysToWrite.emplace(key);
+ ExportingKeys[key] = exportNo;
+ SendPutObject(key, std::move(blob));
+ }
+ }
+
+ void Handle(TEvPrivate::TEvForget::TPtr& ev) {
+ ui64 forgetNo = ++ForgetNo;
+
+ Forgets[forgetNo] = TS3Forget(ev->Release());
+ auto& forget = Forgets[forgetNo];
+
+ for (auto& evict : forget.Event->Evicted) {
+ if (!evict.ExternBlob.IsValid()) {
+ LOG_S_INFO("[S3] Forget not exported '" << evict.Blob.ToStringNew() << "' at tablet " << TabletId);
+ continue; // TODO
+ }
+
+ TString key = evict.ExternBlob.GetS3Key();
+ Y_VERIFY(!ForgettingKeys.count(key)); // TODO
+
+ forget.KeysToDelete.emplace(key);
+ ForgettingKeys[key] = forgetNo;
+ SendDeleteObject(key);
+ }
+ }
+
+ // TODO: clean written blobs in failed export
+ void Handle(TEvS3Wrapper::TEvPutObjectResponse::TPtr& ev) {
+ Y_VERIFY(Initialized());
+
+ auto& msg = *ev->Get();
+ const auto& resultOutcome = msg.Result;
+
+ TString errStr;
+ if (!resultOutcome.IsSuccess()) {
+ errStr = LogError("PutObjectResponse", resultOutcome.GetError(), !!msg.Key);
+ }
+
+ Y_VERIFY(msg.Key); // FIXME
+ TString key = *msg.Key;
+
+ LOG_S_DEBUG("[S3] PutObjectResponse '" << key << "' at tablet " << TabletId);
+
+ if (!ExportingKeys.count(key)) {
+ LOG_S_DEBUG("[S3] PutObjectResponse for unknown key '" << key << "' at tablet " << TabletId);
+ return;
+ }
+
+ ui64 exportNo = ExportingKeys[key];
+ ExportingKeys.erase(key);
+
+ if (!Exports.count(exportNo)) {
+ LOG_S_DEBUG("[S3] PutObjectResponse for unknown export with key '" << key << "' at tablet " << TabletId);
+ return;
+ }
+
+ auto& ex = Exports[exportNo];
+ ex.KeysToWrite.erase(key);
+ Y_VERIFY(ex.Event->DstActor == ShardActor);
+
+ if (!errStr.empty()) {
+ ex.Event->Status = NKikimrProto::ERROR;
+ ex.Event->ErrorStr = errStr;
+ Send(ShardActor, ex.Event.release());
+ Exports.erase(exportNo);
+ } else if (ex.KeysToWrite.empty()) {
+ ex.Event->Status = NKikimrProto::OK;
+ Send(ShardActor, ex.Event.release());
+ Exports.erase(exportNo);
+ }
+ }
+
+ void Handle(TEvS3Wrapper::TEvDeleteObjectResponse::TPtr& ev) {
+ Y_VERIFY(Initialized());
+
+ auto& msg = *ev->Get();
+ const auto& resultOutcome = msg.Result;
+
+ TString errStr;
+ if (!resultOutcome.IsSuccess()) {
+ errStr = LogError("DeleteObjectResponse", resultOutcome.GetError(), !!msg.Key);
+ }
+
+ Y_VERIFY(msg.Key); // FIXME
+ TString key = *msg.Key;
+
+ LOG_S_DEBUG("[S3] DeleteObjectResponse '" << key << "' at tablet " << TabletId);
+
+ if (!ForgettingKeys.count(key)) {
+ LOG_S_DEBUG("[S3] DeleteObjectResponse for unknown key '" << key << "' at tablet " << TabletId);
+ return;
+ }
+
+ ui64 forgetNo = ForgettingKeys[key];
+ ForgettingKeys.erase(key);
+
+ if (!Forgets.count(forgetNo)) {
+ LOG_S_DEBUG("[S3] DeleteObjectResponse for unknown forget with key '" << key << "' at tablet " << TabletId);
+ return;
+ }
+
+ auto& forget = Forgets[forgetNo];
+ forget.KeysToDelete.erase(key);
+
+ if (!errStr.empty()) {
+ forget.Event->Status = NKikimrProto::ERROR;
+ forget.Event->ErrorStr = errStr;
+ Send(ShardActor, forget.Event.release());
+ Forgets.erase(forgetNo);
+ } else if (forget.KeysToDelete.empty()) {
+ forget.Event->Status = NKikimrProto::OK;
+ Send(ShardActor, forget.Event.release());
+ Forgets.erase(forgetNo);
+ }
+ }
+#if 0
+ void Handle(TEvS3Wrapper::TEvHeadObjectResponse::TPtr& ev) {
+ Y_VERIFY(Initialized());
+
+ auto& msg = *ev->Get();
+ const auto& key = msg.Key;
+ const auto& resultOutcome = msg.Result;
+
+ TString errStr;
+ if (!resultOutcome.IsSuccess()) {
+ errStr = LogError("HeadObjectResponse", resultOutcome.GetError(), !!key);
+ }
+
+ if (!errStr.empty()) {
+ //Send(ShardActor, new TEvPrivate::TEvGetResponse(*key, {}, errStr));
+ }
+
+ Y_VERIFY(key);
+ ui64 contentLength = resultOutcome.GetResult().GetContentLength();
+ LOG_S_DEBUG("HeadObjectResponse '" << *key << "', size: " << contentLength << " at tablet " << TabletId);
+
+ //Send(ShardActor, new TEvPrivate::TEvGetResponse(*key, {}));
+ }
+
+ void Handle(TEvS3Wrapper::TEvGetObjectResponse::TPtr& ev) {
+ Y_VERIFY(Initialized());
+
+ auto& msg = *ev->Get();
+ const auto& key = msg.Key;
+ const auto& data = msg.Body;
+ const auto& resultOutcome = msg.Result;
+
+ TString errStr;
+ if (!resultOutcome.IsSuccess()) {
+ errStr = LogError("GetObjectResponse", resultOutcome.GetError(), !!key);
+ }
+
+ if (!errStr.empty()) {
+ //Send(ShardActor, new TEvPrivate::TEvGetResponse(*key, {}, errStr));
+ }
+
+ // TODO: CheckETag
+
+ Y_VERIFY(key);
+ LOG_S_DEBUG("GetObjectResponse '" << *key << "', size: " << data.size() << " at tablet " << TabletId);
+
+ //Send(ShardActor, new TEvPrivate::TEvGetResponse(*key, data));
+ }
+#endif
+
+private:
+ ui64 TabletId;
+ TActorId ShardActor;
+ TAwsContext S3Ctx;
+ TString TierName;
+ TString Bucket;
+ ui64 ForgetNo{};
+ THashMap<ui64, TS3Export> Exports;
+ THashMap<ui64, TS3Forget> Forgets;
+ THashMap<TString, ui64> ExportingKeys;
+ THashMap<TString, ui64> ForgettingKeys;
+
+ STATEFN(StateWait) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvPrivate::TEvS3Settings, Handle);
+ hFunc(TEvPrivate::TEvExport, Handle);
+ hFunc(TEvPrivate::TEvForget, Handle);
+ cFunc(TEvents::TEvPoisonPill::EventType, PassAway);
+ hFunc(TEvS3Wrapper::TEvPutObjectResponse, Handle);
+ hFunc(TEvS3Wrapper::TEvDeleteObjectResponse, Handle);
+#if 0
+ hFunc(TEvS3Wrapper::TEvHeadObjectResponse, Handle);
+ hFunc(TEvS3Wrapper::TEvGetObjectResponse, Handle);
+#endif
+ default:
+ break;
+ }
+ }
+
+ bool Initialized() const {
+ return (bool)S3Ctx.Client;
+ }
+
+ void PassAway() override {
+ if (S3Ctx.Client) {
+ Send(S3Ctx.Client, new TEvents::TEvPoisonPill());
+ S3Ctx.Client = {};
+ }
+ TActor::PassAway();
+ }
+
+ void SendPutObject(const TString& key, TString&& data) const {
+ auto request = Aws::S3::Model::PutObjectRequest()
+ .WithBucket(Bucket)
+ .WithKey(key)
+ .WithStorageClass(Aws::S3::Model::StorageClass::STANDARD_IA);
+#if 0
+ Aws::Map<Aws::String, Aws::String> metadata;
+ metadata.emplace("Content-Type", "application/x-compressed");
+ request.SetMetadata(std::move(metadata));
+#endif
+ LOG_S_DEBUG("[S3] PutObjectRequest key '" << key << "' at tablet " << TabletId);
+ Send(S3Ctx.Client, new TEvS3Wrapper::TEvPutObjectRequest(request, std::move(data)));
+ }
+
+ void SendHeadObject(const TString& key) const {
+ auto request = Aws::S3::Model::HeadObjectRequest()
+ .WithBucket(Bucket)
+ .WithKey(key);
+
+ LOG_S_DEBUG("[S3] HeadObjectRequest key '" << key << "' at tablet " << TabletId);
+ Send(S3Ctx.Client, new TEvS3Wrapper::TEvHeadObjectRequest(request));
+ }
+
+ void SendGetObject(const TString& key) {
+ auto request = Aws::S3::Model::GetObjectRequest()
+ .WithBucket(Bucket)
+ .WithKey(key);
+ //.WithRange(TStringBuilder() << "bytes=" << range.first << "-" << range.second); // TODO
+
+ LOG_S_DEBUG("[S3] GetObjectRequest key '" << key << "' at tablet " << TabletId);
+ Send(S3Ctx.Client, new TEvS3Wrapper::TEvGetObjectRequest(request));
+ }
+
+ void SendDeleteObject(const TString& key) const {
+ auto request = Aws::S3::Model::DeleteObjectRequest()
+ .WithBucket(Bucket)
+ .WithKey(key);
+
+ Send(S3Ctx.Client, new TEvS3Wrapper::TEvDeleteObjectRequest(request));
+ }
+
+ TString LogError(const TString& responseType, const Aws::S3::S3Error& error, bool hasKey) const {
+ TString errStr = TString(error.GetExceptionName()) + " " + error.GetMessage();
+ if (errStr.empty() && !hasKey) {
+ errStr = responseType + " with no key";
+ }
+
+ LOG_S_NOTICE("[S3] Error in " << responseType << " at tablet " << TabletId << ": " << errStr);
+ return errStr;
+ }
+};
+
+IActor* CreateS3Actor(ui64 tabletId, const TActorId& parent, const TString& tierName) {
+ return new TS3Actor(tabletId, parent, tierName);
+}
+
+}
+
+#endif
diff --git a/ydb/core/tx/columnshard/ut_columnshard_schema.cpp b/ydb/core/tx/columnshard/ut_columnshard_schema.cpp
index 1511c3219b1..4af7892db2f 100644
--- a/ydb/core/tx/columnshard/ut_columnshard_schema.cpp
+++ b/ydb/core/tx/columnshard/ut_columnshard_schema.cpp
@@ -1,8 +1,10 @@
#include "columnshard_ut_common.h"
+#include <ydb/core/wrappers/ut_helpers/s3_mock.h>
namespace NKikimr {
using namespace NTxUT;
+using NWrappers::NTestHelpers::TS3Mock;
namespace {
@@ -317,7 +319,11 @@ TestTiers(bool reboots, const std::vector<TString>& blobs, const std::vector<TTe
}
TriggerTTL(runtime, sender, {++planStep, ++txId}, {});
-
+#if 0
+ if (i) {
+ sleep(1); // TODO: wait export
+ }
+#endif
// Read
--planStep;
@@ -354,15 +360,10 @@ TestTiers(bool reboots, const std::vector<TString>& blobs, const std::vector<TTe
return resColumns;
}
-void TestTiersT1(bool reboots) {
+void TestTwoTiers(const TTestSchema::TTableSpecials& spec, bool compressed, bool reboots) {
std::vector<ui64> ts = {1600000000, 1620000000};
ui64 nowSec = TInstant::Now().Seconds();
- TTestSchema::TTableSpecials spec;
- spec.Tiers.emplace_back(TTestSchema::TStorageTier("tier0"));
- spec.Tiers.emplace_back(TTestSchema::TStorageTier("tier1"));
- spec.Tiers.back().SetCodec("zstd");
-
std::vector<TTestSchema::TTableSpecials> alters(4, spec);
ui64 allowBoth = nowSec - ts[0] + 600;
@@ -402,7 +403,56 @@ void TestTiersT1(bool reboots) {
UNIT_ASSERT_EQUAL(columns[2].first->length(), portionSize);
Cerr << "read bytes: " << columns[0].second << ", " << columns[1].second << ", " << columns[2].second << "\n";
- UNIT_ASSERT_GT(columns[0].second, columns[1].second);
+ if (compressed) {
+ UNIT_ASSERT_GT(columns[0].second, columns[1].second);
+ } else {
+ UNIT_ASSERT_EQUAL(columns[0].second, columns[1].second);
+ }
+}
+
+void TestTwoHotTiers(bool reboot) {
+ TTestSchema::TTableSpecials spec;
+ spec.Tiers.emplace_back(TTestSchema::TStorageTier("tier0"));
+ spec.Tiers.emplace_back(TTestSchema::TStorageTier("tier1"));
+ spec.Tiers.back().SetCodec("zstd");
+
+ TestTwoTiers(spec, true, reboot);
+}
+
+void TestHotAndColdTiers(bool reboot) {
+#if 1
+ TString bucket = "ydb";
+ TPortManager portManager;
+ ui16 port = portManager.GetPort();
+ TString connString = Sprintf("localhost:%hu", port);
+ Cerr << "S3 at " << connString << "\n";
+
+ TS3Mock s3Mock({}, TS3Mock::TSettings(port));
+ UNIT_ASSERT(s3Mock.Start());
+#endif
+
+ TTestSchema::TTableSpecials spec;
+ spec.Tiers.emplace_back(TTestSchema::TStorageTier("tier0"));
+ spec.Tiers.emplace_back(TTestSchema::TStorageTier("tier1"));
+ spec.Tiers.back().S3 = NKikimrSchemeOp::TS3Settings();
+ auto& s3 = *spec.Tiers.back().S3;
+
+ s3.SetScheme(NKikimrSchemeOp::TS3Settings::HTTP);
+ s3.SetVerifySSL(false);
+#if 0
+ s3.SetEndpoint("storage.cloud-preprod.yandex.net");
+ s3.SetBucket("ch-s3");
+ s3.SetAccessKey(); <--
+ s3.SetSecretKey(); <--
+ s3.SetProxyHost("localhost");
+ s3.SetProxyPort(8080);
+ s3.SetProxyScheme(NKikimrSchemeOp::TS3Settings::HTTP);
+#else
+ s3.SetEndpoint(connString);
+ s3.SetBucket(bucket);
+#endif
+
+ TestTwoTiers(spec, false, reboot);
}
void TestDrop(bool reboots) {
@@ -536,15 +586,24 @@ Y_UNIT_TEST_SUITE(TColumnShardTestSchema) {
TestTtl(true, false, specs);
}
- Y_UNIT_TEST(Tiers) {
- TestTiersT1(false);
+ Y_UNIT_TEST(HotTiers) {
+ TestTwoHotTiers(false);
}
- Y_UNIT_TEST(RebootTiers) {
+ Y_UNIT_TEST(RebootHotTiers) {
NColumnShard::gAllowLogBatchingDefaultValue = false;
- TestTiersT1(true);
+ TestTwoHotTiers(true);
}
+ Y_UNIT_TEST(ColdTiers) {
+ TestHotAndColdTiers(false);
+ }
+#if 0
+ Y_UNIT_TEST(RebootColdTiers) {
+ NColumnShard::gAllowLogBatchingDefaultValue = false;
+ TestHotAndColdTiers(true);
+ }
+#endif
Y_UNIT_TEST(Drop) {
TestDrop(false);
}
diff --git a/ydb/core/tx/datashard/s3_common.h b/ydb/core/tx/datashard/s3_common.h
index a9f7c570f9c..8e4b4c14cb9 100644
--- a/ydb/core/tx/datashard/s3_common.h
+++ b/ydb/core/tx/datashard/s3_common.h
@@ -34,6 +34,26 @@ inline Aws::Client::ClientConfiguration ConfigFromSettings(const NKikimrSchemeOp
Y_FAIL("Unknown scheme");
}
+ if (settings.HasVerifySSL()) {
+ config.verifySSL = settings.GetVerifySSL();
+ }
+
+ if (settings.HasProxyHost()) {
+ config.proxyHost = settings.GetProxyHost();
+ config.proxyPort = settings.GetProxyPort();
+
+ switch (settings.GetProxyScheme()) {
+ case NKikimrSchemeOp::TS3Settings::HTTP:
+ config.proxyScheme = Aws::Http::Scheme::HTTP;
+ break;
+ case NKikimrSchemeOp::TS3Settings::HTTPS:
+ config.proxyScheme = Aws::Http::Scheme::HTTPS;
+ break;
+ default:
+ break;
+ }
+ }
+
return config;
}
diff --git a/ydb/core/wrappers/s3_out.cpp b/ydb/core/wrappers/s3_out.cpp
index 0bc2745a6d0..6918e4eac79 100644
--- a/ydb/core/wrappers/s3_out.cpp
+++ b/ydb/core/wrappers/s3_out.cpp
@@ -102,6 +102,19 @@ void Out(IOutputStream& out, const PutObjectOutcome& outcome) {
OutOutcome(out, outcome);
}
+void Out(IOutputStream& out, const DeleteObjectRequest& request) {
+ using T = DeleteObjectRequest;
+ OutRequest(out, request, {&Bucket<T>, &Key<T>});
+}
+
+void Out(IOutputStream& out, const DeleteObjectResult& result) {
+ OutResult(out, result, "DeleteObjectResult", {});
+}
+
+void Out(IOutputStream& out, const DeleteObjectOutcome& outcome) {
+ OutOutcome(out, outcome);
+}
+
void Out(IOutputStream& out, const CreateMultipartUploadRequest& request) {
using T = CreateMultipartUploadRequest;
OutRequest(out, request, {&Bucket<T>, &Key<T>});
diff --git a/ydb/core/wrappers/s3_out.h b/ydb/core/wrappers/s3_out.h
index ac6143592f8..040c435be22 100644
--- a/ydb/core/wrappers/s3_out.h
+++ b/ydb/core/wrappers/s3_out.h
@@ -6,6 +6,7 @@
#include <contrib/libs/aws-sdk-cpp/aws-cpp-sdk-s3/include/aws/s3/model/GetObjectRequest.h>
#include <contrib/libs/aws-sdk-cpp/aws-cpp-sdk-s3/include/aws/s3/model/HeadObjectRequest.h>
#include <contrib/libs/aws-sdk-cpp/aws-cpp-sdk-s3/include/aws/s3/model/PutObjectRequest.h>
+#include <contrib/libs/aws-sdk-cpp/aws-cpp-sdk-s3/include/aws/s3/model/DeleteObjectRequest.h>
#include <contrib/libs/aws-sdk-cpp/aws-cpp-sdk-s3/include/aws/s3/model/UploadPartRequest.h>
#include <contrib/libs/aws-sdk-cpp/aws-cpp-sdk-s3/include/aws/s3/S3Client.h>
@@ -26,6 +27,10 @@ void Out(IOutputStream& out, const Aws::S3::Model::PutObjectRequest& request);
void Out(IOutputStream& out, const Aws::S3::Model::PutObjectResult& result);
void Out(IOutputStream& out, const Aws::S3::Model::PutObjectOutcome& outcome);
+void Out(IOutputStream& out, const Aws::S3::Model::DeleteObjectRequest& request);
+void Out(IOutputStream& out, const Aws::S3::Model::DeleteObjectResult& result);
+void Out(IOutputStream& out, const Aws::S3::Model::DeleteObjectOutcome& outcome);
+
void Out(IOutputStream& out, const Aws::S3::Model::CreateMultipartUploadRequest& request);
void Out(IOutputStream& out, const Aws::S3::Model::CreateMultipartUploadResult& result);
void Out(IOutputStream& out, const Aws::S3::Model::CreateMultipartUploadOutcome& outcome);
@@ -87,6 +92,18 @@ Y_DECLARE_OUT_SPEC(inline, Aws::S3::Model::PutObjectOutcome, out, value) {
NKikimr::NWrappers::Out(out, value);
}
+Y_DECLARE_OUT_SPEC(inline, Aws::S3::Model::DeleteObjectRequest, out, value) {
+ NKikimr::NWrappers::Out(out, value);
+}
+
+Y_DECLARE_OUT_SPEC(inline, Aws::S3::Model::DeleteObjectResult, out, value) {
+ NKikimr::NWrappers::Out(out, value);
+}
+
+Y_DECLARE_OUT_SPEC(inline, Aws::S3::Model::DeleteObjectOutcome, out, value) {
+ NKikimr::NWrappers::Out(out, value);
+}
+
Y_DECLARE_OUT_SPEC(inline, Aws::S3::Model::CreateMultipartUploadRequest, out, value) {
NKikimr::NWrappers::Out(out, value);
}
diff --git a/ydb/core/wrappers/s3_wrapper.cpp b/ydb/core/wrappers/s3_wrapper.cpp
index b0fe378e218..5d5a21cc033 100644
--- a/ydb/core/wrappers/s3_wrapper.cpp
+++ b/ydb/core/wrappers/s3_wrapper.cpp
@@ -112,9 +112,17 @@ class TS3Wrapper: public TActor<TS3Wrapper>, private TS3User {
return ev->Get()->Request;
}
- void Reply(const typename TEvResponse::TOutcome& outcome) const {
+ void Reply(const typename TEvRequest::TRequest& request,
+ const typename TEvResponse::TOutcome& outcome) const
+ {
Y_VERIFY(!std::exchange(Replied, true), "Double-reply");
- Send(MakeResponse(outcome).Release());
+
+ std::optional<TString> key;
+ if (request.KeyHasBeenSet()) {
+ key = request.GetKey();
+ }
+
+ Send(MakeResponse(key, outcome).Release());
}
protected:
@@ -126,8 +134,10 @@ class TS3Wrapper: public TActor<TS3Wrapper>, private TS3User {
Send(Sender, ev);
}
- virtual THolder<IEventBase> MakeResponse(const typename TEvResponse::TOutcome& outcome) const {
- return MakeHolder<TEvResponse>(outcome);
+ virtual THolder<IEventBase> MakeResponse(const std::optional<TString>& key,
+ const typename TEvResponse::TOutcome& outcome) const
+ {
+ return MakeHolder<TEvResponse>(key, outcome);
}
private:
@@ -192,11 +202,13 @@ class TS3Wrapper: public TActor<TS3Wrapper>, private TS3User {
}
protected:
- THolder<IEventBase> MakeResponse(const typename TEvResponse::TOutcome& outcome) const override {
+ THolder<IEventBase> MakeResponse(const std::optional<TString>& key,
+ const typename TEvResponse::TOutcome& outcome) const override
+ {
if (outcome.IsSuccess()) {
- return MakeHolder<TEvResponse>(outcome, std::move(Buffer));
+ return MakeHolder<TEvResponse>(key, outcome, std::move(Buffer));
} else {
- return MakeHolder<TEvResponse>(outcome);
+ return MakeHolder<TEvResponse>(key, outcome);
}
}
@@ -256,7 +268,7 @@ class TS3Wrapper: public TActor<TS3Wrapper>, private TS3User {
auto ctx = std::make_shared<TCtx>(TlsActivationContext->ActorSystem(), ev->Sender);
auto callback = [](
const S3Client*,
- const typename TEvRequest::TRequest&,
+ const typename TEvRequest::TRequest& request,
const typename TEvResponse::TOutcome& outcome,
const std::shared_ptr<const AsyncCallerContext>& context) {
const auto* ctx = static_cast<const TCtx*>(context.get());
@@ -264,7 +276,7 @@ class TS3Wrapper: public TActor<TS3Wrapper>, private TS3User {
LOG_NOTICE_S(*ctx->GetActorSystem(), NKikimrServices::S3_WRAPPER, "Response"
<< ": uuid# " << ctx->GetUUID()
<< ", response# " << outcome);
- ctx->Reply(outcome);
+ ctx->Reply(request, outcome);
};
LOG_NOTICE_S(*TlsActivationContext, NKikimrServices::S3_WRAPPER, "Request"
@@ -288,6 +300,11 @@ class TS3Wrapper: public TActor<TS3Wrapper>, private TS3User {
ev, &S3Client::PutObjectAsync);
}
+ void Handle(TEvS3Wrapper::TEvDeleteObjectRequest::TPtr& ev) {
+ Call<TEvS3Wrapper::TEvDeleteObjectRequest, TEvS3Wrapper::TEvDeleteObjectResponse>(
+ ev, &S3Client::DeleteObjectAsync);
+ }
+
void Handle(TEvS3Wrapper::TEvCreateMultipartUploadRequest::TPtr& ev) {
Call<TEvS3Wrapper::TEvCreateMultipartUploadRequest, TEvS3Wrapper::TEvCreateMultipartUploadResponse>(
ev, &S3Client::CreateMultipartUploadAsync);
@@ -330,6 +347,7 @@ public:
hFunc(TEvS3Wrapper::TEvGetObjectRequest, Handle);
hFunc(TEvS3Wrapper::TEvHeadObjectRequest, Handle);
hFunc(TEvS3Wrapper::TEvPutObjectRequest, Handle);
+ hFunc(TEvS3Wrapper::TEvDeleteObjectRequest, Handle);
hFunc(TEvS3Wrapper::TEvCreateMultipartUploadRequest, Handle);
hFunc(TEvS3Wrapper::TEvUploadPartRequest, Handle);
hFunc(TEvS3Wrapper::TEvCompleteMultipartUploadRequest, Handle);
diff --git a/ydb/core/wrappers/s3_wrapper.h b/ydb/core/wrappers/s3_wrapper.h
index 1197606202b..c623113e9a0 100644
--- a/ydb/core/wrappers/s3_wrapper.h
+++ b/ydb/core/wrappers/s3_wrapper.h
@@ -10,6 +10,7 @@
#include <contrib/libs/aws-sdk-cpp/aws-cpp-sdk-s3/include/aws/s3/model/GetObjectRequest.h>
#include <contrib/libs/aws-sdk-cpp/aws-cpp-sdk-s3/include/aws/s3/model/HeadObjectRequest.h>
#include <contrib/libs/aws-sdk-cpp/aws-cpp-sdk-s3/include/aws/s3/model/PutObjectRequest.h>
+#include <contrib/libs/aws-sdk-cpp/aws-cpp-sdk-s3/include/aws/s3/model/DeleteObjectRequest.h>
#include <contrib/libs/aws-sdk-cpp/aws-cpp-sdk-s3/include/aws/s3/model/UploadPartRequest.h>
#include <contrib/libs/aws-sdk-cpp/aws-cpp-sdk-s3/include/aws/s3/S3Client.h>
@@ -57,10 +58,12 @@ struct TEvS3Wrapper {
using TOutcome = Aws::Utils::Outcome<T, Aws::S3::S3Error>;
using TResult = Aws::Utils::Outcome<U, Aws::S3::S3Error>;
+ std::optional<TString> Key;
TResult Result;
- explicit TGenericResponse(const TOutcome& outcome)
- : Result(TDerived::ResultFromOutcome(outcome))
+ explicit TGenericResponse(const std::optional<TString>& key, const TOutcome& outcome)
+ : Key(key)
+ , Result(TDerived::ResultFromOutcome(outcome))
{
}
@@ -77,13 +80,14 @@ struct TEvS3Wrapper {
TString Body;
- explicit TResponseWithBody(const typename TGeneric::TOutcome& outcome)
- : TGeneric(outcome)
+ explicit TResponseWithBody(const std::optional<TString>& key, const typename TGeneric::TOutcome& outcome)
+ : TGeneric(key, outcome)
{
}
- explicit TResponseWithBody(const typename TGeneric::TOutcome& outcome, TString&& body)
- : TGeneric(outcome)
+ explicit TResponseWithBody(const std::optional<TString>& key, const typename TGeneric::TOutcome& outcome,
+ TString&& body)
+ : TGeneric(key, outcome)
, Body(std::move(body))
{
}
@@ -101,6 +105,7 @@ struct TEvS3Wrapper {
EV_REQUEST_RESPONSE(GetObject),
EV_REQUEST_RESPONSE(HeadObject),
EV_REQUEST_RESPONSE(PutObject),
+ EV_REQUEST_RESPONSE(DeleteObject),
EV_REQUEST_RESPONSE(CreateMultipartUpload),
EV_REQUEST_RESPONSE(UploadPart),
EV_REQUEST_RESPONSE(CompleteMultipartUpload),
@@ -156,6 +161,7 @@ struct TEvS3Wrapper {
DEFINE_GENERIC_RESPONSE(UploadPart);
DEFINE_GENERIC_REQUEST_RESPONSE(HeadObject);
+ DEFINE_GENERIC_REQUEST_RESPONSE(DeleteObject);
DEFINE_GENERIC_REQUEST_RESPONSE(CreateMultipartUpload);
DEFINE_GENERIC_REQUEST_RESPONSE(CompleteMultipartUpload);
DEFINE_GENERIC_REQUEST_RESPONSE(AbortMultipartUpload);