summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authornsofya <[email protected]>2024-01-11 17:31:53 +0300
committerGitHub <[email protected]>2024-01-11 17:31:53 +0300
commit53aa305accd90893de5a617df796bef4c2a74fdb (patch)
tree5c477cd67ab996649b83cd9c62c5fe69ab292173
parent10f2783de2d74a54b377d0d6d88618986d3b00eb (diff)
Remove TEvColumnShard::TEvRead (#927)
* Remove TLongTxReadRPC * Remove TEvColumnShard::TEvRead
-rw-r--r--ydb/core/grpc_services/rpc_long_tx.cpp254
-rw-r--r--ydb/core/grpc_services/service_longtx.h1
-rw-r--r--ydb/core/tx/columnshard/columnshard.cpp2
-rw-r--r--ydb/core/tx/columnshard/columnshard.h61
-rw-r--r--ydb/core/tx/columnshard/columnshard__read.cpp165
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.cpp8
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.h10
-rw-r--r--ydb/core/tx/columnshard/read_actor.cpp238
-rw-r--r--ydb/core/tx/columnshard/ya.make2
-rw-r--r--ydb/public/api/grpc/draft/ydb_long_tx_v1.proto1
-rw-r--r--ydb/public/api/protos/draft/ydb_long_tx.proto21
-rw-r--r--ydb/public/sdk/cpp/client/draft/ydb_long_tx.cpp20
-rw-r--r--ydb/public/sdk/cpp/client/draft/ydb_long_tx.h20
-rw-r--r--ydb/services/ydb/ydb_long_tx.cpp1
-rw-r--r--ydb/services/ydb/ydb_long_tx_ut.cpp297
15 files changed, 3 insertions, 1098 deletions
diff --git a/ydb/core/grpc_services/rpc_long_tx.cpp b/ydb/core/grpc_services/rpc_long_tx.cpp
index eb1533d455a..80e9d7e7682 100644
--- a/ydb/core/grpc_services/rpc_long_tx.cpp
+++ b/ydb/core/grpc_services/rpc_long_tx.cpp
@@ -33,8 +33,6 @@ using TEvLongTxRollbackRequest = NGRpcService::TGrpcRequestOperationCall<Ydb::Lo
Ydb::LongTx::RollbackTransactionResponse>;
using TEvLongTxWriteRequest = NGRpcService::TGrpcRequestOperationCall<Ydb::LongTx::WriteRequest,
Ydb::LongTx::WriteResponse>;
-using TEvLongTxReadRequest = NGRpcService::TGrpcRequestOperationCall<Ydb::LongTx::ReadRequest,
- Ydb::LongTx::ReadResponse>;
}
@@ -308,12 +306,12 @@ protected:
} else {
IndexReady = true;
}
-
+
auto shardsSplitter = NEvWrite::IShardsSplitter::BuildSplitter(entry);
if (!shardsSplitter) {
return ReplyError(Ydb::StatusIds::BAD_REQUEST, "Shard splitter not implemented for table kind");
}
-
+
auto initStatus = shardsSplitter->SplitData(entry, GetDataAccessor());
if (!initStatus.Ok()) {
return ReplyError(initStatus.GetStatus(), initStatus.GetErrorMessage());
@@ -630,250 +628,6 @@ TActorId DoLongTxWriteSameMailbox(const TActorContext& ctx, const TActorId& repl
new TLongTxWriteInternal(replyTo, longTxId, dedupId, databaseName, path, navigateResult, batch, issues));
}
-
-class TLongTxReadRPC : public TActorBootstrapped<TLongTxReadRPC> {
- using TBase = TActorBootstrapped<TLongTxReadRPC>;
-
-private:
- static const constexpr ui32 MaxRetriesPerShard = 10;
-
-public:
- static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
- return NKikimrServices::TActivity::GRPC_REQ;
- }
-
- explicit TLongTxReadRPC(std::unique_ptr<IRequestOpCtx> request)
- : TBase()
- , Request(std::move(request))
- , DatabaseName(Request->GetDatabaseName().GetOrElse(DatabaseFromDomain(AppData())))
- , SchemeCache(MakeSchemeCacheID())
- , LeaderPipeCache(MakePipePeNodeCacheID(false))
- , TableId(0)
- , OutChunkNumber(0)
- {
-
- }
-
- void Bootstrap() {
- const auto* req = TEvLongTxReadRequest::GetProtoRequest(Request);
-
- if (const TString& internalToken = Request->GetSerializedToken()) {
- UserToken.emplace(internalToken);
- }
-
- TString errMsg;
- if (!LongTxId.ParseString(req->tx_id(), &errMsg)) {
- return ReplyError(Ydb::StatusIds::BAD_REQUEST, errMsg);
- }
-
- Path = req->path();
- SendNavigateRequest();
- }
-
- void PassAway() override {
- Send(LeaderPipeCache, new TEvPipeCache::TEvUnlink(0));
- TBase::PassAway();
- }
-
-private:
- void SendNavigateRequest() {
- auto request = MakeHolder<NSchemeCache::TSchemeCacheNavigate>();
- request->DatabaseName = this->DatabaseName;
- auto& entry = request->ResultSet.emplace_back();
- entry.Path = ::NKikimr::SplitPath(Path);
- entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpPath;
- Send(SchemeCache, new TEvTxProxySchemeCache::TEvNavigateKeySet(request.Release()));
- Become(&TThis::StateNavigate);
- }
-
- STFUNC(StateNavigate) {
- switch (ev->GetTypeRewrite()) {
- hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
- }
- }
-
- void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
- NSchemeCache::TSchemeCacheNavigate* resp = ev->Get()->Request.Get();
-
- if (resp->ErrorCount > 0) {
- // TODO: map to a correct error
- return ReplyError(Ydb::StatusIds::SCHEME_ERROR, "There was an error during table query");
- }
-
- auto& entry = resp->ResultSet[0];
-
- if (UserToken && entry.SecurityObject) {
- const ui32 access = NACLib::SelectRow;
- if (!entry.SecurityObject->CheckAccess(access, *UserToken)) {
- Request->RaiseIssue(MakeIssue(NKikimrIssues::TIssuesIds::ACCESS_DENIED, TStringBuilder()
- << "User has no permission to perform reads from this table"
- << " user: " << UserToken->GetUserSID()
- << " path: " << Path));
- return ReplyError(Ydb::StatusIds::UNAUTHORIZED);
- }
- }
-
- if (entry.Kind != NSchemeCache::TSchemeCacheNavigate::KindColumnTable) {
- return ReplyError(Ydb::StatusIds::SCHEME_ERROR, "The specified path is not an column table");
- }
-
- Y_ABORT_UNLESS(entry.ColumnTableInfo);
- Y_ABORT_UNLESS(entry.ColumnTableInfo->Description.HasSharding());
- const auto& sharding = entry.ColumnTableInfo->Description.GetSharding();
-
- TableId = entry.TableId.PathId.LocalPathId;
- for (ui64 shardId : sharding.GetColumnShards()) {
- ShardChunks[shardId] = {};
- }
- for (ui64 shardId : sharding.GetAdditionalColumnShards()) {
- ShardChunks[shardId] = {};
- }
-
- if (ShardChunks.empty()) {
- return ReplyError(Ydb::StatusIds::SCHEME_ERROR, "No shards to read");
- }
-
- SendReadRequests();
- }
-
-private:
- void SendReadRequests() {
- for (auto& [shard, chunk] : ShardChunks) {
- Y_UNUSED(chunk);
- SendRequest(shard);
- }
- Become(&TThis::StateWork);
- }
-
- void SendRequest(ui64 shard) {
- Y_ABORT_UNLESS(shard != 0);
- Waits.insert(shard);
- SendToTablet(shard, MakeRequest());
- }
-
- STFUNC(StateWork) {
- switch (ev->GetTypeRewrite()) {
- hFunc(TEvents::TEvUndelivered, Handle);
- hFunc(TEvPipeCache::TEvDeliveryProblem, Handle);
- hFunc(TEvColumnShard::TEvReadResult, Handle);
- }
- }
-
- void Handle(TEvents::TEvUndelivered::TPtr& ev) {
- Y_UNUSED(ev);
- ReplyError(Ydb::StatusIds::INTERNAL_ERROR,
- "Internal error: node pipe cache is not available, check cluster configuration");
- }
-
- void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) {
- ui64 shard = ev->Get()->TabletId;
- if (!Waits.contains(shard)) {
- return;
- }
-
- if (!ShardRetries.contains(shard)) {
- ShardRetries[shard] = 0;
- }
-
- ui32 retries = ++ShardRetries[shard];
- if (retries > MaxRetriesPerShard) {
- return ReplyError(Ydb::StatusIds::UNAVAILABLE, Sprintf("Failed to connect to shard %lu", shard));
- }
-
- SendRequest(shard);
- }
-
- void Handle(TEvColumnShard::TEvReadResult::TPtr& ev) {
- const auto& record = Proto(ev->Get());
- ui64 shard = record.GetOrigin();
- ui64 chunk = record.GetBatch();
- bool finished = record.GetFinished();
-
- { // Filter duplicates and allow messages reorder
- if (!ShardChunks.contains(shard)) {
- return ReplyError(Ydb::StatusIds::GENERIC_ERROR, "Response from unexpected shard");
- }
-
- if (!Waits.contains(shard) || ShardChunks[shard].contains(chunk)) {
- return;
- }
-
- if (finished) {
- ShardChunkCounts[shard] = chunk + 1; // potential int overflow but pofig
- }
-
- ShardChunks[shard].insert(chunk);
- if (ShardChunkCounts.count(shard) && ShardChunkCounts[shard] == ShardChunks[shard].size()) {
- Waits.erase(shard);
- ShardChunks[shard].clear();
- Send(LeaderPipeCache, new TEvPipeCache::TEvUnlink(shard));
- }
- }
-
- ui32 status = record.GetStatus();
- if (status == NKikimrTxColumnShard::EResultStatus::SUCCESS) {
- auto result = MakeResult(OutChunkNumber, Waits.empty());
- if (record.HasData()) {
- result->mutable_data()->set_data(record.GetData());
- }
- ++OutChunkNumber;
- return ReplySuccess(*result);
- }
-
- return ReplyError(Ydb::StatusIds::GENERIC_ERROR, "");
- }
-
- THolder<TEvColumnShard::TEvRead> MakeRequest() const {
- return MakeHolder<TEvColumnShard::TEvRead>(
- SelfId(), 0, LongTxId.Snapshot.Step, LongTxId.Snapshot.TxId, TableId);
- }
-
- Ydb::LongTx::ReadResult* MakeResult(ui64 outChunk, bool finished) const {
- auto result = TEvLongTxReadRequest::AllocateResult<Ydb::LongTx::ReadResult>(Request);
-
- const auto* req = TEvLongTxReadRequest::GetProtoRequest(Request);
- result->set_tx_id(req->tx_id());
- result->set_path(req->path());
- result->set_chunk(outChunk);
- result->set_finished(finished);
- return result;
- }
-
-private:
- void SendToTablet(ui64 tabletId, THolder<IEventBase> event) {
- Send(LeaderPipeCache, new TEvPipeCache::TEvForward(event.Release(), tabletId, true),
- IEventHandle::FlagTrackDelivery);
- }
-
- void ReplyError(Ydb::StatusIds::StatusCode status, const TString& message = TString()) {
- if (!message.empty()) {
- Request->RaiseIssue(NYql::TIssue(message));
- }
- Request->ReplyWithYdbStatus(status);
- PassAway();
- }
-
- void ReplySuccess(const Ydb::LongTx::ReadResult& result) {
- Request->SendResult(result, Ydb::StatusIds::SUCCESS);
- PassAway();
- }
-
-private:
- std::unique_ptr<IRequestOpCtx> Request;
- TString DatabaseName;
- TActorId SchemeCache;
- TActorId LeaderPipeCache;
- std::optional<NACLib::TUserToken> UserToken;
- TLongTxId LongTxId;
- TString Path;
- ui64 TableId;
- THashMap<ui64, THashSet<ui32>> ShardChunks;
- THashMap<ui64, ui32> ShardChunkCounts;
- THashMap<ui64, ui32> ShardRetries;
- THashSet<ui64> Waits;
- ui64 OutChunkNumber;
-};
-
//
void DoLongTxBeginRPC(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f) {
@@ -892,9 +646,5 @@ void DoLongTxWriteRPC(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&
f.RegisterActor(new TLongTxWriteRPC(std::move(p)));
}
-void DoLongTxReadRPC(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f) {
- f.RegisterActor(new TLongTxReadRPC(std::move(p)));
-}
-
}
}
diff --git a/ydb/core/grpc_services/service_longtx.h b/ydb/core/grpc_services/service_longtx.h
index b83667ba628..2d46a639eec 100644
--- a/ydb/core/grpc_services/service_longtx.h
+++ b/ydb/core/grpc_services/service_longtx.h
@@ -11,7 +11,6 @@ void DoLongTxBeginRPC(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&
void DoLongTxCommitRPC(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
void DoLongTxRollbackRPC(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
void DoLongTxWriteRPC(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
-void DoLongTxReadRPC(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
}
}
diff --git a/ydb/core/tx/columnshard/columnshard.cpp b/ydb/core/tx/columnshard/columnshard.cpp
index 16a7aa44e51..5a5c56105e2 100644
--- a/ydb/core/tx/columnshard/columnshard.cpp
+++ b/ydb/core/tx/columnshard/columnshard.cpp
@@ -258,7 +258,7 @@ ui64 TColumnShard::MemoryUsage() const {
CommitsInFlight.size() * sizeof(TCommitMeta) +
LongTxWrites.size() * (sizeof(TWriteId) + sizeof(TLongTxWriteInfo)) +
LongTxWritesByUniqueId.size() * (sizeof(TULID) + sizeof(void*)) +
- (WaitingReads.size() + WaitingScans.size()) * (sizeof(NOlap::TSnapshot) + sizeof(void*)) +
+ (WaitingScans.size()) * (sizeof(NOlap::TSnapshot) + sizeof(void*)) +
TabletCounters->Simple()[COUNTER_PREPARED_RECORDS].Get() * sizeof(NOlap::TInsertedData) +
TabletCounters->Simple()[COUNTER_COMMITTED_RECORDS].Get() * sizeof(NOlap::TInsertedData);
memory += TablesManager.GetMemoryUsage();
diff --git a/ydb/core/tx/columnshard/columnshard.h b/ydb/core/tx/columnshard/columnshard.h
index e2b665e4361..5e5e6f10a00 100644
--- a/ydb/core/tx/columnshard/columnshard.h
+++ b/ydb/core/tx/columnshard/columnshard.h
@@ -234,59 +234,6 @@ struct TEvColumnShard {
}
};
- struct TEvRead : public TEventPB<TEvRead, NKikimrTxColumnShard::TEvRead, TEvColumnShard::EvRead> {
- TEvRead() = default;
-
- TEvRead(const TActorId& source, ui64 metaShard, ui64 planStep, ui64 txId, ui64 tableId = 0) {
- ActorIdToProto(source, Record.MutableSource());
- Record.SetTxInitiator(metaShard);
- Record.SetPlanStep(planStep);
- Record.SetTxId(txId);
- Record.SetTableId(tableId);
- }
-
- TActorId GetSource() const {
- return ActorIdFromProto(Record.GetSource());
- }
- };
-
- struct TEvReadResult : public TEventPB<TEvReadResult, NKikimrTxColumnShard::TEvReadResult,
- TEvColumnShard::EvReadResult> {
- TEvReadResult() = default;
-
- TEvReadResult(ui64 origin, ui64 metaShard, ui64 planStep, ui64 txId, ui64 tableId, ui32 batch,
- bool finished, ui32 status) {
- Record.SetOrigin(origin);
- Record.SetTxInitiator(metaShard);
- Record.SetPlanStep(planStep);
- Record.SetTxId(txId);
- Record.SetTableId(tableId);
- Record.SetBatch(batch);
- Record.SetFinished(finished);
- Record.SetStatus(status);
- }
-
- TEvReadResult(const TEvReadResult& ev) {
- Record.CopyFrom(ev.Record);
- }
-
- std::shared_ptr<arrow::RecordBatch> GetArrowBatch() const {
- const auto& scheme = Record.GetMeta().GetSchema();
- if (scheme.empty() || Record.GetMeta().GetFormat() != NKikimrTxColumnShard::FORMAT_ARROW) {
- return nullptr;
- }
- const auto arrowSchema = NArrow::DeserializeSchema(scheme);
- if (Record.GetData().empty()) {
- return NArrow::MakeEmptyBatch(arrowSchema);
- }
- return NArrow::DeserializeBatch(Record.GetData(), arrowSchema);
- }
-
- bool HasMore() const {
- return !Record.GetFinished();
- }
- };
-
using TEvScan = TEvDataShard::TEvKqpScan;
};
@@ -306,18 +253,10 @@ inline auto& Proto(TEvColumnShard::TEvWrite* ev) {
return ev->Record;
}
-inline auto& Proto(TEvColumnShard::TEvRead* ev) {
- return ev->Record;
-}
-
inline auto& Proto(TEvColumnShard::TEvWriteResult* ev) {
return ev->Record;
}
-inline auto& Proto(TEvColumnShard::TEvReadResult* ev) {
- return ev->Record;
-}
-
inline TMessageSeqNo SeqNoFromProto(const NKikimrTxColumnShard::TSchemaSeqNo& proto) {
return TMessageSeqNo(proto.GetGeneration(), proto.GetRound());
}
diff --git a/ydb/core/tx/columnshard/columnshard__read.cpp b/ydb/core/tx/columnshard/columnshard__read.cpp
deleted file mode 100644
index ae4455099c2..00000000000
--- a/ydb/core/tx/columnshard/columnshard__read.cpp
+++ /dev/null
@@ -1,165 +0,0 @@
-#include "engines/reader/description.h"
-
-#include <ydb/core/tx/columnshard/columnshard_impl.h>
-#include <ydb/core/tx/columnshard/columnshard_private_events.h>
-#include <ydb/core/tx/columnshard/columnshard_schema.h>
-#include <ydb/core/tx/columnshard/columnshard__read_base.h>
-#include <ydb/core/tx/columnshard/columnshard__index_scan.h>
-
-namespace NKikimr::NColumnShard {
-
-namespace {
-
-template <typename T, typename U>
-std::vector<T> ProtoToVector(const U& cont) {
- return std::vector<T>(cont.begin(), cont.end());
-}
-
-}
-
-using namespace NTabletFlatExecutor;
-
-class TTxRead : public TTxReadBase {
-public:
- TTxRead(TColumnShard* self, TEvColumnShard::TEvRead::TPtr& ev)
- : TTxReadBase(self)
- , Ev(ev)
- , TabletTxNo(++Self->TabletTxCounter)
- {}
-
- bool Execute(TTransactionContext& txc, const TActorContext& ctx) override;
- void Complete(const TActorContext& ctx) override;
- TTxType GetTxType() const override { return TXTYPE_READ; }
-
-private:
- TEvColumnShard::TEvRead::TPtr Ev;
- const ui32 TabletTxNo;
- std::unique_ptr<TEvColumnShard::TEvReadResult> Result;
- NOlap::TReadMetadata::TConstPtr ReadMetadata;
-
- TStringBuilder TxPrefix() const {
- return TStringBuilder() << "TxRead[" << ToString(TabletTxNo) << "] ";
- }
-
- TString TxSuffix() const {
- return TStringBuilder() << " at tablet " << Self->TabletID();
- }
-};
-
-
-bool TTxRead::Execute(TTransactionContext& txc, const TActorContext& /*ctx*/) {
- Y_ABORT_UNLESS(Ev);
- Y_ABORT_UNLESS(Self->TablesManager.HasPrimaryIndex());
- Y_UNUSED(txc);
- LOG_S_DEBUG(TxPrefix() << "execute" << TxSuffix());
-
- txc.DB.NoMoreReadsForTx();
-
- auto& record = Proto(Ev->Get());
- ui64 metaShard = record.GetTxInitiator();
-
- NOlap::TReadDescription read(NOlap::TSnapshot(record.GetPlanStep(), record.GetTxId()), false);
- read.PathId = record.GetTableId();
- read.ReadNothing = !(Self->TablesManager.HasTable(read.PathId));
- read.ColumnIds = ProtoToVector<ui32>(record.GetColumnIds());
- read.ColumnNames = ProtoToVector<TString>(record.GetColumnNames());
-
- const NOlap::TIndexInfo& indexInfo = Self->TablesManager.GetIndexInfo(read.GetSnapshot());
- if (read.ColumnIds.empty() && read.ColumnNames.empty()) {
- auto allColumnNames = indexInfo.ArrowSchema()->field_names();
- read.ColumnNames.assign(allColumnNames.begin(), allColumnNames.end());
- }
-
- std::shared_ptr<NOlap::TPredicate> fromPredicate;
- std::shared_ptr<NOlap::TPredicate> toPredicate;
- if (record.HasGreaterPredicate()) {
- auto& proto = record.GetGreaterPredicate();
- auto schema = indexInfo.ArrowSchema(ProtoToVector<TString>(proto.GetColumnNames()));
- fromPredicate = std::make_shared<NOlap::TPredicate>(
- proto.GetInclusive() ? NArrow::EOperation::GreaterEqual : NArrow::EOperation::Greater, proto.GetRow(), schema);
- }
- if (record.HasLessPredicate()) {
- auto& proto = record.GetLessPredicate();
- auto schema = indexInfo.ArrowSchema(ProtoToVector<TString>(proto.GetColumnNames()));
- toPredicate = std::make_shared<NOlap::TPredicate>(
- proto.GetInclusive() ? NArrow::EOperation::LessEqual : NArrow::EOperation::Less, proto.GetRow(), schema);
- }
- Y_ABORT_UNLESS(read.PKRangesFilter.Add(fromPredicate, toPredicate, &indexInfo));
-
- bool parseResult = ParseProgram(record.GetOlapProgramType(), record.GetOlapProgram(), read,
- TIndexColumnResolver(indexInfo));
-
- std::shared_ptr<NOlap::TReadMetadata> metadata;
- if (parseResult) {
- metadata = PrepareReadMetadata(read, Self->InsertTable, Self->TablesManager.GetPrimaryIndex(),
- ErrorDescription, false);
- }
-
- ui32 status = NKikimrTxColumnShard::EResultStatus::ERROR;
-
- if (metadata) {
- ReadMetadata = metadata;
- status = NKikimrTxColumnShard::EResultStatus::SUCCESS;
- }
-
- Result = std::make_unique<TEvColumnShard::TEvReadResult>(
- Self->TabletID(), metaShard, read.GetSnapshot().GetPlanStep(), read.GetSnapshot().GetTxId(), read.PathId, 0, true, status);
-
- if (status == NKikimrTxColumnShard::EResultStatus::SUCCESS) {
- Self->IncCounter(COUNTER_READ_SUCCESS);
- } else {
- Self->IncCounter(COUNTER_READ_FAIL);
- }
- return true;
-}
-
-void TTxRead::Complete(const TActorContext& ctx) {
- Y_ABORT_UNLESS(Ev);
- Y_ABORT_UNLESS(Result);
-
- bool noData = !ReadMetadata || ReadMetadata->Empty();
- bool success = (Proto(Result.get()).GetStatus() == NKikimrTxColumnShard::EResultStatus::SUCCESS);
-
- if (!success) {
- LOG_S_DEBUG(TxPrefix() << "complete. Error " << ErrorDescription << " while reading" << TxSuffix());
- ctx.Send(Ev->Get()->GetSource(), Result.release());
- } else if (noData) {
- LOG_S_DEBUG(TxPrefix() << "complete. Empty result" << TxSuffix());
- ctx.Send(Ev->Get()->GetSource(), Result.release());
- } else {
- LOG_S_DEBUG(TxPrefix() << "complete" << TxSuffix() << " Metadata: " << *ReadMetadata);
-
- const ui64 requestCookie = Self->InFlightReadsTracker.AddInFlightRequest(
- std::static_pointer_cast<const NOlap::TReadMetadataBase>(ReadMetadata));
- auto statsDelta = Self->InFlightReadsTracker.GetSelectStatsDelta();
-
- Self->IncCounter(COUNTER_READ_INDEX_PORTIONS, statsDelta.Portions);
- Self->IncCounter(COUNTER_READ_INDEX_BLOBS, statsDelta.Blobs);
- Self->IncCounter(COUNTER_READ_INDEX_ROWS, statsDelta.Rows);
- Self->IncCounter(COUNTER_READ_INDEX_BYTES, statsDelta.Bytes);
-
- TInstant deadline = TInstant::Max(); // TODO
- ctx.Register(CreateReadActor(Self->TabletID(), Self->SelfId(), Ev->Get()->GetSource(),
- Self->GetStoragesManager(), std::move(Result), ReadMetadata, deadline, Self->SelfId(), requestCookie, Self->ReadCounters));
- }
-}
-
-
-void TColumnShard::Handle(TEvColumnShard::TEvRead::TPtr& ev, const TActorContext& ctx) {
- LastAccessTime = TAppData::TimeProvider->Now();
-
- const auto* msg = ev->Get();
- NOlap::TSnapshot readVersion(msg->Record.GetPlanStep(), msg->Record.GetTxId());
- NOlap::TSnapshot maxReadVersion = GetMaxReadVersion();
- LOG_S_DEBUG("Read at tablet " << TabletID() << " version=" << readVersion << " readable=" << maxReadVersion);
-
- if (maxReadVersion < readVersion) {
- WaitingReads.emplace(readVersion, std::move(ev));
- WaitPlanStep(readVersion.GetPlanStep());
- return;
- }
-
- Execute(new TTxRead(this, ev), ctx);
-}
-
-}
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp
index 4098d2b00ca..658b7c67c4a 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.cpp
+++ b/ydb/core/tx/columnshard/columnshard_impl.cpp
@@ -257,14 +257,6 @@ void TColumnShard::SendWaitPlanStep(ui64 step) {
void TColumnShard::RescheduleWaitingReads() {
ui64 minWaitingStep = Max<ui64>();
NOlap::TSnapshot maxReadVersion = GetMaxReadVersion();
- for (auto it = WaitingReads.begin(); it != WaitingReads.end();) {
- if (maxReadVersion < it->first) {
- minWaitingStep = Min(minWaitingStep, it->first.GetPlanStep());
- break;
- }
- TActivationContext::Send(it->second.Release());
- it = WaitingReads.erase(it);
- }
for (auto it = WaitingScans.begin(); it != WaitingScans.end();) {
if (maxReadVersion < it->first) {
minWaitingStep = Min(minWaitingStep, it->first.GetPlanStep());
diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h
index 10d3e177e8e..65148356043 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.h
+++ b/ydb/core/tx/columnshard/columnshard_impl.h
@@ -55,13 +55,6 @@ class TOperationsManager;
extern bool gAllowLogBatchingDefaultValue;
IActor* CreateWriteActor(ui64 tabletId, IWriteController::TPtr writeController, const TInstant deadline);
-IActor* CreateReadActor(ui64 tabletId, const NActors::TActorId readBlobsActor,
- const TActorId& dstActor, const std::shared_ptr<NOlap::IStoragesManager>& storages,
- std::unique_ptr<TEvColumnShard::TEvReadResult>&& event,
- NOlap::TReadMetadata::TConstPtr readMetadata,
- const TInstant& deadline,
- const TActorId& columnShardActorId,
- ui64 requestCookie, const TConcreteScanCounters& counters);
IActor* CreateColumnShardScan(const TActorId& scanComputeActor, ui32 scanId, ui64 txId);
struct TSettings {
@@ -151,7 +144,6 @@ class TColumnShard
void Handle(TEvColumnShard::TEvNotifyTxCompletion::TPtr& ev, const TActorContext& ctx);
void Handle(TEvTxProcessing::TEvPlanStep::TPtr& ev, const TActorContext& ctx);
void Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvColumnShard::TEvRead::TPtr& ev, const TActorContext& ctx);
void Handle(TEvColumnShard::TEvScan::TPtr& ev, const TActorContext& ctx);
void Handle(TEvMediatorTimecast::TEvRegisterTabletResult::TPtr& ev, const TActorContext& ctx);
void Handle(TEvMediatorTimecast::TEvNotifyPlanStep::TPtr& ev, const TActorContext& ctx);
@@ -264,7 +256,6 @@ protected:
HFunc(TEvColumnShard::TEvScan, Handle);
HFunc(TEvTxProcessing::TEvPlanStep, Handle);
HFunc(TEvColumnShard::TEvWrite, Handle);
- HFunc(TEvColumnShard::TEvRead, Handle);
HFunc(TEvPrivate::TEvWriteBlobsResult, Handle);
HFunc(TEvMediatorTimecast::TEvRegisterTabletResult, Handle);
HFunc(TEvMediatorTimecast::TEvNotifyPlanStep, Handle);
@@ -428,7 +419,6 @@ private:
THashMap<TWriteId, TLongTxWriteInfo> LongTxWrites;
using TPartsForLTXShard = THashMap<ui32, TLongTxWriteInfo*>;
THashMap<TULID, TPartsForLTXShard> LongTxWritesByUniqueId;
- TMultiMap<NOlap::TSnapshot, TEvColumnShard::TEvRead::TPtr> WaitingReads;
TMultiMap<NOlap::TSnapshot, TEvColumnShard::TEvScan::TPtr> WaitingScans;
TBackgroundController BackgroundController;
TSettings Settings;
diff --git a/ydb/core/tx/columnshard/read_actor.cpp b/ydb/core/tx/columnshard/read_actor.cpp
deleted file mode 100644
index 475c2a34267..00000000000
--- a/ydb/core/tx/columnshard/read_actor.cpp
+++ /dev/null
@@ -1,238 +0,0 @@
-#include <ydb/core/tx/columnshard/columnshard_impl.h>
-#include <ydb/core/tx/columnshard/blob_cache.h>
-#include <ydb/core/tx/columnshard/blobs_action/abstract/storages_manager.h>
-#include <ydb/core/tx/columnshard/blobs_reader/events.h>
-#include <ydb/core/tx/conveyor/usage/events.h>
-#include <ydb/library/actors/core/actor_bootstrapped.h>
-#include "blobs_reader/actor.h"
-#include "engines/reader/read_context.h"
-#include "resource_subscriber/actor.h"
-#include "blobs_reader/read_coordinator.h"
-
-namespace NKikimr::NColumnShard {
-namespace {
-
-class TReadActor : public TActorBootstrapped<TReadActor> {
-private:
- void BuildResult(const TActorContext& ctx) {
- auto ready = IndexedData->ExtractReadyResults(Max<i64>());
- LOG_S_TRACE("Ready results with " << ready.size() << " batches at tablet " << TabletId << " (read)");
- if (ready.empty()) {
- if (IndexedData->IsFinished()) {
- SendResult(ctx, nullptr, true);
- }
- } else {
- size_t next = 1;
- for (auto it = ready.begin(); it != ready.end(); ++it, ++next) {
- const bool lastOne = IndexedData->IsFinished() && (next == ready.size());
- SendResult(ctx, it->GetResultBatchPtrVerified(), lastOne);
- }
- }
- }
-public:
- static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
- return NKikimrServices::TActivity::TX_COLUMNSHARD_READ_ACTOR;
- }
-
- TReadActor(ui64 tabletId, const NActors::TActorId readBlobsActor,
- const std::shared_ptr<NOlap::IStoragesManager>& storages,
- const TActorId& dstActor,
- std::unique_ptr<TEvColumnShard::TEvReadResult>&& event,
- NOlap::TReadMetadata::TConstPtr readMetadata,
- const TInstant& deadline,
- const TActorId& columnShardActorId,
- ui64 requestCookie, const TConcreteScanCounters& counters)
- : TabletId(tabletId)
- , ReadBlobsActor(readBlobsActor)
- , Storages(storages)
- , DstActor(dstActor)
- , BlobCacheActorId(NBlobCache::MakeBlobCacheServiceId())
- , Result(std::move(event))
- , ReadMetadata(readMetadata)
- , Deadline(deadline)
- , ColumnShardActorId(columnShardActorId)
- , RequestCookie(requestCookie)
- , ReturnedBatchNo(0)
- , Counters(counters)
- {}
-
- void Handle(NConveyor::TEvExecution::TEvTaskProcessedResult::TPtr& ev, const TActorContext& ctx) {
- if (ev->Get()->GetErrorMessage()) {
- ACFL_DEBUG("event", "TEvTaskProcessedResult")("error", ev->Get()->GetErrorMessage());
- SendErrorResult(ctx, NKikimrTxColumnShard::EResultStatus::ERROR);
- return DieFinished(ctx);
- } else {
- ACFL_DEBUG("event", "TEvTaskProcessedResult");
- auto t = static_pointer_cast<IDataTasksProcessor::ITask>(ev->Get()->GetResult());
- Y_DEBUG_ABORT_UNLESS(dynamic_pointer_cast<IDataTasksProcessor::ITask>(ev->Get()->GetResult()));
- if (!IndexedData->IsFinished()) {
- Y_ABORT_UNLESS(t->Apply(*IndexedData));
- }
- BuildResult(ctx);
- if (IndexedData->IsFinished()) {
- DieFinished(ctx);
- }
- }
- }
-
- void Handle(TEvents::TEvWakeup::TPtr& ev, const TActorContext& ctx) {
- Y_UNUSED(ev);
- LOG_S_INFO("TEvWakeup: read timeout at tablet " << TabletId << " (read)");
-
- SendTimeouts(ctx);
- DieFinished(ctx);
- }
-
- void SendErrorResult(const TActorContext& ctx, NKikimrTxColumnShard::EResultStatus status) {
- Y_ABORT_UNLESS(status != NKikimrTxColumnShard::EResultStatus::SUCCESS);
- SendResult(ctx, {}, true, status);
- IndexedData->Abort();
- }
-
- void SendResult(const TActorContext& ctx, const std::shared_ptr<arrow::RecordBatch>& batch, bool finished = false,
- NKikimrTxColumnShard::EResultStatus status = NKikimrTxColumnShard::EResultStatus::SUCCESS) {
- auto chunkEvent = std::make_unique<TEvColumnShard::TEvReadResult>(*Result);
- auto& proto = Proto(chunkEvent.get());
-
- TString data;
- if (batch && batch->num_rows()) {
- data = NArrow::SerializeBatchNoCompression(batch);
-
- auto metadata = proto.MutableMeta();
- metadata->SetFormat(NKikimrTxColumnShard::FORMAT_ARROW);
- metadata->SetSchema(GetSerializedSchema(batch));
- if (status == NKikimrTxColumnShard::EResultStatus::SUCCESS) {
- Y_ABORT_UNLESS(!data.empty());
- }
- }
-
- proto.SetBatch(ReturnedBatchNo);
- proto.SetData(data);
- proto.SetStatus(status);
- proto.SetFinished(finished);
- ++ReturnedBatchNo;
-
- if (finished) {
- auto stats = ReadMetadata->ReadStats;
- auto* protoStats = proto.MutableMeta()->MutableReadStats();
- protoStats->SetBeginTimestamp(stats->BeginTimestamp.MicroSeconds());
- protoStats->SetDurationUsec(stats->Duration().MicroSeconds());
- protoStats->SetSelectedIndex(stats->SelectedIndex);
- protoStats->SetIndexGranules(stats->IndexGranules);
- protoStats->SetIndexPortions(stats->IndexPortions);
- protoStats->SetIndexBatches(stats->IndexBatches);
- protoStats->SetNotIndexedBatches(stats->CommittedBatches);
- protoStats->SetSchemaColumns(stats->SchemaColumns);
- protoStats->SetFilterColumns(stats->FilterColumns);
- protoStats->SetAdditionalColumns(stats->AdditionalColumns);
- protoStats->SetDataFilterBytes(stats->DataFilterBytes);
- protoStats->SetDataAdditionalBytes(stats->DataAdditionalBytes);
- protoStats->SetCompactedPortionsBytes(stats->CompactedPortionsBytes);
- protoStats->SetInsertedPortionsBytes(stats->InsertedPortionsBytes);
- protoStats->SetCommittedPortionsBytes(stats->CommittedPortionsBytes);
- protoStats->SetSelectedRows(stats->SelectedRows);
- }
-
- if (Deadline != TInstant::Max()) {
- TInstant now = TAppData::TimeProvider->Now();
- if (Deadline <= now) {
- proto.SetStatus(NKikimrTxColumnShard::EResultStatus::TIMEOUT);
- }
- }
-
- ctx.Send(DstActor, chunkEvent.release());
- }
-
- void DieFinished(const TActorContext& ctx) {
- if (IndexedData->IsFinished()) {
- LOG_S_DEBUG("Finished read (with " << ReturnedBatchNo << " batches sent) at tablet " << TabletId);
- Send(ColumnShardActorId, new TEvPrivate::TEvReadFinished(RequestCookie));
- Die(ctx);
- }
- }
-
- virtual void PassAway() override {
- Send(ResourceSubscribeActorId, new TEvents::TEvPoisonPill);
- IActor::PassAway();
- }
-
- void Bootstrap(const TActorContext& ctx) {
- ResourceSubscribeActorId = ctx.Register(new NOlap::NResourceBroker::NSubscribe::TActor(TabletId, SelfId()));
- ReadCoordinatorActorId = ctx.Register(new NOlap::NBlobOperations::NRead::TReadCoordinatorActor(TabletId, SelfId()));
- IndexedData = ReadMetadata->BuildReader(std::make_shared<NOlap::TReadContext>(Storages, Counters, true, ReadMetadata, SelfId(), ResourceSubscribeActorId, ReadCoordinatorActorId, NOlap::TComputeShardingPolicy()));
- LOG_S_DEBUG("Starting read (" << IndexedData->DebugString(false) << ") at tablet " << TabletId);
-
- bool earlyExit = false;
- if (Deadline != TInstant::Max()) {
- TInstant now = TAppData::TimeProvider->Now();
- if (Deadline <= now) {
- earlyExit = true;
- } else {
- const TDuration timeout = Deadline - now;
- ctx.Schedule(timeout, new TEvents::TEvWakeup());
- }
- }
-
- if (earlyExit) {
- SendTimeouts(ctx);
- ctx.Send(SelfId(), new TEvents::TEvPoisonPill());
- } else {
- while (IndexedData->ReadNextInterval()) {
- }
- BuildResult(ctx);
- }
-
- Become(&TThis::StateWait);
- }
-
- void SendTimeouts(const TActorContext& ctx) {
- SendErrorResult(ctx, NKikimrTxColumnShard::EResultStatus::TIMEOUT);
- }
-
- STFUNC(StateWait) {
- switch (ev->GetTypeRewrite()) {
- HFunc(NConveyor::TEvExecution::TEvTaskProcessedResult, Handle);
- HFunc(TEvents::TEvWakeup, Handle);
- default:
- break;
- }
- }
-
-private:
- ui64 TabletId;
- TActorId ReadBlobsActor;
- std::shared_ptr<NOlap::IStoragesManager> Storages;
- TActorId DstActor;
- TActorId BlobCacheActorId;
- std::unique_ptr<TEvColumnShard::TEvReadResult> Result;
- TActorId ResourceSubscribeActorId;
- TActorId ReadCoordinatorActorId;
- NOlap::TReadMetadata::TConstPtr ReadMetadata;
- std::shared_ptr<NOlap::IDataReader> IndexedData;
- TInstant Deadline;
- TActorId ColumnShardActorId;
- const ui64 RequestCookie;
- ui32 ReturnedBatchNo;
- const TConcreteScanCounters Counters;
- mutable TString SerializedSchema;
-
- TString GetSerializedSchema(const std::shared_ptr<arrow::RecordBatch>& batch) const {
- return NArrow::SerializeSchema(*batch->schema());
- }
-};
-
-} // namespace
-
-IActor* CreateReadActor(ui64 tabletId,
- const NActors::TActorId readBlobsActor, const TActorId& dstActor, const std::shared_ptr<NOlap::IStoragesManager>& storages,
- std::unique_ptr<TEvColumnShard::TEvReadResult>&& event,
- NOlap::TReadMetadata::TConstPtr readMetadata,
- const TInstant& deadline,
- const TActorId& columnShardActorId,
- ui64 requestCookie, const TConcreteScanCounters& counters)
-{
- return new TReadActor(tabletId, readBlobsActor, storages, dstActor, std::move(event), readMetadata,
- deadline, columnShardActorId, requestCookie, counters);
-}
-
-}
diff --git a/ydb/core/tx/columnshard/ya.make b/ydb/core/tx/columnshard/ya.make
index 22177da5be3..04443739ee0 100644
--- a/ydb/core/tx/columnshard/ya.make
+++ b/ydb/core/tx/columnshard/ya.make
@@ -11,7 +11,6 @@ SRCS(
columnshard__progress_tx.cpp
columnshard__propose_cancel.cpp
columnshard__propose_transaction.cpp
- columnshard__read.cpp
columnshard__read_base.cpp
columnshard__scan.cpp
columnshard__index_scan.cpp
@@ -26,7 +25,6 @@ SRCS(
columnshard_view.cpp
counters.cpp
defs.cpp
- read_actor.cpp
write_actor.cpp
tables_manager.cpp
tx_controller.cpp
diff --git a/ydb/public/api/grpc/draft/ydb_long_tx_v1.proto b/ydb/public/api/grpc/draft/ydb_long_tx_v1.proto
index 581ec581bdb..7f920b44af3 100644
--- a/ydb/public/api/grpc/draft/ydb_long_tx_v1.proto
+++ b/ydb/public/api/grpc/draft/ydb_long_tx_v1.proto
@@ -10,6 +10,5 @@ service LongTxService {
rpc CommitTx(CommitTransactionRequest) returns (CommitTransactionResponse);
rpc RollbackTx(RollbackTransactionRequest) returns (RollbackTransactionResponse);
rpc Write(WriteRequest) returns (WriteResponse);
- rpc Read(ReadRequest) returns (ReadResponse);
// rpc ResolveNodes(ResolveNodesRequest) returns (stream ResolveNodesResponse);
}
diff --git a/ydb/public/api/protos/draft/ydb_long_tx.proto b/ydb/public/api/protos/draft/ydb_long_tx.proto
index fd22ec6b13f..d5b658595fd 100644
--- a/ydb/public/api/protos/draft/ydb_long_tx.proto
+++ b/ydb/public/api/protos/draft/ydb_long_tx.proto
@@ -87,24 +87,3 @@ message WriteResult {
message WriteResponse {
Ydb.Operations.Operation operation = 1;
}
-
-message ReadRequest {
- Ydb.Operations.OperationParams operation_params = 1;
- string tx_id = 2;
- string path = 3;
- oneof query {
- string sql = 11;
- }
-}
-
-message ReadResult {
- string tx_id = 1;
- string path = 2;
- uint64 chunk = 3;
- bool finished = 4;
- Data data = 5;
-}
-
-message ReadResponse {
- Ydb.Operations.Operation operation = 1;
-}
diff --git a/ydb/public/sdk/cpp/client/draft/ydb_long_tx.cpp b/ydb/public/sdk/cpp/client/draft/ydb_long_tx.cpp
index 8368fb84971..06ef07b1113 100644
--- a/ydb/public/sdk/cpp/client/draft/ydb_long_tx.cpp
+++ b/ydb/public/sdk/cpp/client/draft/ydb_long_tx.cpp
@@ -77,19 +77,6 @@ public:
TRpcRequestSettings::Make(settings));
}
- TAsyncReadResult Read(const TString& txId, const TString& table,
- const TOpSettings& settings = TOpSettings()) {
- auto request = MakeOperationRequest<Ydb::LongTx::ReadRequest>(settings);
- request.set_tx_id(txId);
- request.set_path(table);
- // TODO: query
-
- return RunOperation<Ydb::LongTx::V1::LongTxService,
- Ydb::LongTx::ReadRequest, Ydb::LongTx::ReadResponse, TLongTxReadResult>(
- std::move(request),
- &Ydb::LongTx::V1::LongTxService::Stub::AsyncRead,
- TRpcRequestSettings::Make(settings));
- }
};
TClient::TClient(const TDriver& driver, const TClientSettings& settings)
@@ -100,10 +87,6 @@ TClient::TAsyncBeginTxResult TClient::BeginWriteTx() {
return Impl_->BeginTx(Ydb::LongTx::BeginTransactionRequest::WRITE);
}
-TClient::TAsyncBeginTxResult TClient::BeginReadTx() {
- return Impl_->BeginTx(Ydb::LongTx::BeginTransactionRequest::READ);
-}
-
TClient::TAsyncCommitTxResult TClient::CommitTx(const TString& txId) {
return Impl_->CommitTx(txId);
}
@@ -117,9 +100,6 @@ TClient::TAsyncWriteResult TClient::Write(const TString& txId, const TString& ta
return Impl_->Write(txId, table, dedupId, data, format);
}
-TClient::TAsyncReadResult TClient::Read(const TString& txId, const TString& table) {
- return Impl_->Read(txId, table);
-}
} // namespace NLongTx
} // namespace NYdb
diff --git a/ydb/public/sdk/cpp/client/draft/ydb_long_tx.h b/ydb/public/sdk/cpp/client/draft/ydb_long_tx.h
index 978a14527e8..e749d0348ab 100644
--- a/ydb/public/sdk/cpp/client/draft/ydb_long_tx.h
+++ b/ydb/public/sdk/cpp/client/draft/ydb_long_tx.h
@@ -75,23 +75,6 @@ public:
}
};
-class TLongTxReadResult : public TOperation {
-public:
- explicit TLongTxReadResult(TStatus&& status)
- : TOperation(std::move(status))
- {}
-
- TLongTxReadResult(TStatus&& status, Ydb::Operations::Operation&& operation)
- : TOperation(std::move(status), std::move(operation))
- {}
-
- Ydb::LongTx::ReadResult GetResult() {
- Ydb::LongTx::ReadResult result;
- GetProto().result().UnpackTo(&result);
- return result;
- }
-};
-
struct TClientSettings : public TCommonClientSettingsBase<TClientSettings> {
using TSelf = TClientSettings;
};
@@ -102,17 +85,14 @@ public:
using TAsyncCommitTxResult = NThreading::TFuture<TLongTxCommitResult>;
using TAsyncRollbackTxResult = NThreading::TFuture<TLongTxRollbackResult>;
using TAsyncWriteResult = NThreading::TFuture<TLongTxWriteResult>;
- using TAsyncReadResult = NThreading::TFuture<TLongTxReadResult>;
TClient(const TDriver& driver, const TClientSettings& settings = TClientSettings());
TAsyncBeginTxResult BeginWriteTx();
- TAsyncBeginTxResult BeginReadTx();
TAsyncCommitTxResult CommitTx(const TString& txId);
TAsyncRollbackTxResult RollbackTx(const TString& txId);
TAsyncWriteResult Write(const TString& txId, const TString& table, const TString& dedupId,
const TString& data, Ydb::LongTx::Data::Format format);
- TAsyncReadResult Read(const TString& txId, const TString& table);
private:
class TImpl;
diff --git a/ydb/services/ydb/ydb_long_tx.cpp b/ydb/services/ydb/ydb_long_tx.cpp
index 2656d22471c..643d95b858a 100644
--- a/ydb/services/ydb/ydb_long_tx.cpp
+++ b/ydb/services/ydb/ydb_long_tx.cpp
@@ -28,7 +28,6 @@ void TGRpcYdbLongTxService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) {
ADD_REQUEST(CommitTx, CommitTransaction, DoLongTxCommitRPC)
ADD_REQUEST(RollbackTx, RollbackTransaction, DoLongTxRollbackRPC)
ADD_REQUEST(Write, Write, DoLongTxWriteRPC)
- ADD_REQUEST(Read, Read, DoLongTxReadRPC)
#undef ADD_REQUEST
}
diff --git a/ydb/services/ydb/ydb_long_tx_ut.cpp b/ydb/services/ydb/ydb_long_tx_ut.cpp
index b46b933a12a..ac1015b491c 100644
--- a/ydb/services/ydb/ydb_long_tx_ut.cpp
+++ b/ydb/services/ydb/ydb_long_tx_ut.cpp
@@ -21,90 +21,6 @@ TString TestBlob() {
return NArrow::NSerialization::TFullDataSerializer(arrow::ipc::IpcWriteOptions::Defaults()).Serialize(batch);
}
-TVector<std::shared_ptr<arrow::RecordBatch>> SplitData(const TString& data, ui32 numBatches) {
- auto batch = NArrow::NSerialization::TFullDataDeserializer().Deserialize(data);
- UNIT_ASSERT(batch.ok());
-
- NSharding::TLogsSharding sharding(numBatches, { "timestamp", "uid" }, numBatches);
- std::vector<ui32> rowSharding = sharding.MakeSharding(*batch);
- Y_ABORT_UNLESS(rowSharding.size() == (size_t)batch->get()->num_rows());
-
- std::vector<std::shared_ptr<arrow::RecordBatch>> sharded = NArrow::ShardingSplit(*batch, rowSharding, numBatches);
- Y_ABORT_UNLESS(sharded.size() == numBatches);
-
- TVector<std::shared_ptr<arrow::RecordBatch>> out;
- for (size_t i = 0; i < numBatches; ++i) {
- if (sharded[i]) {
- Y_ABORT_UNLESS(sharded[i]->ValidateFull().ok());
- out.emplace_back(sharded[i]);
- }
- }
-
- return out;
-}
-
-bool EqualBatches(const TString& x, const TString& y) {
- auto schema = TTestOlap::ArrowSchema();
- std::shared_ptr<arrow::RecordBatch> batchX = NArrow::DeserializeBatch(x, schema);
- std::shared_ptr<arrow::RecordBatch> batchY = NArrow::DeserializeBatch(y, schema);
- Y_ABORT_UNLESS(batchX && batchY);
- if ((batchX->num_columns() != batchY->num_columns()) ||
- (batchX->num_rows() != batchY->num_rows())) {
- Cerr << __FILE__ << ':' << __LINE__ << " "
- << batchX->num_columns() << ':' << batchX->num_rows() << " vs "
- << batchY->num_columns() << ':' << batchY->num_rows() << "\n";
- return false;
- }
-
- for (auto& column : schema->field_names()) {
- auto filedX = batchX->schema()->GetFieldByName(column);
- auto filedY = batchY->schema()->GetFieldByName(column);
- Y_ABORT_UNLESS(filedX->type()->id() == filedY->type()->id());
-
- auto arrX = batchX->GetColumnByName(column);
- auto arrY = batchY->GetColumnByName(column);
-
- switch (filedX->type()->id()) {
- case arrow::Type::INT32:
- if (!NArrow::ArrayEqualValue<arrow::Int32Array>(arrX, arrY)) {
- Cerr << __FILE__ << ':' << __LINE__ << " " << column << "\n";
- return false;
- }
- break;
- case arrow::Type::UINT64:
- if (!NArrow::ArrayEqualValue<arrow::UInt64Array>(arrX, arrY)) {
- Cerr << __FILE__ << ':' << __LINE__ << " " << column << "\n";
- return false;
- }
- break;
- case arrow::Type::TIMESTAMP:
- if (!NArrow::ArrayEqualValue<arrow::TimestampArray>(arrX, arrY)) {
- Cerr << __FILE__ << ':' << __LINE__ << " " << column << "\n";
- return false;
- }
- break;
- case arrow::Type::BINARY:
- if (!NArrow::ArrayEqualView<arrow::BinaryArray>(arrX, arrY)) {
- Cerr << __FILE__ << ':' << __LINE__ << " " << column << "\n";
- return false;
- }
- break;
- case arrow::Type::STRING:
- if (!NArrow::ArrayEqualView<arrow::StringArray>(arrX, arrY)) {
- Cerr << __FILE__ << ':' << __LINE__ << " " << column << "\n";
- return false;
- }
- break;
-
- default:
- Cerr << __FILE__ << ':' << __LINE__ << "\n";
- return false;
- }
- }
-
- return true;
-}
-
}
@@ -164,166 +80,6 @@ Y_UNIT_TEST_SUITE(YdbLongTx) {
UNIT_ASSERT_VALUES_EQUAL(resRollbackTx.Status().GetStatus(), EStatus::SUCCESS);
}
- Y_UNIT_TEST(BeginRead) {
- NKikimrConfig::TAppConfig appConfig;
- TKikimrWithGrpcAndRootSchema server(appConfig);
- server.Server_->GetRuntime()->SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NActors::NLog::PRI_DEBUG);
-
- ui16 grpc = server.GetPort();
- TString location = TStringBuilder() << "localhost:" << grpc;
- auto connection = NYdb::TDriver(TDriverConfig().SetEndpoint(location));
-
- TTestOlap::CreateTable(*server.ServerSettings);
-
- NYdb::NLongTx::TClient client(connection);
-
- NLongTx::TLongTxBeginResult resBeginTx = client.BeginReadTx().GetValueSync();
- UNIT_ASSERT_VALUES_EQUAL(resBeginTx.Status().GetStatus(), EStatus::SUCCESS);
-
- auto txId = resBeginTx.GetResult().tx_id();
-
- NLongTx::TLongTxReadResult resRead = client.Read(txId, TestTablePath).GetValueSync();
- UNIT_ASSERT_VALUES_EQUAL(resRead.Status().GetStatus(), EStatus::SUCCESS);
- UNIT_ASSERT_VALUES_EQUAL(resRead.GetResult().data().data(), "");
- }
-
- Y_UNIT_TEST(WriteThenRead) {
- NKikimrConfig::TAppConfig appConfig;
- TKikimrWithGrpcAndRootSchema server(appConfig);
- server.Server_->GetRuntime()->SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NActors::NLog::PRI_DEBUG);
-
- ui16 grpc = server.GetPort();
- TString location = TStringBuilder() << "localhost:" << grpc;
- auto connection = NYdb::TDriver(TDriverConfig().SetEndpoint(location));
-
- TTestOlap::CreateTable(*server.ServerSettings);
-
- NYdb::NLongTx::TClient client(connection);
-
- // Read before write
- TString beforeWriteTxId;
- {
- NLongTx::TLongTxBeginResult resBeginTx = client.BeginReadTx().GetValueSync();
- UNIT_ASSERT_VALUES_EQUAL(resBeginTx.Status().GetStatus(), EStatus::SUCCESS);
-
- beforeWriteTxId = resBeginTx.GetResult().tx_id();
-
- NLongTx::TLongTxReadResult resRead = client.Read(beforeWriteTxId, TestTablePath).GetValueSync();
- UNIT_ASSERT_VALUES_EQUAL(resRead.Status().GetStatus(), EStatus::SUCCESS);
- UNIT_ASSERT_VALUES_EQUAL(resRead.GetResult().data().data(), "");
- }
-
- // Write
- auto batch = TTestOlap::SampleBatch(false, 10000);
- const TString data = NArrow::NSerialization::TFullDataSerializer(arrow::ipc::IpcWriteOptions::Defaults()).Serialize(batch);
-
- {
- NLongTx::TLongTxBeginResult resBeginTx = client.BeginWriteTx().GetValueSync();
- UNIT_ASSERT_VALUES_EQUAL(resBeginTx.Status().GetStatus(), EStatus::SUCCESS);
-
- auto txId = resBeginTx.GetResult().tx_id();
-
- NLongTx::TLongTxWriteResult resWrite =
- client.Write(txId, TestTablePath, "0", data, Ydb::LongTx::Data::APACHE_ARROW).GetValueSync();
- UNIT_ASSERT_VALUES_EQUAL(resWrite.Status().GetStatus(), EStatus::SUCCESS);
-
- NLongTx::TLongTxCommitResult resCommitTx = client.CommitTx(txId).GetValueSync();
- UNIT_ASSERT_VALUES_EQUAL(resCommitTx.Status().GetStatus(), EStatus::SUCCESS);
- }
-
- // Read after write
- auto sharded = SplitData(data, 2);
- UNIT_ASSERT_VALUES_EQUAL(sharded.size(), 2);
- UNIT_ASSERT_VALUES_EQUAL(sharded[0]->num_rows(), 4990);
- UNIT_ASSERT_VALUES_EQUAL(sharded[1]->num_rows(), 5010);
-
- TVector<TString> expected;
- for (auto batch : sharded) {
- expected.push_back(NArrow::SerializeBatchNoCompression(batch));
- }
-
- TVector<TString> returned;
- {
- NLongTx::TLongTxBeginResult resBeginTx = client.BeginReadTx().GetValueSync();
- UNIT_ASSERT_VALUES_EQUAL(resBeginTx.Status().GetStatus(), EStatus::SUCCESS);
-
- auto txId = resBeginTx.GetResult().tx_id();
-
- NLongTx::TLongTxReadResult resRead = client.Read(txId, TestTablePath).GetValueSync();
- UNIT_ASSERT_VALUES_EQUAL(resRead.Status().GetStatus(), EStatus::SUCCESS);
- returned.push_back(resRead.GetResult().data().data());
- // TODO: read both
-
- UNIT_ASSERT(EqualBatches(expected[0], returned[0]) ||
- EqualBatches(expected[1], returned[0]));
- }
-
- // Read before write again
- {
- NLongTx::TLongTxReadResult resRead = client.Read(beforeWriteTxId, TestTablePath).GetValueSync();
- UNIT_ASSERT_VALUES_EQUAL(resRead.Status().GetStatus(), EStatus::SUCCESS);
- UNIT_ASSERT_VALUES_EQUAL(resRead.GetResult().data().data(), "");
- }
- }
-
- Y_UNIT_TEST(ReadFutureSnapshot) {
- NKikimrConfig::TAppConfig appConfig;
- TKikimrWithGrpcAndRootSchema server(appConfig);
- server.Server_->GetRuntime()->SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NActors::NLog::PRI_DEBUG);
-
- ui16 grpc = server.GetPort();
- TString location = TStringBuilder() << "localhost:" << grpc;
- auto connection = NYdb::TDriver(TDriverConfig().SetEndpoint(location));
-
- TTestOlap::CreateTable(*server.ServerSettings, 1);
-
- NYdb::NLongTx::TClient client(connection);
-
- TString futureTxId;
- NYdb::NLongTx::TClient::TAsyncReadResult futureRead;
- {
- NLongTx::TLongTxBeginResult resBeginTx = client.BeginReadTx().GetValueSync();
- UNIT_ASSERT_VALUES_EQUAL(resBeginTx.Status().GetStatus(), EStatus::SUCCESS);
-
- TString txId = resBeginTx.GetResult().tx_id();
- NLongTxService::TLongTxId parsed;
- Y_ABORT_UNLESS(parsed.ParseString(txId));
- Y_ABORT_UNLESS(parsed.Snapshot.Step > 0);
- parsed.Snapshot.Step += 2000; // 2 seconds in the future
- futureTxId = parsed.ToString();
- // Cerr << "Future txId " << futureTxId << Endl;
-
- futureRead = client.Read(futureTxId, TestTablePath);
- }
-
- // Write
- TString data = TestBlob();
- {
- NLongTx::TLongTxBeginResult resBeginTx = client.BeginWriteTx().GetValueSync();
- UNIT_ASSERT_VALUES_EQUAL(resBeginTx.Status().GetStatus(), EStatus::SUCCESS);
-
- auto txId = resBeginTx.GetResult().tx_id();
-
- NLongTx::TLongTxWriteResult resWrite =
- client.Write(txId, TestTablePath, "0", data, Ydb::LongTx::Data::APACHE_ARROW).GetValueSync();
- UNIT_ASSERT_VALUES_EQUAL(resWrite.Status().GetStatus(), EStatus::SUCCESS);
-
- NLongTx::TLongTxCommitResult resCommitTx = client.CommitTx(txId).GetValueSync();
- UNIT_ASSERT_VALUES_EQUAL(resCommitTx.Status().GetStatus(), EStatus::SUCCESS);
- }
-
- // Await read
- {
- NLongTx::TLongTxReadResult resRead = futureRead.GetValueSync();
- UNIT_ASSERT_VALUES_EQUAL(resRead.Status().GetStatus(), EStatus::SUCCESS);
-
- auto inputBatch = NArrow::NSerialization::TFullDataDeserializer().Deserialize(data);
- UNIT_ASSERT(inputBatch.ok());
- auto readBatch = NArrow::NSerialization::TBatchPayloadDeserializer(inputBatch->get()->schema()).Deserialize(resRead.GetResult().data().data());
- UNIT_ASSERT(readBatch.ok());
- UNIT_ASSERT_VALUES_EQUAL(readBatch->get()->ToString(), inputBatch->get()->ToString());
- }
- }
Y_UNIT_TEST(WriteAclChecks) {
NKikimrConfig::TAppConfig appConfig;
@@ -388,59 +144,6 @@ Y_UNIT_TEST_SUITE(YdbLongTx) {
}
}
- Y_UNIT_TEST(ReadAclChecks) {
- NKikimrConfig::TAppConfig appConfig;
- appConfig.MutableDomainsConfig()->MutableSecurityConfig()->SetEnforceUserTokenRequirement(true);
- TKikimrWithGrpcAndRootSchema server(appConfig);
- server.Server_->GetRuntime()->SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NActors::NLog::PRI_DEBUG);
-
- ui16 grpc = server.GetPort();
- TString location = TStringBuilder() << "localhost:" << grpc;
- auto connection1 = NYdb::TDriver(TDriverConfig()
- .SetEndpoint(location)
- .SetDatabase("/Root")
- .SetAuthToken("user1@builtin"));
- auto connection2 = NYdb::TDriver(TDriverConfig()
- .SetEndpoint(location)
- .SetDatabase("/Root")
- .SetAuthToken("user2@builtin"));
-
- TTestOlap::CreateTable(*server.ServerSettings);
- {
- TClient annoyingClient(*server.ServerSettings);
- annoyingClient.SetSecurityToken("root@builtin");
- NACLib::TDiffACL diff;
- diff.AddAccess(NACLib::EAccessType::Allow, NACLib::SelectRow, "user1@builtin");
- annoyingClient.ModifyACL("/Root/OlapStore", "OlapTable", diff.SerializeAsString());
- }
-
- // try user1 first
- {
- NYdb::NLongTx::TClient client(connection1);
-
- NLongTx::TLongTxBeginResult resBeginTx = client.BeginReadTx().GetValueSync();
- UNIT_ASSERT_VALUES_EQUAL(resBeginTx.Status().GetStatus(), EStatus::SUCCESS);
-
- auto txId = resBeginTx.GetResult().tx_id();
-
- NLongTx::TLongTxReadResult resRead = client.Read(txId, TestTablePath).GetValueSync();
- UNIT_ASSERT_VALUES_EQUAL(resRead.Status().GetStatus(), EStatus::SUCCESS);
- }
-
- // try user2 next
- {
- NYdb::NLongTx::TClient client(connection2);
-
- NLongTx::TLongTxBeginResult resBeginTx = client.BeginReadTx().GetValueSync();
- UNIT_ASSERT_VALUES_EQUAL(resBeginTx.Status().GetStatus(), EStatus::SUCCESS);
-
- auto txId = resBeginTx.GetResult().tx_id();
-
- NLongTx::TLongTxReadResult resRead = client.Read(txId, TestTablePath).GetValueSync();
- UNIT_ASSERT_VALUES_EQUAL(resRead.Status().GetStatus(), EStatus::UNAUTHORIZED);
- }
- }
-
Y_UNIT_TEST(CreateOlapWithDirs) {
NKikimrConfig::TAppConfig appConfig;
TKikimrWithGrpcAndRootSchema server(appConfig);