aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/http/io/chunk.cpp
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/http/io/chunk.cpp
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/http/io/chunk.cpp')
-rw-r--r--library/cpp/http/io/chunk.cpp246
1 files changed, 246 insertions, 0 deletions
diff --git a/library/cpp/http/io/chunk.cpp b/library/cpp/http/io/chunk.cpp
new file mode 100644
index 00000000000..6975d9eac1e
--- /dev/null
+++ b/library/cpp/http/io/chunk.cpp
@@ -0,0 +1,246 @@
+#include "chunk.h"
+
+#include "headers.h"
+
+#include <util/string/cast.h>
+#include <util/generic/utility.h>
+#include <util/generic/yexception.h>
+
+static inline size_t ParseHex(const TString& s) {
+ if (s.empty()) {
+ ythrow yexception() << "can not parse chunk length(empty string)";
+ }
+
+ size_t ret = 0;
+
+ for (TString::const_iterator c = s.begin(); c != s.end(); ++c) {
+ const char ch = *c;
+
+ if (ch >= '0' && ch <= '9') {
+ ret *= 16;
+ ret += ch - '0';
+ } else if (ch >= 'a' && ch <= 'f') {
+ ret *= 16;
+ ret += 10 + ch - 'a';
+ } else if (ch >= 'A' && ch <= 'F') {
+ ret *= 16;
+ ret += 10 + ch - 'A';
+ } else if (ch == ';') {
+ break;
+ } else if (isspace(ch)) {
+ continue;
+ } else {
+ ythrow yexception() << "can not parse chunk length(" << s.data() << ")";
+ }
+ }
+
+ return ret;
+}
+
+static inline char* ToHex(size_t len, char* buf) {
+ do {
+ const size_t val = len % 16;
+
+ *--buf = (val < 10) ? (val + '0') : (val - 10 + 'a');
+ len /= 16;
+ } while (len);
+
+ return buf;
+}
+
+class TChunkedInput::TImpl {
+public:
+ inline TImpl(IInputStream* slave, TMaybe<THttpHeaders>* trailers)
+ : Slave_(slave)
+ , Trailers_(trailers)
+ , Pending_(0)
+ , LastChunkReaded_(false)
+ {
+ if (Trailers_) {
+ Trailers_->Clear();
+ }
+ }
+
+ inline ~TImpl() {
+ }
+
+ inline size_t Read(void* buf, size_t len) {
+ return Perform(len, [this, buf](size_t toRead) { return Slave_->Read(buf, toRead); });
+ }
+
+ inline size_t Skip(size_t len) {
+ return Perform(len, [this](size_t toSkip) { return Slave_->Skip(toSkip); });
+ }
+
+private:
+ template <class Operation>
+ inline size_t Perform(size_t len, const Operation& operation) {
+ if (!HavePendingData()) {
+ return 0;
+ }
+
+ const size_t toProcess = Min(Pending_, len);
+
+ if (toProcess) {
+ const size_t processed = operation(toProcess);
+
+ if (!processed) {
+ ythrow yexception() << "malformed http chunk";
+ }
+
+ Pending_ -= processed;
+
+ return processed;
+ }
+
+ return 0;
+ }
+
+ inline bool HavePendingData() {
+ if (LastChunkReaded_) {
+ return false;
+ }
+
+ if (!Pending_) {
+ if (!ProceedToNextChunk()) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ inline bool ProceedToNextChunk() {
+ TString len(Slave_->ReadLine());
+
+ if (len.empty()) {
+ /*
+ * skip crlf from previous chunk
+ */
+
+ len = Slave_->ReadLine();
+ }
+
+ Pending_ = ParseHex(len);
+
+ if (Pending_) {
+ return true;
+ }
+
+ if (Trailers_) {
+ Trailers_->ConstructInPlace(Slave_);
+ }
+ LastChunkReaded_ = true;
+
+ return false;
+ }
+
+private:
+ IInputStream* Slave_;
+ TMaybe<THttpHeaders>* Trailers_;
+ size_t Pending_;
+ bool LastChunkReaded_;
+};
+
+TChunkedInput::TChunkedInput(IInputStream* slave, TMaybe<THttpHeaders>* trailers)
+ : Impl_(new TImpl(slave, trailers))
+{
+}
+
+TChunkedInput::~TChunkedInput() {
+}
+
+size_t TChunkedInput::DoRead(void* buf, size_t len) {
+ return Impl_->Read(buf, len);
+}
+
+size_t TChunkedInput::DoSkip(size_t len) {
+ return Impl_->Skip(len);
+}
+
+class TChunkedOutput::TImpl {
+ typedef IOutputStream::TPart TPart;
+
+public:
+ inline TImpl(IOutputStream* slave)
+ : Slave_(slave)
+ {
+ }
+
+ inline ~TImpl() {
+ }
+
+ inline void Write(const void* buf, size_t len) {
+ const char* ptr = (const char*)buf;
+
+ while (len) {
+ const size_t portion = Min<size_t>(len, 1024 * 16);
+
+ WriteImpl(ptr, portion);
+
+ ptr += portion;
+ len -= portion;
+ }
+ }
+
+ inline void WriteImpl(const void* buf, size_t len) {
+ char tmp[32];
+ char* e = tmp + sizeof(tmp);
+ char* b = ToHex(len, e);
+
+ const TPart parts[] = {
+ TPart(b, e - b),
+ TPart::CrLf(),
+ TPart(buf, len),
+ TPart::CrLf(),
+ };
+
+ Slave_->Write(parts, sizeof(parts) / sizeof(*parts));
+ }
+
+ inline void Flush() {
+ Slave_->Flush();
+ }
+
+ inline void Finish() {
+ Slave_->Write("0\r\n\r\n", 5);
+
+ Flush();
+ }
+
+private:
+ IOutputStream* Slave_;
+};
+
+TChunkedOutput::TChunkedOutput(IOutputStream* slave)
+ : Impl_(new TImpl(slave))
+{
+}
+
+TChunkedOutput::~TChunkedOutput() {
+ try {
+ Finish();
+ } catch (...) {
+ }
+}
+
+void TChunkedOutput::DoWrite(const void* buf, size_t len) {
+ if (Impl_.Get()) {
+ Impl_->Write(buf, len);
+ } else {
+ ythrow yexception() << "can not write to finished stream";
+ }
+}
+
+void TChunkedOutput::DoFlush() {
+ if (Impl_.Get()) {
+ Impl_->Flush();
+ }
+}
+
+void TChunkedOutput::DoFinish() {
+ if (Impl_.Get()) {
+ Impl_->Finish();
+ Impl_.Destroy();
+ }
+}