aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIlnaz Nizametdinov <i.nizametdinov@gmail.com>2022-04-13 12:35:07 +0300
committerIlnaz Nizametdinov <i.nizametdinov@gmail.com>2022-04-13 12:35:07 +0300
commit2f14a0656c686c407bb481a79060d4f4f57e3dac (patch)
treee68136d67287fad2448b75d651844576bdddbd09
parent7a67b298c60dfee4b9d9bb4109a7862b6e37659d (diff)
downloadydb-2f14a0656c686c407bb481a79060d4f4f57e3dac.tar.gz
Check RejectProbability, store data along with state KIKIMR-14693 KIKIMR-14695
ref:66b425d4dfa58bd225eb6b7fdb43737792b1cd54
-rw-r--r--ydb/core/tx/datashard/datashard.cpp14
-rw-r--r--ydb/core/tx/datashard/datashard.h126
-rw-r--r--ydb/core/tx/datashard/datashard__s3_download_txs.cpp8
-rw-r--r--ydb/core/tx/datashard/datashard_common_upload.cpp21
-rw-r--r--ydb/core/tx/datashard/datashard_common_upload.h5
-rw-r--r--ydb/core/tx/datashard/datashard_direct_upload.h7
-rw-r--r--ydb/core/tx/datashard/datashard_s3_download.h29
-rw-r--r--ydb/core/tx/datashard/datashard_s3_downloads.cpp24
-rw-r--r--ydb/core/tx/datashard/datashard_s3_downloads.h14
-rw-r--r--ydb/core/tx/datashard/datashard_txs.h9
-rw-r--r--ydb/core/tx/datashard/datashard_unsafe_upload.cpp9
-rw-r--r--ydb/core/tx/datashard/import_s3.cpp104
-rw-r--r--ydb/core/tx/schemeshard/ut_restore.cpp60
13 files changed, 280 insertions, 150 deletions
diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp
index 55921144e41..0500e32d533 100644
--- a/ydb/core/tx/datashard/datashard.cpp
+++ b/ydb/core/tx/datashard/datashard.cpp
@@ -2944,6 +2944,20 @@ void TDataShard::Handle(TEvDataShard::TEvStoreS3DownloadInfo::TPtr& ev, const TA
void TDataShard::Handle(TEvDataShard::TEvUnsafeUploadRowsRequest::TPtr& ev, const TActorContext& ctx)
{
+ const float rejectProbabilty = Executor()->GetRejectProbability();
+ if (rejectProbabilty > 0) {
+ const float rnd = AppData(ctx)->RandomProvider->GenRandReal2();
+ if (rnd < rejectProbabilty) {
+ auto response = MakeHolder<TEvDataShard::TEvUnsafeUploadRowsResponse>(
+ TabletID(), NKikimrTxDataShard::TError::WRONG_SHARD_STATE);
+ response->Record.SetErrorDescription("Reject due to given RejectProbability");
+ ctx.Send(ev->Sender, std::move(response));
+ IncCounter(COUNTER_BULK_UPSERT_OVERLOADED);
+
+ return;
+ }
+ }
+
Execute(new TTxUnsafeUploadRows(this, ev), ctx);
}
diff --git a/ydb/core/tx/datashard/datashard.h b/ydb/core/tx/datashard/datashard.h
index 5d39415ac39..06f357c8e97 100644
--- a/ydb/core/tx/datashard/datashard.h
+++ b/ydb/core/tx/datashard/datashard.h
@@ -1,5 +1,6 @@
#pragma once
+#include "datashard_s3_download.h"
#include "datashard_s3_upload.h"
#include <ydb/core/tx/tx.h>
@@ -857,24 +858,6 @@ struct TEvDataShard {
}
};
- struct TEvUnsafeUploadRowsRequest : public TEventPBWithArena<TEvUnsafeUploadRowsRequest,
- NKikimrTxDataShard::TEvUploadRowsRequest,
- TEvDataShard::EvUnsafeUploadRowsRequest,
- 16200, 32500> {
- TEvUnsafeUploadRowsRequest() = default;
- };
-
- struct TEvUnsafeUploadRowsResponse : public TEventPB<TEvUnsafeUploadRowsResponse,
- NKikimrTxDataShard::TEvUploadRowsResponse,
- TEvDataShard::EvUnsafeUploadRowsResponse> {
- TEvUnsafeUploadRowsResponse() = default;
-
- explicit TEvUnsafeUploadRowsResponse(ui64 tabletId, ui32 status = NKikimrTxDataShard::TError::OK) {
- Record.SetTabletID(tabletId);
- Record.SetStatus(status);
- }
- };
-
// In most cases this event is local, thus users must
// use Keys, Ranges and Program struct members instead of corresponding
// protobuf members. In case of remote event these struct members will
@@ -1223,12 +1206,10 @@ struct TEvDataShard {
struct TEvGetS3DownloadInfo
: public TEventLocal<TEvGetS3DownloadInfo, TEvDataShard::EvGetS3DownloadInfo>
{
- TActorId ReplyTo;
ui64 TxId;
- explicit TEvGetS3DownloadInfo(const TActorId& replyTo, ui64 txId)
- : ReplyTo(replyTo)
- , TxId(txId)
+ explicit TEvGetS3DownloadInfo(ui64 txId)
+ : TxId(txId)
{
}
};
@@ -1236,38 +1217,20 @@ struct TEvDataShard {
struct TEvStoreS3DownloadInfo
: public TEventLocal<TEvStoreS3DownloadInfo, TEvDataShard::EvStoreS3DownloadInfo>
{
- TActorId ReplyTo;
ui64 TxId;
- TString DataETag;
- ui64 ProcessedBytes;
- ui64 WrittenBytes;
- ui64 WrittenRows;
-
- TEvStoreS3DownloadInfo() = default;
+ NDataShard::TS3Download Info;
- explicit TEvStoreS3DownloadInfo(
- const TActorId& replyTo,
- ui64 txId,
- const TString& dataETag,
- ui64 processedBytes,
- ui64 writtenBytes,
- ui64 writtenRows)
- : ReplyTo(replyTo)
- , TxId(txId)
- , DataETag(dataETag)
- , ProcessedBytes(processedBytes)
- , WrittenBytes(writtenBytes)
- , WrittenRows(writtenRows)
+ explicit TEvStoreS3DownloadInfo(ui64 txId, const NDataShard::TS3Download& info)
+ : TxId(txId)
+ , Info(info)
{
+ Y_VERIFY(Info.DataETag);
}
TString ToString() const override {
return TStringBuilder() << ToStringHeader() << " {"
<< " TxId: " << TxId
- << " DataETag: " << DataETag
- << " ProcessedBytes: " << ProcessedBytes
- << " WrittenBytes: " << WrittenBytes
- << " WrittenRows: " << WrittenRows
+ << " Info: " << Info
<< " }";
}
};
@@ -1275,33 +1238,66 @@ struct TEvDataShard {
struct TEvS3DownloadInfo
: public TEventLocal<TEvS3DownloadInfo, TEvDataShard::EvS3DownloadInfo>
{
- struct TInfo {
- TMaybe<TString> DataETag;
- ui64 ProcessedBytes = 0;
- ui64 WrittenBytes = 0;
- ui64 WrittenRows = 0;
-
- TString ToString() const {
- return TStringBuilder() << "{"
- << " DataETag: " << DataETag
- << " ProcessedBytes: " << ProcessedBytes
- << " WrittenBytes: " << WrittenBytes
- << " WrittenRows: " << WrittenRows
- << " }";
- }
- };
-
- TInfo Info;
+ NDataShard::TS3Download Info;
TEvS3DownloadInfo() = default;
- explicit TEvS3DownloadInfo(const TInfo& info)
+ explicit TEvS3DownloadInfo(const NDataShard::TS3Download& info)
: Info(info)
{
}
TString ToString() const override {
- return TStringBuilder() << ToStringHeader() << " " << Info.ToString();
+ return TStringBuilder() << ToStringHeader() << " {"
+ << " Info: " << Info
+ << " }";
+ }
+ };
+
+ struct TEvUnsafeUploadRowsRequest
+ : public TEventLocal<TEvUnsafeUploadRowsRequest, TEvDataShard::EvUnsafeUploadRowsRequest>
+ {
+ ui64 TxId;
+ std::shared_ptr<NKikimrTxDataShard::TEvUploadRowsRequest> RecordHolder;
+ const NKikimrTxDataShard::TEvUploadRowsRequest& Record;
+ NDataShard::TS3Download Info;
+
+ explicit TEvUnsafeUploadRowsRequest(
+ ui64 txId,
+ const std::shared_ptr<NKikimrTxDataShard::TEvUploadRowsRequest>& record,
+ const NDataShard::TS3Download& info)
+ : TxId(txId)
+ , RecordHolder(record)
+ , Record(*RecordHolder)
+ , Info(info)
+ {
+ Y_VERIFY(Info.DataETag);
+ }
+
+ TString ToString() const override {
+ return TStringBuilder() << ToStringHeader() << " {"
+ << " TxId: " << TxId
+ << " Info: " << Info
+ << " }";
+ }
+ };
+
+ struct TEvUnsafeUploadRowsResponse
+ : public TEventLocal<TEvUnsafeUploadRowsResponse, TEvDataShard::EvUnsafeUploadRowsResponse>
+ {
+ NKikimrTxDataShard::TEvUploadRowsResponse Record;
+ NDataShard::TS3Download Info;
+
+ explicit TEvUnsafeUploadRowsResponse(ui64 tabletId, ui32 status = NKikimrTxDataShard::TError::OK) {
+ Record.SetTabletID(tabletId);
+ Record.SetStatus(status);
+ }
+
+ TString ToString() const override {
+ return TStringBuilder() << ToStringHeader() << " {"
+ << " Record: " << Record.ShortDebugString()
+ << " Info: " << Info
+ << " }";
}
};
diff --git a/ydb/core/tx/datashard/datashard__s3_download_txs.cpp b/ydb/core/tx/datashard/datashard__s3_download_txs.cpp
index e726171a8ef..fef011fa224 100644
--- a/ydb/core/tx/datashard/datashard__s3_download_txs.cpp
+++ b/ydb/core/tx/datashard/datashard__s3_download_txs.cpp
@@ -28,7 +28,7 @@ bool TDataShard::TTxGetS3DownloadInfo::Execute(TTransactionContext& txc, const T
void TDataShard::TTxGetS3DownloadInfo::Complete(const TActorContext& ctx) {
Y_VERIFY(Reply);
- ctx.Send(Ev->Get()->ReplyTo, Reply.Release(), 0, Ev->Cookie);
+ ctx.Send(Ev->Sender, Reply.Release(), 0, Ev->Cookie);
}
/// Store
@@ -40,12 +40,10 @@ TDataShard::TTxStoreS3DownloadInfo::TTxStoreS3DownloadInfo(
{ }
bool TDataShard::TTxStoreS3DownloadInfo::Execute(TTransactionContext& txc, const TActorContext&) {
- const auto& msg = *Ev->Get();
-
txc.DB.NoMoreReadsForTx();
NIceDb::TNiceDb db(txc.DB);
- const auto& info = Self->S3Downloads.Store(db, msg);
+ const auto& info = Self->S3Downloads.Store(db, Ev->Get()->TxId, Ev->Get()->Info);
Reply.Reset(new TEvDataShard::TEvS3DownloadInfo(info));
return true;
@@ -53,7 +51,7 @@ bool TDataShard::TTxStoreS3DownloadInfo::Execute(TTransactionContext& txc, const
void TDataShard::TTxStoreS3DownloadInfo::Complete(const TActorContext& ctx) {
Y_VERIFY(Reply);
- ctx.Send(Ev->Get()->ReplyTo, Reply.Release(), 0, Ev->Cookie);
+ ctx.Send(Ev->Sender, Reply.Release(), 0, Ev->Cookie);
}
} // namespace NDataShard
diff --git a/ydb/core/tx/datashard/datashard_common_upload.cpp b/ydb/core/tx/datashard/datashard_common_upload.cpp
index 4128209133d..a3cbd5a476c 100644
--- a/ydb/core/tx/datashard/datashard_common_upload.cpp
+++ b/ydb/core/tx/datashard/datashard_common_upload.cpp
@@ -6,9 +6,9 @@ namespace NDataShard {
template <typename TEvRequest, typename TEvResponse>
TCommonUploadOps<TEvRequest, TEvResponse>::TCommonUploadOps(typename TEvRequest::TPtr& ev, bool breakLocks, bool collectChanges)
- : BreakLocks(breakLocks)
+ : Ev(ev)
+ , BreakLocks(breakLocks)
, CollectChanges(collectChanges)
- , Ev(ev)
{
}
@@ -17,7 +17,6 @@ bool TCommonUploadOps<TEvRequest, TEvResponse>::Execute(TDataShard* self, TTrans
const TRowVersion& readVersion, const TRowVersion& writeVersion)
{
const auto& record = Ev->Get()->Record;
-
Result = MakeHolder<TEvResponse>(self->TabletID());
TInstant deadline = TInstant::MilliSeconds(record.GetCancelDeadlineMs());
@@ -224,16 +223,14 @@ void TCommonUploadOps<TEvRequest, TEvResponse>::GetResult(TDataShard* self, TAct
}
template <typename TEvRequest, typename TEvResponse>
-void TCommonUploadOps<TEvRequest, TEvResponse>::SendResult(TDataShard* self, const TActorContext& ctx) {
- Y_VERIFY(Result);
-
- if (Result->Record.GetStatus() == NKikimrTxDataShard::TError::OK) {
- self->IncCounter(COUNTER_BULK_UPSERT_SUCCESS);
- } else {
- self->IncCounter(COUNTER_BULK_UPSERT_ERROR);
- }
+const TEvRequest* TCommonUploadOps<TEvRequest, TEvResponse>::GetRequest() const {
+ return Ev->Get();
+}
- ctx.Send(Ev->Sender, std::move(Result));
+template <typename TEvRequest, typename TEvResponse>
+TEvResponse* TCommonUploadOps<TEvRequest, TEvResponse>::GetResult() {
+ Y_VERIFY(Result);
+ return Result.Get();
}
template <typename TEvRequest, typename TEvResponse>
diff --git a/ydb/core/tx/datashard/datashard_common_upload.h b/ydb/core/tx/datashard/datashard_common_upload.h
index 928558de2d9..052937aabaf 100644
--- a/ydb/core/tx/datashard/datashard_common_upload.h
+++ b/ydb/core/tx/datashard/datashard_common_upload.h
@@ -11,10 +11,10 @@ template <typename TEvRequest, typename TEvResponse>
class TCommonUploadOps {
using IChangeCollector = NMiniKQL::IChangeCollector;
+ typename TEvRequest::TPtr Ev;
const bool BreakLocks;
const bool CollectChanges;
- typename TEvRequest::TPtr Ev;
THolder<TEvResponse> Result;
THolder<IChangeCollector> ChangeCollector;
@@ -24,7 +24,8 @@ public:
protected:
bool Execute(TDataShard* self, TTransactionContext& txc, const TRowVersion& readVersion, const TRowVersion& writeVersion);
void GetResult(TDataShard* self, TActorId& target, THolder<IEventBase>& event, ui64& cookie);
- void SendResult(TDataShard* self, const TActorContext& ctx);
+ const TEvRequest* GetRequest() const;
+ TEvResponse* GetResult();
TVector<IChangeCollector::TChange> GetCollectedChanges() const;
private:
diff --git a/ydb/core/tx/datashard/datashard_direct_upload.h b/ydb/core/tx/datashard/datashard_direct_upload.h
index a26226bd79a..b6bc1845b34 100644
--- a/ydb/core/tx/datashard/datashard_direct_upload.h
+++ b/ydb/core/tx/datashard/datashard_direct_upload.h
@@ -6,10 +6,11 @@
namespace NKikimr {
namespace NDataShard {
-class TDirectTxUpload : public IDirectTx,
- public TCommonUploadOps<
+class TDirectTxUpload : public IDirectTx
+ , public TCommonUploadOps<
TEvDataShard::TEvUploadRowsRequest,
- TEvDataShard::TEvUploadRowsResponse> {
+ TEvDataShard::TEvUploadRowsResponse>
+{
public:
explicit TDirectTxUpload(TEvDataShard::TEvUploadRowsRequest::TPtr& ev);
diff --git a/ydb/core/tx/datashard/datashard_s3_download.h b/ydb/core/tx/datashard/datashard_s3_download.h
new file mode 100644
index 00000000000..eeb9fb213c0
--- /dev/null
+++ b/ydb/core/tx/datashard/datashard_s3_download.h
@@ -0,0 +1,29 @@
+#pragma once
+
+#include <util/generic/maybe.h>
+
+namespace NKikimr {
+namespace NDataShard {
+
+struct TS3Download {
+ TMaybe<TString> DataETag;
+ ui64 ProcessedBytes = 0;
+ ui64 WrittenBytes = 0;
+ ui64 WrittenRows = 0;
+
+ void Out(IOutputStream& out) const {
+ out << "{"
+ << " DataETag: " << DataETag
+ << " ProcessedBytes: " << ProcessedBytes
+ << " WrittenBytes: " << WrittenBytes
+ << " WrittenRows: " << WrittenRows
+ << " }";
+ }
+};
+
+} // namespace NDataShard
+} // namespace NKikimr
+
+Y_DECLARE_OUT_SPEC(inline, NKikimr::NDataShard::TS3Download, out, value) {
+ value.Out(out);
+}
diff --git a/ydb/core/tx/datashard/datashard_s3_downloads.cpp b/ydb/core/tx/datashard/datashard_s3_downloads.cpp
index f1571885c50..4718bad3cac 100644
--- a/ydb/core/tx/datashard/datashard_s3_downloads.cpp
+++ b/ydb/core/tx/datashard/datashard_s3_downloads.cpp
@@ -40,25 +40,23 @@ void TS3DownloadsManager::Reset() {
Downloads.clear();
}
-const TS3DownloadsManager::TInfo* TS3DownloadsManager::Find(ui64 txId) const {
+const TS3Download* TS3DownloadsManager::Find(ui64 txId) const {
return Downloads.FindPtr(txId);
}
-const TS3DownloadsManager::TInfo& TS3DownloadsManager::Store(NIceDb::TNiceDb& db, const TEvDataShard::TEvStoreS3DownloadInfo& msg) {
- auto& info = Downloads[msg.TxId];
+const TS3Download& TS3DownloadsManager::Store(NIceDb::TNiceDb& db, ui64 txId, const TS3Download& newInfo) {
+ auto& info = Downloads[txId];
- Y_VERIFY(info.DataETag.GetOrElse(msg.DataETag) == msg.DataETag);
- info.DataETag = msg.DataETag;
- info.ProcessedBytes = msg.ProcessedBytes;
- info.WrittenBytes = msg.WrittenBytes;
- info.WrittenRows = msg.WrittenRows;
+ Y_VERIFY(newInfo.DataETag);
+ Y_VERIFY(info.DataETag.GetOrElse(*newInfo.DataETag) == *newInfo.DataETag);
+ info = newInfo;
using Schema = TDataShard::Schema;
- db.Table<Schema::S3Downloads>().Key(msg.TxId).Update(
- NIceDb::TUpdate<Schema::S3Downloads::DataETag>(msg.DataETag),
- NIceDb::TUpdate<Schema::S3Downloads::ProcessedBytes>(msg.ProcessedBytes),
- NIceDb::TUpdate<Schema::S3Downloads::WrittenBytes>(msg.WrittenBytes),
- NIceDb::TUpdate<Schema::S3Downloads::WrittenRows>(msg.WrittenRows));
+ db.Table<Schema::S3Downloads>().Key(txId).Update(
+ NIceDb::TUpdate<Schema::S3Downloads::DataETag>(*newInfo.DataETag),
+ NIceDb::TUpdate<Schema::S3Downloads::ProcessedBytes>(newInfo.ProcessedBytes),
+ NIceDb::TUpdate<Schema::S3Downloads::WrittenBytes>(newInfo.WrittenBytes),
+ NIceDb::TUpdate<Schema::S3Downloads::WrittenRows>(newInfo.WrittenRows));
return info;
}
diff --git a/ydb/core/tx/datashard/datashard_s3_downloads.h b/ydb/core/tx/datashard/datashard_s3_downloads.h
index d7d11ad2f21..1af348fc2c2 100644
--- a/ydb/core/tx/datashard/datashard_s3_downloads.h
+++ b/ydb/core/tx/datashard/datashard_s3_downloads.h
@@ -1,6 +1,6 @@
#pragma once
-#include "datashard.h"
+#include "datashard_s3_download.h"
#include <ydb/core/tablet_flat/flat_cxx_database.h>
@@ -10,19 +10,17 @@ namespace NKikimr {
namespace NDataShard {
class TS3DownloadsManager {
- using TInfo = TEvDataShard::TEvS3DownloadInfo::TInfo;
-
public:
bool Load(NIceDb::TNiceDb& db);
void Reset();
- const TInfo* Find(ui64 txId) const;
- const TInfo& Store(NIceDb::TNiceDb& db, const TEvDataShard::TEvStoreS3DownloadInfo& msg);
+ const TS3Download* Find(ui64 txId) const;
+ const TS3Download& Store(NIceDb::TNiceDb& db, ui64 txId, const TS3Download& newInfo);
private:
- THashMap<ui64, TInfo> Downloads;
+ THashMap<ui64, TS3Download> Downloads;
}; // TS3DownloadsManager
-} // namespace NDataShard
-} // namespace NKikimr
+} // namespace NDataShard
+} // namespace NKikimr
diff --git a/ydb/core/tx/datashard/datashard_txs.h b/ydb/core/tx/datashard/datashard_txs.h
index a3168bd3ef2..a73e24b1991 100644
--- a/ydb/core/tx/datashard/datashard_txs.h
+++ b/ydb/core/tx/datashard/datashard_txs.h
@@ -273,10 +273,11 @@ private:
THolder<TEvDataShard::TEvS3DownloadInfo> Reply;
};
-class TDataShard::TTxUnsafeUploadRows: public NTabletFlatExecutor::TTransactionBase<TDataShard>,
- public TCommonUploadOps<
- TEvDataShard::TEvUnsafeUploadRowsRequest,
- TEvDataShard::TEvUnsafeUploadRowsResponse> {
+class TDataShard::TTxUnsafeUploadRows: public NTabletFlatExecutor::TTransactionBase<TDataShard>
+ , public TCommonUploadOps<
+ TEvDataShard::TEvUnsafeUploadRowsRequest,
+ TEvDataShard::TEvUnsafeUploadRowsResponse>
+{
public:
TTxUnsafeUploadRows(TDataShard* ds, TEvDataShard::TEvUnsafeUploadRowsRequest::TPtr& ev);
bool Execute(TTransactionContext& txc, const TActorContext& ctx) override;
diff --git a/ydb/core/tx/datashard/datashard_unsafe_upload.cpp b/ydb/core/tx/datashard/datashard_unsafe_upload.cpp
index cb085ad4805..1aec8f60214 100644
--- a/ydb/core/tx/datashard/datashard_unsafe_upload.cpp
+++ b/ydb/core/tx/datashard/datashard_unsafe_upload.cpp
@@ -11,8 +11,15 @@ TDataShard::TTxUnsafeUploadRows::TTxUnsafeUploadRows(TDataShard* ds, TEvDataShar
bool TDataShard::TTxUnsafeUploadRows::Execute(TTransactionContext& txc, const TActorContext&) {
auto [readVersion, writeVersion] = Self->GetReadWriteVersions();
- if (!TCommonUploadOps::Execute(Self, txc, readVersion, writeVersion))
+ if (!TCommonUploadOps::Execute(Self, txc, readVersion, writeVersion)) {
return false;
+ }
+
+ auto* result = GetResult();
+ if (result->Record.GetStatus() == NKikimrTxDataShard::TError::OK) {
+ NIceDb::TNiceDb db(txc.DB);
+ result->Info = Self->S3Downloads.Store(db, GetRequest()->TxId, GetRequest()->Info);
+ }
if (Self->IsMvccEnabled()) {
// Note: we always wait for completion, so we can ignore the result
diff --git a/ydb/core/tx/datashard/import_s3.cpp b/ydb/core/tx/datashard/import_s3.cpp
index 9ea7fbab5c0..ecabfe7bb8a 100644
--- a/ydb/core/tx/datashard/import_s3.cpp
+++ b/ydb/core/tx/datashard/import_s3.cpp
@@ -86,15 +86,15 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader>, private TS3User {
class TUploadRowsRequestBuilder {
public:
void New(const TTableInfo& tableInfo, const NKikimrSchemeOp::TTableDescription& scheme) {
- Request.Reset(new TEvDataShard::TEvUnsafeUploadRowsRequest());
- Request->Record.SetTableId(tableInfo.GetId());
+ Record = std::make_shared<NKikimrTxDataShard::TEvUploadRowsRequest>();
+ Record->SetTableId(tableInfo.GetId());
TVector<TString> columnNames;
for (const auto& column : scheme.GetColumns()) {
columnNames.push_back(column.GetName());
}
- auto& rowScheme = *Request->Record.MutableRowScheme();
+ auto& rowScheme = *Record->MutableRowScheme();
for (ui32 id : tableInfo.GetKeyColumnIds()) {
rowScheme.AddKeyColumnIds(id);
}
@@ -104,27 +104,27 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader>, private TS3User {
}
void AddRow(const TVector<TCell>& keys, const TVector<TCell>& values) {
- Y_VERIFY(Request);
- auto& row = *Request->Record.AddRows();
+ Y_VERIFY(Record);
+ auto& row = *Record->AddRows();
row.SetKeyColumns(TSerializedCellVec::Serialize(keys));
row.SetValueColumns(TSerializedCellVec::Serialize(values));
}
- const NKikimrTxDataShard::TEvUploadRowsRequest& GetRecord() const {
- Y_VERIFY(Request);
- return Request->Record;
- }
-
- THolder<TEvDataShard::TEvUnsafeUploadRowsRequest> Build() {
- Y_VERIFY(Request);
- return std::move(Request);
+ const std::shared_ptr<NKikimrTxDataShard::TEvUploadRowsRequest>& GetRecord() {
+ Y_VERIFY(Record);
+ return Record;
}
private:
- THolder<TEvDataShard::TEvUnsafeUploadRowsRequest> Request;
+ std::shared_ptr<NKikimrTxDataShard::TEvUploadRowsRequest> Record;
}; // TUploadRowsRequestBuilder
+ enum class EWakeupTag: ui64 {
+ Restart,
+ RetryUpload,
+ };
+
void AllocateResource() {
IMPORT_LOG_D("AllocateResource"
<< ": self# " << SelfId());
@@ -205,7 +205,7 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader>, private TS3User {
ETag = result.GetResult().GetETag();
ContentLength = result.GetResult().GetContentLength();
- Send(DataShard, new TEvDataShard::TEvGetS3DownloadInfo(SelfId(), TxId));
+ Send(DataShard, new TEvDataShard::TEvGetS3DownloadInfo(TxId));
}
void Handle(TEvDataShard::TEvS3DownloadInfo::TPtr& ev) {
@@ -213,15 +213,22 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader>, private TS3User {
IMPORT_LOG_D("Handle TEvDataShard::TEvS3DownloadInfo"
<< ": self# " << SelfId()
- << ", info# " << info.ToString());
+ << ", info# " << info);
if (!info.DataETag) {
- Send(DataShard, new TEvDataShard::TEvStoreS3DownloadInfo(SelfId(),
- TxId, ETag, ProcessedBytes, WrittenBytes, WrittenRows));
+ Send(DataShard, new TEvDataShard::TEvStoreS3DownloadInfo(TxId, {
+ ETag, ProcessedBytes, WrittenBytes, WrittenRows
+ }));
return;
}
- if (!CheckETag(*info.DataETag, ETag, TStringBuf("DownloadInfo"))) {
+ ProcessDownloadInfo(info, TStringBuf("DownloadInfo"));
+ }
+
+ void ProcessDownloadInfo(const TS3Download& info, const TStringBuf marker) {
+ Y_VERIFY(info.DataETag);
+
+ if (!CheckETag(*info.DataETag, ETag, marker)) {
return;
}
@@ -283,14 +290,9 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader>, private TS3User {
}
const auto& record = RequestBuilder.GetRecord();
- WrittenRows += record.RowsSize();
+ WrittenRows += record->RowsSize();
- IMPORT_LOG_D("Upload rows"
- << ": self# " << SelfId()
- << ", count# " << record.RowsSize()
- << ", size# " << record.ByteSizeLong());
-
- Send(DataShard, RequestBuilder.Build().Release());
+ UploadRows();
return false;
}
@@ -320,27 +322,42 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader>, private TS3User {
return true;
}
+ void UploadRows() {
+ const auto& record = RequestBuilder.GetRecord();
+
+ IMPORT_LOG_D("Upload rows"
+ << ": self# " << SelfId()
+ << ", count# " << record->RowsSize()
+ << ", size# " << record->ByteSizeLong());
+
+ Send(DataShard, new TEvDataShard::TEvUnsafeUploadRowsRequest(TxId, record, {
+ ETag, ProcessedBytes, WrittenBytes, WrittenRows
+ }));
+ }
+
void Handle(TEvDataShard::TEvUnsafeUploadRowsResponse::TPtr& ev) {
const auto& record = ev->Get()->Record;
+ const auto& info = ev->Get()->Info;
IMPORT_LOG_D("Handle TEvDataShard::TEvUnsafeUploadRowsResponse"
<< ": self# " << SelfId()
- << ", record# " << record.ShortDebugString());
+ << ", record# " << record.ShortDebugString()
+ << ", info# " << info);
switch (record.GetStatus()) {
case NKikimrTxDataShard::TError::OK:
- break;
+ return ProcessDownloadInfo(info, TStringBuf("UploadResponse"));
+
+ case NKikimrTxDataShard::TError::WRONG_SHARD_STATE: // OVERLOADED
+ return RetryUpload();
case NKikimrTxDataShard::TError::SCHEME_ERROR:
case NKikimrTxDataShard::TError::BAD_ARGUMENT:
return Finish(false, record.GetErrorDescription());
default:
- return RetryOrFinish(record.GetErrorDescription());
+ return RestartOrFinish(record.GetErrorDescription());
};
-
- Send(DataShard, new TEvDataShard::TEvStoreS3DownloadInfo(SelfId(),
- TxId, ETag, ProcessedBytes, WrittenBytes, WrittenRows));
}
template <typename TResult>
@@ -352,7 +369,7 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader>, private TS3User {
IMPORT_LOG_E("Error at '" << marker << "'"
<< ": self# " << SelfId()
<< ", error# " << result);
- RetryOrFinish(result.GetError().GetMessage().c_str());
+ RestartOrFinish(result.GetError().GetMessage().c_str());
return false;
}
@@ -417,17 +434,30 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader>, private TS3User {
return true;
}
- void RetryOrFinish(const TString& error) {
+ void RetryUpload() {
+ Schedule(TDuration::MilliSeconds(50), new TEvents::TEvWakeup(static_cast<ui64>(EWakeupTag::RetryUpload)));
+ }
+
+ void RestartOrFinish(const TString& error) {
if (Attempt++ < Retries) {
Delay = Min(Delay * Attempt, MaxDelay);
const TDuration random = TDuration::FromValue(TAppData::RandomProvider->GenRand64() % Delay.MicroSeconds());
- Schedule(Delay + random, new TEvents::TEvWakeup());
+ Schedule(Delay + random, new TEvents::TEvWakeup(static_cast<ui64>(EWakeupTag::Restart)));
} else {
Finish(false, error);
}
}
+ void Handle(TEvents::TEvWakeup::TPtr& ev) {
+ switch (static_cast<EWakeupTag>(ev->Get()->Tag)) {
+ case EWakeupTag::Restart:
+ return Restart();
+ case EWakeupTag::RetryUpload:
+ return UploadRows();
+ }
+ }
+
void Finish(bool success = true, const TString& error = TString()) {
IMPORT_LOG_I("Finish"
<< ": self# " << SelfId()
@@ -505,8 +535,8 @@ public:
hFunc(TEvDataShard::TEvS3DownloadInfo, Handle);
hFunc(TEvDataShard::TEvUnsafeUploadRowsResponse, Handle);
- cFunc(TEvents::TEvWakeup::EventType, Restart);
- cFunc(TEvents::TEvPoisonPill::EventType, NotifyDied);
+ hFunc(TEvents::TEvWakeup, Handle);
+ sFunc(TEvents::TEvPoisonPill, NotifyDied);
}
}
diff --git a/ydb/core/tx/schemeshard/ut_restore.cpp b/ydb/core/tx/schemeshard/ut_restore.cpp
index 49777286ed6..5573748eb88 100644
--- a/ydb/core/tx/schemeshard/ut_restore.cpp
+++ b/ydb/core/tx/schemeshard/ut_restore.cpp
@@ -1,5 +1,6 @@
#include <contrib/libs/double-conversion/double-conversion/ieee.h>
+#include <ydb/core/base/localdb.h>
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
#include <ydb/core/tablet/resource_broker.h>
#include <ydb/core/tx/schemeshard/ut_helpers/helpers.h>
@@ -536,6 +537,65 @@ Y_UNIT_TEST_SUITE(TRestoreTests) {
UNIT_ASSERT_VALUES_EQUAL(result->GetRowsProcessed(), 2);
}
+ Y_UNIT_TEST(ShouldHandleOverloadedShard) {
+ TTestBasicRuntime runtime;
+ TTestEnv env(runtime);
+ ui64 txId = 100;
+
+ // prepare table schema with special policy
+ NKikimrSchemeOp::TTableDescription desc;
+ desc.SetName("Table");
+ desc.AddKeyColumnNames("key");
+ {
+ auto& column = *desc.AddColumns();
+ column.SetName("key");
+ column.SetType("Uint32");
+ }
+ {
+ auto& column = *desc.AddColumns();
+ column.SetName("value");
+ column.SetType("Utf8");
+ }
+
+ auto policy = NLocalDb::CreateDefaultUserTablePolicy();
+ policy->InMemForceSizeToSnapshot = 1;
+ policy->Serialize(*desc.MutablePartitionConfig()->MutableCompactionPolicy());
+
+ // serialize schema
+ TString scheme;
+ UNIT_ASSERT(google::protobuf::TextFormat::PrintToString(desc, &scheme));
+ TestCreateTable(runtime, ++txId, "/MyRoot", scheme);
+ env.TestWaitNotification(runtime, txId);
+
+ ui32 total = 0;
+ ui32 overloads = 0;
+ runtime.SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) {
+ if (ev->GetTypeRewrite() == TEvDataShard::EvUnsafeUploadRowsResponse) {
+ ++total;
+ const auto& record = ev->Get<TEvDataShard::TEvUnsafeUploadRowsResponse>()->Record;
+ overloads += ui32(record.GetStatus() == NKikimrTxDataShard::TError::WRONG_SHARD_STATE);
+ }
+
+ return TTestActorRuntime::EEventAction::PROCESS;
+ });
+
+ TPortManager portManager;
+ THolder<TS3Mock> s3Mock;
+
+ const auto data = GenerateTestData("", 1000);
+ const ui32 batchSize = 32;
+ RestoreNoWait(runtime, ++txId, portManager.GetPort(), s3Mock, {data}, batchSize);
+
+ env.TestWaitNotification(runtime, txId);
+ UNIT_ASSERT(overloads > 0);
+
+ const ui32 expected = data.Csv.size() / batchSize + ui32(bool(data.Csv.size() % batchSize));
+ UNIT_ASSERT_VALUES_EQUAL(expected, total - overloads);
+
+ auto content = ReadTable(runtime, TTestTxConfig::FakeHiveTablets, "Table", {"key", "Uint32", "0"});
+ NKqp::CompareYson(data.YsonStr, content);
+ }
+
Y_UNIT_TEST(ShouldFailOnFileWithoutNewLines) {
TTestBasicRuntime runtime;