diff options
| author | eivanov89 <[email protected]> | 2023-01-10 12:29:11 +0300 |
|---|---|---|
| committer | eivanov89 <[email protected]> | 2023-01-10 12:29:11 +0300 |
| commit | d649bd6c35082346d29ac814532d1151a9d496d7 (patch) | |
| tree | 3deda28447bdd8b5556bb75adb060bcc7ea0a4b4 | |
| parent | 1026b06de8cc942b34df890d3c387fa435283169 (diff) | |
reconnect if pipe fails before first connection
| -rw-r--r-- | ydb/core/load_test/ycsb/bulk_mkql_upsert.cpp | 23 | ||||
| -rw-r--r-- | ydb/core/load_test/ycsb/test_load_read_iterator.cpp | 39 |
2 files changed, 51 insertions, 11 deletions
diff --git a/ydb/core/load_test/ycsb/bulk_mkql_upsert.cpp b/ydb/core/load_test/ycsb/bulk_mkql_upsert.cpp index 6790bfab87c..c10da88adea 100644 --- a/ydb/core/load_test/ycsb/bulk_mkql_upsert.cpp +++ b/ydb/core/load_test/ycsb/bulk_mkql_upsert.cpp @@ -133,6 +133,8 @@ class TUpsertActor : public TActorBootstrapped<TUpsertActor> { TString ConfingString; TActorId Pipe; + bool WasConnected = false; + ui64 ReconnectLimit = 10; TRequestsVector Requests; size_t CurrentRequest = 0; @@ -185,6 +187,12 @@ public: private: void Connect(const TActorContext &ctx) { LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Id# " << Id << " TUpsertActor Connect called"); + --ReconnectLimit; + if (ReconnectLimit == 0) { + TStringStream ss; + ss << "Failed to set pipe to " << Target.GetTabletId(); + return StopWithError(ctx, ss.Str()); + } Pipe = Register(NTabletPipe::CreateClient(SelfId(), Target.GetTabletId())); } @@ -195,20 +203,25 @@ private: << " TUpsertActor Handle TEvClientConnected called, Status# " << msg->Status); if (msg->Status != NKikimrProto::OK) { - TStringStream ss; - ss << "Failed to connect to " << Target.GetTabletId() << ", status: " << msg->Status; - StopWithError(ctx, ss.Str()); - return; + Pipe = {}; + return Connect(ctx); } StartTs = TInstant::Now(); + WasConnected = true; SendRows(ctx); } void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr, const TActorContext& ctx) { LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "Id# " << Id << " TUpsertActor Handle TEvClientDestroyed called"); - StopWithError(ctx, "broken pipe"); + + // sanity check + if (!WasConnected) { + return Connect(ctx); + } + + return StopWithError(ctx, "broken pipe"); } void SendRows(const TActorContext &ctx) { diff --git a/ydb/core/load_test/ycsb/test_load_read_iterator.cpp b/ydb/core/load_test/ycsb/test_load_read_iterator.cpp index 09790de8cfb..4502f6314ca 100644 --- a/ydb/core/load_test/ycsb/test_load_read_iterator.cpp +++ b/ydb/core/load_test/ycsb/test_load_read_iterator.cpp @@ -96,6 +96,8 @@ class TReadIteratorPoints : public TActorBootstrapped<TReadIteratorPoints> { const TSubLoadId Id; TActorId Pipe; + bool WasConnected = false; + ui64 ReconnectLimit = 10; TInstant StartTs; // actor started to send requests @@ -146,6 +148,12 @@ private: void Connect(const TActorContext &ctx) { LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TReadIteratorPoints# " << Id << " Connect to# " << TabletId << " called"); + --ReconnectLimit; + if (ReconnectLimit == 0) { + TStringStream ss; + ss << "Failed to set pipe to " << TabletId; + return StopWithError(ctx, ss.Str()); + } Pipe = Register(NTabletPipe::CreateClient(SelfId(), TabletId)); } @@ -156,18 +164,24 @@ private: << " Handle TEvClientConnected called, Status# " << msg->Status); if (msg->Status != NKikimrProto::OK) { - TStringStream ss; - ss << "Failed to connect to " << TabletId << ", status: " << msg->Status; - return StopWithError(ctx, ss.Str()); + Pipe = {}; + return Connect(ctx); } StartTs = TInstant::Now(); + WasConnected = true; SendRead(ctx); } void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr, const TActorContext& ctx) { LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TReadIteratorPoints# " << Id << " Handle TEvClientDestroyed called"); + + // sanity check + if (!WasConnected) { + return Connect(ctx); + } + return StopWithError(ctx, "broken pipe"); } @@ -263,6 +277,8 @@ class TReadIteratorScan : public TActorBootstrapped<TReadIteratorScan> { const ui64 SampleKeyCount; TActorId Pipe; + bool WasConnected = false; + ui64 ReconnectLimit = 10; TInstant StartTs; size_t Oks = 0; @@ -296,6 +312,12 @@ private: void Connect(const TActorContext &ctx) { LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << Id << " Connect to# " << TabletId << " called"); + --ReconnectLimit; + if (ReconnectLimit == 0) { + TStringStream ss; + ss << "Failed to set pipe to " << TabletId; + return StopWithError(ctx, ss.Str()); + } Pipe = Register(NTabletPipe::CreateClient(SelfId(), TabletId)); } @@ -306,18 +328,23 @@ private: << " Handle TEvClientConnected called, Status# " << msg->Status); if (msg->Status != NKikimrProto::OK) { - TStringStream ss; - ss << "Failed to connect to " << TabletId << ", status: " << msg->Status; - return StopWithError(ctx, ss.Str()); + return Connect(ctx); } StartTs = TInstant::Now(); + WasConnected = true; NTabletPipe::SendData(ctx, Pipe, Request.release()); } void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr, const TActorContext& ctx) { LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "ReadIteratorScan# " << Id << " Handle TEvClientDestroyed called"); + + // sanity check + if (!WasConnected) { + return Connect(ctx); + } + return StopWithError(ctx, "broken pipe"); } |
