aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhcpp <hcpp@ydb.tech>2022-09-15 17:39:27 +0300
committerhcpp <hcpp@ydb.tech>2022-09-15 17:39:27 +0300
commitf59b0432fa711edd8731e587548b58d728aeaf39 (patch)
tree0d8a9d2692f85d31ee2a22f63fc345d27a968f05
parentdd18089985e8371123ee3caf62709243e5e0e1e3 (diff)
downloadydb-f59b0432fa711edd8731e587548b58d728aeaf39.tar.gz
s3 read actor has been fixed on finalizing
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp30
1 files changed, 19 insertions, 11 deletions
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
index 7ee9dd2af8e..a9ef106a6d6 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
@@ -72,15 +72,15 @@
LOG_TRACE_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, name << ": " << this->SelfId() << ", TxId: " << TxId << ". " << stream)
#define LOG_CORO_E(name, stream) \
- LOG_ERROR_S(GetActorContext(), NKikimrServices::KQP_COMPUTE, name << ": " << SelfActorId << ", TxId: " << TxId << ". " << stream)
+ LOG_ERROR_S(GetActorContext(), NKikimrServices::KQP_COMPUTE, name << ": " << SelfActorId << ", CA: " << ComputeActorId << ", TxId: " << TxId << ". " << stream)
#define LOG_CORO_W(name, stream) \
- LOG_WARN_S(GetActorContext(), NKikimrServices::KQP_COMPUTE, name << ": " << SelfActorId << ", TxId: " << TxId << ". " << stream)
+ LOG_WARN_S(GetActorContext(), NKikimrServices::KQP_COMPUTE, name << ": " << SelfActorId << ", CA: " << ComputeActorId << ", TxId: " << TxId << ". " << stream)
#define LOG_CORO_I(name, stream) \
- LOG_INFO_S(GetActorContext(), NKikimrServices::KQP_COMPUTE, name << ": " << SelfActorId << ", TxId: " << TxId << ". " << stream)
+ LOG_INFO_S(GetActorContext(), NKikimrServices::KQP_COMPUTE, name << ": " << SelfActorId << ", CA: " << ComputeActorId << ", TxId: " << TxId << ". " << stream)
#define LOG_CORO_D(name, stream) \
- LOG_DEBUG_S(GetActorContext(), NKikimrServices::KQP_COMPUTE, name << ": " << SelfActorId << ", TxId: " << TxId << ". " << stream)
+ LOG_DEBUG_S(GetActorContext(), NKikimrServices::KQP_COMPUTE, name << ": " << SelfActorId << ", CA: " << ComputeActorId << ", TxId: " << TxId << ". " << stream)
#define LOG_CORO_T(name, stream) \
- LOG_TRACE_S(GetActorContext(), NKikimrServices::KQP_COMPUTE, name << ": " << SelfActorId << ", TxId: " << TxId << ". " << stream)
+ LOG_TRACE_S(GetActorContext(), NKikimrServices::KQP_COMPUTE, name << ": " << SelfActorId << ", CA: " << ComputeActorId << ", TxId: " << TxId << ". " << stream)
namespace NYql::NDq {
@@ -189,7 +189,7 @@ public:
{}
void Bootstrap() {
- LOG_D("TS3ReadActor", __func__ << ", InputIndex: " << InputIndex);
+ LOG_D("TS3ReadActor", "Bootstrap" << ", InputIndex: " << InputIndex);
Become(&TS3ReadActor::StateFunc);
for (size_t pathInd = 0; pathInd < Paths.size(); ++pathInd) {
const TPath& path = Paths[pathInd];
@@ -280,7 +280,7 @@ private:
// IActor & IDqComputeActorAsyncInput
void PassAway() override { // Is called from Compute Actor
- LOG_D("TS3ReadActor", __func__);
+ LOG_D("TS3ReadActor", "PassAway");
ContainerCache.Clear();
TActorBootstrapped<TS3ReadActor>::PassAway();
}
@@ -466,7 +466,7 @@ public:
}
private:
void WaitFinish() {
- LOG_CORO_D("TS3ReadCoroImpl", __func__);
+ LOG_CORO_D("TS3ReadCoroImpl", "WaitFinish: " << Path);
if (InputFinished)
return;
@@ -488,7 +488,7 @@ private:
void Run() final try {
- LOG_CORO_D("TS3ReadCoroImpl", __func__ << ", Path: " << Path);
+ LOG_CORO_D("TS3ReadCoroImpl", "Run" << ", Path: " << Path);
NYql::NDqProto::StatusIds::StatusCode fatalCode = NYql::NDqProto::StatusIds::EXTERNAL_ERROR;
@@ -642,7 +642,7 @@ public:
{}
void Bootstrap() {
- LOG_D("TS3StreamReadActor", __func__);
+ LOG_D("TS3StreamReadActor", "Bootstrap");
Become(&TS3StreamReadActor::StateFunc);
for (size_t pathInd = 0; pathInd < Paths.size(); ++pathInd) {
const TPath& path = Paths[pathInd];
@@ -710,7 +710,7 @@ private:
// IActor & IDqComputeActorAsyncInput
void PassAway() override { // Is called from Compute Actor
- LOG_D("TS3StreamReadActor", __func__);
+ LOG_D("TS3StreamReadActor", "PassAway");
for (auto stuff: RetryStuffForFile) {
stuff->Cancel();
}
@@ -744,6 +744,14 @@ private:
void HandleReadFinished() {
Y_VERIFY(Count);
--Count;
+ /*
+ If an empty range is being downloaded on the last file,
+ then we need to pass the information to Compute Actor that
+ the download of all data is finished in this place
+ */
+ if (Blocks.empty() && Count == 0) {
+ Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived(InputIndex));
+ }
}
const IHTTPGateway::TPtr Gateway;