summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhor911 <[email protected]>2022-09-14 12:12:11 +0300
committerhor911 <[email protected]>2022-09-14 12:12:11 +0300
commitf2536f7a87466cc95830c84a398d89bd3bce6312 (patch)
tree6114d309ef5a60a051c9220b37964efbd8ee2191
parentdc37dfe86ca1c20394aad4d3d76dfd9405c0b8fa (diff)
NextRetryDelay workaround + last bytes logs on retry
-rw-r--r--library/cpp/actors/util/thread_load_log.h25
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp50
2 files changed, 59 insertions, 16 deletions
diff --git a/library/cpp/actors/util/thread_load_log.h b/library/cpp/actors/util/thread_load_log.h
index 26b32492d2c..b4b34d47bb5 100644
--- a/library/cpp/actors/util/thread_load_log.h
+++ b/library/cpp/actors/util/thread_load_log.h
@@ -4,6 +4,7 @@
#include <util/system/types.h>
+#include <type_traits>
#include <algorithm>
#include <atomic>
#include <limits>
@@ -19,8 +20,8 @@ private:
static constexpr ui64 TIME_SLOT_PART_COUNT = TIME_SLOT_MAX_VALUE + 1;
static constexpr auto TIME_SLOT_PART_LENGTH_NS = TIME_SLOT_LENGTH_NS / TIME_SLOT_PART_COUNT;
- template <int MAX_VALUE, typename T>
- static void AtomicAddBound(std::atomic<T>& val, int inc) {
+ template <typename T>
+ static void AtomicAddBound(std::atomic<T>& val, i64 inc) {
if (inc == 0) {
return;
}
@@ -29,27 +30,29 @@ private:
auto oldVal = newVal;
do {
- if (oldVal > MAX_VALUE) {
+ static constexpr auto MAX_VALUE = std::numeric_limits<T>::max();
+
+ if (oldVal >= MAX_VALUE) {
return;
}
- newVal = std::min<int>(MAX_VALUE, oldVal + inc);
+ newVal = std::min<i64>(MAX_VALUE, static_cast<i64>(oldVal) + inc);
} while (!val.compare_exchange_weak(oldVal, newVal));
}
template <typename T>
- static void AtomicSubBound(std::atomic<T>& val, int sub) {
+ static void AtomicSubBound(std::atomic<T>& val, i64 sub) {
if (sub == 0) {
return;
}
auto newVal = val.load();
- auto oldVal = val.load();
+ auto oldVal = newVal;
do {
if (oldVal == 0) {
return;
}
- newVal = std::max<int>(0, static_cast<int>(oldVal) - sub);
+ newVal = std::max<i64>(0, static_cast<i64>(oldVal) - sub);
} while (!val.compare_exchange_weak(oldVal, newVal));
}
@@ -86,6 +89,8 @@ public:
std::atomic<bool> LastRegisteredPeriodIsBusy = false;
explicit TThreadLoad(ui64 timeNs = 0) {
+ static_assert(std::is_unsigned<TimeSlotType>::value);
+
LastTimeNs = timeNs;
for (size_t i = 0; i < TIME_SLOT_COUNT; ++i) {
TimeSlots[i] = 0;
@@ -146,7 +151,7 @@ public:
if (firstSlotNumber == lastSlotNumber) {
ui32 slotLengthNs = timeNs - lastTimeNs;
ui32 slotPartsCount = (slotLengthNs + TIME_SLOT_PART_LENGTH_NS - 1) / TIME_SLOT_PART_LENGTH_NS;
- AtomicAddBound<TIME_SLOT_MAX_VALUE>(TimeSlots[firstSlotIndex], slotPartsCount);
+ AtomicAddBound(TimeSlots[firstSlotIndex], slotPartsCount);
if (ModifyLastTime) {
LastTimeNs = timeNs;
@@ -160,13 +165,13 @@ public:
ui32 lastSlotPartsCount = (lastSlotLengthNs + TIME_SLOT_PART_LENGTH_NS - 1) / TIME_SLOT_PART_LENGTH_NS;
// process first time slot
- AtomicAddBound<TIME_SLOT_MAX_VALUE>(TimeSlots[firstSlotIndex], firstSlotPartsCount);
+ AtomicAddBound(TimeSlots[firstSlotIndex], firstSlotPartsCount);
// process complete time slots
UpdateCompleteTimeSlots(firstSlotNumber, lastSlotNumber, TIME_SLOT_MAX_VALUE);
// process last time slot
- AtomicAddBound<TIME_SLOT_MAX_VALUE>(TimeSlots[lastSlotIndex], lastSlotPartsCount);
+ AtomicAddBound(TimeSlots[lastSlotIndex], lastSlotPartsCount);
if (ModifyLastTime) {
LastTimeNs = timeNs;
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
index f2f357c3390..2773bcfc881 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
@@ -378,18 +378,20 @@ public:
return true;
case TEvPrivate::TEvReadFinished::EventType:
InputFinished = true;
- LOG_CORO_D("TS3ReadCoroImpl", "TEvReadFinished, Url: " << RetryStuff->Url << ", Offset: " << RetryStuff->Offset);
+ LOG_CORO_D("TS3ReadCoroImpl", "TEvReadFinished, Url: " << RetryStuff->Url << ", Offset: " << RetryStuff->Offset << ", LastOffset: " << LastOffset << ", LastData: " << GetLastDataAsText());
return false;
case TEvPrivate::TEvReadError::EventType:
InputFinished = true;
Issues = std::move(ev->Get<TEvPrivate::TEvReadError>()->Error);
- LOG_CORO_W("TS3ReadCoroImpl", "TEvReadError: " << Issues.ToOneLineString() << ", Url: " << RetryStuff->Url << ", Offset: " << RetryStuff->Offset);
+ LOG_CORO_W("TS3ReadCoroImpl", "TEvReadError: " << Issues.ToOneLineString() << ", Url: " << RetryStuff->Url << ", Offset: " << RetryStuff->Offset << ", LastOffset: " << LastOffset << ", LastData: " << GetLastDataAsText());
return false;
case TEvPrivate::TEvDataPart::EventType:
if (200L == HttpResponseCode || 206L == HttpResponseCode) {
value = ev->Get<TEvPrivate::TEvDataPart>()->Result.Extract();
RetryStuff->Offset += value.size();
RetryStuff->SizeLimit -= value.size();
+ LastOffset = RetryStuff->Offset;
+ LastData = value;
LOG_CORO_T("TS3ReadCoroImpl", "TEvDataPart, size: " << value.size() << ", Url: " << RetryStuff->Url << ", Offset (updated): " << RetryStuff->Offset);
Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived(InputIndex));
} else if (HttpResponseCode && !RetryStuff->IsCancelled() && !RetryStuff->NextRetryDelay) {
@@ -398,7 +400,7 @@ public:
else if (!ErrorText.EndsWith(TruncatedSuffix))
ErrorText.append(TruncatedSuffix);
value.clear();
- LOG_CORO_W("TS3ReadCoroImpl", "TEvDataPart, ERROR: " << ErrorText << ", Url: " << RetryStuff->Url << ", Offset: " << RetryStuff->Offset);
+ LOG_CORO_W("TS3ReadCoroImpl", "TEvDataPart, ERROR: " << ErrorText << ", Url: " << RetryStuff->Url << ", Offset: " << RetryStuff->Offset << ", LastOffset: " << LastOffset << ", LastData: " << GetLastDataAsText());
}
return true;
default:
@@ -500,9 +502,41 @@ private:
}
Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvAsyncInputError(InputIndex, TIssues{TIssue(message)}));
}
+
+ TString GetLastDataAsText() {
+
+ if (LastData.empty()) {
+ return "[]";
+ }
+
+ auto begin = const_cast<char*>(LastData.data());
+ auto end = begin + LastData.size();
+
+ TStringBuilder result;
+
+ result << "[";
+
+ if (LastData.size() > 32) {
+ begin += LastData.size() - 32;
+ result << "...";
+ }
+
+ while (begin < end) {
+ char c = *begin++;
+ if (c >= 32 && c <= 126) {
+ result << c;
+ } else {
+ result << "\\" << Hex(static_cast<ui8>(c));
+ }
+ }
+
+ result << "]";
+
+ return result;
+ }
private:
const ui64 InputIndex;
- const TTxId& TxId;
+ const TTxId TxId;
const TRetryStuff::TPtr RetryStuff;
const TReadSpec::TPtr ReadSpec;
const TString Format, RowType, Compression;
@@ -514,6 +548,9 @@ private:
long HttpResponseCode = 0L;
TString ErrorText;
TIssues Issues;
+
+ std::size_t LastOffset = 0;
+ TString LastData;
};
class TS3ReadCoroActor : public TActorCoro {
@@ -539,9 +576,10 @@ private:
}
}
- if (!retryStuff->IsCancelled() && retryStuff->NextRetryDelay && retryStuff->SizeLimit > 0ULL) {
+ auto nextRetryDelay = retryStuff->NextRetryDelay;
+ if (!retryStuff->IsCancelled() && nextRetryDelay && retryStuff->SizeLimit > 0ULL) {
LOG_DEBUG_S(*actorSystem, NKikimrServices::KQP_COMPUTE, "TS3ReadCoroActor" << ": " << self << ", TxId: " << retryStuff->TxId << ". " << "Retry Download, Url: " << retryStuff->Url << ", Offset: " << retryStuff->Offset);
- actorSystem->Schedule(*retryStuff->NextRetryDelay, new IEventHandle(parent, self, new TEvPrivate::TEvRetryEventFunc(std::bind(&TS3ReadCoroActor::DownloadStart, retryStuff, actorSystem, self, parent))));
+ actorSystem->Schedule(*nextRetryDelay, new IEventHandle(parent, self, new TEvPrivate::TEvRetryEventFunc(std::bind(&TS3ReadCoroActor::DownloadStart, retryStuff, actorSystem, self, parent))));
} else {
actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvReadFinished));
}