aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhor911 <hor911@ydb.tech>2022-09-02 10:29:39 +0300
committerhor911 <hor911@ydb.tech>2022-09-02 10:29:39 +0300
commit40d4e4842f449c176faca9725f933f24180a277e (patch)
tree4a41ceaae12df4633a8d969045eb99f1b9e48dac
parent541ed5eac5b51c78bb18304e014a6c38f626a9d0 (diff)
downloadydb-40d4e4842f449c176faca9725f933f24180a277e.tar.gz
TxId
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.cpp2
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp46
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_write_actor.h3
3 files changed, 30 insertions, 21 deletions
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.cpp
index c03a74944f..e28e9d5791 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.cpp
@@ -8,7 +8,7 @@ namespace NYql::NDq {
void RegisterS3WriteActorFactory(TDqAsyncIoFactory& factory, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, IHTTPGateway::TPtr gateway, const std::shared_ptr<NYql::NS3::TRetryConfig>&) {
factory.RegisterSink<NS3::TSink>("S3Sink",
[credentialsFactory, gateway](NS3::TSink&& settings, IDqAsyncIoFactory::TSinkArguments&& args) {
- return CreateS3WriteActor(args.TypeEnv, *args.HolderFactory.GetFunctionRegistry(), args.RandomProvider, gateway, std::move(settings), args.OutputIndex, args.SecureParams, args.Callback, credentialsFactory);
+ return CreateS3WriteActor(args.TypeEnv, *args.HolderFactory.GetFunctionRegistry(), args.RandomProvider, gateway, std::move(settings), args.OutputIndex, args.TxId, args.SecureParams, args.Callback, credentialsFactory);
});
}
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp
index ce91c8e545..47eae564c3 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp
@@ -25,16 +25,16 @@
#include <library/cpp/xml/document/xml-document.h>
-#define LOG_E(stream) \
- LOG_ERROR_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, stream)
-#define LOG_W(stream) \
- LOG_WARN_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, stream)
-#define LOG_I(stream) \
- LOG_INFO_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, stream)
-#define LOG_D(stream) \
- LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, stream)
-#define LOG_T(stream) \
- LOG_TRACE_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, stream)
+#define LOG_E(name, stream) \
+ LOG_ERROR_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, name << ": " << this->SelfId() << ", TxId: " << TxId << ". " << stream)
+#define LOG_W(name, stream) \
+ LOG_WARN_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, name << ": " << this->SelfId() << ", TxId: " << TxId << ". " << stream)
+#define LOG_I(name, stream) \
+ LOG_INFO_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, name << ": " << this->SelfId() << ", TxId: " << TxId << ". " << stream)
+#define LOG_D(name, stream) \
+ LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, name << ": " << this->SelfId() << ", TxId: " << TxId << ". " << stream)
+#define LOG_T(name, stream) \
+ LOG_TRACE_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, name << ": " << this->SelfId() << ", TxId: " << TxId << ". " << stream)
namespace NYql::NDq {
@@ -90,10 +90,12 @@ class TS3FileWriteActor : public TActorBootstrapped<TS3FileWriteActor> {
public:
TS3FileWriteActor(
+ const TTxId& txId,
IHTTPGateway::TPtr gateway,
NYdb::TCredentialsProviderPtr credProvider,
const TString& key, const TString& url, size_t sizeLimit, const std::string_view& compression)
- : Gateway(std::move(gateway))
+ : TxId(txId)
+ , Gateway(std::move(gateway))
, CredProvider(std::move(credProvider))
, ActorSystem(TActivationContext::ActorSystem())
, Key(key), Url(url), SizeLimit(sizeLimit), Parts(MakeCompressorQueue(compression))
@@ -103,7 +105,7 @@ public:
void Bootstrap(const TActorId& parentId) {
ParentId = parentId;
- LOG_D("TS3FileWriteActor: BootStrapped" << SelfId() << " by " << ParentId);
+ LOG_D("TS3FileWriteActor", "Bootstrapped by " << ParentId);
if (Parts->IsSealed() && 1U == Parts->Size()) {
const auto size = Parts->Volume();
InFlight += size;
@@ -119,9 +121,9 @@ public:
void PassAway() override {
if (InFlight || !Parts->Empty()) {
- LOG_W("TS3FileWriteActor: PassAway " << SelfId() << " NOT finished, InFlight: " << InFlight << ", Parts: " << Parts->Size());
+ LOG_W("TS3FileWriteActor", "PassAway but NOT finished, InFlight: " << InFlight << ", Parts: " << Parts->Size());
} else {
- LOG_D("TS3FileWriteActor: PassAway " << SelfId());
+ LOG_D("TS3FileWriteActor", "PassAway");
}
TActorBootstrapped<TS3FileWriteActor>::PassAway();
}
@@ -303,6 +305,7 @@ private:
size_t InFlight = 0ULL;
size_t SentSize = 0ULL;
+ const TTxId TxId;
const IHTTPGateway::TPtr Gateway;
const NYdb::TCredentialsProviderPtr CredProvider;
@@ -322,6 +325,7 @@ private:
class TS3WriteActor : public TActorBootstrapped<TS3WriteActor>, public IDqComputeActorAsyncOutput {
public:
TS3WriteActor(ui64 outputIndex,
+ const TTxId& txId,
IHTTPGateway::TPtr gateway,
NYdb::TCredentialsProviderPtr credProvider,
IRandomProvider* randomProvider,
@@ -336,6 +340,7 @@ public:
, CredProvider(std::move(credProvider))
, RandomProvider(randomProvider)
, OutputIndex(outputIndex)
+ , TxId(txId)
, Callbacks(callbacks)
, Url(url)
, Path(path)
@@ -351,7 +356,7 @@ public:
}
void Bootstrap() {
- LOG_D("TS3WriteActor: BootStrapped" << SelfId());
+ LOG_D("TS3WriteActor", "BootStrapped");
Become(&TS3WriteActor::StateFunc);
}
@@ -396,7 +401,7 @@ private:
const auto& key = MakeKey(v);
const auto ins = FileWriteActors.emplace(key, std::vector<TS3FileWriteActor*>());
if (ins.second || ins.first->second.empty() || ins.first->second.back()->IsFinishing()) {
- auto fileWrite = std::make_unique<TS3FileWriteActor>(Gateway, CredProvider, key, Url + Path + key + MakeSuffix(), MaxFileSize, Compression);
+ auto fileWrite = std::make_unique<TS3FileWriteActor>(TxId, Gateway, CredProvider, key, Url + Path + key + MakeSuffix(), MaxFileSize, Compression);
ins.first->second.emplace_back(fileWrite.get());
RegisterWithSameMailbox(fileWrite.release());
}
@@ -410,7 +415,7 @@ private:
}
void Handle(TEvPrivate::TEvUploadError::TPtr& result) {
- LOG_W("TS3WriteActor: TEvUploadError " << SelfId() << " " << result->Get()->Error.ToOneLineString());
+ LOG_W("TS3WriteActor", "TEvUploadError " << result->Get()->Error.ToOneLineString());
Callbacks->OnAsyncOutputError(OutputIndex, result->Get()->Error, NYql::NDqProto::StatusIds::EXTERNAL_ERROR);
}
@@ -440,9 +445,9 @@ private:
FileWriteActors.clear();
if (fileWriterCount) {
- LOG_W("TS3WriteActor: PassAway " << SelfId() << " with " << fileWriterCount << " NOT finished FileWriter(s)");
+ LOG_W("TS3WriteActor", "PassAway with " << fileWriterCount << " NOT finished FileWriter(s)");
} else {
- LOG_D("TS3WriteActor: PassAway " << SelfId());
+ LOG_D("TS3WriteActor", "PassAway");
}
TActorBootstrapped<TS3WriteActor>::PassAway();
@@ -454,6 +459,7 @@ private:
TIntrusivePtr<IRandomProvider> DefaultRandomProvider;
const ui64 OutputIndex;
+ const TTxId TxId;
IDqComputeActorAsyncOutput::ICallbacks *const Callbacks;
const TString Url;
@@ -476,6 +482,7 @@ std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateS3WriteActor(
IHTTPGateway::TPtr gateway,
NS3::TSink&& params,
ui64 outputIndex,
+ const TTxId& txId,
const THashMap<TString, TString>& secureParams,
IDqComputeActorAsyncOutput::ICallbacks* callbacks,
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory)
@@ -484,6 +491,7 @@ std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateS3WriteActor(
const auto credentialsProviderFactory = CreateCredentialsProviderFactoryForStructuredToken(credentialsFactory, token);
const auto actor = new TS3WriteActor(
outputIndex,
+ txId,
std::move(gateway),
credentialsProviderFactory->CreateProvider(),
randomProvider, params.GetUrl(),
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.h b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.h
index 61d96b4f46..cfd88e3a2f 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.h
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.h
@@ -15,7 +15,8 @@ std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateS3WriteActor(
IRandomProvider*,
IHTTPGateway::TPtr gateway,
NS3::TSink&& params,
- ui64 inputIndex,
+ ui64 outputIndex,
+ const TTxId& txId,
const THashMap<TString, TString>& secureParams,
IDqComputeActorAsyncOutput::ICallbacks* callbacks,
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory);