aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authordanilalexeev <danilalexeev@yandex-team.com>2023-11-01 23:33:41 +0300
committerdanilalexeev <danilalexeev@yandex-team.com>2023-11-01 23:51:52 +0300
commit756daf669731c90178154c348d138ed559802c71 (patch)
treec31e429f2a63550862c6c59f003483575b6b4ed5
parent0c143d9a40e05b090b3c9e3e32a2093e8a7e0c2b (diff)
downloadydb-756daf669731c90178154c348d138ed559802c71.tar.gz
YT-20155: Independent snapshot checksums for automaton parts
-rw-r--r--yt/yt/core/misc/checksum.cpp21
-rw-r--r--yt/yt/core/misc/checksum.h20
2 files changed, 41 insertions, 0 deletions
diff --git a/yt/yt/core/misc/checksum.cpp b/yt/yt/core/misc/checksum.cpp
index 520b0b03a8..05cb16d3b1 100644
--- a/yt/yt/core/misc/checksum.cpp
+++ b/yt/yt/core/misc/checksum.cpp
@@ -9,6 +9,8 @@
namespace NYT {
+using namespace NConcurrency;
+
////////////////////////////////////////////////////////////////////////////////
#ifdef YT_USE_SSE42
@@ -776,4 +778,23 @@ void TChecksumOutput::DoFinish()
////////////////////////////////////////////////////////////////////////////////
+TChecksumAsyncOutput::TChecksumAsyncOutput(IAsyncOutputStreamPtr underlyingStream)
+ : UnderlyingStream_(underlyingStream)
+{ }
+
+TFuture<void> TChecksumAsyncOutput::Close()
+{
+ return UnderlyingStream_->Close();
+}
+
+TFuture<void> TChecksumAsyncOutput::Write(const TSharedRef& block)
+{
+ return UnderlyingStream_->Write(block)
+ .Apply(BIND([&, this, this_ = MakeWeak(this)] {
+ Checksum_ = NYT::GetChecksum(block, Checksum_);
+ }));
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
} // namespace NYT
diff --git a/yt/yt/core/misc/checksum.h b/yt/yt/core/misc/checksum.h
index 92a3c06e1a..b33ec3a983 100644
--- a/yt/yt/core/misc/checksum.h
+++ b/yt/yt/core/misc/checksum.h
@@ -7,6 +7,8 @@
#include <util/stream/input.h>
#include <util/stream/output.h>
+#include <yt/yt/core/concurrency/async_stream.h>
+
namespace NYT {
////////////////////////////////////////////////////////////////////////////////
@@ -53,4 +55,22 @@ private:
////////////////////////////////////////////////////////////////////////////////
+class TChecksumAsyncOutput
+ : public NConcurrency::IAsyncOutputStream
+{
+public:
+ explicit TChecksumAsyncOutput(NConcurrency::IAsyncOutputStreamPtr underlyingStream);
+
+ TFuture<void> Close() override;
+
+ TFuture<void> Write(const TSharedRef& block) override;
+
+ DEFINE_BYVAL_RW_PROPERTY(TChecksum, Checksum);
+
+private:
+ NConcurrency::IAsyncOutputStreamPtr UnderlyingStream_;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
} // namespace NYT