diff options
author | hcpp <hcpp@ydb.tech> | 2022-09-15 17:39:27 +0300 |
---|---|---|
committer | hcpp <hcpp@ydb.tech> | 2022-09-15 17:39:27 +0300 |
commit | f59b0432fa711edd8731e587548b58d728aeaf39 (patch) | |
tree | 0d8a9d2692f85d31ee2a22f63fc345d27a968f05 | |
parent | dd18089985e8371123ee3caf62709243e5e0e1e3 (diff) | |
download | ydb-f59b0432fa711edd8731e587548b58d728aeaf39.tar.gz |
s3 read actor has been fixed on finalizing
-rw-r--r-- | ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp | 30 |
1 files changed, 19 insertions, 11 deletions
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index 7ee9dd2af8e..a9ef106a6d6 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -72,15 +72,15 @@ LOG_TRACE_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, name << ": " << this->SelfId() << ", TxId: " << TxId << ". " << stream) #define LOG_CORO_E(name, stream) \ - LOG_ERROR_S(GetActorContext(), NKikimrServices::KQP_COMPUTE, name << ": " << SelfActorId << ", TxId: " << TxId << ". " << stream) + LOG_ERROR_S(GetActorContext(), NKikimrServices::KQP_COMPUTE, name << ": " << SelfActorId << ", CA: " << ComputeActorId << ", TxId: " << TxId << ". " << stream) #define LOG_CORO_W(name, stream) \ - LOG_WARN_S(GetActorContext(), NKikimrServices::KQP_COMPUTE, name << ": " << SelfActorId << ", TxId: " << TxId << ". " << stream) + LOG_WARN_S(GetActorContext(), NKikimrServices::KQP_COMPUTE, name << ": " << SelfActorId << ", CA: " << ComputeActorId << ", TxId: " << TxId << ". " << stream) #define LOG_CORO_I(name, stream) \ - LOG_INFO_S(GetActorContext(), NKikimrServices::KQP_COMPUTE, name << ": " << SelfActorId << ", TxId: " << TxId << ". " << stream) + LOG_INFO_S(GetActorContext(), NKikimrServices::KQP_COMPUTE, name << ": " << SelfActorId << ", CA: " << ComputeActorId << ", TxId: " << TxId << ". " << stream) #define LOG_CORO_D(name, stream) \ - LOG_DEBUG_S(GetActorContext(), NKikimrServices::KQP_COMPUTE, name << ": " << SelfActorId << ", TxId: " << TxId << ". " << stream) + LOG_DEBUG_S(GetActorContext(), NKikimrServices::KQP_COMPUTE, name << ": " << SelfActorId << ", CA: " << ComputeActorId << ", TxId: " << TxId << ". " << stream) #define LOG_CORO_T(name, stream) \ - LOG_TRACE_S(GetActorContext(), NKikimrServices::KQP_COMPUTE, name << ": " << SelfActorId << ", TxId: " << TxId << ". " << stream) + LOG_TRACE_S(GetActorContext(), NKikimrServices::KQP_COMPUTE, name << ": " << SelfActorId << ", CA: " << ComputeActorId << ", TxId: " << TxId << ". " << stream) namespace NYql::NDq { @@ -189,7 +189,7 @@ public: {} void Bootstrap() { - LOG_D("TS3ReadActor", __func__ << ", InputIndex: " << InputIndex); + LOG_D("TS3ReadActor", "Bootstrap" << ", InputIndex: " << InputIndex); Become(&TS3ReadActor::StateFunc); for (size_t pathInd = 0; pathInd < Paths.size(); ++pathInd) { const TPath& path = Paths[pathInd]; @@ -280,7 +280,7 @@ private: // IActor & IDqComputeActorAsyncInput void PassAway() override { // Is called from Compute Actor - LOG_D("TS3ReadActor", __func__); + LOG_D("TS3ReadActor", "PassAway"); ContainerCache.Clear(); TActorBootstrapped<TS3ReadActor>::PassAway(); } @@ -466,7 +466,7 @@ public: } private: void WaitFinish() { - LOG_CORO_D("TS3ReadCoroImpl", __func__); + LOG_CORO_D("TS3ReadCoroImpl", "WaitFinish: " << Path); if (InputFinished) return; @@ -488,7 +488,7 @@ private: void Run() final try { - LOG_CORO_D("TS3ReadCoroImpl", __func__ << ", Path: " << Path); + LOG_CORO_D("TS3ReadCoroImpl", "Run" << ", Path: " << Path); NYql::NDqProto::StatusIds::StatusCode fatalCode = NYql::NDqProto::StatusIds::EXTERNAL_ERROR; @@ -642,7 +642,7 @@ public: {} void Bootstrap() { - LOG_D("TS3StreamReadActor", __func__); + LOG_D("TS3StreamReadActor", "Bootstrap"); Become(&TS3StreamReadActor::StateFunc); for (size_t pathInd = 0; pathInd < Paths.size(); ++pathInd) { const TPath& path = Paths[pathInd]; @@ -710,7 +710,7 @@ private: // IActor & IDqComputeActorAsyncInput void PassAway() override { // Is called from Compute Actor - LOG_D("TS3StreamReadActor", __func__); + LOG_D("TS3StreamReadActor", "PassAway"); for (auto stuff: RetryStuffForFile) { stuff->Cancel(); } @@ -744,6 +744,14 @@ private: void HandleReadFinished() { Y_VERIFY(Count); --Count; + /* + If an empty range is being downloaded on the last file, + then we need to pass the information to Compute Actor that + the download of all data is finished in this place + */ + if (Blocks.empty() && Count == 0) { + Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived(InputIndex)); + } } const IHTTPGateway::TPtr Gateway; |