diff options
author | Nikita Vasilev <ns-vasilev@ydb.tech> | 2025-04-21 16:09:23 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-04-21 13:09:23 +0000 |
commit | 7aa367add8eb8304aa00b2291090ed93587f818a (patch) | |
tree | eed55cdc6fe26ddac6d73ba020a227bb12ecbf2e | |
parent | ef35ffef726ed786e6145e3172dae71aeaddceac (diff) | |
download | ydb-7aa367add8eb8304aa00b2291090ed93587f818a.tar.gz |
Fix empty upsert with sink (#17482)
-rw-r--r-- | ydb/core/kqp/runtime/kqp_write_actor.cpp | 15 | ||||
-rw-r--r-- | ydb/core/kqp/ut/query/kqp_query_ut.cpp | 54 |
2 files changed, 66 insertions, 3 deletions
diff --git a/ydb/core/kqp/runtime/kqp_write_actor.cpp b/ydb/core/kqp/runtime/kqp_write_actor.cpp index 9c29d190d2a..f28d0e3e6df 100644 --- a/ydb/core/kqp/runtime/kqp_write_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_write_actor.cpp @@ -1747,7 +1747,6 @@ public: void Handle(TEvBufferWrite::TPtr& ev) { Counters->ForwardActorWritesLatencyHistogram->Collect((TInstant::Now() - ev->Get()->SendTime).MicroSeconds()); - TWriteToken token; if (!ev->Get()->Token) { AFL_ENSURE(ev->Get()->Settings); @@ -2967,6 +2966,12 @@ private: void Handle(TEvBufferWriteResult::TPtr& result) { CA_LOG_D("TKqpForwardWriteActor recieve EvBufferWriteResult from " << BufferActorId); + + WriteToken = result->Get()->Token; + OnFlushed(); + } + + void OnFlushed() { InFlight = false; EgressStats.Bytes += DataSize; @@ -2975,8 +2980,6 @@ private: EgressStats.Resume(); Counters->ForwardActorWritesSizeHistogram->Collect(DataSize); - - WriteToken = result->Get()->Token; DataSize = 0; if (Closed) { @@ -3032,6 +3035,12 @@ private: ev->SendTime = TInstant::Now(); + if (ev->Data->IsEmpty() && ev->Close && WriteToken.IsEmpty()) { + // Nothing was written + OnFlushed(); + return; + } + CA_LOG_D("Send data=" << DataSize << ", closed=" << Closed << ", bufferActorId=" << BufferActorId); AFL_ENSURE(Send(BufferActorId, ev.release())); } diff --git a/ydb/core/kqp/ut/query/kqp_query_ut.cpp b/ydb/core/kqp/ut/query/kqp_query_ut.cpp index 1f2d3be087c..4b0212c9a79 100644 --- a/ydb/core/kqp/ut/query/kqp_query_ut.cpp +++ b/ydb/core/kqp/ut/query/kqp_query_ut.cpp @@ -2580,6 +2580,60 @@ Y_UNIT_TEST_SUITE(KqpQuery) { } } + Y_UNIT_TEST_TWIN(UpdateThenDelete, UseSink) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableOltpSink(UseSink); + auto settings = TKikimrSettings() + .SetAppConfig(appConfig) + .SetWithSampleTables(true); + + TKikimrRunner kikimr(settings); + auto client = kikimr.GetTableClient(); + + { + const TString query = R"( + DECLARE $data AS List<Struct< + Key: String, + Value: String + >>; + + UPSERT INTO KeyValue2 SELECT * FROM AS_TABLE($data); + + DELETE FROM KeyValue2 ON SELECT * FROM KeyValue2 AS a LEFT ONLY JOIN AS_TABLE($data) AS b USING (Key); + )"; + + TTypeBuilder builder; + builder + .BeginStruct() + .AddMember("Key", TTypeBuilder().Primitive(NYdb::EPrimitiveType::String).Build()) + .AddMember("Value", TTypeBuilder().Primitive(NYdb::EPrimitiveType::String).Build()) + .EndStruct(); + + auto params = client.GetParamsBuilder() + .AddParam("$data") + .EmptyList(builder.Build()) + .Build() + .Build(); + + auto session = client.CreateSession().GetValueSync().GetSession(); + auto result = session.ExecuteDataQuery(query, NYdb::NTable::TTxControl::BeginTx().CommitTx(), std::move(params)).ExtractValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + + } + { + const TString query = R"( + SELECT * FROM KeyValue2; + )"; + + auto session = client.CreateSession().GetValueSync().GetSession(); + auto result = session.ExecuteDataQuery(query, NYdb::NTable::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + + Cerr << FormatResultSetYson(result.GetResultSet(0)) << Endl; + UNIT_ASSERT_VALUES_EQUAL(0, result.GetResultSet(0).RowsCount()); + } + } + } } // namespace NKqp |