diff options
author | Ilnaz Nizametdinov <i.nizametdinov@gmail.com> | 2022-04-13 12:35:07 +0300 |
---|---|---|
committer | Ilnaz Nizametdinov <i.nizametdinov@gmail.com> | 2022-04-13 12:35:07 +0300 |
commit | 2f14a0656c686c407bb481a79060d4f4f57e3dac (patch) | |
tree | e68136d67287fad2448b75d651844576bdddbd09 | |
parent | 7a67b298c60dfee4b9d9bb4109a7862b6e37659d (diff) | |
download | ydb-2f14a0656c686c407bb481a79060d4f4f57e3dac.tar.gz |
Check RejectProbability, store data along with state KIKIMR-14693 KIKIMR-14695
ref:66b425d4dfa58bd225eb6b7fdb43737792b1cd54
-rw-r--r-- | ydb/core/tx/datashard/datashard.cpp | 14 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard.h | 126 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard__s3_download_txs.cpp | 8 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_common_upload.cpp | 21 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_common_upload.h | 5 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_direct_upload.h | 7 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_s3_download.h | 29 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_s3_downloads.cpp | 24 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_s3_downloads.h | 14 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_txs.h | 9 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_unsafe_upload.cpp | 9 | ||||
-rw-r--r-- | ydb/core/tx/datashard/import_s3.cpp | 104 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/ut_restore.cpp | 60 |
13 files changed, 280 insertions, 150 deletions
diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index 55921144e41..0500e32d533 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -2944,6 +2944,20 @@ void TDataShard::Handle(TEvDataShard::TEvStoreS3DownloadInfo::TPtr& ev, const TA void TDataShard::Handle(TEvDataShard::TEvUnsafeUploadRowsRequest::TPtr& ev, const TActorContext& ctx) { + const float rejectProbabilty = Executor()->GetRejectProbability(); + if (rejectProbabilty > 0) { + const float rnd = AppData(ctx)->RandomProvider->GenRandReal2(); + if (rnd < rejectProbabilty) { + auto response = MakeHolder<TEvDataShard::TEvUnsafeUploadRowsResponse>( + TabletID(), NKikimrTxDataShard::TError::WRONG_SHARD_STATE); + response->Record.SetErrorDescription("Reject due to given RejectProbability"); + ctx.Send(ev->Sender, std::move(response)); + IncCounter(COUNTER_BULK_UPSERT_OVERLOADED); + + return; + } + } + Execute(new TTxUnsafeUploadRows(this, ev), ctx); } diff --git a/ydb/core/tx/datashard/datashard.h b/ydb/core/tx/datashard/datashard.h index 5d39415ac39..06f357c8e97 100644 --- a/ydb/core/tx/datashard/datashard.h +++ b/ydb/core/tx/datashard/datashard.h @@ -1,5 +1,6 @@ #pragma once +#include "datashard_s3_download.h" #include "datashard_s3_upload.h" #include <ydb/core/tx/tx.h> @@ -857,24 +858,6 @@ struct TEvDataShard { } }; - struct TEvUnsafeUploadRowsRequest : public TEventPBWithArena<TEvUnsafeUploadRowsRequest, - NKikimrTxDataShard::TEvUploadRowsRequest, - TEvDataShard::EvUnsafeUploadRowsRequest, - 16200, 32500> { - TEvUnsafeUploadRowsRequest() = default; - }; - - struct TEvUnsafeUploadRowsResponse : public TEventPB<TEvUnsafeUploadRowsResponse, - NKikimrTxDataShard::TEvUploadRowsResponse, - TEvDataShard::EvUnsafeUploadRowsResponse> { - TEvUnsafeUploadRowsResponse() = default; - - explicit TEvUnsafeUploadRowsResponse(ui64 tabletId, ui32 status = NKikimrTxDataShard::TError::OK) { - Record.SetTabletID(tabletId); - Record.SetStatus(status); - } - }; - // In most cases this event is local, thus users must // use Keys, Ranges and Program struct members instead of corresponding // protobuf members. In case of remote event these struct members will @@ -1223,12 +1206,10 @@ struct TEvDataShard { struct TEvGetS3DownloadInfo : public TEventLocal<TEvGetS3DownloadInfo, TEvDataShard::EvGetS3DownloadInfo> { - TActorId ReplyTo; ui64 TxId; - explicit TEvGetS3DownloadInfo(const TActorId& replyTo, ui64 txId) - : ReplyTo(replyTo) - , TxId(txId) + explicit TEvGetS3DownloadInfo(ui64 txId) + : TxId(txId) { } }; @@ -1236,38 +1217,20 @@ struct TEvDataShard { struct TEvStoreS3DownloadInfo : public TEventLocal<TEvStoreS3DownloadInfo, TEvDataShard::EvStoreS3DownloadInfo> { - TActorId ReplyTo; ui64 TxId; - TString DataETag; - ui64 ProcessedBytes; - ui64 WrittenBytes; - ui64 WrittenRows; - - TEvStoreS3DownloadInfo() = default; + NDataShard::TS3Download Info; - explicit TEvStoreS3DownloadInfo( - const TActorId& replyTo, - ui64 txId, - const TString& dataETag, - ui64 processedBytes, - ui64 writtenBytes, - ui64 writtenRows) - : ReplyTo(replyTo) - , TxId(txId) - , DataETag(dataETag) - , ProcessedBytes(processedBytes) - , WrittenBytes(writtenBytes) - , WrittenRows(writtenRows) + explicit TEvStoreS3DownloadInfo(ui64 txId, const NDataShard::TS3Download& info) + : TxId(txId) + , Info(info) { + Y_VERIFY(Info.DataETag); } TString ToString() const override { return TStringBuilder() << ToStringHeader() << " {" << " TxId: " << TxId - << " DataETag: " << DataETag - << " ProcessedBytes: " << ProcessedBytes - << " WrittenBytes: " << WrittenBytes - << " WrittenRows: " << WrittenRows + << " Info: " << Info << " }"; } }; @@ -1275,33 +1238,66 @@ struct TEvDataShard { struct TEvS3DownloadInfo : public TEventLocal<TEvS3DownloadInfo, TEvDataShard::EvS3DownloadInfo> { - struct TInfo { - TMaybe<TString> DataETag; - ui64 ProcessedBytes = 0; - ui64 WrittenBytes = 0; - ui64 WrittenRows = 0; - - TString ToString() const { - return TStringBuilder() << "{" - << " DataETag: " << DataETag - << " ProcessedBytes: " << ProcessedBytes - << " WrittenBytes: " << WrittenBytes - << " WrittenRows: " << WrittenRows - << " }"; - } - }; - - TInfo Info; + NDataShard::TS3Download Info; TEvS3DownloadInfo() = default; - explicit TEvS3DownloadInfo(const TInfo& info) + explicit TEvS3DownloadInfo(const NDataShard::TS3Download& info) : Info(info) { } TString ToString() const override { - return TStringBuilder() << ToStringHeader() << " " << Info.ToString(); + return TStringBuilder() << ToStringHeader() << " {" + << " Info: " << Info + << " }"; + } + }; + + struct TEvUnsafeUploadRowsRequest + : public TEventLocal<TEvUnsafeUploadRowsRequest, TEvDataShard::EvUnsafeUploadRowsRequest> + { + ui64 TxId; + std::shared_ptr<NKikimrTxDataShard::TEvUploadRowsRequest> RecordHolder; + const NKikimrTxDataShard::TEvUploadRowsRequest& Record; + NDataShard::TS3Download Info; + + explicit TEvUnsafeUploadRowsRequest( + ui64 txId, + const std::shared_ptr<NKikimrTxDataShard::TEvUploadRowsRequest>& record, + const NDataShard::TS3Download& info) + : TxId(txId) + , RecordHolder(record) + , Record(*RecordHolder) + , Info(info) + { + Y_VERIFY(Info.DataETag); + } + + TString ToString() const override { + return TStringBuilder() << ToStringHeader() << " {" + << " TxId: " << TxId + << " Info: " << Info + << " }"; + } + }; + + struct TEvUnsafeUploadRowsResponse + : public TEventLocal<TEvUnsafeUploadRowsResponse, TEvDataShard::EvUnsafeUploadRowsResponse> + { + NKikimrTxDataShard::TEvUploadRowsResponse Record; + NDataShard::TS3Download Info; + + explicit TEvUnsafeUploadRowsResponse(ui64 tabletId, ui32 status = NKikimrTxDataShard::TError::OK) { + Record.SetTabletID(tabletId); + Record.SetStatus(status); + } + + TString ToString() const override { + return TStringBuilder() << ToStringHeader() << " {" + << " Record: " << Record.ShortDebugString() + << " Info: " << Info + << " }"; } }; diff --git a/ydb/core/tx/datashard/datashard__s3_download_txs.cpp b/ydb/core/tx/datashard/datashard__s3_download_txs.cpp index e726171a8ef..fef011fa224 100644 --- a/ydb/core/tx/datashard/datashard__s3_download_txs.cpp +++ b/ydb/core/tx/datashard/datashard__s3_download_txs.cpp @@ -28,7 +28,7 @@ bool TDataShard::TTxGetS3DownloadInfo::Execute(TTransactionContext& txc, const T void TDataShard::TTxGetS3DownloadInfo::Complete(const TActorContext& ctx) { Y_VERIFY(Reply); - ctx.Send(Ev->Get()->ReplyTo, Reply.Release(), 0, Ev->Cookie); + ctx.Send(Ev->Sender, Reply.Release(), 0, Ev->Cookie); } /// Store @@ -40,12 +40,10 @@ TDataShard::TTxStoreS3DownloadInfo::TTxStoreS3DownloadInfo( { } bool TDataShard::TTxStoreS3DownloadInfo::Execute(TTransactionContext& txc, const TActorContext&) { - const auto& msg = *Ev->Get(); - txc.DB.NoMoreReadsForTx(); NIceDb::TNiceDb db(txc.DB); - const auto& info = Self->S3Downloads.Store(db, msg); + const auto& info = Self->S3Downloads.Store(db, Ev->Get()->TxId, Ev->Get()->Info); Reply.Reset(new TEvDataShard::TEvS3DownloadInfo(info)); return true; @@ -53,7 +51,7 @@ bool TDataShard::TTxStoreS3DownloadInfo::Execute(TTransactionContext& txc, const void TDataShard::TTxStoreS3DownloadInfo::Complete(const TActorContext& ctx) { Y_VERIFY(Reply); - ctx.Send(Ev->Get()->ReplyTo, Reply.Release(), 0, Ev->Cookie); + ctx.Send(Ev->Sender, Reply.Release(), 0, Ev->Cookie); } } // namespace NDataShard diff --git a/ydb/core/tx/datashard/datashard_common_upload.cpp b/ydb/core/tx/datashard/datashard_common_upload.cpp index 4128209133d..a3cbd5a476c 100644 --- a/ydb/core/tx/datashard/datashard_common_upload.cpp +++ b/ydb/core/tx/datashard/datashard_common_upload.cpp @@ -6,9 +6,9 @@ namespace NDataShard { template <typename TEvRequest, typename TEvResponse> TCommonUploadOps<TEvRequest, TEvResponse>::TCommonUploadOps(typename TEvRequest::TPtr& ev, bool breakLocks, bool collectChanges) - : BreakLocks(breakLocks) + : Ev(ev) + , BreakLocks(breakLocks) , CollectChanges(collectChanges) - , Ev(ev) { } @@ -17,7 +17,6 @@ bool TCommonUploadOps<TEvRequest, TEvResponse>::Execute(TDataShard* self, TTrans const TRowVersion& readVersion, const TRowVersion& writeVersion) { const auto& record = Ev->Get()->Record; - Result = MakeHolder<TEvResponse>(self->TabletID()); TInstant deadline = TInstant::MilliSeconds(record.GetCancelDeadlineMs()); @@ -224,16 +223,14 @@ void TCommonUploadOps<TEvRequest, TEvResponse>::GetResult(TDataShard* self, TAct } template <typename TEvRequest, typename TEvResponse> -void TCommonUploadOps<TEvRequest, TEvResponse>::SendResult(TDataShard* self, const TActorContext& ctx) { - Y_VERIFY(Result); - - if (Result->Record.GetStatus() == NKikimrTxDataShard::TError::OK) { - self->IncCounter(COUNTER_BULK_UPSERT_SUCCESS); - } else { - self->IncCounter(COUNTER_BULK_UPSERT_ERROR); - } +const TEvRequest* TCommonUploadOps<TEvRequest, TEvResponse>::GetRequest() const { + return Ev->Get(); +} - ctx.Send(Ev->Sender, std::move(Result)); +template <typename TEvRequest, typename TEvResponse> +TEvResponse* TCommonUploadOps<TEvRequest, TEvResponse>::GetResult() { + Y_VERIFY(Result); + return Result.Get(); } template <typename TEvRequest, typename TEvResponse> diff --git a/ydb/core/tx/datashard/datashard_common_upload.h b/ydb/core/tx/datashard/datashard_common_upload.h index 928558de2d9..052937aabaf 100644 --- a/ydb/core/tx/datashard/datashard_common_upload.h +++ b/ydb/core/tx/datashard/datashard_common_upload.h @@ -11,10 +11,10 @@ template <typename TEvRequest, typename TEvResponse> class TCommonUploadOps { using IChangeCollector = NMiniKQL::IChangeCollector; + typename TEvRequest::TPtr Ev; const bool BreakLocks; const bool CollectChanges; - typename TEvRequest::TPtr Ev; THolder<TEvResponse> Result; THolder<IChangeCollector> ChangeCollector; @@ -24,7 +24,8 @@ public: protected: bool Execute(TDataShard* self, TTransactionContext& txc, const TRowVersion& readVersion, const TRowVersion& writeVersion); void GetResult(TDataShard* self, TActorId& target, THolder<IEventBase>& event, ui64& cookie); - void SendResult(TDataShard* self, const TActorContext& ctx); + const TEvRequest* GetRequest() const; + TEvResponse* GetResult(); TVector<IChangeCollector::TChange> GetCollectedChanges() const; private: diff --git a/ydb/core/tx/datashard/datashard_direct_upload.h b/ydb/core/tx/datashard/datashard_direct_upload.h index a26226bd79a..b6bc1845b34 100644 --- a/ydb/core/tx/datashard/datashard_direct_upload.h +++ b/ydb/core/tx/datashard/datashard_direct_upload.h @@ -6,10 +6,11 @@ namespace NKikimr { namespace NDataShard { -class TDirectTxUpload : public IDirectTx, - public TCommonUploadOps< +class TDirectTxUpload : public IDirectTx + , public TCommonUploadOps< TEvDataShard::TEvUploadRowsRequest, - TEvDataShard::TEvUploadRowsResponse> { + TEvDataShard::TEvUploadRowsResponse> +{ public: explicit TDirectTxUpload(TEvDataShard::TEvUploadRowsRequest::TPtr& ev); diff --git a/ydb/core/tx/datashard/datashard_s3_download.h b/ydb/core/tx/datashard/datashard_s3_download.h new file mode 100644 index 00000000000..eeb9fb213c0 --- /dev/null +++ b/ydb/core/tx/datashard/datashard_s3_download.h @@ -0,0 +1,29 @@ +#pragma once + +#include <util/generic/maybe.h> + +namespace NKikimr { +namespace NDataShard { + +struct TS3Download { + TMaybe<TString> DataETag; + ui64 ProcessedBytes = 0; + ui64 WrittenBytes = 0; + ui64 WrittenRows = 0; + + void Out(IOutputStream& out) const { + out << "{" + << " DataETag: " << DataETag + << " ProcessedBytes: " << ProcessedBytes + << " WrittenBytes: " << WrittenBytes + << " WrittenRows: " << WrittenRows + << " }"; + } +}; + +} // namespace NDataShard +} // namespace NKikimr + +Y_DECLARE_OUT_SPEC(inline, NKikimr::NDataShard::TS3Download, out, value) { + value.Out(out); +} diff --git a/ydb/core/tx/datashard/datashard_s3_downloads.cpp b/ydb/core/tx/datashard/datashard_s3_downloads.cpp index f1571885c50..4718bad3cac 100644 --- a/ydb/core/tx/datashard/datashard_s3_downloads.cpp +++ b/ydb/core/tx/datashard/datashard_s3_downloads.cpp @@ -40,25 +40,23 @@ void TS3DownloadsManager::Reset() { Downloads.clear(); } -const TS3DownloadsManager::TInfo* TS3DownloadsManager::Find(ui64 txId) const { +const TS3Download* TS3DownloadsManager::Find(ui64 txId) const { return Downloads.FindPtr(txId); } -const TS3DownloadsManager::TInfo& TS3DownloadsManager::Store(NIceDb::TNiceDb& db, const TEvDataShard::TEvStoreS3DownloadInfo& msg) { - auto& info = Downloads[msg.TxId]; +const TS3Download& TS3DownloadsManager::Store(NIceDb::TNiceDb& db, ui64 txId, const TS3Download& newInfo) { + auto& info = Downloads[txId]; - Y_VERIFY(info.DataETag.GetOrElse(msg.DataETag) == msg.DataETag); - info.DataETag = msg.DataETag; - info.ProcessedBytes = msg.ProcessedBytes; - info.WrittenBytes = msg.WrittenBytes; - info.WrittenRows = msg.WrittenRows; + Y_VERIFY(newInfo.DataETag); + Y_VERIFY(info.DataETag.GetOrElse(*newInfo.DataETag) == *newInfo.DataETag); + info = newInfo; using Schema = TDataShard::Schema; - db.Table<Schema::S3Downloads>().Key(msg.TxId).Update( - NIceDb::TUpdate<Schema::S3Downloads::DataETag>(msg.DataETag), - NIceDb::TUpdate<Schema::S3Downloads::ProcessedBytes>(msg.ProcessedBytes), - NIceDb::TUpdate<Schema::S3Downloads::WrittenBytes>(msg.WrittenBytes), - NIceDb::TUpdate<Schema::S3Downloads::WrittenRows>(msg.WrittenRows)); + db.Table<Schema::S3Downloads>().Key(txId).Update( + NIceDb::TUpdate<Schema::S3Downloads::DataETag>(*newInfo.DataETag), + NIceDb::TUpdate<Schema::S3Downloads::ProcessedBytes>(newInfo.ProcessedBytes), + NIceDb::TUpdate<Schema::S3Downloads::WrittenBytes>(newInfo.WrittenBytes), + NIceDb::TUpdate<Schema::S3Downloads::WrittenRows>(newInfo.WrittenRows)); return info; } diff --git a/ydb/core/tx/datashard/datashard_s3_downloads.h b/ydb/core/tx/datashard/datashard_s3_downloads.h index d7d11ad2f21..1af348fc2c2 100644 --- a/ydb/core/tx/datashard/datashard_s3_downloads.h +++ b/ydb/core/tx/datashard/datashard_s3_downloads.h @@ -1,6 +1,6 @@ #pragma once -#include "datashard.h" +#include "datashard_s3_download.h" #include <ydb/core/tablet_flat/flat_cxx_database.h> @@ -10,19 +10,17 @@ namespace NKikimr { namespace NDataShard { class TS3DownloadsManager { - using TInfo = TEvDataShard::TEvS3DownloadInfo::TInfo; - public: bool Load(NIceDb::TNiceDb& db); void Reset(); - const TInfo* Find(ui64 txId) const; - const TInfo& Store(NIceDb::TNiceDb& db, const TEvDataShard::TEvStoreS3DownloadInfo& msg); + const TS3Download* Find(ui64 txId) const; + const TS3Download& Store(NIceDb::TNiceDb& db, ui64 txId, const TS3Download& newInfo); private: - THashMap<ui64, TInfo> Downloads; + THashMap<ui64, TS3Download> Downloads; }; // TS3DownloadsManager -} // namespace NDataShard -} // namespace NKikimr +} // namespace NDataShard +} // namespace NKikimr diff --git a/ydb/core/tx/datashard/datashard_txs.h b/ydb/core/tx/datashard/datashard_txs.h index a3168bd3ef2..a73e24b1991 100644 --- a/ydb/core/tx/datashard/datashard_txs.h +++ b/ydb/core/tx/datashard/datashard_txs.h @@ -273,10 +273,11 @@ private: THolder<TEvDataShard::TEvS3DownloadInfo> Reply; }; -class TDataShard::TTxUnsafeUploadRows: public NTabletFlatExecutor::TTransactionBase<TDataShard>, - public TCommonUploadOps< - TEvDataShard::TEvUnsafeUploadRowsRequest, - TEvDataShard::TEvUnsafeUploadRowsResponse> { +class TDataShard::TTxUnsafeUploadRows: public NTabletFlatExecutor::TTransactionBase<TDataShard> + , public TCommonUploadOps< + TEvDataShard::TEvUnsafeUploadRowsRequest, + TEvDataShard::TEvUnsafeUploadRowsResponse> +{ public: TTxUnsafeUploadRows(TDataShard* ds, TEvDataShard::TEvUnsafeUploadRowsRequest::TPtr& ev); bool Execute(TTransactionContext& txc, const TActorContext& ctx) override; diff --git a/ydb/core/tx/datashard/datashard_unsafe_upload.cpp b/ydb/core/tx/datashard/datashard_unsafe_upload.cpp index cb085ad4805..1aec8f60214 100644 --- a/ydb/core/tx/datashard/datashard_unsafe_upload.cpp +++ b/ydb/core/tx/datashard/datashard_unsafe_upload.cpp @@ -11,8 +11,15 @@ TDataShard::TTxUnsafeUploadRows::TTxUnsafeUploadRows(TDataShard* ds, TEvDataShar bool TDataShard::TTxUnsafeUploadRows::Execute(TTransactionContext& txc, const TActorContext&) { auto [readVersion, writeVersion] = Self->GetReadWriteVersions(); - if (!TCommonUploadOps::Execute(Self, txc, readVersion, writeVersion)) + if (!TCommonUploadOps::Execute(Self, txc, readVersion, writeVersion)) { return false; + } + + auto* result = GetResult(); + if (result->Record.GetStatus() == NKikimrTxDataShard::TError::OK) { + NIceDb::TNiceDb db(txc.DB); + result->Info = Self->S3Downloads.Store(db, GetRequest()->TxId, GetRequest()->Info); + } if (Self->IsMvccEnabled()) { // Note: we always wait for completion, so we can ignore the result diff --git a/ydb/core/tx/datashard/import_s3.cpp b/ydb/core/tx/datashard/import_s3.cpp index 9ea7fbab5c0..ecabfe7bb8a 100644 --- a/ydb/core/tx/datashard/import_s3.cpp +++ b/ydb/core/tx/datashard/import_s3.cpp @@ -86,15 +86,15 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader>, private TS3User { class TUploadRowsRequestBuilder { public: void New(const TTableInfo& tableInfo, const NKikimrSchemeOp::TTableDescription& scheme) { - Request.Reset(new TEvDataShard::TEvUnsafeUploadRowsRequest()); - Request->Record.SetTableId(tableInfo.GetId()); + Record = std::make_shared<NKikimrTxDataShard::TEvUploadRowsRequest>(); + Record->SetTableId(tableInfo.GetId()); TVector<TString> columnNames; for (const auto& column : scheme.GetColumns()) { columnNames.push_back(column.GetName()); } - auto& rowScheme = *Request->Record.MutableRowScheme(); + auto& rowScheme = *Record->MutableRowScheme(); for (ui32 id : tableInfo.GetKeyColumnIds()) { rowScheme.AddKeyColumnIds(id); } @@ -104,27 +104,27 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader>, private TS3User { } void AddRow(const TVector<TCell>& keys, const TVector<TCell>& values) { - Y_VERIFY(Request); - auto& row = *Request->Record.AddRows(); + Y_VERIFY(Record); + auto& row = *Record->AddRows(); row.SetKeyColumns(TSerializedCellVec::Serialize(keys)); row.SetValueColumns(TSerializedCellVec::Serialize(values)); } - const NKikimrTxDataShard::TEvUploadRowsRequest& GetRecord() const { - Y_VERIFY(Request); - return Request->Record; - } - - THolder<TEvDataShard::TEvUnsafeUploadRowsRequest> Build() { - Y_VERIFY(Request); - return std::move(Request); + const std::shared_ptr<NKikimrTxDataShard::TEvUploadRowsRequest>& GetRecord() { + Y_VERIFY(Record); + return Record; } private: - THolder<TEvDataShard::TEvUnsafeUploadRowsRequest> Request; + std::shared_ptr<NKikimrTxDataShard::TEvUploadRowsRequest> Record; }; // TUploadRowsRequestBuilder + enum class EWakeupTag: ui64 { + Restart, + RetryUpload, + }; + void AllocateResource() { IMPORT_LOG_D("AllocateResource" << ": self# " << SelfId()); @@ -205,7 +205,7 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader>, private TS3User { ETag = result.GetResult().GetETag(); ContentLength = result.GetResult().GetContentLength(); - Send(DataShard, new TEvDataShard::TEvGetS3DownloadInfo(SelfId(), TxId)); + Send(DataShard, new TEvDataShard::TEvGetS3DownloadInfo(TxId)); } void Handle(TEvDataShard::TEvS3DownloadInfo::TPtr& ev) { @@ -213,15 +213,22 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader>, private TS3User { IMPORT_LOG_D("Handle TEvDataShard::TEvS3DownloadInfo" << ": self# " << SelfId() - << ", info# " << info.ToString()); + << ", info# " << info); if (!info.DataETag) { - Send(DataShard, new TEvDataShard::TEvStoreS3DownloadInfo(SelfId(), - TxId, ETag, ProcessedBytes, WrittenBytes, WrittenRows)); + Send(DataShard, new TEvDataShard::TEvStoreS3DownloadInfo(TxId, { + ETag, ProcessedBytes, WrittenBytes, WrittenRows + })); return; } - if (!CheckETag(*info.DataETag, ETag, TStringBuf("DownloadInfo"))) { + ProcessDownloadInfo(info, TStringBuf("DownloadInfo")); + } + + void ProcessDownloadInfo(const TS3Download& info, const TStringBuf marker) { + Y_VERIFY(info.DataETag); + + if (!CheckETag(*info.DataETag, ETag, marker)) { return; } @@ -283,14 +290,9 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader>, private TS3User { } const auto& record = RequestBuilder.GetRecord(); - WrittenRows += record.RowsSize(); + WrittenRows += record->RowsSize(); - IMPORT_LOG_D("Upload rows" - << ": self# " << SelfId() - << ", count# " << record.RowsSize() - << ", size# " << record.ByteSizeLong()); - - Send(DataShard, RequestBuilder.Build().Release()); + UploadRows(); return false; } @@ -320,27 +322,42 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader>, private TS3User { return true; } + void UploadRows() { + const auto& record = RequestBuilder.GetRecord(); + + IMPORT_LOG_D("Upload rows" + << ": self# " << SelfId() + << ", count# " << record->RowsSize() + << ", size# " << record->ByteSizeLong()); + + Send(DataShard, new TEvDataShard::TEvUnsafeUploadRowsRequest(TxId, record, { + ETag, ProcessedBytes, WrittenBytes, WrittenRows + })); + } + void Handle(TEvDataShard::TEvUnsafeUploadRowsResponse::TPtr& ev) { const auto& record = ev->Get()->Record; + const auto& info = ev->Get()->Info; IMPORT_LOG_D("Handle TEvDataShard::TEvUnsafeUploadRowsResponse" << ": self# " << SelfId() - << ", record# " << record.ShortDebugString()); + << ", record# " << record.ShortDebugString() + << ", info# " << info); switch (record.GetStatus()) { case NKikimrTxDataShard::TError::OK: - break; + return ProcessDownloadInfo(info, TStringBuf("UploadResponse")); + + case NKikimrTxDataShard::TError::WRONG_SHARD_STATE: // OVERLOADED + return RetryUpload(); case NKikimrTxDataShard::TError::SCHEME_ERROR: case NKikimrTxDataShard::TError::BAD_ARGUMENT: return Finish(false, record.GetErrorDescription()); default: - return RetryOrFinish(record.GetErrorDescription()); + return RestartOrFinish(record.GetErrorDescription()); }; - - Send(DataShard, new TEvDataShard::TEvStoreS3DownloadInfo(SelfId(), - TxId, ETag, ProcessedBytes, WrittenBytes, WrittenRows)); } template <typename TResult> @@ -352,7 +369,7 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader>, private TS3User { IMPORT_LOG_E("Error at '" << marker << "'" << ": self# " << SelfId() << ", error# " << result); - RetryOrFinish(result.GetError().GetMessage().c_str()); + RestartOrFinish(result.GetError().GetMessage().c_str()); return false; } @@ -417,17 +434,30 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader>, private TS3User { return true; } - void RetryOrFinish(const TString& error) { + void RetryUpload() { + Schedule(TDuration::MilliSeconds(50), new TEvents::TEvWakeup(static_cast<ui64>(EWakeupTag::RetryUpload))); + } + + void RestartOrFinish(const TString& error) { if (Attempt++ < Retries) { Delay = Min(Delay * Attempt, MaxDelay); const TDuration random = TDuration::FromValue(TAppData::RandomProvider->GenRand64() % Delay.MicroSeconds()); - Schedule(Delay + random, new TEvents::TEvWakeup()); + Schedule(Delay + random, new TEvents::TEvWakeup(static_cast<ui64>(EWakeupTag::Restart))); } else { Finish(false, error); } } + void Handle(TEvents::TEvWakeup::TPtr& ev) { + switch (static_cast<EWakeupTag>(ev->Get()->Tag)) { + case EWakeupTag::Restart: + return Restart(); + case EWakeupTag::RetryUpload: + return UploadRows(); + } + } + void Finish(bool success = true, const TString& error = TString()) { IMPORT_LOG_I("Finish" << ": self# " << SelfId() @@ -505,8 +535,8 @@ public: hFunc(TEvDataShard::TEvS3DownloadInfo, Handle); hFunc(TEvDataShard::TEvUnsafeUploadRowsResponse, Handle); - cFunc(TEvents::TEvWakeup::EventType, Restart); - cFunc(TEvents::TEvPoisonPill::EventType, NotifyDied); + hFunc(TEvents::TEvWakeup, Handle); + sFunc(TEvents::TEvPoisonPill, NotifyDied); } } diff --git a/ydb/core/tx/schemeshard/ut_restore.cpp b/ydb/core/tx/schemeshard/ut_restore.cpp index 49777286ed6..5573748eb88 100644 --- a/ydb/core/tx/schemeshard/ut_restore.cpp +++ b/ydb/core/tx/schemeshard/ut_restore.cpp @@ -1,5 +1,6 @@ #include <contrib/libs/double-conversion/double-conversion/ieee.h> +#include <ydb/core/base/localdb.h> #include <ydb/core/kqp/ut/common/kqp_ut_common.h> #include <ydb/core/tablet/resource_broker.h> #include <ydb/core/tx/schemeshard/ut_helpers/helpers.h> @@ -536,6 +537,65 @@ Y_UNIT_TEST_SUITE(TRestoreTests) { UNIT_ASSERT_VALUES_EQUAL(result->GetRowsProcessed(), 2); } + Y_UNIT_TEST(ShouldHandleOverloadedShard) { + TTestBasicRuntime runtime; + TTestEnv env(runtime); + ui64 txId = 100; + + // prepare table schema with special policy + NKikimrSchemeOp::TTableDescription desc; + desc.SetName("Table"); + desc.AddKeyColumnNames("key"); + { + auto& column = *desc.AddColumns(); + column.SetName("key"); + column.SetType("Uint32"); + } + { + auto& column = *desc.AddColumns(); + column.SetName("value"); + column.SetType("Utf8"); + } + + auto policy = NLocalDb::CreateDefaultUserTablePolicy(); + policy->InMemForceSizeToSnapshot = 1; + policy->Serialize(*desc.MutablePartitionConfig()->MutableCompactionPolicy()); + + // serialize schema + TString scheme; + UNIT_ASSERT(google::protobuf::TextFormat::PrintToString(desc, &scheme)); + TestCreateTable(runtime, ++txId, "/MyRoot", scheme); + env.TestWaitNotification(runtime, txId); + + ui32 total = 0; + ui32 overloads = 0; + runtime.SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) { + if (ev->GetTypeRewrite() == TEvDataShard::EvUnsafeUploadRowsResponse) { + ++total; + const auto& record = ev->Get<TEvDataShard::TEvUnsafeUploadRowsResponse>()->Record; + overloads += ui32(record.GetStatus() == NKikimrTxDataShard::TError::WRONG_SHARD_STATE); + } + + return TTestActorRuntime::EEventAction::PROCESS; + }); + + TPortManager portManager; + THolder<TS3Mock> s3Mock; + + const auto data = GenerateTestData("", 1000); + const ui32 batchSize = 32; + RestoreNoWait(runtime, ++txId, portManager.GetPort(), s3Mock, {data}, batchSize); + + env.TestWaitNotification(runtime, txId); + UNIT_ASSERT(overloads > 0); + + const ui32 expected = data.Csv.size() / batchSize + ui32(bool(data.Csv.size() % batchSize)); + UNIT_ASSERT_VALUES_EQUAL(expected, total - overloads); + + auto content = ReadTable(runtime, TTestTxConfig::FakeHiveTablets, "Table", {"key", "Uint32", "0"}); + NKqp::CompareYson(data.YsonStr, content); + } + Y_UNIT_TEST(ShouldFailOnFileWithoutNewLines) { TTestBasicRuntime runtime; |