diff options
author | hor911 <hor911@ydb.tech> | 2022-09-02 10:29:39 +0300 |
---|---|---|
committer | hor911 <hor911@ydb.tech> | 2022-09-02 10:29:39 +0300 |
commit | 40d4e4842f449c176faca9725f933f24180a277e (patch) | |
tree | 4a41ceaae12df4633a8d969045eb99f1b9e48dac | |
parent | 541ed5eac5b51c78bb18304e014a6c38f626a9d0 (diff) | |
download | ydb-40d4e4842f449c176faca9725f933f24180a277e.tar.gz |
TxId
3 files changed, 30 insertions, 21 deletions
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.cpp index c03a74944f..e28e9d5791 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.cpp @@ -8,7 +8,7 @@ namespace NYql::NDq { void RegisterS3WriteActorFactory(TDqAsyncIoFactory& factory, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, IHTTPGateway::TPtr gateway, const std::shared_ptr<NYql::NS3::TRetryConfig>&) { factory.RegisterSink<NS3::TSink>("S3Sink", [credentialsFactory, gateway](NS3::TSink&& settings, IDqAsyncIoFactory::TSinkArguments&& args) { - return CreateS3WriteActor(args.TypeEnv, *args.HolderFactory.GetFunctionRegistry(), args.RandomProvider, gateway, std::move(settings), args.OutputIndex, args.SecureParams, args.Callback, credentialsFactory); + return CreateS3WriteActor(args.TypeEnv, *args.HolderFactory.GetFunctionRegistry(), args.RandomProvider, gateway, std::move(settings), args.OutputIndex, args.TxId, args.SecureParams, args.Callback, credentialsFactory); }); } diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp index ce91c8e545..47eae564c3 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp @@ -25,16 +25,16 @@ #include <library/cpp/xml/document/xml-document.h> -#define LOG_E(stream) \ - LOG_ERROR_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, stream) -#define LOG_W(stream) \ - LOG_WARN_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, stream) -#define LOG_I(stream) \ - LOG_INFO_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, stream) -#define LOG_D(stream) \ - LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, stream) -#define LOG_T(stream) \ - LOG_TRACE_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, stream) +#define LOG_E(name, stream) \ + LOG_ERROR_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, name << ": " << this->SelfId() << ", TxId: " << TxId << ". " << stream) +#define LOG_W(name, stream) \ + LOG_WARN_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, name << ": " << this->SelfId() << ", TxId: " << TxId << ". " << stream) +#define LOG_I(name, stream) \ + LOG_INFO_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, name << ": " << this->SelfId() << ", TxId: " << TxId << ". " << stream) +#define LOG_D(name, stream) \ + LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, name << ": " << this->SelfId() << ", TxId: " << TxId << ". " << stream) +#define LOG_T(name, stream) \ + LOG_TRACE_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, name << ": " << this->SelfId() << ", TxId: " << TxId << ". " << stream) namespace NYql::NDq { @@ -90,10 +90,12 @@ class TS3FileWriteActor : public TActorBootstrapped<TS3FileWriteActor> { public: TS3FileWriteActor( + const TTxId& txId, IHTTPGateway::TPtr gateway, NYdb::TCredentialsProviderPtr credProvider, const TString& key, const TString& url, size_t sizeLimit, const std::string_view& compression) - : Gateway(std::move(gateway)) + : TxId(txId) + , Gateway(std::move(gateway)) , CredProvider(std::move(credProvider)) , ActorSystem(TActivationContext::ActorSystem()) , Key(key), Url(url), SizeLimit(sizeLimit), Parts(MakeCompressorQueue(compression)) @@ -103,7 +105,7 @@ public: void Bootstrap(const TActorId& parentId) { ParentId = parentId; - LOG_D("TS3FileWriteActor: BootStrapped" << SelfId() << " by " << ParentId); + LOG_D("TS3FileWriteActor", "Bootstrapped by " << ParentId); if (Parts->IsSealed() && 1U == Parts->Size()) { const auto size = Parts->Volume(); InFlight += size; @@ -119,9 +121,9 @@ public: void PassAway() override { if (InFlight || !Parts->Empty()) { - LOG_W("TS3FileWriteActor: PassAway " << SelfId() << " NOT finished, InFlight: " << InFlight << ", Parts: " << Parts->Size()); + LOG_W("TS3FileWriteActor", "PassAway but NOT finished, InFlight: " << InFlight << ", Parts: " << Parts->Size()); } else { - LOG_D("TS3FileWriteActor: PassAway " << SelfId()); + LOG_D("TS3FileWriteActor", "PassAway"); } TActorBootstrapped<TS3FileWriteActor>::PassAway(); } @@ -303,6 +305,7 @@ private: size_t InFlight = 0ULL; size_t SentSize = 0ULL; + const TTxId TxId; const IHTTPGateway::TPtr Gateway; const NYdb::TCredentialsProviderPtr CredProvider; @@ -322,6 +325,7 @@ private: class TS3WriteActor : public TActorBootstrapped<TS3WriteActor>, public IDqComputeActorAsyncOutput { public: TS3WriteActor(ui64 outputIndex, + const TTxId& txId, IHTTPGateway::TPtr gateway, NYdb::TCredentialsProviderPtr credProvider, IRandomProvider* randomProvider, @@ -336,6 +340,7 @@ public: , CredProvider(std::move(credProvider)) , RandomProvider(randomProvider) , OutputIndex(outputIndex) + , TxId(txId) , Callbacks(callbacks) , Url(url) , Path(path) @@ -351,7 +356,7 @@ public: } void Bootstrap() { - LOG_D("TS3WriteActor: BootStrapped" << SelfId()); + LOG_D("TS3WriteActor", "BootStrapped"); Become(&TS3WriteActor::StateFunc); } @@ -396,7 +401,7 @@ private: const auto& key = MakeKey(v); const auto ins = FileWriteActors.emplace(key, std::vector<TS3FileWriteActor*>()); if (ins.second || ins.first->second.empty() || ins.first->second.back()->IsFinishing()) { - auto fileWrite = std::make_unique<TS3FileWriteActor>(Gateway, CredProvider, key, Url + Path + key + MakeSuffix(), MaxFileSize, Compression); + auto fileWrite = std::make_unique<TS3FileWriteActor>(TxId, Gateway, CredProvider, key, Url + Path + key + MakeSuffix(), MaxFileSize, Compression); ins.first->second.emplace_back(fileWrite.get()); RegisterWithSameMailbox(fileWrite.release()); } @@ -410,7 +415,7 @@ private: } void Handle(TEvPrivate::TEvUploadError::TPtr& result) { - LOG_W("TS3WriteActor: TEvUploadError " << SelfId() << " " << result->Get()->Error.ToOneLineString()); + LOG_W("TS3WriteActor", "TEvUploadError " << result->Get()->Error.ToOneLineString()); Callbacks->OnAsyncOutputError(OutputIndex, result->Get()->Error, NYql::NDqProto::StatusIds::EXTERNAL_ERROR); } @@ -440,9 +445,9 @@ private: FileWriteActors.clear(); if (fileWriterCount) { - LOG_W("TS3WriteActor: PassAway " << SelfId() << " with " << fileWriterCount << " NOT finished FileWriter(s)"); + LOG_W("TS3WriteActor", "PassAway with " << fileWriterCount << " NOT finished FileWriter(s)"); } else { - LOG_D("TS3WriteActor: PassAway " << SelfId()); + LOG_D("TS3WriteActor", "PassAway"); } TActorBootstrapped<TS3WriteActor>::PassAway(); @@ -454,6 +459,7 @@ private: TIntrusivePtr<IRandomProvider> DefaultRandomProvider; const ui64 OutputIndex; + const TTxId TxId; IDqComputeActorAsyncOutput::ICallbacks *const Callbacks; const TString Url; @@ -476,6 +482,7 @@ std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateS3WriteActor( IHTTPGateway::TPtr gateway, NS3::TSink&& params, ui64 outputIndex, + const TTxId& txId, const THashMap<TString, TString>& secureParams, IDqComputeActorAsyncOutput::ICallbacks* callbacks, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory) @@ -484,6 +491,7 @@ std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateS3WriteActor( const auto credentialsProviderFactory = CreateCredentialsProviderFactoryForStructuredToken(credentialsFactory, token); const auto actor = new TS3WriteActor( outputIndex, + txId, std::move(gateway), credentialsProviderFactory->CreateProvider(), randomProvider, params.GetUrl(), diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.h b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.h index 61d96b4f46..cfd88e3a2f 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.h +++ b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.h @@ -15,7 +15,8 @@ std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateS3WriteActor( IRandomProvider*, IHTTPGateway::TPtr gateway, NS3::TSink&& params, - ui64 inputIndex, + ui64 outputIndex, + const TTxId& txId, const THashMap<TString, TString>& secureParams, IDqComputeActorAsyncOutput::ICallbacks* callbacks, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory); |