diff options
| author | nsofya <[email protected]> | 2024-01-11 17:31:53 +0300 |
|---|---|---|
| committer | GitHub <[email protected]> | 2024-01-11 17:31:53 +0300 |
| commit | 53aa305accd90893de5a617df796bef4c2a74fdb (patch) | |
| tree | 5c477cd67ab996649b83cd9c62c5fe69ab292173 | |
| parent | 10f2783de2d74a54b377d0d6d88618986d3b00eb (diff) | |
Remove TEvColumnShard::TEvRead (#927)
* Remove TLongTxReadRPC
* Remove TEvColumnShard::TEvRead
| -rw-r--r-- | ydb/core/grpc_services/rpc_long_tx.cpp | 254 | ||||
| -rw-r--r-- | ydb/core/grpc_services/service_longtx.h | 1 | ||||
| -rw-r--r-- | ydb/core/tx/columnshard/columnshard.cpp | 2 | ||||
| -rw-r--r-- | ydb/core/tx/columnshard/columnshard.h | 61 | ||||
| -rw-r--r-- | ydb/core/tx/columnshard/columnshard__read.cpp | 165 | ||||
| -rw-r--r-- | ydb/core/tx/columnshard/columnshard_impl.cpp | 8 | ||||
| -rw-r--r-- | ydb/core/tx/columnshard/columnshard_impl.h | 10 | ||||
| -rw-r--r-- | ydb/core/tx/columnshard/read_actor.cpp | 238 | ||||
| -rw-r--r-- | ydb/core/tx/columnshard/ya.make | 2 | ||||
| -rw-r--r-- | ydb/public/api/grpc/draft/ydb_long_tx_v1.proto | 1 | ||||
| -rw-r--r-- | ydb/public/api/protos/draft/ydb_long_tx.proto | 21 | ||||
| -rw-r--r-- | ydb/public/sdk/cpp/client/draft/ydb_long_tx.cpp | 20 | ||||
| -rw-r--r-- | ydb/public/sdk/cpp/client/draft/ydb_long_tx.h | 20 | ||||
| -rw-r--r-- | ydb/services/ydb/ydb_long_tx.cpp | 1 | ||||
| -rw-r--r-- | ydb/services/ydb/ydb_long_tx_ut.cpp | 297 |
15 files changed, 3 insertions, 1098 deletions
diff --git a/ydb/core/grpc_services/rpc_long_tx.cpp b/ydb/core/grpc_services/rpc_long_tx.cpp index eb1533d455a..80e9d7e7682 100644 --- a/ydb/core/grpc_services/rpc_long_tx.cpp +++ b/ydb/core/grpc_services/rpc_long_tx.cpp @@ -33,8 +33,6 @@ using TEvLongTxRollbackRequest = NGRpcService::TGrpcRequestOperationCall<Ydb::Lo Ydb::LongTx::RollbackTransactionResponse>; using TEvLongTxWriteRequest = NGRpcService::TGrpcRequestOperationCall<Ydb::LongTx::WriteRequest, Ydb::LongTx::WriteResponse>; -using TEvLongTxReadRequest = NGRpcService::TGrpcRequestOperationCall<Ydb::LongTx::ReadRequest, - Ydb::LongTx::ReadResponse>; } @@ -308,12 +306,12 @@ protected: } else { IndexReady = true; } - + auto shardsSplitter = NEvWrite::IShardsSplitter::BuildSplitter(entry); if (!shardsSplitter) { return ReplyError(Ydb::StatusIds::BAD_REQUEST, "Shard splitter not implemented for table kind"); } - + auto initStatus = shardsSplitter->SplitData(entry, GetDataAccessor()); if (!initStatus.Ok()) { return ReplyError(initStatus.GetStatus(), initStatus.GetErrorMessage()); @@ -630,250 +628,6 @@ TActorId DoLongTxWriteSameMailbox(const TActorContext& ctx, const TActorId& repl new TLongTxWriteInternal(replyTo, longTxId, dedupId, databaseName, path, navigateResult, batch, issues)); } - -class TLongTxReadRPC : public TActorBootstrapped<TLongTxReadRPC> { - using TBase = TActorBootstrapped<TLongTxReadRPC>; - -private: - static const constexpr ui32 MaxRetriesPerShard = 10; - -public: - static constexpr NKikimrServices::TActivity::EType ActorActivityType() { - return NKikimrServices::TActivity::GRPC_REQ; - } - - explicit TLongTxReadRPC(std::unique_ptr<IRequestOpCtx> request) - : TBase() - , Request(std::move(request)) - , DatabaseName(Request->GetDatabaseName().GetOrElse(DatabaseFromDomain(AppData()))) - , SchemeCache(MakeSchemeCacheID()) - , LeaderPipeCache(MakePipePeNodeCacheID(false)) - , TableId(0) - , OutChunkNumber(0) - { - - } - - void Bootstrap() { - const auto* req = TEvLongTxReadRequest::GetProtoRequest(Request); - - if (const TString& internalToken = Request->GetSerializedToken()) { - UserToken.emplace(internalToken); - } - - TString errMsg; - if (!LongTxId.ParseString(req->tx_id(), &errMsg)) { - return ReplyError(Ydb::StatusIds::BAD_REQUEST, errMsg); - } - - Path = req->path(); - SendNavigateRequest(); - } - - void PassAway() override { - Send(LeaderPipeCache, new TEvPipeCache::TEvUnlink(0)); - TBase::PassAway(); - } - -private: - void SendNavigateRequest() { - auto request = MakeHolder<NSchemeCache::TSchemeCacheNavigate>(); - request->DatabaseName = this->DatabaseName; - auto& entry = request->ResultSet.emplace_back(); - entry.Path = ::NKikimr::SplitPath(Path); - entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpPath; - Send(SchemeCache, new TEvTxProxySchemeCache::TEvNavigateKeySet(request.Release())); - Become(&TThis::StateNavigate); - } - - STFUNC(StateNavigate) { - switch (ev->GetTypeRewrite()) { - hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle); - } - } - - void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) { - NSchemeCache::TSchemeCacheNavigate* resp = ev->Get()->Request.Get(); - - if (resp->ErrorCount > 0) { - // TODO: map to a correct error - return ReplyError(Ydb::StatusIds::SCHEME_ERROR, "There was an error during table query"); - } - - auto& entry = resp->ResultSet[0]; - - if (UserToken && entry.SecurityObject) { - const ui32 access = NACLib::SelectRow; - if (!entry.SecurityObject->CheckAccess(access, *UserToken)) { - Request->RaiseIssue(MakeIssue(NKikimrIssues::TIssuesIds::ACCESS_DENIED, TStringBuilder() - << "User has no permission to perform reads from this table" - << " user: " << UserToken->GetUserSID() - << " path: " << Path)); - return ReplyError(Ydb::StatusIds::UNAUTHORIZED); - } - } - - if (entry.Kind != NSchemeCache::TSchemeCacheNavigate::KindColumnTable) { - return ReplyError(Ydb::StatusIds::SCHEME_ERROR, "The specified path is not an column table"); - } - - Y_ABORT_UNLESS(entry.ColumnTableInfo); - Y_ABORT_UNLESS(entry.ColumnTableInfo->Description.HasSharding()); - const auto& sharding = entry.ColumnTableInfo->Description.GetSharding(); - - TableId = entry.TableId.PathId.LocalPathId; - for (ui64 shardId : sharding.GetColumnShards()) { - ShardChunks[shardId] = {}; - } - for (ui64 shardId : sharding.GetAdditionalColumnShards()) { - ShardChunks[shardId] = {}; - } - - if (ShardChunks.empty()) { - return ReplyError(Ydb::StatusIds::SCHEME_ERROR, "No shards to read"); - } - - SendReadRequests(); - } - -private: - void SendReadRequests() { - for (auto& [shard, chunk] : ShardChunks) { - Y_UNUSED(chunk); - SendRequest(shard); - } - Become(&TThis::StateWork); - } - - void SendRequest(ui64 shard) { - Y_ABORT_UNLESS(shard != 0); - Waits.insert(shard); - SendToTablet(shard, MakeRequest()); - } - - STFUNC(StateWork) { - switch (ev->GetTypeRewrite()) { - hFunc(TEvents::TEvUndelivered, Handle); - hFunc(TEvPipeCache::TEvDeliveryProblem, Handle); - hFunc(TEvColumnShard::TEvReadResult, Handle); - } - } - - void Handle(TEvents::TEvUndelivered::TPtr& ev) { - Y_UNUSED(ev); - ReplyError(Ydb::StatusIds::INTERNAL_ERROR, - "Internal error: node pipe cache is not available, check cluster configuration"); - } - - void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) { - ui64 shard = ev->Get()->TabletId; - if (!Waits.contains(shard)) { - return; - } - - if (!ShardRetries.contains(shard)) { - ShardRetries[shard] = 0; - } - - ui32 retries = ++ShardRetries[shard]; - if (retries > MaxRetriesPerShard) { - return ReplyError(Ydb::StatusIds::UNAVAILABLE, Sprintf("Failed to connect to shard %lu", shard)); - } - - SendRequest(shard); - } - - void Handle(TEvColumnShard::TEvReadResult::TPtr& ev) { - const auto& record = Proto(ev->Get()); - ui64 shard = record.GetOrigin(); - ui64 chunk = record.GetBatch(); - bool finished = record.GetFinished(); - - { // Filter duplicates and allow messages reorder - if (!ShardChunks.contains(shard)) { - return ReplyError(Ydb::StatusIds::GENERIC_ERROR, "Response from unexpected shard"); - } - - if (!Waits.contains(shard) || ShardChunks[shard].contains(chunk)) { - return; - } - - if (finished) { - ShardChunkCounts[shard] = chunk + 1; // potential int overflow but pofig - } - - ShardChunks[shard].insert(chunk); - if (ShardChunkCounts.count(shard) && ShardChunkCounts[shard] == ShardChunks[shard].size()) { - Waits.erase(shard); - ShardChunks[shard].clear(); - Send(LeaderPipeCache, new TEvPipeCache::TEvUnlink(shard)); - } - } - - ui32 status = record.GetStatus(); - if (status == NKikimrTxColumnShard::EResultStatus::SUCCESS) { - auto result = MakeResult(OutChunkNumber, Waits.empty()); - if (record.HasData()) { - result->mutable_data()->set_data(record.GetData()); - } - ++OutChunkNumber; - return ReplySuccess(*result); - } - - return ReplyError(Ydb::StatusIds::GENERIC_ERROR, ""); - } - - THolder<TEvColumnShard::TEvRead> MakeRequest() const { - return MakeHolder<TEvColumnShard::TEvRead>( - SelfId(), 0, LongTxId.Snapshot.Step, LongTxId.Snapshot.TxId, TableId); - } - - Ydb::LongTx::ReadResult* MakeResult(ui64 outChunk, bool finished) const { - auto result = TEvLongTxReadRequest::AllocateResult<Ydb::LongTx::ReadResult>(Request); - - const auto* req = TEvLongTxReadRequest::GetProtoRequest(Request); - result->set_tx_id(req->tx_id()); - result->set_path(req->path()); - result->set_chunk(outChunk); - result->set_finished(finished); - return result; - } - -private: - void SendToTablet(ui64 tabletId, THolder<IEventBase> event) { - Send(LeaderPipeCache, new TEvPipeCache::TEvForward(event.Release(), tabletId, true), - IEventHandle::FlagTrackDelivery); - } - - void ReplyError(Ydb::StatusIds::StatusCode status, const TString& message = TString()) { - if (!message.empty()) { - Request->RaiseIssue(NYql::TIssue(message)); - } - Request->ReplyWithYdbStatus(status); - PassAway(); - } - - void ReplySuccess(const Ydb::LongTx::ReadResult& result) { - Request->SendResult(result, Ydb::StatusIds::SUCCESS); - PassAway(); - } - -private: - std::unique_ptr<IRequestOpCtx> Request; - TString DatabaseName; - TActorId SchemeCache; - TActorId LeaderPipeCache; - std::optional<NACLib::TUserToken> UserToken; - TLongTxId LongTxId; - TString Path; - ui64 TableId; - THashMap<ui64, THashSet<ui32>> ShardChunks; - THashMap<ui64, ui32> ShardChunkCounts; - THashMap<ui64, ui32> ShardRetries; - THashSet<ui64> Waits; - ui64 OutChunkNumber; -}; - // void DoLongTxBeginRPC(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f) { @@ -892,9 +646,5 @@ void DoLongTxWriteRPC(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f.RegisterActor(new TLongTxWriteRPC(std::move(p))); } -void DoLongTxReadRPC(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f) { - f.RegisterActor(new TLongTxReadRPC(std::move(p))); -} - } } diff --git a/ydb/core/grpc_services/service_longtx.h b/ydb/core/grpc_services/service_longtx.h index b83667ba628..2d46a639eec 100644 --- a/ydb/core/grpc_services/service_longtx.h +++ b/ydb/core/grpc_services/service_longtx.h @@ -11,7 +11,6 @@ void DoLongTxBeginRPC(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& void DoLongTxCommitRPC(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f); void DoLongTxRollbackRPC(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f); void DoLongTxWriteRPC(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f); -void DoLongTxReadRPC(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f); } } diff --git a/ydb/core/tx/columnshard/columnshard.cpp b/ydb/core/tx/columnshard/columnshard.cpp index 16a7aa44e51..5a5c56105e2 100644 --- a/ydb/core/tx/columnshard/columnshard.cpp +++ b/ydb/core/tx/columnshard/columnshard.cpp @@ -258,7 +258,7 @@ ui64 TColumnShard::MemoryUsage() const { CommitsInFlight.size() * sizeof(TCommitMeta) + LongTxWrites.size() * (sizeof(TWriteId) + sizeof(TLongTxWriteInfo)) + LongTxWritesByUniqueId.size() * (sizeof(TULID) + sizeof(void*)) + - (WaitingReads.size() + WaitingScans.size()) * (sizeof(NOlap::TSnapshot) + sizeof(void*)) + + (WaitingScans.size()) * (sizeof(NOlap::TSnapshot) + sizeof(void*)) + TabletCounters->Simple()[COUNTER_PREPARED_RECORDS].Get() * sizeof(NOlap::TInsertedData) + TabletCounters->Simple()[COUNTER_COMMITTED_RECORDS].Get() * sizeof(NOlap::TInsertedData); memory += TablesManager.GetMemoryUsage(); diff --git a/ydb/core/tx/columnshard/columnshard.h b/ydb/core/tx/columnshard/columnshard.h index e2b665e4361..5e5e6f10a00 100644 --- a/ydb/core/tx/columnshard/columnshard.h +++ b/ydb/core/tx/columnshard/columnshard.h @@ -234,59 +234,6 @@ struct TEvColumnShard { } }; - struct TEvRead : public TEventPB<TEvRead, NKikimrTxColumnShard::TEvRead, TEvColumnShard::EvRead> { - TEvRead() = default; - - TEvRead(const TActorId& source, ui64 metaShard, ui64 planStep, ui64 txId, ui64 tableId = 0) { - ActorIdToProto(source, Record.MutableSource()); - Record.SetTxInitiator(metaShard); - Record.SetPlanStep(planStep); - Record.SetTxId(txId); - Record.SetTableId(tableId); - } - - TActorId GetSource() const { - return ActorIdFromProto(Record.GetSource()); - } - }; - - struct TEvReadResult : public TEventPB<TEvReadResult, NKikimrTxColumnShard::TEvReadResult, - TEvColumnShard::EvReadResult> { - TEvReadResult() = default; - - TEvReadResult(ui64 origin, ui64 metaShard, ui64 planStep, ui64 txId, ui64 tableId, ui32 batch, - bool finished, ui32 status) { - Record.SetOrigin(origin); - Record.SetTxInitiator(metaShard); - Record.SetPlanStep(planStep); - Record.SetTxId(txId); - Record.SetTableId(tableId); - Record.SetBatch(batch); - Record.SetFinished(finished); - Record.SetStatus(status); - } - - TEvReadResult(const TEvReadResult& ev) { - Record.CopyFrom(ev.Record); - } - - std::shared_ptr<arrow::RecordBatch> GetArrowBatch() const { - const auto& scheme = Record.GetMeta().GetSchema(); - if (scheme.empty() || Record.GetMeta().GetFormat() != NKikimrTxColumnShard::FORMAT_ARROW) { - return nullptr; - } - const auto arrowSchema = NArrow::DeserializeSchema(scheme); - if (Record.GetData().empty()) { - return NArrow::MakeEmptyBatch(arrowSchema); - } - return NArrow::DeserializeBatch(Record.GetData(), arrowSchema); - } - - bool HasMore() const { - return !Record.GetFinished(); - } - }; - using TEvScan = TEvDataShard::TEvKqpScan; }; @@ -306,18 +253,10 @@ inline auto& Proto(TEvColumnShard::TEvWrite* ev) { return ev->Record; } -inline auto& Proto(TEvColumnShard::TEvRead* ev) { - return ev->Record; -} - inline auto& Proto(TEvColumnShard::TEvWriteResult* ev) { return ev->Record; } -inline auto& Proto(TEvColumnShard::TEvReadResult* ev) { - return ev->Record; -} - inline TMessageSeqNo SeqNoFromProto(const NKikimrTxColumnShard::TSchemaSeqNo& proto) { return TMessageSeqNo(proto.GetGeneration(), proto.GetRound()); } diff --git a/ydb/core/tx/columnshard/columnshard__read.cpp b/ydb/core/tx/columnshard/columnshard__read.cpp deleted file mode 100644 index ae4455099c2..00000000000 --- a/ydb/core/tx/columnshard/columnshard__read.cpp +++ /dev/null @@ -1,165 +0,0 @@ -#include "engines/reader/description.h" - -#include <ydb/core/tx/columnshard/columnshard_impl.h> -#include <ydb/core/tx/columnshard/columnshard_private_events.h> -#include <ydb/core/tx/columnshard/columnshard_schema.h> -#include <ydb/core/tx/columnshard/columnshard__read_base.h> -#include <ydb/core/tx/columnshard/columnshard__index_scan.h> - -namespace NKikimr::NColumnShard { - -namespace { - -template <typename T, typename U> -std::vector<T> ProtoToVector(const U& cont) { - return std::vector<T>(cont.begin(), cont.end()); -} - -} - -using namespace NTabletFlatExecutor; - -class TTxRead : public TTxReadBase { -public: - TTxRead(TColumnShard* self, TEvColumnShard::TEvRead::TPtr& ev) - : TTxReadBase(self) - , Ev(ev) - , TabletTxNo(++Self->TabletTxCounter) - {} - - bool Execute(TTransactionContext& txc, const TActorContext& ctx) override; - void Complete(const TActorContext& ctx) override; - TTxType GetTxType() const override { return TXTYPE_READ; } - -private: - TEvColumnShard::TEvRead::TPtr Ev; - const ui32 TabletTxNo; - std::unique_ptr<TEvColumnShard::TEvReadResult> Result; - NOlap::TReadMetadata::TConstPtr ReadMetadata; - - TStringBuilder TxPrefix() const { - return TStringBuilder() << "TxRead[" << ToString(TabletTxNo) << "] "; - } - - TString TxSuffix() const { - return TStringBuilder() << " at tablet " << Self->TabletID(); - } -}; - - -bool TTxRead::Execute(TTransactionContext& txc, const TActorContext& /*ctx*/) { - Y_ABORT_UNLESS(Ev); - Y_ABORT_UNLESS(Self->TablesManager.HasPrimaryIndex()); - Y_UNUSED(txc); - LOG_S_DEBUG(TxPrefix() << "execute" << TxSuffix()); - - txc.DB.NoMoreReadsForTx(); - - auto& record = Proto(Ev->Get()); - ui64 metaShard = record.GetTxInitiator(); - - NOlap::TReadDescription read(NOlap::TSnapshot(record.GetPlanStep(), record.GetTxId()), false); - read.PathId = record.GetTableId(); - read.ReadNothing = !(Self->TablesManager.HasTable(read.PathId)); - read.ColumnIds = ProtoToVector<ui32>(record.GetColumnIds()); - read.ColumnNames = ProtoToVector<TString>(record.GetColumnNames()); - - const NOlap::TIndexInfo& indexInfo = Self->TablesManager.GetIndexInfo(read.GetSnapshot()); - if (read.ColumnIds.empty() && read.ColumnNames.empty()) { - auto allColumnNames = indexInfo.ArrowSchema()->field_names(); - read.ColumnNames.assign(allColumnNames.begin(), allColumnNames.end()); - } - - std::shared_ptr<NOlap::TPredicate> fromPredicate; - std::shared_ptr<NOlap::TPredicate> toPredicate; - if (record.HasGreaterPredicate()) { - auto& proto = record.GetGreaterPredicate(); - auto schema = indexInfo.ArrowSchema(ProtoToVector<TString>(proto.GetColumnNames())); - fromPredicate = std::make_shared<NOlap::TPredicate>( - proto.GetInclusive() ? NArrow::EOperation::GreaterEqual : NArrow::EOperation::Greater, proto.GetRow(), schema); - } - if (record.HasLessPredicate()) { - auto& proto = record.GetLessPredicate(); - auto schema = indexInfo.ArrowSchema(ProtoToVector<TString>(proto.GetColumnNames())); - toPredicate = std::make_shared<NOlap::TPredicate>( - proto.GetInclusive() ? NArrow::EOperation::LessEqual : NArrow::EOperation::Less, proto.GetRow(), schema); - } - Y_ABORT_UNLESS(read.PKRangesFilter.Add(fromPredicate, toPredicate, &indexInfo)); - - bool parseResult = ParseProgram(record.GetOlapProgramType(), record.GetOlapProgram(), read, - TIndexColumnResolver(indexInfo)); - - std::shared_ptr<NOlap::TReadMetadata> metadata; - if (parseResult) { - metadata = PrepareReadMetadata(read, Self->InsertTable, Self->TablesManager.GetPrimaryIndex(), - ErrorDescription, false); - } - - ui32 status = NKikimrTxColumnShard::EResultStatus::ERROR; - - if (metadata) { - ReadMetadata = metadata; - status = NKikimrTxColumnShard::EResultStatus::SUCCESS; - } - - Result = std::make_unique<TEvColumnShard::TEvReadResult>( - Self->TabletID(), metaShard, read.GetSnapshot().GetPlanStep(), read.GetSnapshot().GetTxId(), read.PathId, 0, true, status); - - if (status == NKikimrTxColumnShard::EResultStatus::SUCCESS) { - Self->IncCounter(COUNTER_READ_SUCCESS); - } else { - Self->IncCounter(COUNTER_READ_FAIL); - } - return true; -} - -void TTxRead::Complete(const TActorContext& ctx) { - Y_ABORT_UNLESS(Ev); - Y_ABORT_UNLESS(Result); - - bool noData = !ReadMetadata || ReadMetadata->Empty(); - bool success = (Proto(Result.get()).GetStatus() == NKikimrTxColumnShard::EResultStatus::SUCCESS); - - if (!success) { - LOG_S_DEBUG(TxPrefix() << "complete. Error " << ErrorDescription << " while reading" << TxSuffix()); - ctx.Send(Ev->Get()->GetSource(), Result.release()); - } else if (noData) { - LOG_S_DEBUG(TxPrefix() << "complete. Empty result" << TxSuffix()); - ctx.Send(Ev->Get()->GetSource(), Result.release()); - } else { - LOG_S_DEBUG(TxPrefix() << "complete" << TxSuffix() << " Metadata: " << *ReadMetadata); - - const ui64 requestCookie = Self->InFlightReadsTracker.AddInFlightRequest( - std::static_pointer_cast<const NOlap::TReadMetadataBase>(ReadMetadata)); - auto statsDelta = Self->InFlightReadsTracker.GetSelectStatsDelta(); - - Self->IncCounter(COUNTER_READ_INDEX_PORTIONS, statsDelta.Portions); - Self->IncCounter(COUNTER_READ_INDEX_BLOBS, statsDelta.Blobs); - Self->IncCounter(COUNTER_READ_INDEX_ROWS, statsDelta.Rows); - Self->IncCounter(COUNTER_READ_INDEX_BYTES, statsDelta.Bytes); - - TInstant deadline = TInstant::Max(); // TODO - ctx.Register(CreateReadActor(Self->TabletID(), Self->SelfId(), Ev->Get()->GetSource(), - Self->GetStoragesManager(), std::move(Result), ReadMetadata, deadline, Self->SelfId(), requestCookie, Self->ReadCounters)); - } -} - - -void TColumnShard::Handle(TEvColumnShard::TEvRead::TPtr& ev, const TActorContext& ctx) { - LastAccessTime = TAppData::TimeProvider->Now(); - - const auto* msg = ev->Get(); - NOlap::TSnapshot readVersion(msg->Record.GetPlanStep(), msg->Record.GetTxId()); - NOlap::TSnapshot maxReadVersion = GetMaxReadVersion(); - LOG_S_DEBUG("Read at tablet " << TabletID() << " version=" << readVersion << " readable=" << maxReadVersion); - - if (maxReadVersion < readVersion) { - WaitingReads.emplace(readVersion, std::move(ev)); - WaitPlanStep(readVersion.GetPlanStep()); - return; - } - - Execute(new TTxRead(this, ev), ctx); -} - -} diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index 4098d2b00ca..658b7c67c4a 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -257,14 +257,6 @@ void TColumnShard::SendWaitPlanStep(ui64 step) { void TColumnShard::RescheduleWaitingReads() { ui64 minWaitingStep = Max<ui64>(); NOlap::TSnapshot maxReadVersion = GetMaxReadVersion(); - for (auto it = WaitingReads.begin(); it != WaitingReads.end();) { - if (maxReadVersion < it->first) { - minWaitingStep = Min(minWaitingStep, it->first.GetPlanStep()); - break; - } - TActivationContext::Send(it->second.Release()); - it = WaitingReads.erase(it); - } for (auto it = WaitingScans.begin(); it != WaitingScans.end();) { if (maxReadVersion < it->first) { minWaitingStep = Min(minWaitingStep, it->first.GetPlanStep()); diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h index 10d3e177e8e..65148356043 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.h +++ b/ydb/core/tx/columnshard/columnshard_impl.h @@ -55,13 +55,6 @@ class TOperationsManager; extern bool gAllowLogBatchingDefaultValue; IActor* CreateWriteActor(ui64 tabletId, IWriteController::TPtr writeController, const TInstant deadline); -IActor* CreateReadActor(ui64 tabletId, const NActors::TActorId readBlobsActor, - const TActorId& dstActor, const std::shared_ptr<NOlap::IStoragesManager>& storages, - std::unique_ptr<TEvColumnShard::TEvReadResult>&& event, - NOlap::TReadMetadata::TConstPtr readMetadata, - const TInstant& deadline, - const TActorId& columnShardActorId, - ui64 requestCookie, const TConcreteScanCounters& counters); IActor* CreateColumnShardScan(const TActorId& scanComputeActor, ui32 scanId, ui64 txId); struct TSettings { @@ -151,7 +144,6 @@ class TColumnShard void Handle(TEvColumnShard::TEvNotifyTxCompletion::TPtr& ev, const TActorContext& ctx); void Handle(TEvTxProcessing::TEvPlanStep::TPtr& ev, const TActorContext& ctx); void Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContext& ctx); - void Handle(TEvColumnShard::TEvRead::TPtr& ev, const TActorContext& ctx); void Handle(TEvColumnShard::TEvScan::TPtr& ev, const TActorContext& ctx); void Handle(TEvMediatorTimecast::TEvRegisterTabletResult::TPtr& ev, const TActorContext& ctx); void Handle(TEvMediatorTimecast::TEvNotifyPlanStep::TPtr& ev, const TActorContext& ctx); @@ -264,7 +256,6 @@ protected: HFunc(TEvColumnShard::TEvScan, Handle); HFunc(TEvTxProcessing::TEvPlanStep, Handle); HFunc(TEvColumnShard::TEvWrite, Handle); - HFunc(TEvColumnShard::TEvRead, Handle); HFunc(TEvPrivate::TEvWriteBlobsResult, Handle); HFunc(TEvMediatorTimecast::TEvRegisterTabletResult, Handle); HFunc(TEvMediatorTimecast::TEvNotifyPlanStep, Handle); @@ -428,7 +419,6 @@ private: THashMap<TWriteId, TLongTxWriteInfo> LongTxWrites; using TPartsForLTXShard = THashMap<ui32, TLongTxWriteInfo*>; THashMap<TULID, TPartsForLTXShard> LongTxWritesByUniqueId; - TMultiMap<NOlap::TSnapshot, TEvColumnShard::TEvRead::TPtr> WaitingReads; TMultiMap<NOlap::TSnapshot, TEvColumnShard::TEvScan::TPtr> WaitingScans; TBackgroundController BackgroundController; TSettings Settings; diff --git a/ydb/core/tx/columnshard/read_actor.cpp b/ydb/core/tx/columnshard/read_actor.cpp deleted file mode 100644 index 475c2a34267..00000000000 --- a/ydb/core/tx/columnshard/read_actor.cpp +++ /dev/null @@ -1,238 +0,0 @@ -#include <ydb/core/tx/columnshard/columnshard_impl.h> -#include <ydb/core/tx/columnshard/blob_cache.h> -#include <ydb/core/tx/columnshard/blobs_action/abstract/storages_manager.h> -#include <ydb/core/tx/columnshard/blobs_reader/events.h> -#include <ydb/core/tx/conveyor/usage/events.h> -#include <ydb/library/actors/core/actor_bootstrapped.h> -#include "blobs_reader/actor.h" -#include "engines/reader/read_context.h" -#include "resource_subscriber/actor.h" -#include "blobs_reader/read_coordinator.h" - -namespace NKikimr::NColumnShard { -namespace { - -class TReadActor : public TActorBootstrapped<TReadActor> { -private: - void BuildResult(const TActorContext& ctx) { - auto ready = IndexedData->ExtractReadyResults(Max<i64>()); - LOG_S_TRACE("Ready results with " << ready.size() << " batches at tablet " << TabletId << " (read)"); - if (ready.empty()) { - if (IndexedData->IsFinished()) { - SendResult(ctx, nullptr, true); - } - } else { - size_t next = 1; - for (auto it = ready.begin(); it != ready.end(); ++it, ++next) { - const bool lastOne = IndexedData->IsFinished() && (next == ready.size()); - SendResult(ctx, it->GetResultBatchPtrVerified(), lastOne); - } - } - } -public: - static constexpr NKikimrServices::TActivity::EType ActorActivityType() { - return NKikimrServices::TActivity::TX_COLUMNSHARD_READ_ACTOR; - } - - TReadActor(ui64 tabletId, const NActors::TActorId readBlobsActor, - const std::shared_ptr<NOlap::IStoragesManager>& storages, - const TActorId& dstActor, - std::unique_ptr<TEvColumnShard::TEvReadResult>&& event, - NOlap::TReadMetadata::TConstPtr readMetadata, - const TInstant& deadline, - const TActorId& columnShardActorId, - ui64 requestCookie, const TConcreteScanCounters& counters) - : TabletId(tabletId) - , ReadBlobsActor(readBlobsActor) - , Storages(storages) - , DstActor(dstActor) - , BlobCacheActorId(NBlobCache::MakeBlobCacheServiceId()) - , Result(std::move(event)) - , ReadMetadata(readMetadata) - , Deadline(deadline) - , ColumnShardActorId(columnShardActorId) - , RequestCookie(requestCookie) - , ReturnedBatchNo(0) - , Counters(counters) - {} - - void Handle(NConveyor::TEvExecution::TEvTaskProcessedResult::TPtr& ev, const TActorContext& ctx) { - if (ev->Get()->GetErrorMessage()) { - ACFL_DEBUG("event", "TEvTaskProcessedResult")("error", ev->Get()->GetErrorMessage()); - SendErrorResult(ctx, NKikimrTxColumnShard::EResultStatus::ERROR); - return DieFinished(ctx); - } else { - ACFL_DEBUG("event", "TEvTaskProcessedResult"); - auto t = static_pointer_cast<IDataTasksProcessor::ITask>(ev->Get()->GetResult()); - Y_DEBUG_ABORT_UNLESS(dynamic_pointer_cast<IDataTasksProcessor::ITask>(ev->Get()->GetResult())); - if (!IndexedData->IsFinished()) { - Y_ABORT_UNLESS(t->Apply(*IndexedData)); - } - BuildResult(ctx); - if (IndexedData->IsFinished()) { - DieFinished(ctx); - } - } - } - - void Handle(TEvents::TEvWakeup::TPtr& ev, const TActorContext& ctx) { - Y_UNUSED(ev); - LOG_S_INFO("TEvWakeup: read timeout at tablet " << TabletId << " (read)"); - - SendTimeouts(ctx); - DieFinished(ctx); - } - - void SendErrorResult(const TActorContext& ctx, NKikimrTxColumnShard::EResultStatus status) { - Y_ABORT_UNLESS(status != NKikimrTxColumnShard::EResultStatus::SUCCESS); - SendResult(ctx, {}, true, status); - IndexedData->Abort(); - } - - void SendResult(const TActorContext& ctx, const std::shared_ptr<arrow::RecordBatch>& batch, bool finished = false, - NKikimrTxColumnShard::EResultStatus status = NKikimrTxColumnShard::EResultStatus::SUCCESS) { - auto chunkEvent = std::make_unique<TEvColumnShard::TEvReadResult>(*Result); - auto& proto = Proto(chunkEvent.get()); - - TString data; - if (batch && batch->num_rows()) { - data = NArrow::SerializeBatchNoCompression(batch); - - auto metadata = proto.MutableMeta(); - metadata->SetFormat(NKikimrTxColumnShard::FORMAT_ARROW); - metadata->SetSchema(GetSerializedSchema(batch)); - if (status == NKikimrTxColumnShard::EResultStatus::SUCCESS) { - Y_ABORT_UNLESS(!data.empty()); - } - } - - proto.SetBatch(ReturnedBatchNo); - proto.SetData(data); - proto.SetStatus(status); - proto.SetFinished(finished); - ++ReturnedBatchNo; - - if (finished) { - auto stats = ReadMetadata->ReadStats; - auto* protoStats = proto.MutableMeta()->MutableReadStats(); - protoStats->SetBeginTimestamp(stats->BeginTimestamp.MicroSeconds()); - protoStats->SetDurationUsec(stats->Duration().MicroSeconds()); - protoStats->SetSelectedIndex(stats->SelectedIndex); - protoStats->SetIndexGranules(stats->IndexGranules); - protoStats->SetIndexPortions(stats->IndexPortions); - protoStats->SetIndexBatches(stats->IndexBatches); - protoStats->SetNotIndexedBatches(stats->CommittedBatches); - protoStats->SetSchemaColumns(stats->SchemaColumns); - protoStats->SetFilterColumns(stats->FilterColumns); - protoStats->SetAdditionalColumns(stats->AdditionalColumns); - protoStats->SetDataFilterBytes(stats->DataFilterBytes); - protoStats->SetDataAdditionalBytes(stats->DataAdditionalBytes); - protoStats->SetCompactedPortionsBytes(stats->CompactedPortionsBytes); - protoStats->SetInsertedPortionsBytes(stats->InsertedPortionsBytes); - protoStats->SetCommittedPortionsBytes(stats->CommittedPortionsBytes); - protoStats->SetSelectedRows(stats->SelectedRows); - } - - if (Deadline != TInstant::Max()) { - TInstant now = TAppData::TimeProvider->Now(); - if (Deadline <= now) { - proto.SetStatus(NKikimrTxColumnShard::EResultStatus::TIMEOUT); - } - } - - ctx.Send(DstActor, chunkEvent.release()); - } - - void DieFinished(const TActorContext& ctx) { - if (IndexedData->IsFinished()) { - LOG_S_DEBUG("Finished read (with " << ReturnedBatchNo << " batches sent) at tablet " << TabletId); - Send(ColumnShardActorId, new TEvPrivate::TEvReadFinished(RequestCookie)); - Die(ctx); - } - } - - virtual void PassAway() override { - Send(ResourceSubscribeActorId, new TEvents::TEvPoisonPill); - IActor::PassAway(); - } - - void Bootstrap(const TActorContext& ctx) { - ResourceSubscribeActorId = ctx.Register(new NOlap::NResourceBroker::NSubscribe::TActor(TabletId, SelfId())); - ReadCoordinatorActorId = ctx.Register(new NOlap::NBlobOperations::NRead::TReadCoordinatorActor(TabletId, SelfId())); - IndexedData = ReadMetadata->BuildReader(std::make_shared<NOlap::TReadContext>(Storages, Counters, true, ReadMetadata, SelfId(), ResourceSubscribeActorId, ReadCoordinatorActorId, NOlap::TComputeShardingPolicy())); - LOG_S_DEBUG("Starting read (" << IndexedData->DebugString(false) << ") at tablet " << TabletId); - - bool earlyExit = false; - if (Deadline != TInstant::Max()) { - TInstant now = TAppData::TimeProvider->Now(); - if (Deadline <= now) { - earlyExit = true; - } else { - const TDuration timeout = Deadline - now; - ctx.Schedule(timeout, new TEvents::TEvWakeup()); - } - } - - if (earlyExit) { - SendTimeouts(ctx); - ctx.Send(SelfId(), new TEvents::TEvPoisonPill()); - } else { - while (IndexedData->ReadNextInterval()) { - } - BuildResult(ctx); - } - - Become(&TThis::StateWait); - } - - void SendTimeouts(const TActorContext& ctx) { - SendErrorResult(ctx, NKikimrTxColumnShard::EResultStatus::TIMEOUT); - } - - STFUNC(StateWait) { - switch (ev->GetTypeRewrite()) { - HFunc(NConveyor::TEvExecution::TEvTaskProcessedResult, Handle); - HFunc(TEvents::TEvWakeup, Handle); - default: - break; - } - } - -private: - ui64 TabletId; - TActorId ReadBlobsActor; - std::shared_ptr<NOlap::IStoragesManager> Storages; - TActorId DstActor; - TActorId BlobCacheActorId; - std::unique_ptr<TEvColumnShard::TEvReadResult> Result; - TActorId ResourceSubscribeActorId; - TActorId ReadCoordinatorActorId; - NOlap::TReadMetadata::TConstPtr ReadMetadata; - std::shared_ptr<NOlap::IDataReader> IndexedData; - TInstant Deadline; - TActorId ColumnShardActorId; - const ui64 RequestCookie; - ui32 ReturnedBatchNo; - const TConcreteScanCounters Counters; - mutable TString SerializedSchema; - - TString GetSerializedSchema(const std::shared_ptr<arrow::RecordBatch>& batch) const { - return NArrow::SerializeSchema(*batch->schema()); - } -}; - -} // namespace - -IActor* CreateReadActor(ui64 tabletId, - const NActors::TActorId readBlobsActor, const TActorId& dstActor, const std::shared_ptr<NOlap::IStoragesManager>& storages, - std::unique_ptr<TEvColumnShard::TEvReadResult>&& event, - NOlap::TReadMetadata::TConstPtr readMetadata, - const TInstant& deadline, - const TActorId& columnShardActorId, - ui64 requestCookie, const TConcreteScanCounters& counters) -{ - return new TReadActor(tabletId, readBlobsActor, storages, dstActor, std::move(event), readMetadata, - deadline, columnShardActorId, requestCookie, counters); -} - -} diff --git a/ydb/core/tx/columnshard/ya.make b/ydb/core/tx/columnshard/ya.make index 22177da5be3..04443739ee0 100644 --- a/ydb/core/tx/columnshard/ya.make +++ b/ydb/core/tx/columnshard/ya.make @@ -11,7 +11,6 @@ SRCS( columnshard__progress_tx.cpp columnshard__propose_cancel.cpp columnshard__propose_transaction.cpp - columnshard__read.cpp columnshard__read_base.cpp columnshard__scan.cpp columnshard__index_scan.cpp @@ -26,7 +25,6 @@ SRCS( columnshard_view.cpp counters.cpp defs.cpp - read_actor.cpp write_actor.cpp tables_manager.cpp tx_controller.cpp diff --git a/ydb/public/api/grpc/draft/ydb_long_tx_v1.proto b/ydb/public/api/grpc/draft/ydb_long_tx_v1.proto index 581ec581bdb..7f920b44af3 100644 --- a/ydb/public/api/grpc/draft/ydb_long_tx_v1.proto +++ b/ydb/public/api/grpc/draft/ydb_long_tx_v1.proto @@ -10,6 +10,5 @@ service LongTxService { rpc CommitTx(CommitTransactionRequest) returns (CommitTransactionResponse); rpc RollbackTx(RollbackTransactionRequest) returns (RollbackTransactionResponse); rpc Write(WriteRequest) returns (WriteResponse); - rpc Read(ReadRequest) returns (ReadResponse); // rpc ResolveNodes(ResolveNodesRequest) returns (stream ResolveNodesResponse); } diff --git a/ydb/public/api/protos/draft/ydb_long_tx.proto b/ydb/public/api/protos/draft/ydb_long_tx.proto index fd22ec6b13f..d5b658595fd 100644 --- a/ydb/public/api/protos/draft/ydb_long_tx.proto +++ b/ydb/public/api/protos/draft/ydb_long_tx.proto @@ -87,24 +87,3 @@ message WriteResult { message WriteResponse { Ydb.Operations.Operation operation = 1; } - -message ReadRequest { - Ydb.Operations.OperationParams operation_params = 1; - string tx_id = 2; - string path = 3; - oneof query { - string sql = 11; - } -} - -message ReadResult { - string tx_id = 1; - string path = 2; - uint64 chunk = 3; - bool finished = 4; - Data data = 5; -} - -message ReadResponse { - Ydb.Operations.Operation operation = 1; -} diff --git a/ydb/public/sdk/cpp/client/draft/ydb_long_tx.cpp b/ydb/public/sdk/cpp/client/draft/ydb_long_tx.cpp index 8368fb84971..06ef07b1113 100644 --- a/ydb/public/sdk/cpp/client/draft/ydb_long_tx.cpp +++ b/ydb/public/sdk/cpp/client/draft/ydb_long_tx.cpp @@ -77,19 +77,6 @@ public: TRpcRequestSettings::Make(settings)); } - TAsyncReadResult Read(const TString& txId, const TString& table, - const TOpSettings& settings = TOpSettings()) { - auto request = MakeOperationRequest<Ydb::LongTx::ReadRequest>(settings); - request.set_tx_id(txId); - request.set_path(table); - // TODO: query - - return RunOperation<Ydb::LongTx::V1::LongTxService, - Ydb::LongTx::ReadRequest, Ydb::LongTx::ReadResponse, TLongTxReadResult>( - std::move(request), - &Ydb::LongTx::V1::LongTxService::Stub::AsyncRead, - TRpcRequestSettings::Make(settings)); - } }; TClient::TClient(const TDriver& driver, const TClientSettings& settings) @@ -100,10 +87,6 @@ TClient::TAsyncBeginTxResult TClient::BeginWriteTx() { return Impl_->BeginTx(Ydb::LongTx::BeginTransactionRequest::WRITE); } -TClient::TAsyncBeginTxResult TClient::BeginReadTx() { - return Impl_->BeginTx(Ydb::LongTx::BeginTransactionRequest::READ); -} - TClient::TAsyncCommitTxResult TClient::CommitTx(const TString& txId) { return Impl_->CommitTx(txId); } @@ -117,9 +100,6 @@ TClient::TAsyncWriteResult TClient::Write(const TString& txId, const TString& ta return Impl_->Write(txId, table, dedupId, data, format); } -TClient::TAsyncReadResult TClient::Read(const TString& txId, const TString& table) { - return Impl_->Read(txId, table); -} } // namespace NLongTx } // namespace NYdb diff --git a/ydb/public/sdk/cpp/client/draft/ydb_long_tx.h b/ydb/public/sdk/cpp/client/draft/ydb_long_tx.h index 978a14527e8..e749d0348ab 100644 --- a/ydb/public/sdk/cpp/client/draft/ydb_long_tx.h +++ b/ydb/public/sdk/cpp/client/draft/ydb_long_tx.h @@ -75,23 +75,6 @@ public: } }; -class TLongTxReadResult : public TOperation { -public: - explicit TLongTxReadResult(TStatus&& status) - : TOperation(std::move(status)) - {} - - TLongTxReadResult(TStatus&& status, Ydb::Operations::Operation&& operation) - : TOperation(std::move(status), std::move(operation)) - {} - - Ydb::LongTx::ReadResult GetResult() { - Ydb::LongTx::ReadResult result; - GetProto().result().UnpackTo(&result); - return result; - } -}; - struct TClientSettings : public TCommonClientSettingsBase<TClientSettings> { using TSelf = TClientSettings; }; @@ -102,17 +85,14 @@ public: using TAsyncCommitTxResult = NThreading::TFuture<TLongTxCommitResult>; using TAsyncRollbackTxResult = NThreading::TFuture<TLongTxRollbackResult>; using TAsyncWriteResult = NThreading::TFuture<TLongTxWriteResult>; - using TAsyncReadResult = NThreading::TFuture<TLongTxReadResult>; TClient(const TDriver& driver, const TClientSettings& settings = TClientSettings()); TAsyncBeginTxResult BeginWriteTx(); - TAsyncBeginTxResult BeginReadTx(); TAsyncCommitTxResult CommitTx(const TString& txId); TAsyncRollbackTxResult RollbackTx(const TString& txId); TAsyncWriteResult Write(const TString& txId, const TString& table, const TString& dedupId, const TString& data, Ydb::LongTx::Data::Format format); - TAsyncReadResult Read(const TString& txId, const TString& table); private: class TImpl; diff --git a/ydb/services/ydb/ydb_long_tx.cpp b/ydb/services/ydb/ydb_long_tx.cpp index 2656d22471c..643d95b858a 100644 --- a/ydb/services/ydb/ydb_long_tx.cpp +++ b/ydb/services/ydb/ydb_long_tx.cpp @@ -28,7 +28,6 @@ void TGRpcYdbLongTxService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) { ADD_REQUEST(CommitTx, CommitTransaction, DoLongTxCommitRPC) ADD_REQUEST(RollbackTx, RollbackTransaction, DoLongTxRollbackRPC) ADD_REQUEST(Write, Write, DoLongTxWriteRPC) - ADD_REQUEST(Read, Read, DoLongTxReadRPC) #undef ADD_REQUEST } diff --git a/ydb/services/ydb/ydb_long_tx_ut.cpp b/ydb/services/ydb/ydb_long_tx_ut.cpp index b46b933a12a..ac1015b491c 100644 --- a/ydb/services/ydb/ydb_long_tx_ut.cpp +++ b/ydb/services/ydb/ydb_long_tx_ut.cpp @@ -21,90 +21,6 @@ TString TestBlob() { return NArrow::NSerialization::TFullDataSerializer(arrow::ipc::IpcWriteOptions::Defaults()).Serialize(batch); } -TVector<std::shared_ptr<arrow::RecordBatch>> SplitData(const TString& data, ui32 numBatches) { - auto batch = NArrow::NSerialization::TFullDataDeserializer().Deserialize(data); - UNIT_ASSERT(batch.ok()); - - NSharding::TLogsSharding sharding(numBatches, { "timestamp", "uid" }, numBatches); - std::vector<ui32> rowSharding = sharding.MakeSharding(*batch); - Y_ABORT_UNLESS(rowSharding.size() == (size_t)batch->get()->num_rows()); - - std::vector<std::shared_ptr<arrow::RecordBatch>> sharded = NArrow::ShardingSplit(*batch, rowSharding, numBatches); - Y_ABORT_UNLESS(sharded.size() == numBatches); - - TVector<std::shared_ptr<arrow::RecordBatch>> out; - for (size_t i = 0; i < numBatches; ++i) { - if (sharded[i]) { - Y_ABORT_UNLESS(sharded[i]->ValidateFull().ok()); - out.emplace_back(sharded[i]); - } - } - - return out; -} - -bool EqualBatches(const TString& x, const TString& y) { - auto schema = TTestOlap::ArrowSchema(); - std::shared_ptr<arrow::RecordBatch> batchX = NArrow::DeserializeBatch(x, schema); - std::shared_ptr<arrow::RecordBatch> batchY = NArrow::DeserializeBatch(y, schema); - Y_ABORT_UNLESS(batchX && batchY); - if ((batchX->num_columns() != batchY->num_columns()) || - (batchX->num_rows() != batchY->num_rows())) { - Cerr << __FILE__ << ':' << __LINE__ << " " - << batchX->num_columns() << ':' << batchX->num_rows() << " vs " - << batchY->num_columns() << ':' << batchY->num_rows() << "\n"; - return false; - } - - for (auto& column : schema->field_names()) { - auto filedX = batchX->schema()->GetFieldByName(column); - auto filedY = batchY->schema()->GetFieldByName(column); - Y_ABORT_UNLESS(filedX->type()->id() == filedY->type()->id()); - - auto arrX = batchX->GetColumnByName(column); - auto arrY = batchY->GetColumnByName(column); - - switch (filedX->type()->id()) { - case arrow::Type::INT32: - if (!NArrow::ArrayEqualValue<arrow::Int32Array>(arrX, arrY)) { - Cerr << __FILE__ << ':' << __LINE__ << " " << column << "\n"; - return false; - } - break; - case arrow::Type::UINT64: - if (!NArrow::ArrayEqualValue<arrow::UInt64Array>(arrX, arrY)) { - Cerr << __FILE__ << ':' << __LINE__ << " " << column << "\n"; - return false; - } - break; - case arrow::Type::TIMESTAMP: - if (!NArrow::ArrayEqualValue<arrow::TimestampArray>(arrX, arrY)) { - Cerr << __FILE__ << ':' << __LINE__ << " " << column << "\n"; - return false; - } - break; - case arrow::Type::BINARY: - if (!NArrow::ArrayEqualView<arrow::BinaryArray>(arrX, arrY)) { - Cerr << __FILE__ << ':' << __LINE__ << " " << column << "\n"; - return false; - } - break; - case arrow::Type::STRING: - if (!NArrow::ArrayEqualView<arrow::StringArray>(arrX, arrY)) { - Cerr << __FILE__ << ':' << __LINE__ << " " << column << "\n"; - return false; - } - break; - - default: - Cerr << __FILE__ << ':' << __LINE__ << "\n"; - return false; - } - } - - return true; -} - } @@ -164,166 +80,6 @@ Y_UNIT_TEST_SUITE(YdbLongTx) { UNIT_ASSERT_VALUES_EQUAL(resRollbackTx.Status().GetStatus(), EStatus::SUCCESS); } - Y_UNIT_TEST(BeginRead) { - NKikimrConfig::TAppConfig appConfig; - TKikimrWithGrpcAndRootSchema server(appConfig); - server.Server_->GetRuntime()->SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NActors::NLog::PRI_DEBUG); - - ui16 grpc = server.GetPort(); - TString location = TStringBuilder() << "localhost:" << grpc; - auto connection = NYdb::TDriver(TDriverConfig().SetEndpoint(location)); - - TTestOlap::CreateTable(*server.ServerSettings); - - NYdb::NLongTx::TClient client(connection); - - NLongTx::TLongTxBeginResult resBeginTx = client.BeginReadTx().GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resBeginTx.Status().GetStatus(), EStatus::SUCCESS); - - auto txId = resBeginTx.GetResult().tx_id(); - - NLongTx::TLongTxReadResult resRead = client.Read(txId, TestTablePath).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resRead.Status().GetStatus(), EStatus::SUCCESS); - UNIT_ASSERT_VALUES_EQUAL(resRead.GetResult().data().data(), ""); - } - - Y_UNIT_TEST(WriteThenRead) { - NKikimrConfig::TAppConfig appConfig; - TKikimrWithGrpcAndRootSchema server(appConfig); - server.Server_->GetRuntime()->SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NActors::NLog::PRI_DEBUG); - - ui16 grpc = server.GetPort(); - TString location = TStringBuilder() << "localhost:" << grpc; - auto connection = NYdb::TDriver(TDriverConfig().SetEndpoint(location)); - - TTestOlap::CreateTable(*server.ServerSettings); - - NYdb::NLongTx::TClient client(connection); - - // Read before write - TString beforeWriteTxId; - { - NLongTx::TLongTxBeginResult resBeginTx = client.BeginReadTx().GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resBeginTx.Status().GetStatus(), EStatus::SUCCESS); - - beforeWriteTxId = resBeginTx.GetResult().tx_id(); - - NLongTx::TLongTxReadResult resRead = client.Read(beforeWriteTxId, TestTablePath).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resRead.Status().GetStatus(), EStatus::SUCCESS); - UNIT_ASSERT_VALUES_EQUAL(resRead.GetResult().data().data(), ""); - } - - // Write - auto batch = TTestOlap::SampleBatch(false, 10000); - const TString data = NArrow::NSerialization::TFullDataSerializer(arrow::ipc::IpcWriteOptions::Defaults()).Serialize(batch); - - { - NLongTx::TLongTxBeginResult resBeginTx = client.BeginWriteTx().GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resBeginTx.Status().GetStatus(), EStatus::SUCCESS); - - auto txId = resBeginTx.GetResult().tx_id(); - - NLongTx::TLongTxWriteResult resWrite = - client.Write(txId, TestTablePath, "0", data, Ydb::LongTx::Data::APACHE_ARROW).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resWrite.Status().GetStatus(), EStatus::SUCCESS); - - NLongTx::TLongTxCommitResult resCommitTx = client.CommitTx(txId).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resCommitTx.Status().GetStatus(), EStatus::SUCCESS); - } - - // Read after write - auto sharded = SplitData(data, 2); - UNIT_ASSERT_VALUES_EQUAL(sharded.size(), 2); - UNIT_ASSERT_VALUES_EQUAL(sharded[0]->num_rows(), 4990); - UNIT_ASSERT_VALUES_EQUAL(sharded[1]->num_rows(), 5010); - - TVector<TString> expected; - for (auto batch : sharded) { - expected.push_back(NArrow::SerializeBatchNoCompression(batch)); - } - - TVector<TString> returned; - { - NLongTx::TLongTxBeginResult resBeginTx = client.BeginReadTx().GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resBeginTx.Status().GetStatus(), EStatus::SUCCESS); - - auto txId = resBeginTx.GetResult().tx_id(); - - NLongTx::TLongTxReadResult resRead = client.Read(txId, TestTablePath).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resRead.Status().GetStatus(), EStatus::SUCCESS); - returned.push_back(resRead.GetResult().data().data()); - // TODO: read both - - UNIT_ASSERT(EqualBatches(expected[0], returned[0]) || - EqualBatches(expected[1], returned[0])); - } - - // Read before write again - { - NLongTx::TLongTxReadResult resRead = client.Read(beforeWriteTxId, TestTablePath).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resRead.Status().GetStatus(), EStatus::SUCCESS); - UNIT_ASSERT_VALUES_EQUAL(resRead.GetResult().data().data(), ""); - } - } - - Y_UNIT_TEST(ReadFutureSnapshot) { - NKikimrConfig::TAppConfig appConfig; - TKikimrWithGrpcAndRootSchema server(appConfig); - server.Server_->GetRuntime()->SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NActors::NLog::PRI_DEBUG); - - ui16 grpc = server.GetPort(); - TString location = TStringBuilder() << "localhost:" << grpc; - auto connection = NYdb::TDriver(TDriverConfig().SetEndpoint(location)); - - TTestOlap::CreateTable(*server.ServerSettings, 1); - - NYdb::NLongTx::TClient client(connection); - - TString futureTxId; - NYdb::NLongTx::TClient::TAsyncReadResult futureRead; - { - NLongTx::TLongTxBeginResult resBeginTx = client.BeginReadTx().GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resBeginTx.Status().GetStatus(), EStatus::SUCCESS); - - TString txId = resBeginTx.GetResult().tx_id(); - NLongTxService::TLongTxId parsed; - Y_ABORT_UNLESS(parsed.ParseString(txId)); - Y_ABORT_UNLESS(parsed.Snapshot.Step > 0); - parsed.Snapshot.Step += 2000; // 2 seconds in the future - futureTxId = parsed.ToString(); - // Cerr << "Future txId " << futureTxId << Endl; - - futureRead = client.Read(futureTxId, TestTablePath); - } - - // Write - TString data = TestBlob(); - { - NLongTx::TLongTxBeginResult resBeginTx = client.BeginWriteTx().GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resBeginTx.Status().GetStatus(), EStatus::SUCCESS); - - auto txId = resBeginTx.GetResult().tx_id(); - - NLongTx::TLongTxWriteResult resWrite = - client.Write(txId, TestTablePath, "0", data, Ydb::LongTx::Data::APACHE_ARROW).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resWrite.Status().GetStatus(), EStatus::SUCCESS); - - NLongTx::TLongTxCommitResult resCommitTx = client.CommitTx(txId).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resCommitTx.Status().GetStatus(), EStatus::SUCCESS); - } - - // Await read - { - NLongTx::TLongTxReadResult resRead = futureRead.GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resRead.Status().GetStatus(), EStatus::SUCCESS); - - auto inputBatch = NArrow::NSerialization::TFullDataDeserializer().Deserialize(data); - UNIT_ASSERT(inputBatch.ok()); - auto readBatch = NArrow::NSerialization::TBatchPayloadDeserializer(inputBatch->get()->schema()).Deserialize(resRead.GetResult().data().data()); - UNIT_ASSERT(readBatch.ok()); - UNIT_ASSERT_VALUES_EQUAL(readBatch->get()->ToString(), inputBatch->get()->ToString()); - } - } Y_UNIT_TEST(WriteAclChecks) { NKikimrConfig::TAppConfig appConfig; @@ -388,59 +144,6 @@ Y_UNIT_TEST_SUITE(YdbLongTx) { } } - Y_UNIT_TEST(ReadAclChecks) { - NKikimrConfig::TAppConfig appConfig; - appConfig.MutableDomainsConfig()->MutableSecurityConfig()->SetEnforceUserTokenRequirement(true); - TKikimrWithGrpcAndRootSchema server(appConfig); - server.Server_->GetRuntime()->SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NActors::NLog::PRI_DEBUG); - - ui16 grpc = server.GetPort(); - TString location = TStringBuilder() << "localhost:" << grpc; - auto connection1 = NYdb::TDriver(TDriverConfig() - .SetEndpoint(location) - .SetDatabase("/Root") - .SetAuthToken("user1@builtin")); - auto connection2 = NYdb::TDriver(TDriverConfig() - .SetEndpoint(location) - .SetDatabase("/Root") - .SetAuthToken("user2@builtin")); - - TTestOlap::CreateTable(*server.ServerSettings); - { - TClient annoyingClient(*server.ServerSettings); - annoyingClient.SetSecurityToken("root@builtin"); - NACLib::TDiffACL diff; - diff.AddAccess(NACLib::EAccessType::Allow, NACLib::SelectRow, "user1@builtin"); - annoyingClient.ModifyACL("/Root/OlapStore", "OlapTable", diff.SerializeAsString()); - } - - // try user1 first - { - NYdb::NLongTx::TClient client(connection1); - - NLongTx::TLongTxBeginResult resBeginTx = client.BeginReadTx().GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resBeginTx.Status().GetStatus(), EStatus::SUCCESS); - - auto txId = resBeginTx.GetResult().tx_id(); - - NLongTx::TLongTxReadResult resRead = client.Read(txId, TestTablePath).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resRead.Status().GetStatus(), EStatus::SUCCESS); - } - - // try user2 next - { - NYdb::NLongTx::TClient client(connection2); - - NLongTx::TLongTxBeginResult resBeginTx = client.BeginReadTx().GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resBeginTx.Status().GetStatus(), EStatus::SUCCESS); - - auto txId = resBeginTx.GetResult().tx_id(); - - NLongTx::TLongTxReadResult resRead = client.Read(txId, TestTablePath).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(resRead.Status().GetStatus(), EStatus::UNAUTHORIZED); - } - } - Y_UNIT_TEST(CreateOlapWithDirs) { NKikimrConfig::TAppConfig appConfig; TKikimrWithGrpcAndRootSchema server(appConfig); |
