aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhor911 <hor911@ydb.tech>2022-09-01 14:39:30 +0300
committerhor911 <hor911@ydb.tech>2022-09-01 14:39:30 +0300
commit68dfc3415bd2669ab722dd9a5a4715ff45362ff2 (patch)
tree62db44ae9d3d037be5ca7b738f6bbc58271d3e7c
parent37045415adbd50120bacf7a5bb00a69d1c93849d (diff)
downloadydb-68dfc3415bd2669ab722dd9a5a4715ff45362ff2.tar.gz
S3 WriteActors Logs
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp36
1 files changed, 35 insertions, 1 deletions
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp
index 641a7c4f6b5..ce91c8e545f 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp
@@ -1,15 +1,17 @@
#include "yql_s3_write_actor.h"
#include "yql_s3_retry_policy.h"
-#include <ydb/library/yql/utils/yql_panic.h>
+#include <ydb/core/protos/services.pb.h>
#include <ydb/library/yql/providers/common/provider/yql_provider_names.h>
#include <ydb/library/yql/providers/s3/compressors/factory.h>
+#include <ydb/library/yql/utils/yql_panic.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
#include <library/cpp/actors/core/events.h>
#include <library/cpp/actors/core/event_local.h>
#include <library/cpp/actors/core/hfunc.h>
+#include <library/cpp/actors/core/log.h>
#include <library/cpp/string_utils/base64/base64.h>
#include <library/cpp/string_utils/quote/quote.h>
@@ -22,6 +24,18 @@
#endif
#include <library/cpp/xml/document/xml-document.h>
+
+#define LOG_E(stream) \
+ LOG_ERROR_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, stream)
+#define LOG_W(stream) \
+ LOG_WARN_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, stream)
+#define LOG_I(stream) \
+ LOG_INFO_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, stream)
+#define LOG_D(stream) \
+ LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, stream)
+#define LOG_T(stream) \
+ LOG_TRACE_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, stream)
+
namespace NYql::NDq {
using namespace NActors;
@@ -89,6 +103,7 @@ public:
void Bootstrap(const TActorId& parentId) {
ParentId = parentId;
+ LOG_D("TS3FileWriteActor: BootStrapped" << SelfId() << " by " << ParentId);
if (Parts->IsSealed() && 1U == Parts->Size()) {
const auto size = Parts->Volume();
InFlight += size;
@@ -102,6 +117,15 @@ public:
static constexpr char ActorName[] = "S3_FILE_WRITE_ACTOR";
+ void PassAway() override {
+ if (InFlight || !Parts->Empty()) {
+ LOG_W("TS3FileWriteActor: PassAway " << SelfId() << " NOT finished, InFlight: " << InFlight << ", Parts: " << Parts->Size());
+ } else {
+ LOG_D("TS3FileWriteActor: PassAway " << SelfId());
+ }
+ TActorBootstrapped<TS3FileWriteActor>::PassAway();
+ }
+
void SendData(TString&& data) {
Parts->Push(std::move(data));
@@ -327,6 +351,7 @@ public:
}
void Bootstrap() {
+ LOG_D("TS3WriteActor: BootStrapped" << SelfId());
Become(&TS3WriteActor::StateFunc);
}
@@ -385,6 +410,7 @@ private:
}
void Handle(TEvPrivate::TEvUploadError::TPtr& result) {
+ LOG_W("TS3WriteActor: TEvUploadError " << SelfId() << " " << result->Get()->Error.ToOneLineString());
Callbacks->OnAsyncOutputError(OutputIndex, result->Get()->Error, NYql::NDqProto::StatusIds::EXTERNAL_ERROR);
}
@@ -404,13 +430,21 @@ private:
// IActor & IDqComputeActorAsyncOutput
void PassAway() override { // Is called from Compute Actor
+ ui32 fileWriterCount = 0;
for (const auto& p : FileWriteActors) {
for (const auto& fileWriter : p.second) {
fileWriter->PassAway();
+ fileWriterCount++;
}
}
FileWriteActors.clear();
+ if (fileWriterCount) {
+ LOG_W("TS3WriteActor: PassAway " << SelfId() << " with " << fileWriterCount << " NOT finished FileWriter(s)");
+ } else {
+ LOG_D("TS3WriteActor: PassAway " << SelfId());
+ }
+
TActorBootstrapped<TS3WriteActor>::PassAway();
}