summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoreivanov89 <[email protected]>2023-01-10 12:29:11 +0300
committereivanov89 <[email protected]>2023-01-10 12:29:11 +0300
commitd649bd6c35082346d29ac814532d1151a9d496d7 (patch)
tree3deda28447bdd8b5556bb75adb060bcc7ea0a4b4
parent1026b06de8cc942b34df890d3c387fa435283169 (diff)
reconnect if pipe fails before first connection
-rw-r--r--ydb/core/load_test/ycsb/bulk_mkql_upsert.cpp23
-rw-r--r--ydb/core/load_test/ycsb/test_load_read_iterator.cpp39
2 files changed, 51 insertions, 11 deletions
diff --git a/ydb/core/load_test/ycsb/bulk_mkql_upsert.cpp b/ydb/core/load_test/ycsb/bulk_mkql_upsert.cpp
index 6790bfab87c..c10da88adea 100644
--- a/ydb/core/load_test/ycsb/bulk_mkql_upsert.cpp
+++ b/ydb/core/load_test/ycsb/bulk_mkql_upsert.cpp
@@ -133,6 +133,8 @@ class TUpsertActor : public TActorBootstrapped<TUpsertActor> {
TString ConfingString;
TActorId Pipe;
+ bool WasConnected = false;
+ ui64 ReconnectLimit = 10;
TRequestsVector Requests;
size_t CurrentRequest = 0;
@@ -185,6 +187,12 @@ public:
private:
void Connect(const TActorContext &ctx) {
LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Id# " << Id << " TUpsertActor Connect called");
+ --ReconnectLimit;
+ if (ReconnectLimit == 0) {
+ TStringStream ss;
+ ss << "Failed to set pipe to " << Target.GetTabletId();
+ return StopWithError(ctx, ss.Str());
+ }
Pipe = Register(NTabletPipe::CreateClient(SelfId(), Target.GetTabletId()));
}
@@ -195,20 +203,25 @@ private:
<< " TUpsertActor Handle TEvClientConnected called, Status# " << msg->Status);
if (msg->Status != NKikimrProto::OK) {
- TStringStream ss;
- ss << "Failed to connect to " << Target.GetTabletId() << ", status: " << msg->Status;
- StopWithError(ctx, ss.Str());
- return;
+ Pipe = {};
+ return Connect(ctx);
}
StartTs = TInstant::Now();
+ WasConnected = true;
SendRows(ctx);
}
void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr, const TActorContext& ctx) {
LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Id# " << Id
<< " TUpsertActor Handle TEvClientDestroyed called");
- StopWithError(ctx, "broken pipe");
+
+ // sanity check
+ if (!WasConnected) {
+ return Connect(ctx);
+ }
+
+ return StopWithError(ctx, "broken pipe");
}
void SendRows(const TActorContext &ctx) {
diff --git a/ydb/core/load_test/ycsb/test_load_read_iterator.cpp b/ydb/core/load_test/ycsb/test_load_read_iterator.cpp
index 09790de8cfb..4502f6314ca 100644
--- a/ydb/core/load_test/ycsb/test_load_read_iterator.cpp
+++ b/ydb/core/load_test/ycsb/test_load_read_iterator.cpp
@@ -96,6 +96,8 @@ class TReadIteratorPoints : public TActorBootstrapped<TReadIteratorPoints> {
const TSubLoadId Id;
TActorId Pipe;
+ bool WasConnected = false;
+ ui64 ReconnectLimit = 10;
TInstant StartTs; // actor started to send requests
@@ -146,6 +148,12 @@ private:
void Connect(const TActorContext &ctx) {
LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TReadIteratorPoints# " << Id
<< " Connect to# " << TabletId << " called");
+ --ReconnectLimit;
+ if (ReconnectLimit == 0) {
+ TStringStream ss;
+ ss << "Failed to set pipe to " << TabletId;
+ return StopWithError(ctx, ss.Str());
+ }
Pipe = Register(NTabletPipe::CreateClient(SelfId(), TabletId));
}
@@ -156,18 +164,24 @@ private:
<< " Handle TEvClientConnected called, Status# " << msg->Status);
if (msg->Status != NKikimrProto::OK) {
- TStringStream ss;
- ss << "Failed to connect to " << TabletId << ", status: " << msg->Status;
- return StopWithError(ctx, ss.Str());
+ Pipe = {};
+ return Connect(ctx);
}
StartTs = TInstant::Now();
+ WasConnected = true;
SendRead(ctx);
}
void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr, const TActorContext& ctx) {
LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TReadIteratorPoints# " << Id
<< " Handle TEvClientDestroyed called");
+
+ // sanity check
+ if (!WasConnected) {
+ return Connect(ctx);
+ }
+
return StopWithError(ctx, "broken pipe");
}
@@ -263,6 +277,8 @@ class TReadIteratorScan : public TActorBootstrapped<TReadIteratorScan> {
const ui64 SampleKeyCount;
TActorId Pipe;
+ bool WasConnected = false;
+ ui64 ReconnectLimit = 10;
TInstant StartTs;
size_t Oks = 0;
@@ -296,6 +312,12 @@ private:
void Connect(const TActorContext &ctx) {
LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << Id
<< " Connect to# " << TabletId << " called");
+ --ReconnectLimit;
+ if (ReconnectLimit == 0) {
+ TStringStream ss;
+ ss << "Failed to set pipe to " << TabletId;
+ return StopWithError(ctx, ss.Str());
+ }
Pipe = Register(NTabletPipe::CreateClient(SelfId(), TabletId));
}
@@ -306,18 +328,23 @@ private:
<< " Handle TEvClientConnected called, Status# " << msg->Status);
if (msg->Status != NKikimrProto::OK) {
- TStringStream ss;
- ss << "Failed to connect to " << TabletId << ", status: " << msg->Status;
- return StopWithError(ctx, ss.Str());
+ return Connect(ctx);
}
StartTs = TInstant::Now();
+ WasConnected = true;
NTabletPipe::SendData(ctx, Pipe, Request.release());
}
void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr, const TActorContext& ctx) {
LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << Id
<< " Handle TEvClientDestroyed called");
+
+ // sanity check
+ if (!WasConnected) {
+ return Connect(ctx);
+ }
+
return StopWithError(ctx, "broken pipe");
}