aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authormakslitskevic <makslitskevic@yandex-team.ru>2022-02-10 16:52:26 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:52:26 +0300
commit5a74c2ac5f4c5d964faa540d40c987bbfe33d5bc (patch)
tree1ea638621502af556b37d9679fcd7bdc093a984b
parent3718fefbda804e07df6e62b770737577e10b957a (diff)
downloadydb-5a74c2ac5f4c5d964faa540d40c987bbfe33d5bc.tar.gz
Restoring authorship annotation for <makslitskevic@yandex-team.ru>. Commit 1 of 2.
-rw-r--r--ydb/core/blobstorage/backpressure/queue_backpressure_client.cpp4
-rw-r--r--ydb/core/blobstorage/testload/test_load_actor.cpp28
-rw-r--r--ydb/core/blobstorage/testload/test_load_actor.h6
-rw-r--r--ydb/core/blobstorage/testload/test_load_kqp.cpp340
-rw-r--r--ydb/core/blobstorage/testload/ya.make2
-rw-r--r--ydb/core/blobstorage/ut_blobstorage/incorrect_queries.cpp1032
-rw-r--r--ydb/core/blobstorage/ut_blobstorage/lib/common.h78
-rw-r--r--ydb/core/blobstorage/ut_blobstorage/patch.cpp16
-rw-r--r--ydb/core/blobstorage/vdisk/common/vdisk_events.h136
-rw-r--r--ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp64
-rw-r--r--ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp56
-rw-r--r--ydb/core/protos/blobstorage.proto30
12 files changed, 896 insertions, 896 deletions
diff --git a/ydb/core/blobstorage/backpressure/queue_backpressure_client.cpp b/ydb/core/blobstorage/backpressure/queue_backpressure_client.cpp
index 200b85f616..5e67fe114b 100644
--- a/ydb/core/blobstorage/backpressure/queue_backpressure_client.cpp
+++ b/ydb/core/blobstorage/backpressure/queue_backpressure_client.cpp
@@ -180,8 +180,8 @@ private:
////////////////////////////////////////////////////////////////////////
// FORWARD SECTOR
////////////////////////////////////////////////////////////////////////
-
-
+
+
template <typename P, typename T, typename R>
void HandleRequest(P &ev, const TActorContext &ctx) {
const auto& record = ev->Get()->Record;
diff --git a/ydb/core/blobstorage/testload/test_load_actor.cpp b/ydb/core/blobstorage/testload/test_load_actor.cpp
index c8788b1310..1ddd8b1082 100644
--- a/ydb/core/blobstorage/testload/test_load_actor.cpp
+++ b/ydb/core/blobstorage/testload/test_load_actor.cpp
@@ -187,26 +187,26 @@ public:
if (LoadActors.count(tag) != 0) {
ythrow TLoadActorException() << Sprintf("duplicate load actor with Tag# %" PRIu64, tag);
}
-
+
LOG_DEBUG_S(ctx, NKikimrServices::BS_LOAD_TEST, "Create new load actor with tag# " << tag);
LoadActors.emplace(tag, ctx.Register(CreateKeyValueWriterTestLoad(
cmd, ctx.SelfID, GetServiceCounters(Counters, "load_actor"), 0, tag)));
break;
}
- case NKikimrBlobStorage::TEvTestLoadRequest::CommandCase::kKqpLoadStart: {
- const auto& cmd = record.GetKqpLoadStart();
- const ui64 tag = GetOrGenerateTag(cmd);
- if (LoadActors.count(tag) != 0) {
- ythrow TLoadActorException() << Sprintf("duplicate load actor with Tag# %" PRIu64, tag);
- }
-
- LOG_DEBUG_S(ctx, NKikimrServices::BS_LOAD_TEST, "Create new Kqp load actor with tag# " << tag);
- LoadActors.emplace(tag, ctx.Register(CreateKqpWriterTestLoad(
- cmd, ctx.SelfID, GetServiceCounters(Counters, "load_actor"), 0, tag)));
- break;
- }
-
+ case NKikimrBlobStorage::TEvTestLoadRequest::CommandCase::kKqpLoadStart: {
+ const auto& cmd = record.GetKqpLoadStart();
+ const ui64 tag = GetOrGenerateTag(cmd);
+ if (LoadActors.count(tag) != 0) {
+ ythrow TLoadActorException() << Sprintf("duplicate load actor with Tag# %" PRIu64, tag);
+ }
+
+ LOG_DEBUG_S(ctx, NKikimrServices::BS_LOAD_TEST, "Create new Kqp load actor with tag# " << tag);
+ LoadActors.emplace(tag, ctx.Register(CreateKqpWriterTestLoad(
+ cmd, ctx.SelfID, GetServiceCounters(Counters, "load_actor"), 0, tag)));
+ break;
+ }
+
case NKikimrBlobStorage::TEvTestLoadRequest::CommandCase::kMemoryLoadStart: {
const auto& cmd = record.GetMemoryLoadStart();
const ui64 tag = GetOrGenerateTag(cmd);
diff --git a/ydb/core/blobstorage/testload/test_load_actor.h b/ydb/core/blobstorage/testload/test_load_actor.h
index 38b7fb3d30..f651bf3915 100644
--- a/ydb/core/blobstorage/testload/test_load_actor.h
+++ b/ydb/core/blobstorage/testload/test_load_actor.h
@@ -52,9 +52,9 @@ namespace NKikimr {
const NActors::TActorId& parent, const TIntrusivePtr<NMonitoring::TDynamicCounters>& counters,
ui64 index, ui64 tag);
- NActors::IActor *CreateKqpWriterTestLoad(const NKikimrBlobStorage::TEvTestLoadRequest::TKqpLoadStart& cmd,
- const NActors::TActorId& parent, const TIntrusivePtr<NMonitoring::TDynamicCounters>& counters,
- ui64 index, ui64 tag);
+ NActors::IActor *CreateKqpWriterTestLoad(const NKikimrBlobStorage::TEvTestLoadRequest::TKqpLoadStart& cmd,
+ const NActors::TActorId& parent, const TIntrusivePtr<NMonitoring::TDynamicCounters>& counters,
+ ui64 index, ui64 tag);
NActors::IActor *CreateMemoryTestLoad(const NKikimrBlobStorage::TEvTestLoadRequest::TMemoryLoadStart& cmd,
const NActors::TActorId& parent, const TIntrusivePtr<NMonitoring::TDynamicCounters>& counters,
diff --git a/ydb/core/blobstorage/testload/test_load_kqp.cpp b/ydb/core/blobstorage/testload/test_load_kqp.cpp
index fec4fd7b2c..12ba761fab 100644
--- a/ydb/core/blobstorage/testload/test_load_kqp.cpp
+++ b/ydb/core/blobstorage/testload/test_load_kqp.cpp
@@ -1,5 +1,5 @@
-#include "test_load_actor.h"
-
+#include "test_load_actor.h"
+
#include <ydb/core/base/counters.h>
#include <ydb/core/base/tablet_pipe.h>
#include <ydb/core/blobstorage/pdisk/blobstorage_pdisk.h>
@@ -12,105 +12,105 @@
#include <ydb/core/tx/schemeshard/schemeshard.h>
#include <ydb/core/tx/tx_proxy/proxy.h>
#include <ydb/core/ydb_convert/ydb_convert.h>
-
+
#include <ydb/library/workload/workload_factory.h>
#include <ydb/library/workload/stock_workload.h>
#include <ydb/public/lib/operation_id/operation_id.h>
#include <ydb/public/sdk/cpp/client/ydb_params/params.h>
#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
-
-#include <library/cpp/monlib/service/pages/templates.h>
-
-#include <util/generic/queue.h>
-#include <util/random/fast.h>
-#include <util/random/shuffle.h>
-
-
-namespace NKikimr {
-
-class TEvKqpWriterTestLoadActor;
-
-class TKqpWriterTestLoadActor : public TActorBootstrapped<TKqpWriterTestLoadActor> {
- struct TRequestInfo {
- ui32 Size;
- TInstant LogStartTime;
- };
-
- struct TRequestStat {
- ui64 BytesWrittenTotal;
- ui32 Size;
- TDuration Latency;
- };
-
- struct TLogWriteCookie {
- ui32 WorkerIdx;
- TInstant SentTime;
- ui64 Size;
- };
-
- ui64 Key;
- ui64 FirstKey;
- ui64 TotalRowsToUpsert;
- ui64 MaxInFlight;
- ui64 UniformPartitionsCount;
- ui32 NumOfSessions;
- TActorId Pipe;
- std::vector<TString> TxId;
-
- const TActorId Parent;
- ui64 Tag;
- ui32 DurationSeconds;
- ui32 StringValueSize;
- bool SequentialWrite;
- TString StringValue;
- TString WorkingDir;
+
+#include <library/cpp/monlib/service/pages/templates.h>
+
+#include <util/generic/queue.h>
+#include <util/random/fast.h>
+#include <util/random/shuffle.h>
+
+
+namespace NKikimr {
+
+class TEvKqpWriterTestLoadActor;
+
+class TKqpWriterTestLoadActor : public TActorBootstrapped<TKqpWriterTestLoadActor> {
+ struct TRequestInfo {
+ ui32 Size;
+ TInstant LogStartTime;
+ };
+
+ struct TRequestStat {
+ ui64 BytesWrittenTotal;
+ ui32 Size;
+ TDuration Latency;
+ };
+
+ struct TLogWriteCookie {
+ ui32 WorkerIdx;
+ TInstant SentTime;
+ ui64 Size;
+ };
+
+ ui64 Key;
+ ui64 FirstKey;
+ ui64 TotalRowsToUpsert;
+ ui64 MaxInFlight;
+ ui64 UniformPartitionsCount;
+ ui32 NumOfSessions;
+ TActorId Pipe;
+ std::vector<TString> TxId;
+
+ const TActorId Parent;
+ ui64 Tag;
+ ui32 DurationSeconds;
+ ui32 StringValueSize;
+ bool SequentialWrite;
+ TString StringValue;
+ TString WorkingDir;
size_t ProductCount;
size_t Quantity;
- bool DeleteTableOnFinish;
- std::vector<TString> preparedQuery;
+ bool DeleteTableOnFinish;
+ std::vector<TString> preparedQuery;
std::shared_ptr<NYdbWorkload::IWorkloadQueryGenerator> WorkloadQueryGen;
-
- TReallyFastRng32 Rng;
-
- std::unordered_map<TString, std::queue<TInstant>> SentTime;
-
- // Monitoring
- TIntrusivePtr<NMonitoring::TDynamicCounters> LoadCounters;
- NMonitoring::TDynamicCounters::TCounterPtr Transactions;
- NMonitoring::TDynamicCounters::TCounterPtr TransactionsBytesWritten;
- TInstant TestStartTime;
-
- TMap<ui64, TLogWriteCookie> InFlightWrites;
- NMonitoring::TPercentileTrackerLg<6, 5, 15> ResponseTimes;
- std::vector<TString> Sessions;
- TString PreparedSelectQuery;
-
-public:
+
+ TReallyFastRng32 Rng;
+
+ std::unordered_map<TString, std::queue<TInstant>> SentTime;
+
+ // Monitoring
+ TIntrusivePtr<NMonitoring::TDynamicCounters> LoadCounters;
+ NMonitoring::TDynamicCounters::TCounterPtr Transactions;
+ NMonitoring::TDynamicCounters::TCounterPtr TransactionsBytesWritten;
+ TInstant TestStartTime;
+
+ TMap<ui64, TLogWriteCookie> InFlightWrites;
+ NMonitoring::TPercentileTrackerLg<6, 5, 15> ResponseTimes;
+ std::vector<TString> Sessions;
+ TString PreparedSelectQuery;
+
+public:
static constexpr auto ActorActivityType() {
return NKikimrServices::TActivity::KQP_TEST_WORKLOAD;
- }
-
- TKqpWriterTestLoadActor(const NKikimrBlobStorage::TEvTestLoadRequest::TKqpLoadStart& cmd,
- const TActorId& parent, const TIntrusivePtr<NMonitoring::TDynamicCounters>& counters, ui64 index, ui64 tag)
- : Parent(parent)
- , Tag(tag)
- , Rng(Now().GetValue())
- {
- Y_UNUSED(index);
- VERIFY_PARAM(DurationSeconds);
- DurationSeconds = cmd.GetDurationSeconds();
- NumOfSessions = cmd.GetNumOfSessions();
- MaxInFlight = cmd.GetMaxInFlight();
- Key = cmd.GetFirstKey();
- FirstKey = Key;
- TotalRowsToUpsert = cmd.GetTotalRowsToUpsert();
- StringValueSize = cmd.GetStringValueSize();
- DeleteTableOnFinish = cmd.GetDeleteTableOnFinish();
- UniformPartitionsCount = cmd.GetUniformPartitionsCount();
- SequentialWrite = cmd.GetSequentialWrite();
- StringValue = TString(StringValueSize, 'a');
- WorkingDir = cmd.GetWorkingDir();
+ }
+
+ TKqpWriterTestLoadActor(const NKikimrBlobStorage::TEvTestLoadRequest::TKqpLoadStart& cmd,
+ const TActorId& parent, const TIntrusivePtr<NMonitoring::TDynamicCounters>& counters, ui64 index, ui64 tag)
+ : Parent(parent)
+ , Tag(tag)
+ , Rng(Now().GetValue())
+ {
+ Y_UNUSED(index);
+ VERIFY_PARAM(DurationSeconds);
+ DurationSeconds = cmd.GetDurationSeconds();
+ NumOfSessions = cmd.GetNumOfSessions();
+ MaxInFlight = cmd.GetMaxInFlight();
+ Key = cmd.GetFirstKey();
+ FirstKey = Key;
+ TotalRowsToUpsert = cmd.GetTotalRowsToUpsert();
+ StringValueSize = cmd.GetStringValueSize();
+ DeleteTableOnFinish = cmd.GetDeleteTableOnFinish();
+ UniformPartitionsCount = cmd.GetUniformPartitionsCount();
+ SequentialWrite = cmd.GetSequentialWrite();
+ StringValue = TString(StringValueSize, 'a');
+ WorkingDir = cmd.GetWorkingDir();
if (cmd.Workload_case() == NKikimrBlobStorage::TEvTestLoadRequest_TKqpLoadStart::WorkloadCase::kStock) {
ProductCount = cmd.GetStock().GetProductCount();
@@ -127,82 +127,82 @@ public:
NYdbWorkload::TWorkloadFactory factory;
WorkloadQueryGen = factory.GetWorkloadQueryGenerator(cmd.GetWorkloadName(), workloadParams);
Y_ASSERT(WorkloadQueryGen.get() != nullptr);
- Y_ASSERT(DurationSeconds > DelayBeforeMeasurements.Seconds());
-
- // Monitoring initialization
- TVector<float> percentiles {0.1f, 0.5f, 0.9f, 0.99f, 0.999f, 1.0f};
- LoadCounters = counters->GetSubgroup("tag", Sprintf("%" PRIu64, tag));
- Transactions = LoadCounters->GetCounter("Transactions", true);
- TransactionsBytesWritten = LoadCounters->GetCounter("TransactionsBytesWritten", true);
-
- ResponseTimes.Initialize(LoadCounters, "subsystem", "LoadActorLogWriteDuration", "Time in microseconds", percentiles);
- }
-
- ~TKqpWriterTestLoadActor() {
- LoadCounters->ResetCounters();
- }
-
-
- void Bootstrap(const TActorContext& ctx) {
+ Y_ASSERT(DurationSeconds > DelayBeforeMeasurements.Seconds());
+
+ // Monitoring initialization
+ TVector<float> percentiles {0.1f, 0.5f, 0.9f, 0.99f, 0.999f, 1.0f};
+ LoadCounters = counters->GetSubgroup("tag", Sprintf("%" PRIu64, tag));
+ Transactions = LoadCounters->GetCounter("Transactions", true);
+ TransactionsBytesWritten = LoadCounters->GetCounter("TransactionsBytesWritten", true);
+
+ ResponseTimes.Initialize(LoadCounters, "subsystem", "LoadActorLogWriteDuration", "Time in microseconds", percentiles);
+ }
+
+ ~TKqpWriterTestLoadActor() {
+ LoadCounters->ResetCounters();
+ }
+
+
+ void Bootstrap(const TActorContext& ctx) {
LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag
- << " TKqpWriterTestLoadActor Bootstrap called");
- Become(&TKqpWriterTestLoadActor::StateFunc);
+ << " TKqpWriterTestLoadActor Bootstrap called");
+ Become(&TKqpWriterTestLoadActor::StateFunc);
LOG_INFO_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " Schedule PoisonPill");
- ctx.Schedule(TDuration::Seconds(DurationSeconds), new TEvents::TEvPoisonPill);
- ctx.Schedule(TDuration::MilliSeconds(MonitoringUpdateCycleMs), new TEvUpdateMonitoring);
-
+ ctx.Schedule(TDuration::Seconds(DurationSeconds), new TEvents::TEvPoisonPill);
+ ctx.Schedule(TDuration::MilliSeconds(MonitoringUpdateCycleMs), new TEvUpdateMonitoring);
+
LOG_INFO_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " Bootstrap");
-
+
CreateSession(ctx);
- }
-
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- // Death management
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
-
- void HandlePoisonPill(const TActorContext& ctx) {
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // Death management
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+ void HandlePoisonPill(const TActorContext& ctx) {
LOG_INFO_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " HandlePoisonPill, "
- << "all workers is initialized, so starting death process");
- StartDeathProcess(ctx);
- }
-
- void StartDeathProcess(const TActorContext& ctx) {
+ << "all workers is initialized, so starting death process");
+ StartDeathProcess(ctx);
+ }
+
+ void StartDeathProcess(const TActorContext& ctx) {
LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag
- << " TKqpWriterTestLoadActor StartDeathProcess called");
- for(const auto& session : Sessions) {
+ << " TKqpWriterTestLoadActor StartDeathProcess called");
+ for(const auto& session : Sessions) {
auto request = std::make_unique<NKqp::TEvKqp::TEvCloseSessionRequest>();
- request->Record.MutableRequest()->SetSessionId(session);
+ request->Record.MutableRequest()->SetSessionId(session);
ctx.Send( new IEventHandle(NKqp::MakeKqpProxyID(1), SelfId(), request.release(),
- 0, /* via actor system */ true));
- }
-
- Become(&TKqpWriterTestLoadActor::StateEndOfWork);
- TIntrusivePtr<TLoadReport> Report(new TLoadReport());
- Report->Duration = TDuration::Seconds(DurationSeconds);
- ctx.Send(Parent, new TEvTestLoadFinished(Tag, Report, "OK called StartDeathProcess"));
- Die(ctx);
- }
-
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- // Monitoring
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
-
- void Handle(TEvUpdateMonitoring::TPtr& /*ev*/, const TActorContext& ctx) {
- ResponseTimes.Update();
- ctx.Schedule(TDuration::MilliSeconds(MonitoringUpdateCycleMs), new TEvUpdateMonitoring);
- }
-
- STRICT_STFUNC(StateFunc,
- CFunc(TEvents::TSystem::PoisonPill, HandlePoisonPill)
- HFunc(TEvUpdateMonitoring, Handle)
- )
-
- STRICT_STFUNC(StateEndOfWork,
- HFunc(TEvUpdateMonitoring, Handle)
- )
-
-private:
-
+ 0, /* via actor system */ true));
+ }
+
+ Become(&TKqpWriterTestLoadActor::StateEndOfWork);
+ TIntrusivePtr<TLoadReport> Report(new TLoadReport());
+ Report->Duration = TDuration::Seconds(DurationSeconds);
+ ctx.Send(Parent, new TEvTestLoadFinished(Tag, Report, "OK called StartDeathProcess"));
+ Die(ctx);
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // Monitoring
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+ void Handle(TEvUpdateMonitoring::TPtr& /*ev*/, const TActorContext& ctx) {
+ ResponseTimes.Update();
+ ctx.Schedule(TDuration::MilliSeconds(MonitoringUpdateCycleMs), new TEvUpdateMonitoring);
+ }
+
+ STRICT_STFUNC(StateFunc,
+ CFunc(TEvents::TSystem::PoisonPill, HandlePoisonPill)
+ HFunc(TEvUpdateMonitoring, Handle)
+ )
+
+ STRICT_STFUNC(StateEndOfWork,
+ HFunc(TEvUpdateMonitoring, Handle)
+ )
+
+private:
+
void CreateSession(const TActorContext& ctx) {
LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Creating session to run tables DDL.");
auto request = Ydb::Table::CreateSessionRequest();
@@ -227,8 +227,8 @@ private:
NKikimr::NRpcService::DoLocalRpcSameMailbox<TEvCreateSessionRequest>(
std::move(request), std::move(cb), WorkingDir, TString(), ctx
);
- }
-
+ }
+
void CreateShardedTable(const TActorContext& ctx, const TString& sessionId) {
LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Creating tables for workload.");
auto request = Ydb::Table::ExecuteSchemeQueryRequest();
@@ -251,13 +251,13 @@ private:
NKikimr::NRpcService::DoLocalRpcSameMailbox<TEvExecuteSchemeQueryRequest>(
std::move(request), std::move(cb), WorkingDir, TString(), ctx
);
- }
-
-};
-
-IActor * CreateKqpWriterTestLoad(const NKikimrBlobStorage::TEvTestLoadRequest::TKqpLoadStart& cmd,
- const TActorId& parent, const TIntrusivePtr<NMonitoring::TDynamicCounters>& counters, ui64 index, ui64 tag) {
- return new TKqpWriterTestLoadActor(cmd, parent, counters, index, tag);
-}
-
-} // NKikimr
+ }
+
+};
+
+IActor * CreateKqpWriterTestLoad(const NKikimrBlobStorage::TEvTestLoadRequest::TKqpLoadStart& cmd,
+ const TActorId& parent, const TIntrusivePtr<NMonitoring::TDynamicCounters>& counters, ui64 index, ui64 tag) {
+ return new TKqpWriterTestLoadActor(cmd, parent, counters, index, tag);
+}
+
+} // NKikimr
diff --git a/ydb/core/blobstorage/testload/ya.make b/ydb/core/blobstorage/testload/ya.make
index f1448fe240..b793193475 100644
--- a/ydb/core/blobstorage/testload/ya.make
+++ b/ydb/core/blobstorage/testload/ya.make
@@ -35,7 +35,7 @@ SRCS(
test_load_time_series.h
test_load_vdisk_write.cpp
test_load_write.cpp
- test_load_kqp.cpp
+ test_load_kqp.cpp
)
END()
diff --git a/ydb/core/blobstorage/ut_blobstorage/incorrect_queries.cpp b/ydb/core/blobstorage/ut_blobstorage/incorrect_queries.cpp
index 1e1bca2fe9..10d17f675d 100644
--- a/ydb/core/blobstorage/ut_blobstorage/incorrect_queries.cpp
+++ b/ydb/core/blobstorage/ut_blobstorage/incorrect_queries.cpp
@@ -1,558 +1,558 @@
#include <ydb/core/blobstorage/ut_blobstorage/lib/env.h>
#include <ydb/core/blobstorage/ut_blobstorage/lib/common.h>
-
+
#include <ydb/core/protos/blobstorage.pb.h>
#include <ydb/core/protos/base.pb.h>
#include <ydb/core/base/logoblob.h>
#include <ydb/core/protos/blobstorage.pb.h>
-
-Y_UNIT_TEST_SUITE(IncorrectQueries) {
+
+Y_UNIT_TEST_SUITE(IncorrectQueries) {
const TVector<TString> erasureTypes = {"none", "block-4-2", "mirror-3", "mirror-3of4", "mirror-3-dc"};
-
+
void SendPut(TEnvironmentSetup& env, TTestInfo& test,
- const TLogoBlobID& blobId, NKikimrProto::EReplyStatus status, ui32 blob_size, bool isEmptyObject = false, bool isEmptyMeta = false) {
- const TString data(blob_size, 'a');
+ const TLogoBlobID& blobId, NKikimrProto::EReplyStatus status, ui32 blob_size, bool isEmptyObject = false, bool isEmptyMeta = false) {
+ const TString data(blob_size, 'a');
std::unique_ptr<IEventBase> ev = std::make_unique<TEvBlobStorage::TEvVPut>(blobId, data, test.Info->GetVDiskInSubgroup(0, blobId.Hash()),
- false, nullptr, TInstant::Max(), NKikimrBlobStorage::AsyncBlob);
-
- if (isEmptyObject) {
- if (isEmptyMeta) {
+ false, nullptr, TInstant::Max(), NKikimrBlobStorage::AsyncBlob);
+
+ if (isEmptyObject) {
+ if (isEmptyMeta) {
ev = std::make_unique<TEvBlobStorage::TEvVPut>();
- } else {
- NKikimrBlobStorage::TEvVPut protoQuery;
+ } else {
+ NKikimrBlobStorage::TEvVPut protoQuery;
static_cast<TEvBlobStorage::TEvVPut*>(ev.get())->Record = protoQuery;
- }
- } else if (isEmptyMeta) {
+ }
+ } else if (isEmptyMeta) {
static_cast<TEvBlobStorage::TEvVPut*>(ev.get())->StripPayload();
- }
+ }
- env.WithQueueId(test.Info->GetVDiskInSubgroup(0, blobId.Hash()), NKikimrBlobStorage::EVDiskQueueId::PutTabletLog, [&](TActorId queueId) {
- test.Edge = test.Runtime->AllocateEdgeActor(queueId.NodeId(), __FILE__, __LINE__);
+ env.WithQueueId(test.Info->GetVDiskInSubgroup(0, blobId.Hash()), NKikimrBlobStorage::EVDiskQueueId::PutTabletLog, [&](TActorId queueId) {
+ test.Edge = test.Runtime->AllocateEdgeActor(queueId.NodeId(), __FILE__, __LINE__);
test.Runtime->Send(new IEventHandle(queueId, test.Edge, ev.release()), queueId.NodeId());
- auto handle = test.Runtime->WaitForEdgeActorEvent({test.Edge});
- UNIT_ASSERT_EQUAL(handle->Type, TEvBlobStorage::EvVPutResult);
- TEvBlobStorage::TEvVPutResult *putResult = handle->Get<TEvBlobStorage::TEvVPutResult>();
- UNIT_ASSERT_VALUES_EQUAL(putResult->Record.GetStatus(), status);
- });
- }
-
- void SendGet(TEnvironmentSetup& env, TTestInfo& test, const TVDiskID& vdiskId, const TLogoBlobID& blobId, const TString& part,
- NKikimrProto::EReplyStatus status = NKikimrProto::OK, bool isEmptyObject = false, bool isEmptyMeta = false) {
+ auto handle = test.Runtime->WaitForEdgeActorEvent({test.Edge});
+ UNIT_ASSERT_EQUAL(handle->Type, TEvBlobStorage::EvVPutResult);
+ TEvBlobStorage::TEvVPutResult *putResult = handle->Get<TEvBlobStorage::TEvVPutResult>();
+ UNIT_ASSERT_VALUES_EQUAL(putResult->Record.GetStatus(), status);
+ });
+ }
+
+ void SendGet(TEnvironmentSetup& env, TTestInfo& test, const TVDiskID& vdiskId, const TLogoBlobID& blobId, const TString& part,
+ NKikimrProto::EReplyStatus status = NKikimrProto::OK, bool isEmptyObject = false, bool isEmptyMeta = false) {
std::unique_ptr<IEventBase> ev = TEvBlobStorage::TEvVGet::CreateExtremeDataQuery(vdiskId,
- TInstant::Max(), NKikimrBlobStorage::EGetHandleClass::FastRead, TEvBlobStorage::TEvVGet::EFlags::None,
- Nothing(), {{blobId, 0u, ui32(part.size())}});
-
- if (isEmptyObject) {
- if (isEmptyMeta) {
+ TInstant::Max(), NKikimrBlobStorage::EGetHandleClass::FastRead, TEvBlobStorage::TEvVGet::EFlags::None,
+ Nothing(), {{blobId, 0u, ui32(part.size())}});
+
+ if (isEmptyObject) {
+ if (isEmptyMeta) {
ev = std::make_unique<TEvBlobStorage::TEvVGet>();
- } else {
- NKikimrBlobStorage::TEvVGet protoQuery;
+ } else {
+ NKikimrBlobStorage::TEvVGet protoQuery;
static_cast<TEvBlobStorage::TEvVGet*>(ev.get())->Record = protoQuery;
- }
- } else if (isEmptyMeta) {
+ }
+ } else if (isEmptyMeta) {
static_cast<TEvBlobStorage::TEvVGet*>(ev.get())->StripPayload();
- }
+ }
- env.WithQueueId(vdiskId, NKikimrBlobStorage::EVDiskQueueId::GetFastRead, [&](TActorId queueId) {
- test.Edge = test.Runtime->AllocateEdgeActor(queueId.NodeId(), __FILE__, __LINE__);
+ env.WithQueueId(vdiskId, NKikimrBlobStorage::EVDiskQueueId::GetFastRead, [&](TActorId queueId) {
+ test.Edge = test.Runtime->AllocateEdgeActor(queueId.NodeId(), __FILE__, __LINE__);
test.Runtime->Send(new IEventHandle(queueId, test.Edge, ev.release()), queueId.NodeId());
- auto r = test.Runtime->WaitForEdgeActorEvent({test.Edge});
-
- UNIT_ASSERT_EQUAL(r->Type, TEvBlobStorage::EvVGetResult);
- TEvBlobStorage::TEvVGetResult *getResult = r->Get<TEvBlobStorage::TEvVGetResult>();
- UNIT_ASSERT_VALUES_EQUAL(getResult->Record.GetStatus(), status);
- if (status == NKikimrProto::OK) {
- UNIT_ASSERT_VALUES_EQUAL(getResult->Record.GetResult(0).GetBuffer(), part);
- }
- });
- }
-
- struct TBlobInfo {
- TLogoBlobID BlobId;
- TString Data;
- NKikimrProto::EReplyStatus ExpectedStatus;
- };
-
- void SendEmptyMultiPut(TEnvironmentSetup& env, TTestInfo& test, const std::vector<TBlobInfo>& blobs, NKikimrProto::EReplyStatus status,
- NKikimrBlobStorage::TEvVMultiPut proto) {
-
+ auto r = test.Runtime->WaitForEdgeActorEvent({test.Edge});
+
+ UNIT_ASSERT_EQUAL(r->Type, TEvBlobStorage::EvVGetResult);
+ TEvBlobStorage::TEvVGetResult *getResult = r->Get<TEvBlobStorage::TEvVGetResult>();
+ UNIT_ASSERT_VALUES_EQUAL(getResult->Record.GetStatus(), status);
+ if (status == NKikimrProto::OK) {
+ UNIT_ASSERT_VALUES_EQUAL(getResult->Record.GetResult(0).GetBuffer(), part);
+ }
+ });
+ }
+
+ struct TBlobInfo {
+ TLogoBlobID BlobId;
+ TString Data;
+ NKikimrProto::EReplyStatus ExpectedStatus;
+ };
+
+ void SendEmptyMultiPut(TEnvironmentSetup& env, TTestInfo& test, const std::vector<TBlobInfo>& blobs, NKikimrProto::EReplyStatus status,
+ NKikimrBlobStorage::TEvVMultiPut proto) {
+
std::unique_ptr<IEventBase> ev(new TEvBlobStorage::TEvVMultiPut(test.Info->GetVDiskId(0),
TInstant::Max(), NKikimrBlobStorage::EPutHandleClass::TabletLog, false, nullptr));
-
- for(auto [blob, data, status] : blobs) {
+
+ for(auto [blob, data, status] : blobs) {
static_cast<TEvBlobStorage::TEvVMultiPut*>(ev.get())->AddVPut(blob, data, 0);
- }
-
+ }
+
static_cast<TEvBlobStorage::TEvVMultiPut*>(ev.get())->Record = proto;
- env.WithQueueId(test.Info->GetVDiskId(0), NKikimrBlobStorage::EVDiskQueueId::PutTabletLog, [&](TActorId queueId) {
- test.Edge = test.Runtime->AllocateEdgeActor(queueId.NodeId(), __FILE__, __LINE__);
+ env.WithQueueId(test.Info->GetVDiskId(0), NKikimrBlobStorage::EVDiskQueueId::PutTabletLog, [&](TActorId queueId) {
+ test.Edge = test.Runtime->AllocateEdgeActor(queueId.NodeId(), __FILE__, __LINE__);
test.Runtime->Send(new IEventHandle(queueId, test.Edge, ev.release()), queueId.NodeId());
- auto handle = test.Runtime->WaitForEdgeActorEvent({test.Edge});
- UNIT_ASSERT_EQUAL(handle->Type, TEvBlobStorage::EvVMultiPutResult);
- TEvBlobStorage::TEvVMultiPutResult *putResult = handle->Get<TEvBlobStorage::TEvVMultiPutResult>();
- UNIT_ASSERT_VALUES_EQUAL(putResult->Record.GetStatus(), status);
-
- auto results = putResult->Record.GetItems();
- for(int i = 0; i < results.size(); ++i) {
- UNIT_ASSERT_VALUES_EQUAL(results[i].GetStatus(), blobs[i].ExpectedStatus);
- }
- });
- }
-
- void SendMultiPut(TEnvironmentSetup& env, TTestInfo& test, NKikimrProto::EReplyStatus status, const std::vector<TBlobInfo>& blobs) {
+ auto handle = test.Runtime->WaitForEdgeActorEvent({test.Edge});
+ UNIT_ASSERT_EQUAL(handle->Type, TEvBlobStorage::EvVMultiPutResult);
+ TEvBlobStorage::TEvVMultiPutResult *putResult = handle->Get<TEvBlobStorage::TEvVMultiPutResult>();
+ UNIT_ASSERT_VALUES_EQUAL(putResult->Record.GetStatus(), status);
+
+ auto results = putResult->Record.GetItems();
+ for(int i = 0; i < results.size(); ++i) {
+ UNIT_ASSERT_VALUES_EQUAL(results[i].GetStatus(), blobs[i].ExpectedStatus);
+ }
+ });
+ }
+
+ void SendMultiPut(TEnvironmentSetup& env, TTestInfo& test, NKikimrProto::EReplyStatus status, const std::vector<TBlobInfo>& blobs) {
std::unique_ptr<IEventBase> ev(new TEvBlobStorage::TEvVMultiPut(test.Info->GetVDiskInSubgroup(0, blobs[0].BlobId.Hash()),
- TInstant::Max(), NKikimrBlobStorage::EPutHandleClass::TabletLog, false, nullptr));
-
- for(auto [blob, data, status] : blobs) {
+ TInstant::Max(), NKikimrBlobStorage::EPutHandleClass::TabletLog, false, nullptr));
+
+ for(auto [blob, data, status] : blobs) {
static_cast<TEvBlobStorage::TEvVMultiPut*>(ev.get())->AddVPut(blob, data, 0);
- }
+ }
- env.WithQueueId(test.Info->GetVDiskInSubgroup(0, blobs[0].BlobId.Hash()), NKikimrBlobStorage::EVDiskQueueId::PutTabletLog, [&](TActorId queueId) {
- test.Edge = test.Runtime->AllocateEdgeActor(queueId.NodeId(), __FILE__, __LINE__);
+ env.WithQueueId(test.Info->GetVDiskInSubgroup(0, blobs[0].BlobId.Hash()), NKikimrBlobStorage::EVDiskQueueId::PutTabletLog, [&](TActorId queueId) {
+ test.Edge = test.Runtime->AllocateEdgeActor(queueId.NodeId(), __FILE__, __LINE__);
test.Runtime->Send(new IEventHandle(queueId, test.Edge, ev.release()), queueId.NodeId());
- auto handle = test.Runtime->WaitForEdgeActorEvent({test.Edge});
- UNIT_ASSERT_EQUAL(handle->Type, TEvBlobStorage::EvVMultiPutResult);
- TEvBlobStorage::TEvVMultiPutResult *putResult = handle->Get<TEvBlobStorage::TEvVMultiPutResult>();
- UNIT_ASSERT_VALUES_EQUAL(putResult->Record.GetStatus(), status);
-
- auto results = putResult->Record.GetItems();
- for(size_t i = 0; i < blobs.size(); ++i) {
- UNIT_ASSERT_VALUES_EQUAL(results[i].GetStatus(), blobs[i].ExpectedStatus);
- }
- });
- }
-
- void MakeCrcTest(const TString& erasure, ui64 crc) {
- TEnvironmentSetup env(true, GetErasureTypeByString(erasure));
- env.SetupLogging();
- TTestInfo test = InitTest(env);
-
- NKikimrProto::TLogoBlobID pBlobId;
- auto blobId = LogoBlobIDFromLogoBlobID(pBlobId);
- SendPut(env,test, blobId, NKikimrProto::ERROR, 0);
-
- pBlobId.set_rawx1(0);
- pBlobId.set_rawx2(0);
- pBlobId.set_rawx3(crc);
-
- blobId = LogoBlobIDFromLogoBlobID(pBlobId);
- SendPut(env,test, blobId, NKikimrProto::ERROR, 0);
- }
-
- Y_UNIT_TEST(InvalidPartID) {
- for(const auto& erasure : erasureTypes) {
- TEnvironmentSetup env(true, GetErasureTypeByString(erasure));
- TTestInfo test = InitTest(env);
-
- constexpr ui32 size = 100;
- TLogoBlobID blobId(1, 1, 0, 0, size, 0, 13);
- SendPut(env, test, blobId, NKikimrProto::ERROR, size);
- }
- }
-
- Y_UNIT_TEST(VeryBigBlob) {
- TEnvironmentSetup env(true, GetErasureTypeByString("none"));
- TTestInfo test = InitTest(env);
-
- constexpr ui32 size = (10 << 21);
- TLogoBlobID blobId(1, 1, 0, 0, size, 0, 1);
-
- SendPut(env, test, blobId, NKikimrProto::ERROR, size);
- }
-
- Y_UNIT_TEST(Incompatible) {
- TEnvironmentSetup env(true, GetErasureTypeByString("none"));
- TTestInfo test = InitTest(env);
-
- constexpr ui32 size = 100;
- TLogoBlobID blobId(1, 1, 0, 0, size, 0, 1);
-
- SendPut(env, test, blobId, NKikimrProto::ERROR, size - 42);
- SendPut(env, test, blobId, NKikimrProto::ERROR, size + 42);
- SendPut(env, test, blobId, NKikimrProto::ERROR, 0);
-
-
- SendPut(env, test, blobId, NKikimrProto::ERROR, size - 42, true);
- SendPut(env, test, blobId, NKikimrProto::ERROR, size + 42, true);
- SendPut(env, test, blobId, NKikimrProto::ERROR, 0, true);
-
- return;
-
- SendPut(env, test, blobId, NKikimrProto::ERROR, size - 42, false, true);
- SendPut(env, test, blobId, NKikimrProto::ERROR, size + 42, false, true);
- SendPut(env, test, blobId, NKikimrProto::ERROR, 0, false, true);
-
- SendPut(env, test, blobId, NKikimrProto::ERROR, size - 42, true, true);
- SendPut(env, test, blobId, NKikimrProto::ERROR, size + 42, true, true);
- SendPut(env, test, blobId, NKikimrProto::ERROR, 0, true, true);
- }
-
- Y_UNIT_TEST(Proto) {
- for(const auto& erasure : erasureTypes) {
- TEnvironmentSetup env(true, GetErasureTypeByString(erasure));
- TTestInfo test = InitTest(env);
-
- NKikimrProto::TLogoBlobID protoBlobId;
-
- auto blobId = LogoBlobIDFromLogoBlobID(protoBlobId);
- SendPut(env, test, blobId, NKikimrProto::ERROR, 0);
- SendPut(env, test, blobId, NKikimrProto::ERROR, 42);
-
- protoBlobId.set_rawx1(0xABC);
- protoBlobId.set_rawx2(0xFFFFFF);
- protoBlobId.set_rawx3(0xFF);
- blobId = LogoBlobIDFromLogoBlobID(protoBlobId);
- SendPut(env, test, blobId, NKikimrProto::ERROR, 42);
-
- protoBlobId.clear_rawx1();
- blobId = LogoBlobIDFromLogoBlobID(protoBlobId);
- SendPut(env, test, blobId, NKikimrProto::ERROR, 42);
- }
- }
-
- Y_UNIT_TEST(BaseReadingTest) {
- TEnvironmentSetup env(true, GetErasureTypeByString("none"));
- TTestInfo test = InitTest(env);
-
- constexpr ui32 size = 10;
- TLogoBlobID blobId(1, 1, 0, 0, size, 0, 1);
- SendPut(env, test, blobId, NKikimrProto::OK, size);
- const TString data(size, 'a');
- auto vdiskId = test.Info->GetVDiskId(0);
- SendGet(env, test, vdiskId, blobId, data);
- }
-
- Y_UNIT_TEST(EmptyGetTest) {
- return;
- TEnvironmentSetup env(true, GetErasureTypeByString("none"));
- TTestInfo test = InitTest(env);
-
- constexpr ui32 size = 10;
- TLogoBlobID blobId(1, 1, 0, 0, size, 0, 1);
- SendPut(env, test, blobId, NKikimrProto::OK, size);
- const TString data(size, 'a');
- auto vdiskId = test.Info->GetVDiskId(0);
-
- SendGet(env, test, vdiskId, blobId, data, NKikimrProto::ERROR, true);
- SendGet(env, test, vdiskId, blobId, data, NKikimrProto::OK, false, true);
- SendGet(env, test, vdiskId, blobId, data, NKikimrProto::ERROR, true, true);
- }
-
- Y_UNIT_TEST(WrongDataSize) {
- TEnvironmentSetup env(true, GetErasureTypeByString("none"));
- TTestInfo test = InitTest(env);
-
- constexpr ui32 size = 100;
- TLogoBlobID blobId(1, 1, 0, 0, size, 0, 1);
- SendPut(env, test, blobId, NKikimrProto::ERROR, size - 1);
- auto vdiskId = test.Info->GetVDiskInSubgroup(0, blobId.Hash());
- SendGet(env, test, vdiskId, blobId, "", NKikimrProto::OK);
- }
-
- Y_UNIT_TEST(WrongVDiskID) {
- TEnvironmentSetup env(true, GetErasureTypeByString("block-4-2"));
- TTestInfo test = InitTest(env);
-
- constexpr ui32 size = 100;
- TLogoBlobID blobId(1, 1, 0, 0, size, 0, 1);
- SendPut(env, test, blobId, NKikimrProto::OK, 32);
-
- auto vdiskId = test.Info->GetVDiskInSubgroup(1, blobId.Hash());
- SendGet(env, test, vdiskId, blobId, "", NKikimrProto::OK);
- }
-
- Y_UNIT_TEST(ProtoBlobGet) {
- TEnvironmentSetup env(true, GetErasureTypeByString("none"));
- TTestInfo test = InitTest(env);
-
- NKikimrProto::TLogoBlobID protoBlobId;
- protoBlobId.set_rawx2(std::numeric_limits<uint64_t>::max());
- protoBlobId.set_rawx3(17);
-
- TLogoBlobID blobId = LogoBlobIDFromLogoBlobID(protoBlobId);
- SendPut(env, test, blobId, NKikimrProto::OK, 1);
- auto vdiskId = test.Info->GetVDiskInSubgroup(0, blobId.Hash());
- SendGet(env, test, vdiskId, blobId, "a");
- }
-
- Y_UNIT_TEST(ProtoQueryGet) {
- return;
- TEnvironmentSetup env(true, GetErasureTypeByString("none"));
- TTestInfo test = InitTest(env);
-
- constexpr ui32 size = 100;
- TLogoBlobID blobId(1, 1, 0, 0, size, 0, 1);
- SendPut(env, test, blobId, NKikimrProto::OK, size);
- const TString data(size, 'a');
- auto vdiskId = test.Info->GetVDiskId(0);
- SendGet(env, test, vdiskId, blobId, data, NKikimrProto::ERROR, true);
- }
-
-
- Y_UNIT_TEST(WrongPartId) {
- TEnvironmentSetup env(true, GetErasureTypeByString("block-4-2"));
- TTestInfo test = InitTest(env);
-
- constexpr ui32 size = 100;
- TLogoBlobID blobId(1, 1, 0, 0, size, 0, 2);
- SendPut(env, test, blobId, NKikimrProto::ERROR, 32);
-
- auto vdiskId = test.Info->GetVDiskInSubgroup(0, blobId.Hash());
- SendGet(env, test, vdiskId, blobId, "");
+ auto handle = test.Runtime->WaitForEdgeActorEvent({test.Edge});
+ UNIT_ASSERT_EQUAL(handle->Type, TEvBlobStorage::EvVMultiPutResult);
+ TEvBlobStorage::TEvVMultiPutResult *putResult = handle->Get<TEvBlobStorage::TEvVMultiPutResult>();
+ UNIT_ASSERT_VALUES_EQUAL(putResult->Record.GetStatus(), status);
+
+ auto results = putResult->Record.GetItems();
+ for(size_t i = 0; i < blobs.size(); ++i) {
+ UNIT_ASSERT_VALUES_EQUAL(results[i].GetStatus(), blobs[i].ExpectedStatus);
+ }
+ });
+ }
+
+ void MakeCrcTest(const TString& erasure, ui64 crc) {
+ TEnvironmentSetup env(true, GetErasureTypeByString(erasure));
+ env.SetupLogging();
+ TTestInfo test = InitTest(env);
+
+ NKikimrProto::TLogoBlobID pBlobId;
+ auto blobId = LogoBlobIDFromLogoBlobID(pBlobId);
+ SendPut(env,test, blobId, NKikimrProto::ERROR, 0);
+
+ pBlobId.set_rawx1(0);
+ pBlobId.set_rawx2(0);
+ pBlobId.set_rawx3(crc);
+
+ blobId = LogoBlobIDFromLogoBlobID(pBlobId);
+ SendPut(env,test, blobId, NKikimrProto::ERROR, 0);
+ }
+
+ Y_UNIT_TEST(InvalidPartID) {
+ for(const auto& erasure : erasureTypes) {
+ TEnvironmentSetup env(true, GetErasureTypeByString(erasure));
+ TTestInfo test = InitTest(env);
+
+ constexpr ui32 size = 100;
+ TLogoBlobID blobId(1, 1, 0, 0, size, 0, 13);
+ SendPut(env, test, blobId, NKikimrProto::ERROR, size);
+ }
}
-
- Y_UNIT_TEST(EmptyTest) {
- TEnvironmentSetup env(true, GetErasureTypeByString("none"));
- TTestInfo test = InitTest(env);
-
- constexpr ui32 size = 0;
- TLogoBlobID blobId(1, 1, 0, 0, size, 0, 1);
- SendPut(env, test, blobId, NKikimrProto::OK, size);
- const TString data("");
- auto vdiskId = test.Info->GetVDiskId(0);
- SendGet(env, test, vdiskId, blobId, data);
+
+ Y_UNIT_TEST(VeryBigBlob) {
+ TEnvironmentSetup env(true, GetErasureTypeByString("none"));
+ TTestInfo test = InitTest(env);
+
+ constexpr ui32 size = (10 << 21);
+ TLogoBlobID blobId(1, 1, 0, 0, size, 0, 1);
+
+ SendPut(env, test, blobId, NKikimrProto::ERROR, size);
+ }
+
+ Y_UNIT_TEST(Incompatible) {
+ TEnvironmentSetup env(true, GetErasureTypeByString("none"));
+ TTestInfo test = InitTest(env);
+
+ constexpr ui32 size = 100;
+ TLogoBlobID blobId(1, 1, 0, 0, size, 0, 1);
+
+ SendPut(env, test, blobId, NKikimrProto::ERROR, size - 42);
+ SendPut(env, test, blobId, NKikimrProto::ERROR, size + 42);
+ SendPut(env, test, blobId, NKikimrProto::ERROR, 0);
+
+
+ SendPut(env, test, blobId, NKikimrProto::ERROR, size - 42, true);
+ SendPut(env, test, blobId, NKikimrProto::ERROR, size + 42, true);
+ SendPut(env, test, blobId, NKikimrProto::ERROR, 0, true);
+
+ return;
+
+ SendPut(env, test, blobId, NKikimrProto::ERROR, size - 42, false, true);
+ SendPut(env, test, blobId, NKikimrProto::ERROR, size + 42, false, true);
+ SendPut(env, test, blobId, NKikimrProto::ERROR, 0, false, true);
+
+ SendPut(env, test, blobId, NKikimrProto::ERROR, size - 42, true, true);
+ SendPut(env, test, blobId, NKikimrProto::ERROR, size + 42, true, true);
+ SendPut(env, test, blobId, NKikimrProto::ERROR, 0, true, true);
}
-
-
- Y_UNIT_TEST(ProtobufBlob) {
- TEnvironmentSetup env(true, GetErasureTypeByString("none"));
- TTestInfo test = InitTest(env);
-
- NKikimrProto::TLogoBlobID pBlobId;
- auto blobId = LogoBlobIDFromLogoBlobID(pBlobId);
- SendPut(env,test, blobId, NKikimrProto::ERROR, 0);
-
- pBlobId.set_rawx1(std::numeric_limits<ui64>::max());
- pBlobId.set_rawx2(std::numeric_limits<ui64>::max());
- pBlobId.set_rawx3(std::numeric_limits<ui64>::max());
-
- blobId = LogoBlobIDFromLogoBlobID(pBlobId);
- SendPut(env,test, blobId, NKikimrProto::ERROR, 0);
- }
-
- Y_UNIT_TEST(SameBlob) {
- TEnvironmentSetup env(true, GetErasureTypeByString("none"));
- TTestInfo test = InitTest(env);
-
- constexpr ui32 size = 10;
- TLogoBlobID BlobId(1, 1, 0, 0, size, 0, 1);
- TLogoBlobID BlobId2(1, 1, 0, 0, size+100, 0, 1);
- SendPut(env, test, BlobId, NKikimrProto::OK, size);
- SendPut(env, test, BlobId2, NKikimrProto::OK, size+100);
- const TString data(size - 1, 'a');
- auto vdiskId = test.Info->GetVDiskId(0);
- SendGet(env, test, vdiskId, BlobId, data);
- SendGet(env, test, vdiskId, BlobId2, data);
-
- }
-
- Y_UNIT_TEST(WrongCrc) {
- for(const auto& erasure : erasureTypes) {
- MakeCrcTest(erasure, (1ull << 31) + 1);
- MakeCrcTest(erasure, (1ull << 30) + (1ull << 31) + 1);
+
+ Y_UNIT_TEST(Proto) {
+ for(const auto& erasure : erasureTypes) {
+ TEnvironmentSetup env(true, GetErasureTypeByString(erasure));
+ TTestInfo test = InitTest(env);
+
+ NKikimrProto::TLogoBlobID protoBlobId;
+
+ auto blobId = LogoBlobIDFromLogoBlobID(protoBlobId);
+ SendPut(env, test, blobId, NKikimrProto::ERROR, 0);
+ SendPut(env, test, blobId, NKikimrProto::ERROR, 42);
+
+ protoBlobId.set_rawx1(0xABC);
+ protoBlobId.set_rawx2(0xFFFFFF);
+ protoBlobId.set_rawx3(0xFF);
+ blobId = LogoBlobIDFromLogoBlobID(protoBlobId);
+ SendPut(env, test, blobId, NKikimrProto::ERROR, 42);
+
+ protoBlobId.clear_rawx1();
+ blobId = LogoBlobIDFromLogoBlobID(protoBlobId);
+ SendPut(env, test, blobId, NKikimrProto::ERROR, 42);
}
- }
-
-
- Y_UNIT_TEST(BasePutTest) {
- TEnvironmentSetup env(true, GetErasureTypeByString("none"));
- TTestInfo test = InitTest(env);
-
- constexpr ui32 size = 10;
- TLogoBlobID blobId(1, 1, 0, 0, size, 0, 1);
- SendPut(env, test, blobId, NKikimrProto::OK, size);
- }
-
- Y_UNIT_TEST(MultiPutBaseTest) {
- TEnvironmentSetup env(true, GetErasureTypeByString("none"));
- TTestInfo test = InitTest(env);
- constexpr ui32 size = 1000;
- TLogoBlobID blobId(1, 1, 0, 0, size, 0, 1);
- TString data(size, 'a');
- std::vector<TBlobInfo> blobs;
- blobs.push_back({blobId, data, NKikimrProto::OK});
- SendMultiPut(env, test, NKikimrProto::OK, blobs);
- }
-
- Y_UNIT_TEST(MultiPutCrcTest) {
- TEnvironmentSetup env(true, GetErasureTypeByString("none"));
- TTestInfo test = InitTest(env);
-
- NKikimrProto::TLogoBlobID pBlobId;
- auto blobId = LogoBlobIDFromLogoBlobID(pBlobId);
-
- std::vector<TBlobInfo> blobs;
- blobs.push_back({blobId, "", NKikimrProto::ERROR});
-
- SendMultiPut(env, test, NKikimrProto::OK, blobs);
-
- pBlobId.set_rawx1(0);
- pBlobId.set_rawx2(0);
- pBlobId.set_rawx3((1ull << 30) + (1ull << 31) + 1);
-
+ }
+
+ Y_UNIT_TEST(BaseReadingTest) {
+ TEnvironmentSetup env(true, GetErasureTypeByString("none"));
+ TTestInfo test = InitTest(env);
+
+ constexpr ui32 size = 10;
+ TLogoBlobID blobId(1, 1, 0, 0, size, 0, 1);
+ SendPut(env, test, blobId, NKikimrProto::OK, size);
+ const TString data(size, 'a');
+ auto vdiskId = test.Info->GetVDiskId(0);
+ SendGet(env, test, vdiskId, blobId, data);
+ }
+
+ Y_UNIT_TEST(EmptyGetTest) {
+ return;
+ TEnvironmentSetup env(true, GetErasureTypeByString("none"));
+ TTestInfo test = InitTest(env);
+
+ constexpr ui32 size = 10;
+ TLogoBlobID blobId(1, 1, 0, 0, size, 0, 1);
+ SendPut(env, test, blobId, NKikimrProto::OK, size);
+ const TString data(size, 'a');
+ auto vdiskId = test.Info->GetVDiskId(0);
+
+ SendGet(env, test, vdiskId, blobId, data, NKikimrProto::ERROR, true);
+ SendGet(env, test, vdiskId, blobId, data, NKikimrProto::OK, false, true);
+ SendGet(env, test, vdiskId, blobId, data, NKikimrProto::ERROR, true, true);
+ }
+
+ Y_UNIT_TEST(WrongDataSize) {
+ TEnvironmentSetup env(true, GetErasureTypeByString("none"));
+ TTestInfo test = InitTest(env);
+
+ constexpr ui32 size = 100;
+ TLogoBlobID blobId(1, 1, 0, 0, size, 0, 1);
+ SendPut(env, test, blobId, NKikimrProto::ERROR, size - 1);
+ auto vdiskId = test.Info->GetVDiskInSubgroup(0, blobId.Hash());
+ SendGet(env, test, vdiskId, blobId, "", NKikimrProto::OK);
+ }
+
+ Y_UNIT_TEST(WrongVDiskID) {
+ TEnvironmentSetup env(true, GetErasureTypeByString("block-4-2"));
+ TTestInfo test = InitTest(env);
+
+ constexpr ui32 size = 100;
+ TLogoBlobID blobId(1, 1, 0, 0, size, 0, 1);
+ SendPut(env, test, blobId, NKikimrProto::OK, 32);
+
+ auto vdiskId = test.Info->GetVDiskInSubgroup(1, blobId.Hash());
+ SendGet(env, test, vdiskId, blobId, "", NKikimrProto::OK);
+ }
+
+ Y_UNIT_TEST(ProtoBlobGet) {
+ TEnvironmentSetup env(true, GetErasureTypeByString("none"));
+ TTestInfo test = InitTest(env);
+
+ NKikimrProto::TLogoBlobID protoBlobId;
+ protoBlobId.set_rawx2(std::numeric_limits<uint64_t>::max());
+ protoBlobId.set_rawx3(17);
+
+ TLogoBlobID blobId = LogoBlobIDFromLogoBlobID(protoBlobId);
+ SendPut(env, test, blobId, NKikimrProto::OK, 1);
+ auto vdiskId = test.Info->GetVDiskInSubgroup(0, blobId.Hash());
+ SendGet(env, test, vdiskId, blobId, "a");
+ }
+
+ Y_UNIT_TEST(ProtoQueryGet) {
+ return;
+ TEnvironmentSetup env(true, GetErasureTypeByString("none"));
+ TTestInfo test = InitTest(env);
+
+ constexpr ui32 size = 100;
+ TLogoBlobID blobId(1, 1, 0, 0, size, 0, 1);
+ SendPut(env, test, blobId, NKikimrProto::OK, size);
+ const TString data(size, 'a');
+ auto vdiskId = test.Info->GetVDiskId(0);
+ SendGet(env, test, vdiskId, blobId, data, NKikimrProto::ERROR, true);
+ }
+
+
+ Y_UNIT_TEST(WrongPartId) {
+ TEnvironmentSetup env(true, GetErasureTypeByString("block-4-2"));
+ TTestInfo test = InitTest(env);
+
+ constexpr ui32 size = 100;
+ TLogoBlobID blobId(1, 1, 0, 0, size, 0, 2);
+ SendPut(env, test, blobId, NKikimrProto::ERROR, 32);
+
+ auto vdiskId = test.Info->GetVDiskInSubgroup(0, blobId.Hash());
+ SendGet(env, test, vdiskId, blobId, "");
+ }
+
+ Y_UNIT_TEST(EmptyTest) {
+ TEnvironmentSetup env(true, GetErasureTypeByString("none"));
+ TTestInfo test = InitTest(env);
+
+ constexpr ui32 size = 0;
+ TLogoBlobID blobId(1, 1, 0, 0, size, 0, 1);
+ SendPut(env, test, blobId, NKikimrProto::OK, size);
+ const TString data("");
+ auto vdiskId = test.Info->GetVDiskId(0);
+ SendGet(env, test, vdiskId, blobId, data);
+ }
+
+
+ Y_UNIT_TEST(ProtobufBlob) {
+ TEnvironmentSetup env(true, GetErasureTypeByString("none"));
+ TTestInfo test = InitTest(env);
+
+ NKikimrProto::TLogoBlobID pBlobId;
+ auto blobId = LogoBlobIDFromLogoBlobID(pBlobId);
+ SendPut(env,test, blobId, NKikimrProto::ERROR, 0);
+
+ pBlobId.set_rawx1(std::numeric_limits<ui64>::max());
+ pBlobId.set_rawx2(std::numeric_limits<ui64>::max());
+ pBlobId.set_rawx3(std::numeric_limits<ui64>::max());
+
+ blobId = LogoBlobIDFromLogoBlobID(pBlobId);
+ SendPut(env,test, blobId, NKikimrProto::ERROR, 0);
+ }
+
+ Y_UNIT_TEST(SameBlob) {
+ TEnvironmentSetup env(true, GetErasureTypeByString("none"));
+ TTestInfo test = InitTest(env);
+
+ constexpr ui32 size = 10;
+ TLogoBlobID BlobId(1, 1, 0, 0, size, 0, 1);
+ TLogoBlobID BlobId2(1, 1, 0, 0, size+100, 0, 1);
+ SendPut(env, test, BlobId, NKikimrProto::OK, size);
+ SendPut(env, test, BlobId2, NKikimrProto::OK, size+100);
+ const TString data(size - 1, 'a');
+ auto vdiskId = test.Info->GetVDiskId(0);
+ SendGet(env, test, vdiskId, BlobId, data);
+ SendGet(env, test, vdiskId, BlobId2, data);
+
+ }
+
+ Y_UNIT_TEST(WrongCrc) {
+ for(const auto& erasure : erasureTypes) {
+ MakeCrcTest(erasure, (1ull << 31) + 1);
+ MakeCrcTest(erasure, (1ull << 30) + (1ull << 31) + 1);
+ }
+ }
+
+
+ Y_UNIT_TEST(BasePutTest) {
+ TEnvironmentSetup env(true, GetErasureTypeByString("none"));
+ TTestInfo test = InitTest(env);
+
+ constexpr ui32 size = 10;
+ TLogoBlobID blobId(1, 1, 0, 0, size, 0, 1);
+ SendPut(env, test, blobId, NKikimrProto::OK, size);
+ }
+
+ Y_UNIT_TEST(MultiPutBaseTest) {
+ TEnvironmentSetup env(true, GetErasureTypeByString("none"));
+ TTestInfo test = InitTest(env);
+ constexpr ui32 size = 1000;
+ TLogoBlobID blobId(1, 1, 0, 0, size, 0, 1);
+ TString data(size, 'a');
+ std::vector<TBlobInfo> blobs;
+ blobs.push_back({blobId, data, NKikimrProto::OK});
+ SendMultiPut(env, test, NKikimrProto::OK, blobs);
+ }
+
+ Y_UNIT_TEST(MultiPutCrcTest) {
+ TEnvironmentSetup env(true, GetErasureTypeByString("none"));
+ TTestInfo test = InitTest(env);
+
+ NKikimrProto::TLogoBlobID pBlobId;
+ auto blobId = LogoBlobIDFromLogoBlobID(pBlobId);
+
+ std::vector<TBlobInfo> blobs;
+ blobs.push_back({blobId, "", NKikimrProto::ERROR});
+
+ SendMultiPut(env, test, NKikimrProto::OK, blobs);
+
+ pBlobId.set_rawx1(0);
+ pBlobId.set_rawx2(0);
+ pBlobId.set_rawx3((1ull << 30) + (1ull << 31) + 1);
+
blobId = LogoBlobIDFromLogoBlobID(pBlobId);
- blobs.push_back({blobId, "", NKikimrProto::ERROR});
-
- pBlobId.set_rawx3((1ull << 31) + 1);
-
+ blobs.push_back({blobId, "", NKikimrProto::ERROR});
+
+ pBlobId.set_rawx3((1ull << 31) + 1);
+
blobId = LogoBlobIDFromLogoBlobID(pBlobId);
- blobs.push_back({blobId, "", NKikimrProto::ERROR});
-
- SendMultiPut(env, test, NKikimrProto::OK, blobs);
-
- blobs.clear();
-
- constexpr ui32 size = 1000;
- TLogoBlobID niceBlobId(1, 1, 0, 0, size, 0, 1);
- TString data(size, 'a');
-
- blobs.push_back({niceBlobId, data, NKikimrProto::OK});
- SendMultiPut(env, test, NKikimrProto::OK, blobs);
- }
-
-
- Y_UNIT_TEST(MultiPutWithoutBlobs) {
- return;
- TEnvironmentSetup env(true, GetErasureTypeByString("none"));
- TTestInfo test = InitTest(env);
+ blobs.push_back({blobId, "", NKikimrProto::ERROR});
+
+ SendMultiPut(env, test, NKikimrProto::OK, blobs);
+
+ blobs.clear();
+
+ constexpr ui32 size = 1000;
+ TLogoBlobID niceBlobId(1, 1, 0, 0, size, 0, 1);
+ TString data(size, 'a');
+
+ blobs.push_back({niceBlobId, data, NKikimrProto::OK});
+ SendMultiPut(env, test, NKikimrProto::OK, blobs);
+ }
+
+
+ Y_UNIT_TEST(MultiPutWithoutBlobs) {
+ return;
+ TEnvironmentSetup env(true, GetErasureTypeByString("none"));
+ TTestInfo test = InitTest(env);
std::unique_ptr<IEventBase> ev(new TEvBlobStorage::TEvVMultiPut(test.Info->GetVDiskId(0),
- TInstant::Max(), NKikimrBlobStorage::EPutHandleClass::TabletLog, false, nullptr));
-
- env.WithQueueId(test.Info->GetVDiskId(0), NKikimrBlobStorage::EVDiskQueueId::PutTabletLog, [&](TActorId queueId) {
- test.Edge = test.Runtime->AllocateEdgeActor(queueId.NodeId(), __FILE__, __LINE__);
+ TInstant::Max(), NKikimrBlobStorage::EPutHandleClass::TabletLog, false, nullptr));
+
+ env.WithQueueId(test.Info->GetVDiskId(0), NKikimrBlobStorage::EVDiskQueueId::PutTabletLog, [&](TActorId queueId) {
+ test.Edge = test.Runtime->AllocateEdgeActor(queueId.NodeId(), __FILE__, __LINE__);
test.Runtime->Send(new IEventHandle(queueId, test.Edge, ev.release()), queueId.NodeId());
- auto handle = test.Runtime->WaitForEdgeActorEvent({test.Edge});
- UNIT_ASSERT_EQUAL(handle->Type, TEvBlobStorage::EvVMultiPutResult);
- TEvBlobStorage::TEvVMultiPutResult *putResult = handle->Get<TEvBlobStorage::TEvVMultiPutResult>();
- UNIT_ASSERT_VALUES_EQUAL(putResult->Record.GetStatus(), NKikimrProto::ERROR);
- });
- }
-
- Y_UNIT_TEST(ProtoHasOnlyVDiskId) {
- return;
- TEnvironmentSetup env(true, GetErasureTypeByString("none"));
- TTestInfo test = InitTest(env);
- NKikimrBlobStorage::TVDiskID VDiskId;
- VDiskIDFromVDiskID(test.Info->GetVDiskId(0), &VDiskId);
-
- NKikimrBlobStorage::TEvVMultiPut query;
- auto VDisk = query.MutableVDiskID();
- *VDisk = VDiskId;
-
- SendEmptyMultiPut(env, test, {}, NKikimrProto::ERROR, query);
-
- constexpr int blobCount = 1000;
- constexpr int blobSize = 10;
- TString data(blobSize, 'a');
-
- std::vector<TBlobInfo> blobs(blobCount);
- for(int i = 0; i < blobCount; ++i) {
- TLogoBlobID blob(1, 1, i, 0, blobSize, 0, 1);
- blobs[i] = {blob, data, NKikimrProto::OK};
- }
-
- SendEmptyMultiPut(env, test, blobs, NKikimrProto::ERROR, query);
- }
-
- Y_UNIT_TEST(ProtoHasVDiskAndExtQueue) {
- return;
- TEnvironmentSetup env(true, GetErasureTypeByString("none"));
- TTestInfo test = InitTest(env);
- NKikimrBlobStorage::TVDiskID VDiskId;
- VDiskIDFromVDiskID(test.Info->GetVDiskId(0), &VDiskId);
-
- NKikimrBlobStorage::TEvVMultiPut query;
- auto VDisk = query.MutableVDiskID();
- *VDisk = VDiskId;
- auto msg = query.MutableMsgQoS();
- msg->SetExtQueueId(NKikimrBlobStorage::EVDiskQueueId::PutTabletLog);
-
- SendEmptyMultiPut(env, test, {}, NKikimrProto::ERROR, query);
-
- constexpr int blobCount = 1000;
- constexpr int blobSize = 10;
- TString data(blobSize, 'a');
-
- std::vector<TBlobInfo> blobs(blobCount);
- for(int i = 0; i < blobCount; ++i) {
- TLogoBlobID blob(1, 1, i, 0, blobSize, 0, 1);
- blobs[i] = {blob, data, NKikimrProto::OK};
- auto protoBlob = query.AddItems()->MutableBlobID();
- LogoBlobIDFromLogoBlobID(blob, protoBlob);
- }
-
- SendEmptyMultiPut(env, test, blobs, NKikimrProto::OK, query);
-
- for(int i = 0; i < 1000 * blobCount; ++i) {
- TLogoBlobID blob(1, 1, i, 0, blobSize, 0, 1);
- blobs.push_back({blob, data, NKikimrProto::OK});
- auto protoBlob = query.AddItems()->MutableBlobID();
- LogoBlobIDFromLogoBlobID(blob, protoBlob);
- }
-
- SendEmptyMultiPut(env, test, blobs, NKikimrProto::OK, query);
- }
-
- Y_UNIT_TEST(EmptyProtoMultiPut) {
- TEnvironmentSetup env(true, GetErasureTypeByString("none"));
- TTestInfo test = InitTest(env);
-
- constexpr int blobCount = 1;
- constexpr int blobSize = 10;
- TString data(blobSize, 'a');
-
- std::vector<TBlobInfo> blobs(blobCount);
- for(int i = 0; i < blobCount; ++i) {
- TLogoBlobID blob(1, 1, i, 0, blobSize, 0, 1);
- blobs[i] = {blob, data, NKikimrProto::OK};
- }
-
- SendMultiPut(env, test, NKikimrProto::OK, blobs);
- }
-
- Y_UNIT_TEST(ManyQueriesThroughOneBSQueue) {
- return;
- TEnvironmentSetup env(true, GetErasureTypeByString("none"));
- TTestInfo test = InitTest(env);
-
- constexpr int eventsCount = 10000;
- constexpr int blobSize = 100;
- const TString data(blobSize, 'a');
+ auto handle = test.Runtime->WaitForEdgeActorEvent({test.Edge});
+ UNIT_ASSERT_EQUAL(handle->Type, TEvBlobStorage::EvVMultiPutResult);
+ TEvBlobStorage::TEvVMultiPutResult *putResult = handle->Get<TEvBlobStorage::TEvVMultiPutResult>();
+ UNIT_ASSERT_VALUES_EQUAL(putResult->Record.GetStatus(), NKikimrProto::ERROR);
+ });
+ }
+
+ Y_UNIT_TEST(ProtoHasOnlyVDiskId) {
+ return;
+ TEnvironmentSetup env(true, GetErasureTypeByString("none"));
+ TTestInfo test = InitTest(env);
+ NKikimrBlobStorage::TVDiskID VDiskId;
+ VDiskIDFromVDiskID(test.Info->GetVDiskId(0), &VDiskId);
+
+ NKikimrBlobStorage::TEvVMultiPut query;
+ auto VDisk = query.MutableVDiskID();
+ *VDisk = VDiskId;
+
+ SendEmptyMultiPut(env, test, {}, NKikimrProto::ERROR, query);
+
+ constexpr int blobCount = 1000;
+ constexpr int blobSize = 10;
+ TString data(blobSize, 'a');
+
+ std::vector<TBlobInfo> blobs(blobCount);
+ for(int i = 0; i < blobCount; ++i) {
+ TLogoBlobID blob(1, 1, i, 0, blobSize, 0, 1);
+ blobs[i] = {blob, data, NKikimrProto::OK};
+ }
+
+ SendEmptyMultiPut(env, test, blobs, NKikimrProto::ERROR, query);
+ }
+
+ Y_UNIT_TEST(ProtoHasVDiskAndExtQueue) {
+ return;
+ TEnvironmentSetup env(true, GetErasureTypeByString("none"));
+ TTestInfo test = InitTest(env);
+ NKikimrBlobStorage::TVDiskID VDiskId;
+ VDiskIDFromVDiskID(test.Info->GetVDiskId(0), &VDiskId);
+
+ NKikimrBlobStorage::TEvVMultiPut query;
+ auto VDisk = query.MutableVDiskID();
+ *VDisk = VDiskId;
+ auto msg = query.MutableMsgQoS();
+ msg->SetExtQueueId(NKikimrBlobStorage::EVDiskQueueId::PutTabletLog);
+
+ SendEmptyMultiPut(env, test, {}, NKikimrProto::ERROR, query);
+
+ constexpr int blobCount = 1000;
+ constexpr int blobSize = 10;
+ TString data(blobSize, 'a');
+
+ std::vector<TBlobInfo> blobs(blobCount);
+ for(int i = 0; i < blobCount; ++i) {
+ TLogoBlobID blob(1, 1, i, 0, blobSize, 0, 1);
+ blobs[i] = {blob, data, NKikimrProto::OK};
+ auto protoBlob = query.AddItems()->MutableBlobID();
+ LogoBlobIDFromLogoBlobID(blob, protoBlob);
+ }
+
+ SendEmptyMultiPut(env, test, blobs, NKikimrProto::OK, query);
+
+ for(int i = 0; i < 1000 * blobCount; ++i) {
+ TLogoBlobID blob(1, 1, i, 0, blobSize, 0, 1);
+ blobs.push_back({blob, data, NKikimrProto::OK});
+ auto protoBlob = query.AddItems()->MutableBlobID();
+ LogoBlobIDFromLogoBlobID(blob, protoBlob);
+ }
+
+ SendEmptyMultiPut(env, test, blobs, NKikimrProto::OK, query);
+ }
+
+ Y_UNIT_TEST(EmptyProtoMultiPut) {
+ TEnvironmentSetup env(true, GetErasureTypeByString("none"));
+ TTestInfo test = InitTest(env);
+
+ constexpr int blobCount = 1;
+ constexpr int blobSize = 10;
+ TString data(blobSize, 'a');
+
+ std::vector<TBlobInfo> blobs(blobCount);
+ for(int i = 0; i < blobCount; ++i) {
+ TLogoBlobID blob(1, 1, i, 0, blobSize, 0, 1);
+ blobs[i] = {blob, data, NKikimrProto::OK};
+ }
+
+ SendMultiPut(env, test, NKikimrProto::OK, blobs);
+ }
+
+ Y_UNIT_TEST(ManyQueriesThroughOneBSQueue) {
+ return;
+ TEnvironmentSetup env(true, GetErasureTypeByString("none"));
+ TTestInfo test = InitTest(env);
+
+ constexpr int eventsCount = 10000;
+ constexpr int blobSize = 100;
+ const TString data(blobSize, 'a');
std::vector<std::unique_ptr<IEventBase>> events(eventsCount);
-
- int goodCount = 0;
- for(int i = 0; i < eventsCount; ++i) {
+
+ int goodCount = 0;
+ for(int i = 0; i < eventsCount; ++i) {
events[i].reset(new TEvBlobStorage::TEvVMultiPut(test.Info->GetVDiskId(0),
TInstant::Max(), NKikimrBlobStorage::EPutHandleClass::TabletLog, false, nullptr));
- if (i % 19 != 18) {
- ++goodCount;
- TLogoBlobID blob(i, 1, 0, 0, blobSize, 0, 1);
+ if (i % 19 != 18) {
+ ++goodCount;
+ TLogoBlobID blob(i, 1, 0, 0, blobSize, 0, 1);
static_cast<TEvBlobStorage::TEvVMultiPut*>(events[i].get())->AddVPut(blob, data, 0);
- }
- }
-
- env.WithQueueId(test.Info->GetVDiskId(0), NKikimrBlobStorage::EVDiskQueueId::PutTabletLog, [&](TActorId queueId) {
- test.Edge = test.Runtime->AllocateEdgeActor(queueId.NodeId(), __FILE__, __LINE__);
- for(int i = 0; i < eventsCount; ++i) {
+ }
+ }
+
+ env.WithQueueId(test.Info->GetVDiskId(0), NKikimrBlobStorage::EVDiskQueueId::PutTabletLog, [&](TActorId queueId) {
+ test.Edge = test.Runtime->AllocateEdgeActor(queueId.NodeId(), __FILE__, __LINE__);
+ for(int i = 0; i < eventsCount; ++i) {
test.Runtime->Send(new IEventHandle(queueId, test.Edge, events[i].release()), queueId.NodeId());
- }
-
- int okCount = 0;
- int errorCount = 0;
- for(int i = 0; i < eventsCount; ++i) {
- auto handle = test.Runtime->WaitForEdgeActorEvent({test.Edge});
- UNIT_ASSERT_EQUAL(handle->Type, TEvBlobStorage::EvVMultiPutResult);
- TEvBlobStorage::TEvVMultiPutResult *putResult = handle->Get<TEvBlobStorage::TEvVMultiPutResult>();
- if (putResult->Record.GetStatus() == NKikimrProto::OK) {
- ++okCount;
- } else if (putResult->Record.GetStatus() == NKikimrProto::ERROR) {
- ++errorCount;
- }
- }
-
- UNIT_ASSERT_VALUES_EQUAL(okCount, goodCount);
- UNIT_ASSERT_VALUES_EQUAL(errorCount, eventsCount - goodCount);
- });
- }
-}
+ }
+
+ int okCount = 0;
+ int errorCount = 0;
+ for(int i = 0; i < eventsCount; ++i) {
+ auto handle = test.Runtime->WaitForEdgeActorEvent({test.Edge});
+ UNIT_ASSERT_EQUAL(handle->Type, TEvBlobStorage::EvVMultiPutResult);
+ TEvBlobStorage::TEvVMultiPutResult *putResult = handle->Get<TEvBlobStorage::TEvVMultiPutResult>();
+ if (putResult->Record.GetStatus() == NKikimrProto::OK) {
+ ++okCount;
+ } else if (putResult->Record.GetStatus() == NKikimrProto::ERROR) {
+ ++errorCount;
+ }
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(okCount, goodCount);
+ UNIT_ASSERT_VALUES_EQUAL(errorCount, eventsCount - goodCount);
+ });
+ }
+}
diff --git a/ydb/core/blobstorage/ut_blobstorage/lib/common.h b/ydb/core/blobstorage/ut_blobstorage/lib/common.h
index f648dea122..13a6d3235f 100644
--- a/ydb/core/blobstorage/ut_blobstorage/lib/common.h
+++ b/ydb/core/blobstorage/ut_blobstorage/lib/common.h
@@ -1,40 +1,40 @@
-#pragma once
-
-#include "env.h"
+#pragma once
+
+#include "env.h"
#include <ydb/core/base/logoblob.h>
-
-
-inline TBlobStorageGroupType GetErasureTypeByString(const TString& erasure) {
- if (erasure == "none") {
- return TBlobStorageGroupType::ErasureNone;
- } else if (erasure == "block-4-2") {
- return TBlobStorageGroupType::Erasure4Plus2Block;
- } else if (erasure == "mirror-3") {
- return TBlobStorageGroupType::ErasureMirror3;
- } else if (erasure == "mirror-3of4") {
- return TBlobStorageGroupType::ErasureMirror3of4;
- } else if (erasure == "mirror-3-dc") {
- return TBlobStorageGroupType::ErasureMirror3dc;
- }
- UNIT_ASSERT(false);
- return TBlobStorageGroupType::ErasureNone;
-}
-
-struct TTestInfo {
- std::unique_ptr<TTestActorSystem> &Runtime;
- TActorId Edge;
- TIntrusivePtr<TBlobStorageGroupInfo> Info;
-};
-
-inline TTestInfo InitTest(TEnvironmentSetup& env) {
- auto& runtime = env.Runtime;
-
- env.CreateBoxAndPool();
- env.CommenceReplication();
-
- auto groups = env.GetGroups();
- auto info = env.GetGroupInfo(groups[0]);
-
- const TActorId& edge = runtime->AllocateEdgeActor(1);
- return {runtime, edge, info};
-}
+
+
+inline TBlobStorageGroupType GetErasureTypeByString(const TString& erasure) {
+ if (erasure == "none") {
+ return TBlobStorageGroupType::ErasureNone;
+ } else if (erasure == "block-4-2") {
+ return TBlobStorageGroupType::Erasure4Plus2Block;
+ } else if (erasure == "mirror-3") {
+ return TBlobStorageGroupType::ErasureMirror3;
+ } else if (erasure == "mirror-3of4") {
+ return TBlobStorageGroupType::ErasureMirror3of4;
+ } else if (erasure == "mirror-3-dc") {
+ return TBlobStorageGroupType::ErasureMirror3dc;
+ }
+ UNIT_ASSERT(false);
+ return TBlobStorageGroupType::ErasureNone;
+}
+
+struct TTestInfo {
+ std::unique_ptr<TTestActorSystem> &Runtime;
+ TActorId Edge;
+ TIntrusivePtr<TBlobStorageGroupInfo> Info;
+};
+
+inline TTestInfo InitTest(TEnvironmentSetup& env) {
+ auto& runtime = env.Runtime;
+
+ env.CreateBoxAndPool();
+ env.CommenceReplication();
+
+ auto groups = env.GetGroups();
+ auto info = env.GetGroupInfo(groups[0]);
+
+ const TActorId& edge = runtime->AllocateEdgeActor(1);
+ return {runtime, edge, info};
+}
diff --git a/ydb/core/blobstorage/ut_blobstorage/patch.cpp b/ydb/core/blobstorage/ut_blobstorage/patch.cpp
index b68c05b80e..66ee4000d7 100644
--- a/ydb/core/blobstorage/ut_blobstorage/patch.cpp
+++ b/ydb/core/blobstorage/ut_blobstorage/patch.cpp
@@ -65,8 +65,8 @@ Y_UNIT_TEST_SUITE(BlobPatching) {
};
void MakePatchingTest(TString erasure) {
- TEnvironmentSetup env(true, GetErasureTypeByString(erasure));
- TTestInfo test = InitTest(env);
+ TEnvironmentSetup env(true, GetErasureTypeByString(erasure));
+ TTestInfo test = InitTest(env);
constexpr ui32 size = 100;
TString data(size, 'a');
@@ -81,7 +81,7 @@ Y_UNIT_TEST_SUITE(BlobPatching) {
TString patchedData1 = ApplyDiffs(data, diffs1);
TLogoBlobID patchedBlobId1(1, 1, 1, 0, size, 0);
TEvBlobStorage::TEvPatch::GetBlobIdWithSamePlacement(originalBlobId, &patchedBlobId1, 0,
- test.Info->GroupID, test.Info->GroupID);
+ test.Info->GroupID, test.Info->GroupID);
SendPatch(test, originalBlobId, patchedBlobId1, 0, diffs1, NKikimrProto::OK);
SendGet(test, patchedBlobId1, patchedData1, NKikimrProto::OK);
@@ -91,14 +91,14 @@ Y_UNIT_TEST_SUITE(BlobPatching) {
TString patchedData2 = ApplyDiffs(data, diffs2);
TLogoBlobID patchedBlobId2(1, 1, 2, 0, size, 0);
TEvBlobStorage::TEvPatch::GetBlobIdWithSamePlacement(originalBlobId, &patchedBlobId2, 0,
- test.Info->GroupID, test.Info->GroupID);
+ test.Info->GroupID, test.Info->GroupID);
SendPatch(test, originalBlobId, patchedBlobId2, 0, diffs2, NKikimrProto::OK);
SendGet(test, patchedBlobId2, patchedData2, NKikimrProto::OK);
TLogoBlobID patchedBlobId3(1, 1, 3, 0, size, 0);
TLogoBlobID truePatchedBlobId3(1, 1, 3, 0, size, 0);
TEvBlobStorage::TEvPatch::GetBlobIdWithSamePlacement(originalBlobId, &truePatchedBlobId3, TLogoBlobID::MaxCookie,
- test.Info->GroupID, test.Info->GroupID);
+ test.Info->GroupID, test.Info->GroupID);
UNIT_ASSERT(patchedBlobId3 != truePatchedBlobId3);
NKikimrProto::EReplyStatus statusWhenNotMatchingCookie = (erasure == "block-4-2" ? NKikimrProto::ERROR : NKikimrProto::OK);
SendPatch(test, originalBlobId, patchedBlobId3, TLogoBlobID::MaxCookie, diffs2, statusWhenNotMatchingCookie);
@@ -107,8 +107,8 @@ Y_UNIT_TEST_SUITE(BlobPatching) {
}
void MakeStressPatchingTest(TString erasure) {
- TEnvironmentSetup env(true, GetErasureTypeByString(erasure));
- TTestInfo test = InitTest(env);
+ TEnvironmentSetup env(true, GetErasureTypeByString(erasure));
+ TTestInfo test = InitTest(env);
constexpr ui32 size = 100;
TString data(size, 'a');
@@ -128,7 +128,7 @@ Y_UNIT_TEST_SUITE(BlobPatching) {
for (ui32 patchIdx = 0; patchIdx < patchCount; ++patchIdx) {
TLogoBlobID patchedBlobId(1, 1, patchIdx + 1, 0, size, 0);
TEvBlobStorage::TEvPatch::GetBlobIdWithSamePlacement(originalBlobId, &patchedBlobId, TLogoBlobID::MaxCookie,
- test.Info->GroupID, test.Info->GroupID);
+ test.Info->GroupID, test.Info->GroupID);
SendPatch(test, originalBlobId, patchedBlobId, 0, diffs, NKikimrProto::OK);
SendGet(test, patchedBlobId, patchedData, NKikimrProto::OK);
}
diff --git a/ydb/core/blobstorage/vdisk/common/vdisk_events.h b/ydb/core/blobstorage/vdisk/common/vdisk_events.h
index 47fbd959c9..b20b6cab1a 100644
--- a/ydb/core/blobstorage/vdisk/common/vdisk_events.h
+++ b/ydb/core/blobstorage/vdisk/common/vdisk_events.h
@@ -572,25 +572,25 @@ namespace NKikimr {
}
}
- bool Validate(TString& errorReason) {
- if (!Record.HasBlobID()) {
- errorReason = "TEvVPut rejected by VDisk. It has no query";
- } else if (!Record.HasVDiskID()) {
- errorReason = "TEvVPut rejected by VDisk. It has no VDiskID";
- } else if (!Record.MutableVDiskID()->HasGroupID()) {
- errorReason = "TEvVPut rejected by VDisk. It has no VDiskID::GroupID";
- } else if (Record.MutableMsgQoS() == nullptr) {
- errorReason = "TEvVPut rejected by VDisk. MsgQoS is undefined";
- } else if (!Record.MutableMsgQoS()->HasExtQueueId()) {
- errorReason = "TEvVPut rejected by VDisk. ExtQueueId is undefined";
- } else if (GetPayloadCount() == 0 && !Record.HasBuffer()) {
- errorReason = "TEvVPut rejected by VDisk. Payload empty and no buffer provided";
- } else {
- return true;
- }
-
- return false;
- }
+ bool Validate(TString& errorReason) {
+ if (!Record.HasBlobID()) {
+ errorReason = "TEvVPut rejected by VDisk. It has no query";
+ } else if (!Record.HasVDiskID()) {
+ errorReason = "TEvVPut rejected by VDisk. It has no VDiskID";
+ } else if (!Record.MutableVDiskID()->HasGroupID()) {
+ errorReason = "TEvVPut rejected by VDisk. It has no VDiskID::GroupID";
+ } else if (Record.MutableMsgQoS() == nullptr) {
+ errorReason = "TEvVPut rejected by VDisk. MsgQoS is undefined";
+ } else if (!Record.MutableMsgQoS()->HasExtQueueId()) {
+ errorReason = "TEvVPut rejected by VDisk. ExtQueueId is undefined";
+ } else if (GetPayloadCount() == 0 && !Record.HasBuffer()) {
+ errorReason = "TEvVPut rejected by VDisk. Payload empty and no buffer provided";
+ } else {
+ return true;
+ }
+
+ return false;
+ }
TString ToString() const override {
return ToString(Record);
@@ -742,13 +742,13 @@ namespace NKikimr {
if (status != NKikimrProto::OK && errorReason) {
Record.SetErrorReason(errorReason);
}
- if (request.HasBlobID()) {
- Record.MutableBlobID()->CopyFrom(request.GetBlobID());
- }
+ if (request.HasBlobID()) {
+ Record.MutableBlobID()->CopyFrom(request.GetBlobID());
+ }
- if (request.HasVDiskID()) {
- Record.MutableVDiskID()->CopyFrom(request.GetVDiskID());
- }
+ if (request.HasVDiskID()) {
+ Record.MutableVDiskID()->CopyFrom(request.GetVDiskID());
+ }
if (request.HasCookie()) {
Record.SetCookie(request.GetCookie());
}
@@ -835,24 +835,24 @@ namespace NKikimr {
}
}
- bool Validate(TString& errorReason) {
- if (Record.ItemsSize() == 0) {
- errorReason = "TEvVMultiPut rejected by VDisk. It has 0 blobs to put";
- } else if (!Record.HasVDiskID()) {
- errorReason = "TEvVMultiPut rejected by VDisk. VDiskID is undefined";
- } else if (!Record.MutableVDiskID()->HasGroupID()) {
- errorReason = "TEvVMultiPut rejected by VDisk. It has no VDiskID::GroupID";
- } else if (Record.MutableMsgQoS() == nullptr) {
- errorReason = "TEvVMultiPut rejected by VDisk. MsgQoS is undefined";
- } else if (!Record.MutableMsgQoS()->HasExtQueueId()) {
- errorReason = "TEvVMultiPut rejected by VDisk. ExtQueueId is undefined";
- } else {
- return true;
- }
-
- return false;
- }
-
+ bool Validate(TString& errorReason) {
+ if (Record.ItemsSize() == 0) {
+ errorReason = "TEvVMultiPut rejected by VDisk. It has 0 blobs to put";
+ } else if (!Record.HasVDiskID()) {
+ errorReason = "TEvVMultiPut rejected by VDisk. VDiskID is undefined";
+ } else if (!Record.MutableVDiskID()->HasGroupID()) {
+ errorReason = "TEvVMultiPut rejected by VDisk. It has no VDiskID::GroupID";
+ } else if (Record.MutableMsgQoS() == nullptr) {
+ errorReason = "TEvVMultiPut rejected by VDisk. MsgQoS is undefined";
+ } else if (!Record.MutableMsgQoS()->HasExtQueueId()) {
+ errorReason = "TEvVMultiPut rejected by VDisk. ExtQueueId is undefined";
+ } else {
+ return true;
+ }
+
+ return false;
+ }
+
TString ToString() const override {
TStringStream str;
str << "{EvVMultiPut";
@@ -967,9 +967,9 @@ namespace NKikimr {
AddVPutResult(status, errorReason, logoBlobId, cookie);
}
- if (request.HasVDiskID()) {
- Record.MutableVDiskID()->CopyFrom(request.GetVDiskID());
- }
+ if (request.HasVDiskID()) {
+ Record.MutableVDiskID()->CopyFrom(request.GetVDiskID());
+ }
if (request.HasCookie()) {
Record.SetCookie(request.GetCookie());
}
@@ -1117,23 +1117,23 @@ namespace NKikimr {
#endif
}
- bool Validate(TString& errorReason) {
- if (!Record.HasRangeQuery() && Record.ExtremeQueriesSize() == 0) {
- errorReason = "TEvVGet rejected by VDisk. It has no query";
- } else if (!Record.HasVDiskID()) {
- errorReason = "TEvVGet rejected by VDisk. It has no VDiskID";
- } else if (!Record.MutableVDiskID()->HasGroupID()) {
- errorReason = "TEvVGet rejected by VDisk. It has no VDiskID::GroupID";
- } else if (Record.MutableMsgQoS() == nullptr) {
- errorReason = "TEvVGet rejected by VDisk. MsgQoS is undefined";
- } else if (! Record.MutableMsgQoS()->HasExtQueueId()) {
- errorReason = "TEvVGet rejected by VDisk. ExtQueueId is undefined";
- } else {
- return true;
- }
- return false;
- }
-
+ bool Validate(TString& errorReason) {
+ if (!Record.HasRangeQuery() && Record.ExtremeQueriesSize() == 0) {
+ errorReason = "TEvVGet rejected by VDisk. It has no query";
+ } else if (!Record.HasVDiskID()) {
+ errorReason = "TEvVGet rejected by VDisk. It has no VDiskID";
+ } else if (!Record.MutableVDiskID()->HasGroupID()) {
+ errorReason = "TEvVGet rejected by VDisk. It has no VDiskID::GroupID";
+ } else if (Record.MutableMsgQoS() == nullptr) {
+ errorReason = "TEvVGet rejected by VDisk. MsgQoS is undefined";
+ } else if (! Record.MutableMsgQoS()->HasExtQueueId()) {
+ errorReason = "TEvVGet rejected by VDisk. ExtQueueId is undefined";
+ } else {
+ return true;
+ }
+ return false;
+ }
+
TString ToString() const override {
return ToString(Record);
}
@@ -1370,10 +1370,10 @@ namespace NKikimr {
AddResult(queryStatus, id, shift, nullptr, 0, cookie);
}
- if (request.HasVDiskID()) {
- TVDiskID vDiskId = VDiskIDFromVDiskID(request.GetVDiskID());
- VDiskIDFromVDiskID(vDiskId, Record.MutableVDiskID());
- }
+ if (request.HasVDiskID()) {
+ TVDiskID vDiskId = VDiskIDFromVDiskID(request.GetVDiskID());
+ VDiskIDFromVDiskID(vDiskId, Record.MutableVDiskID());
+ }
if (request.HasCookie()) {
Record.SetCookie(request.GetCookie());
}
@@ -2053,7 +2053,7 @@ namespace NKikimr {
LogoBlobIDFromLogoBlobID(patchedPartBlobId, Record.MutablePatchedPartBlobId());
Y_VERIFY(request.HasVDiskID());
- Record.MutableVDiskID()->CopyFrom(request.GetVDiskID());
+ Record.MutableVDiskID()->CopyFrom(request.GetVDiskID());
Y_VERIFY(request.HasCookie());
Record.SetCookie(request.GetCookie());
}
diff --git a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp
index 96b813d317..fa58486922 100644
--- a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp
+++ b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp
@@ -365,23 +365,23 @@ namespace NKikimr {
THullCheckStatus ValidateVPut(const TActorContext &ctx, TString evPrefix,
TLogoBlobID id, ui64 bufSize, bool ignoreBlock)
{
- ui64 blobPartSize = 0;
- try {
- blobPartSize = GInfo->Type.PartSize(id);
- } catch (yexception ex) {
- LOG_ERROR_S(ctx, BS_VDISK_PUT, VCtx->VDiskLogPrefix << ex.what() << " Marker# BSVS40");
- return {NKikimrProto::ERROR, ex.what()};
- }
-
- if (bufSize != blobPartSize) {
+ ui64 blobPartSize = 0;
+ try {
+ blobPartSize = GInfo->Type.PartSize(id);
+ } catch (yexception ex) {
+ LOG_ERROR_S(ctx, BS_VDISK_PUT, VCtx->VDiskLogPrefix << ex.what() << " Marker# BSVS40");
+ return {NKikimrProto::ERROR, ex.what()};
+ }
+
+ if (bufSize != blobPartSize) {
LOG_ERROR_S(ctx, BS_VDISK_PUT, VCtx->VDiskLogPrefix
<< evPrefix << ": buffer size does not match with part size;"
<< " buffer size# " << bufSize
- << " PartSize# " << blobPartSize
+ << " PartSize# " << blobPartSize
<< " id# " << id
<< " Marker# BSVS01");
return {NKikimrProto::ERROR, "buffer size mismatch"};
- }
+ }
if (bufSize > Config->MaxLogoBlobDataSize) {
LOG_ERROR_S(ctx, BS_VDISK_PUT, VCtx->VDiskLogPrefix << evPrefix << ": data is too large;"
@@ -463,17 +463,17 @@ namespace NKikimr {
putsInfo.emplace_back(blobId, ev->Get()->GetItemBuffer(itemIdx));
TVPutInfo &info = putsInfo.back();
- try {
- info.IsHugeBlob = HugeBlobCtx->IsHugeBlob(VCtx->Top->GType, blobId.FullID());
- } catch (yexception ex) {
- LOG_ERROR_S(ctx, BS_VDISK_PUT, VCtx->VDiskLogPrefix << ex.what() << " Marker# BSVS39");
- info.HullStatus = {NKikimrProto::ERROR, 0, false};
- }
-
- if (info.HullStatus.Status == NKikimrProto::UNKNOWN) {
- info.HullStatus = ValidateVPut(ctx, "TEvVMultiPut", blobId, info.Buffer.GetSize(), ignoreBlock);
- }
-
+ try {
+ info.IsHugeBlob = HugeBlobCtx->IsHugeBlob(VCtx->Top->GType, blobId.FullID());
+ } catch (yexception ex) {
+ LOG_ERROR_S(ctx, BS_VDISK_PUT, VCtx->VDiskLogPrefix << ex.what() << " Marker# BSVS39");
+ info.HullStatus = {NKikimrProto::ERROR, 0, false};
+ }
+
+ if (info.HullStatus.Status == NKikimrProto::UNKNOWN) {
+ info.HullStatus = ValidateVPut(ctx, "TEvVMultiPut", blobId, info.Buffer.GetSize(), ignoreBlock);
+ }
+
if (info.HullStatus.Status == NKikimrProto::OK) {
auto ingressOpt = TIngress::CreateIngressWithLocal(VCtx->Top.get(), VCtx->ShortSelfVDisk, blobId);
if (!ingressOpt) {
@@ -628,16 +628,16 @@ namespace NKikimr {
VCtx->Top->GetFailDomainOrderNumber(VCtx->ShortSelfVDisk), id.TabletID(), id.BlobSize());
TVPutInfo info(id, ev->Get()->GetBuffer());
const ui64 bufSize = info.Buffer.GetSize();
-
- try {
- info.IsHugeBlob = HugeBlobCtx->IsHugeBlob(VCtx->Top->GType, id.FullID());
- } catch (yexception ex) {
- LOG_ERROR_S(ctx, BS_VDISK_PUT, VCtx->VDiskLogPrefix << ex.what() << " Marker# BSVS41");
- info.HullStatus = {NKikimrProto::ERROR, 0, false};
- ReplyError({NKikimrProto::ERROR, ex.what(), 0, false}, ev, ctx, now);
- return;
- }
-
+
+ try {
+ info.IsHugeBlob = HugeBlobCtx->IsHugeBlob(VCtx->Top->GType, id.FullID());
+ } catch (yexception ex) {
+ LOG_ERROR_S(ctx, BS_VDISK_PUT, VCtx->VDiskLogPrefix << ex.what() << " Marker# BSVS41");
+ info.HullStatus = {NKikimrProto::ERROR, 0, false};
+ ReplyError({NKikimrProto::ERROR, ex.what(), 0, false}, ev, ctx, now);
+ return;
+ }
+
const bool ignoreBlock = record.GetIgnoreBlock();
if (!OutOfSpaceLogic->Allow(ctx, ev)) {
diff --git a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp
index 08c32a8921..f9205b1b0e 100644
--- a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp
+++ b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp
@@ -288,7 +288,7 @@ namespace NKikimr {
" InFlightCost# %" PRIu64 " msgCtx# %s Deadlines# %" PRIu64,
NKikimrBlobStorage::EVDiskInternalQueueId_Name(IntQueueId).data(),
InFlightCount, InFlightBytes, InFlightCost, msgCtx.ToString().data(), Deadlines);
-
+
--InFlightCount;
InFlightCost -= msgCtx.Cost;
InFlightBytes -= msgCtx.RecByteSize;
@@ -1035,7 +1035,7 @@ namespace NKikimr {
// check queue compatibility: it's a contract between BlobStorage Proxy and VDisk,
// we don't work if queues are incompatible
-
+
bool compatible = Compatible(extQueueId, intQueueId);
Y_VERIFY(compatible, "%s: %s: extQueue is incompatible with intQueue; intQueue# %s extQueue# %s",
VCtx->VDiskLogPrefix.data(), msgName, NKikimrBlobStorage::EVDiskInternalQueueId_Name(intQueueId).data(),
@@ -1676,11 +1676,11 @@ namespace NKikimr {
|| std::is_same_v<TEv, TEvBlobStorage::TEvVDefrag>
|| std::is_same_v<TEv, TEvBlobStorage::TEvVBaldSyncLog>;
- template <typename TEv>
- static constexpr bool IsValidatable = std::is_same_v<TEv, TEvBlobStorage::TEvVMultiPut>
- || std::is_same_v<TEv, TEvBlobStorage::TEvVGet>
- || std::is_same_v<TEv, TEvBlobStorage::TEvVPut>;
-
+ template <typename TEv>
+ static constexpr bool IsValidatable = std::is_same_v<TEv, TEvBlobStorage::TEvVMultiPut>
+ || std::is_same_v<TEv, TEvBlobStorage::TEvVGet>
+ || std::is_same_v<TEv, TEvBlobStorage::TEvVPut>;
+
template<typename TEventType>
void CheckExecute(TAutoPtr<TEventHandle<TEventType>>& ev, const TActorContext& ctx) {
if constexpr (IsPatchEvent<TEventType>) {
@@ -1692,13 +1692,13 @@ namespace NKikimr {
}
}
- template <typename TEv>
- bool Validate(TEv* ev, TString& errorReason) const {
- if constexpr (IsValidatable<TEv>) {
- return ev->Validate(errorReason);
- }
- return true;
- }
+ template <typename TEv>
+ bool Validate(TEv* ev, TString& errorReason) const {
+ if constexpr (IsValidatable<TEv>) {
+ return ev->Validate(errorReason);
+ }
+ return true;
+ }
template <typename TEventType>
void Check(TAutoPtr<TEventHandle<TEventType>>& ev, const TActorContext& ctx) {
@@ -1713,17 +1713,17 @@ namespace NKikimr {
}
}
- template<typename TEv>
- void ValidateEvent(TAutoPtr<TEventHandle<TEv>>& ev, const TActorContext& ctx) {
- TString errorReason;
- bool isQueryValid = Validate(ev->Get(), errorReason);
- if (!isQueryValid) {
- Reply(ev, ctx, NKikimrProto::ERROR, errorReason, TAppData::TimeProvider->Now());
- return;
- }
- Check(ev, ctx);
- };
-
+ template<typename TEv>
+ void ValidateEvent(TAutoPtr<TEventHandle<TEv>>& ev, const TActorContext& ctx) {
+ TString errorReason;
+ bool isQueryValid = Validate(ev->Get(), errorReason);
+ if (!isQueryValid) {
+ Reply(ev, ctx, NKikimrProto::ERROR, errorReason, TAppData::TimeProvider->Now());
+ return;
+ }
+ Check(ev, ctx);
+ };
+
void ForwardToSkeleton(STFUNC_SIG) {
ctx.Send(ev->Forward(SkeletonId));
}
@@ -1738,9 +1738,9 @@ namespace NKikimr {
HFunc(TEvBlobStorage::TEvVPatchStart, Check)
HFunc(TEvBlobStorage::TEvVPatchDiff, Check)
HFunc(TEvBlobStorage::TEvVPatchXorDiff, Check)
- HFunc(TEvBlobStorage::TEvVPut, ValidateEvent)
- HFunc(TEvBlobStorage::TEvVMultiPut, ValidateEvent)
- HFunc(TEvBlobStorage::TEvVGet, ValidateEvent)
+ HFunc(TEvBlobStorage::TEvVPut, ValidateEvent)
+ HFunc(TEvBlobStorage::TEvVMultiPut, ValidateEvent)
+ HFunc(TEvBlobStorage::TEvVGet, ValidateEvent)
HFunc(TEvBlobStorage::TEvVBlock, Check)
HFunc(TEvBlobStorage::TEvVGetBlock, Check)
HFunc(TEvBlobStorage::TEvVCollectGarbage, Check)
diff --git a/ydb/core/protos/blobstorage.proto b/ydb/core/protos/blobstorage.proto
index a08ccbf865..588a9f40b7 100644
--- a/ydb/core/protos/blobstorage.proto
+++ b/ydb/core/protos/blobstorage.proto
@@ -1447,28 +1447,28 @@ message TEvTestLoadRequest {
optional uint32 DurationSeconds = 5;
repeated TWorkerConfig Workers = 7;
}
-
- message TKqpLoadStart {
+
+ message TKqpLoadStart {
message TStockWorkload {
optional uint64 ProductCount = 1;
optional uint64 Quantity = 2;
}
- optional uint64 Tag = 1;
- optional uint32 DurationSeconds = 2;
- optional uint32 NumOfSessions = 3;
- optional uint32 MaxInFlight = 4;
- optional uint32 StringValueSize = 5 [default = 8000];
- optional uint64 FirstKey = 6 [default = 0];
- optional bool DeleteTableOnFinish = 7 [default = true];
- optional bool SequentialWrite = 8 [default = true];
- optional uint32 UniformPartitionsCount = 9 [default = 1];
- optional uint64 TotalRowsToUpsert = 10;
- optional string WorkingDir = 11;
+ optional uint64 Tag = 1;
+ optional uint32 DurationSeconds = 2;
+ optional uint32 NumOfSessions = 3;
+ optional uint32 MaxInFlight = 4;
+ optional uint32 StringValueSize = 5 [default = 8000];
+ optional uint64 FirstKey = 6 [default = 0];
+ optional bool DeleteTableOnFinish = 7 [default = true];
+ optional bool SequentialWrite = 8 [default = true];
+ optional uint32 UniformPartitionsCount = 9 [default = 1];
+ optional uint64 TotalRowsToUpsert = 10;
+ optional string WorkingDir = 11;
optional string WorkloadName = 12;
oneof Workload {
TStockWorkload Stock = 13;
}
- }
+ }
message TMemoryLoadStart {
optional uint64 Tag = 1;
@@ -1486,7 +1486,7 @@ message TEvTestLoadRequest {
TPDiskReadLoadStart PDiskReadLoadStart = 6;
TPDiskLogLoadStart PDiskLogLoadStart = 7;
TKeyValueLoadStart KeyValueLoadStart = 8;
- TKqpLoadStart KqpLoadStart = 9;
+ TKqpLoadStart KqpLoadStart = 9;
TMemoryLoadStart MemoryLoadStart = 10;
}
}