diff options
author | grigoriypisar <grigoriypisar@yandex-team.com> | 2023-09-17 14:47:53 +0300 |
---|---|---|
committer | grigoriypisar <grigoriypisar@yandex-team.com> | 2023-09-17 15:07:38 +0300 |
commit | 9fd9a180a25ac9213be5e56198f2289de0d0d977 (patch) | |
tree | 1076b8b09adce1c462c9e8103566c820f3678e06 | |
parent | 22213ecabb27010fac4380872286d536bf734c17 (diff) | |
download | ydb-9fd9a180a25ac9213be5e56198f2289de0d0d977.tar.gz |
fix endless computing
Исправлено зависание в случае, когда TS3ReadActor получает на вход пустую или не существующую директорию.
-rw-r--r-- | ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp | 64 | ||||
-rw-r--r-- | ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp | 4 |
2 files changed, 68 insertions, 0 deletions
diff --git a/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp b/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp index 23b2ffaad8..d5ed18218b 100644 --- a/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp +++ b/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp @@ -1096,6 +1096,70 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser("year").GetUint32(), 1); UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser("month").GetUint32(), 2); } + + Y_UNIT_TEST(ExecuteScriptWithEmptyCustomPartitioning) { + using namespace fmt::literals; + const TString bucket = "test_bucket1"; + const TString object = "year=2021/test_object"; + + CreateBucketWithObject(bucket, object, ""); + + auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make()); + + auto tc = kikimr->GetTableClient(); + auto session = tc.CreateSession().GetValueSync().GetSession(); + const TString query = fmt::format(R"( + CREATE EXTERNAL DATA SOURCE `/Root/external_data_source` WITH ( + SOURCE_TYPE="ObjectStorage", + LOCATION="{location}", + AUTH_METHOD="NONE" + );)", + "location"_a = GetBucketLocation(bucket) + ); + + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + + auto db = kikimr->GetQueryClient(); + const TString sql = R"( + $projection = @@ { + "projection.enabled" : "true", + "storage.location.template" : "/${date}", + "projection.date.type" : "date", + "projection.date.min" : "2022-11-02", + "projection.date.max" : "2023-12-02", + "projection.date.interval" : "1", + "projection.date.format" : "/year=%Y", + "projection.date.unit" : "YEARS" + } @@; + + SELECT * + FROM `/Root/external_data_source`.`/` + WITH ( + FORMAT="raw", + + SCHEMA=( + `data` String NOT NULL, + `date` Date NOT NULL + ), + + partitioned_by=(`date`), + projection=$projection + ) + )"; + + auto scriptExecutionOperation = db.ExecuteScript(sql).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString()); + + NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver()); + UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed); + TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Id(), 0).ExtractValueSync(); + UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString()); + + TResultSetParser resultSet(results.ExtractResultSet()); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 2); + UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 0); + } } } // namespace NKqp diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index 42a144aaf4..fbee12bfcf 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -818,6 +818,10 @@ private: std::make_move_iterator(objectPathBatch->Get()->ObjectPaths.begin()), std::make_move_iterator(objectPathBatch->Get()->ObjectPaths.end())); while (TryStartDownload()) {} + + if (LastFileWasProcessed()) { + Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex)); + } } void HandleObjectPathReadError(TEvPrivate::TEvObjectPathReadError::TPtr& result) { IsObjectQueueEmpty = true; |