aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgrigoriypisar <grigoriypisar@yandex-team.com>2023-09-17 14:47:53 +0300
committergrigoriypisar <grigoriypisar@yandex-team.com>2023-09-17 15:07:38 +0300
commit9fd9a180a25ac9213be5e56198f2289de0d0d977 (patch)
tree1076b8b09adce1c462c9e8103566c820f3678e06
parent22213ecabb27010fac4380872286d536bf734c17 (diff)
downloadydb-9fd9a180a25ac9213be5e56198f2289de0d0d977.tar.gz
fix endless computing
Исправлено зависание в случае, когда TS3ReadActor получает на вход пустую или не существующую директорию.
-rw-r--r--ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp64
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp4
2 files changed, 68 insertions, 0 deletions
diff --git a/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp b/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp
index 23b2ffaad8..d5ed18218b 100644
--- a/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp
+++ b/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp
@@ -1096,6 +1096,70 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser("year").GetUint32(), 1);
UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser("month").GetUint32(), 2);
}
+
+ Y_UNIT_TEST(ExecuteScriptWithEmptyCustomPartitioning) {
+ using namespace fmt::literals;
+ const TString bucket = "test_bucket1";
+ const TString object = "year=2021/test_object";
+
+ CreateBucketWithObject(bucket, object, "");
+
+ auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make());
+
+ auto tc = kikimr->GetTableClient();
+ auto session = tc.CreateSession().GetValueSync().GetSession();
+ const TString query = fmt::format(R"(
+ CREATE EXTERNAL DATA SOURCE `/Root/external_data_source` WITH (
+ SOURCE_TYPE="ObjectStorage",
+ LOCATION="{location}",
+ AUTH_METHOD="NONE"
+ );)",
+ "location"_a = GetBucketLocation(bucket)
+ );
+
+ auto result = session.ExecuteSchemeQuery(query).GetValueSync();
+ UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
+
+ auto db = kikimr->GetQueryClient();
+ const TString sql = R"(
+ $projection = @@ {
+ "projection.enabled" : "true",
+ "storage.location.template" : "/${date}",
+ "projection.date.type" : "date",
+ "projection.date.min" : "2022-11-02",
+ "projection.date.max" : "2023-12-02",
+ "projection.date.interval" : "1",
+ "projection.date.format" : "/year=%Y",
+ "projection.date.unit" : "YEARS"
+ } @@;
+
+ SELECT *
+ FROM `/Root/external_data_source`.`/`
+ WITH (
+ FORMAT="raw",
+
+ SCHEMA=(
+ `data` String NOT NULL,
+ `date` Date NOT NULL
+ ),
+
+ partitioned_by=(`date`),
+ projection=$projection
+ )
+ )";
+
+ auto scriptExecutionOperation = db.ExecuteScript(sql).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString());
+
+ NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver());
+ UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed);
+ TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Id(), 0).ExtractValueSync();
+ UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString());
+
+ TResultSetParser resultSet(results.ExtractResultSet());
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 2);
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 0);
+ }
}
} // namespace NKqp
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
index 42a144aaf4..fbee12bfcf 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
@@ -818,6 +818,10 @@ private:
std::make_move_iterator(objectPathBatch->Get()->ObjectPaths.begin()),
std::make_move_iterator(objectPathBatch->Get()->ObjectPaths.end()));
while (TryStartDownload()) {}
+
+ if (LastFileWasProcessed()) {
+ Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex));
+ }
}
void HandleObjectPathReadError(TEvPrivate::TEvObjectPathReadError::TPtr& result) {
IsObjectQueueEmpty = true;