diff options
| author | hor911 <[email protected]> | 2022-09-14 12:12:11 +0300 |
|---|---|---|
| committer | hor911 <[email protected]> | 2022-09-14 12:12:11 +0300 |
| commit | f2536f7a87466cc95830c84a398d89bd3bce6312 (patch) | |
| tree | 6114d309ef5a60a051c9220b37964efbd8ee2191 | |
| parent | dc37dfe86ca1c20394aad4d3d76dfd9405c0b8fa (diff) | |
NextRetryDelay workaround + last bytes logs on retry
| -rw-r--r-- | library/cpp/actors/util/thread_load_log.h | 25 | ||||
| -rw-r--r-- | ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp | 50 |
2 files changed, 59 insertions, 16 deletions
diff --git a/library/cpp/actors/util/thread_load_log.h b/library/cpp/actors/util/thread_load_log.h index 26b32492d2c..b4b34d47bb5 100644 --- a/library/cpp/actors/util/thread_load_log.h +++ b/library/cpp/actors/util/thread_load_log.h @@ -4,6 +4,7 @@ #include <util/system/types.h> +#include <type_traits> #include <algorithm> #include <atomic> #include <limits> @@ -19,8 +20,8 @@ private: static constexpr ui64 TIME_SLOT_PART_COUNT = TIME_SLOT_MAX_VALUE + 1; static constexpr auto TIME_SLOT_PART_LENGTH_NS = TIME_SLOT_LENGTH_NS / TIME_SLOT_PART_COUNT; - template <int MAX_VALUE, typename T> - static void AtomicAddBound(std::atomic<T>& val, int inc) { + template <typename T> + static void AtomicAddBound(std::atomic<T>& val, i64 inc) { if (inc == 0) { return; } @@ -29,27 +30,29 @@ private: auto oldVal = newVal; do { - if (oldVal > MAX_VALUE) { + static constexpr auto MAX_VALUE = std::numeric_limits<T>::max(); + + if (oldVal >= MAX_VALUE) { return; } - newVal = std::min<int>(MAX_VALUE, oldVal + inc); + newVal = std::min<i64>(MAX_VALUE, static_cast<i64>(oldVal) + inc); } while (!val.compare_exchange_weak(oldVal, newVal)); } template <typename T> - static void AtomicSubBound(std::atomic<T>& val, int sub) { + static void AtomicSubBound(std::atomic<T>& val, i64 sub) { if (sub == 0) { return; } auto newVal = val.load(); - auto oldVal = val.load(); + auto oldVal = newVal; do { if (oldVal == 0) { return; } - newVal = std::max<int>(0, static_cast<int>(oldVal) - sub); + newVal = std::max<i64>(0, static_cast<i64>(oldVal) - sub); } while (!val.compare_exchange_weak(oldVal, newVal)); } @@ -86,6 +89,8 @@ public: std::atomic<bool> LastRegisteredPeriodIsBusy = false; explicit TThreadLoad(ui64 timeNs = 0) { + static_assert(std::is_unsigned<TimeSlotType>::value); + LastTimeNs = timeNs; for (size_t i = 0; i < TIME_SLOT_COUNT; ++i) { TimeSlots[i] = 0; @@ -146,7 +151,7 @@ public: if (firstSlotNumber == lastSlotNumber) { ui32 slotLengthNs = timeNs - lastTimeNs; ui32 slotPartsCount = (slotLengthNs + TIME_SLOT_PART_LENGTH_NS - 1) / TIME_SLOT_PART_LENGTH_NS; - AtomicAddBound<TIME_SLOT_MAX_VALUE>(TimeSlots[firstSlotIndex], slotPartsCount); + AtomicAddBound(TimeSlots[firstSlotIndex], slotPartsCount); if (ModifyLastTime) { LastTimeNs = timeNs; @@ -160,13 +165,13 @@ public: ui32 lastSlotPartsCount = (lastSlotLengthNs + TIME_SLOT_PART_LENGTH_NS - 1) / TIME_SLOT_PART_LENGTH_NS; // process first time slot - AtomicAddBound<TIME_SLOT_MAX_VALUE>(TimeSlots[firstSlotIndex], firstSlotPartsCount); + AtomicAddBound(TimeSlots[firstSlotIndex], firstSlotPartsCount); // process complete time slots UpdateCompleteTimeSlots(firstSlotNumber, lastSlotNumber, TIME_SLOT_MAX_VALUE); // process last time slot - AtomicAddBound<TIME_SLOT_MAX_VALUE>(TimeSlots[lastSlotIndex], lastSlotPartsCount); + AtomicAddBound(TimeSlots[lastSlotIndex], lastSlotPartsCount); if (ModifyLastTime) { LastTimeNs = timeNs; diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index f2f357c3390..2773bcfc881 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -378,18 +378,20 @@ public: return true; case TEvPrivate::TEvReadFinished::EventType: InputFinished = true; - LOG_CORO_D("TS3ReadCoroImpl", "TEvReadFinished, Url: " << RetryStuff->Url << ", Offset: " << RetryStuff->Offset); + LOG_CORO_D("TS3ReadCoroImpl", "TEvReadFinished, Url: " << RetryStuff->Url << ", Offset: " << RetryStuff->Offset << ", LastOffset: " << LastOffset << ", LastData: " << GetLastDataAsText()); return false; case TEvPrivate::TEvReadError::EventType: InputFinished = true; Issues = std::move(ev->Get<TEvPrivate::TEvReadError>()->Error); - LOG_CORO_W("TS3ReadCoroImpl", "TEvReadError: " << Issues.ToOneLineString() << ", Url: " << RetryStuff->Url << ", Offset: " << RetryStuff->Offset); + LOG_CORO_W("TS3ReadCoroImpl", "TEvReadError: " << Issues.ToOneLineString() << ", Url: " << RetryStuff->Url << ", Offset: " << RetryStuff->Offset << ", LastOffset: " << LastOffset << ", LastData: " << GetLastDataAsText()); return false; case TEvPrivate::TEvDataPart::EventType: if (200L == HttpResponseCode || 206L == HttpResponseCode) { value = ev->Get<TEvPrivate::TEvDataPart>()->Result.Extract(); RetryStuff->Offset += value.size(); RetryStuff->SizeLimit -= value.size(); + LastOffset = RetryStuff->Offset; + LastData = value; LOG_CORO_T("TS3ReadCoroImpl", "TEvDataPart, size: " << value.size() << ", Url: " << RetryStuff->Url << ", Offset (updated): " << RetryStuff->Offset); Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived(InputIndex)); } else if (HttpResponseCode && !RetryStuff->IsCancelled() && !RetryStuff->NextRetryDelay) { @@ -398,7 +400,7 @@ public: else if (!ErrorText.EndsWith(TruncatedSuffix)) ErrorText.append(TruncatedSuffix); value.clear(); - LOG_CORO_W("TS3ReadCoroImpl", "TEvDataPart, ERROR: " << ErrorText << ", Url: " << RetryStuff->Url << ", Offset: " << RetryStuff->Offset); + LOG_CORO_W("TS3ReadCoroImpl", "TEvDataPart, ERROR: " << ErrorText << ", Url: " << RetryStuff->Url << ", Offset: " << RetryStuff->Offset << ", LastOffset: " << LastOffset << ", LastData: " << GetLastDataAsText()); } return true; default: @@ -500,9 +502,41 @@ private: } Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvAsyncInputError(InputIndex, TIssues{TIssue(message)})); } + + TString GetLastDataAsText() { + + if (LastData.empty()) { + return "[]"; + } + + auto begin = const_cast<char*>(LastData.data()); + auto end = begin + LastData.size(); + + TStringBuilder result; + + result << "["; + + if (LastData.size() > 32) { + begin += LastData.size() - 32; + result << "..."; + } + + while (begin < end) { + char c = *begin++; + if (c >= 32 && c <= 126) { + result << c; + } else { + result << "\\" << Hex(static_cast<ui8>(c)); + } + } + + result << "]"; + + return result; + } private: const ui64 InputIndex; - const TTxId& TxId; + const TTxId TxId; const TRetryStuff::TPtr RetryStuff; const TReadSpec::TPtr ReadSpec; const TString Format, RowType, Compression; @@ -514,6 +548,9 @@ private: long HttpResponseCode = 0L; TString ErrorText; TIssues Issues; + + std::size_t LastOffset = 0; + TString LastData; }; class TS3ReadCoroActor : public TActorCoro { @@ -539,9 +576,10 @@ private: } } - if (!retryStuff->IsCancelled() && retryStuff->NextRetryDelay && retryStuff->SizeLimit > 0ULL) { + auto nextRetryDelay = retryStuff->NextRetryDelay; + if (!retryStuff->IsCancelled() && nextRetryDelay && retryStuff->SizeLimit > 0ULL) { LOG_DEBUG_S(*actorSystem, NKikimrServices::KQP_COMPUTE, "TS3ReadCoroActor" << ": " << self << ", TxId: " << retryStuff->TxId << ". " << "Retry Download, Url: " << retryStuff->Url << ", Offset: " << retryStuff->Offset); - actorSystem->Schedule(*retryStuff->NextRetryDelay, new IEventHandle(parent, self, new TEvPrivate::TEvRetryEventFunc(std::bind(&TS3ReadCoroActor::DownloadStart, retryStuff, actorSystem, self, parent)))); + actorSystem->Schedule(*nextRetryDelay, new IEventHandle(parent, self, new TEvPrivate::TEvRetryEventFunc(std::bind(&TS3ReadCoroActor::DownloadStart, retryStuff, actorSystem, self, parent)))); } else { actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvReadFinished)); } |
