authorValery Mironov <mbkkt@ydb.tech>2024-07-08 10:55:06 +0300
committerGitHub <noreply@github.com>2024-07-08 10:55:06 +0300
commit06a15253bdc1bd0d810f1fca0d79a49cea5d56b5 (patch)
parent1f88b11c2cb56fe35a57e2884f771cca6d4b1f3d (diff)
Add flat vector index example to C++ SDK (#6262)
7 files changed, 500 insertions, 0 deletions
diff --git a/ydb/public/sdk/cpp/examples/vector_index/README.md b/ydb/public/sdk/cpp/examples/vector_index/README.md
new file mode 100644
index 0000000000..244d1fcf52
--- /dev/null
+++ b/ydb/public/sdk/cpp/examples/vector_index/README.md
@@ -0,0 +1,74 @@
+# Vector Index
+Parameters description
+./vector_index --help
+Usage: ./vector_index [OPTIONS] [ARG]...
+Required parameters:
+ {-e|--endpoint} HOST:PORT YDB endpoint
+ {-d|--database} PATH YDB database
+ {-c|--command} COMMAND execute command: [Create|Drop|Build|Recreate]Index
+ or TopK (used for search, read request)
+ --table TABLE table name
+ --index_type TYPE index type: [flat]
+ --index_quantizer QUANTIZER
+ index quantizer: [none|int8|uint8|bit]
+ --primary_key PK primary key column
+ --embedding EMBEDDING embedding (vector) column
+ --distance DISTANCE distance function:
+ [Cosine|Euclidean|Manhattan]Distance
+ --rows ROWS count of rows in table, used only for
+ [Build|Recreate]Index commands
+ --top_k TOPK count of rows in top, used only for TopK command
+ --data DATA list of columns to read, used only for TopK command
+ --target TARGET file with target vector, used only for TopK command
+RecreateIndex -- command which sequentially executes Drop, Create and Build commands
+Index table will have name like `<table>_<index_type>_<index_quantizer>`
+## Examples
+### Flat Index
+It uses scalar quantization to speedup ANN search:
+* an approximate search is performed using quantization
+* an approximate list of primary keys is obtained
+* we search this list without using quantization
+#### Create Flat Bit Index
+It creates index table, with two columns: `primary_key` and `embedding`, `embedding` will be trasformed from `<table>` `<embedding>`
+./vector_index --endpoint=<endpoint> --database=<database> --command=RecreateIndex --table=<table> --index_type=flat --index_quantizer=bit --primary_key=<primary_key> --embedding=<embedding> --distance=CosineDistance --rows=490000 --top_k=0 --data="" --target=""
+#### Search Flat Bit Index
+Execute query like this, to get approximate candidates
+$candidates = SELECT primary_key
+FROM index_table
+ORDER BY CosineDistance(target, embedding)
+LIMIT top_k * 2
+And then execute and print
+SELECT CosineDistance(target, embedding) AS distance -- <list of columns from data parameter>
+FROM table
+WHERE primary_key IN $candidates
+ORDER BY distance
+LIMIT top_k
+./vector_index --endpoint=<endpoint> --database=<database> --command=RecreateIndex --table=<table> --index_type=flat --index_quantizer=bit --primary_key=<primary_key> --embedding=<embedding> --distance=CosineDistance --rows=0 --top_k=15 --data=<list of string columns, e.g. "title, text"> --target=<absolute-path-to-target.json>
diff --git a/ydb/public/sdk/cpp/examples/vector_index/main.cpp b/ydb/public/sdk/cpp/examples/vector_index/main.cpp
new file mode 100644
index 0000000000..fe310f3cf5
--- /dev/null
+++ b/ydb/public/sdk/cpp/examples/vector_index/main.cpp
@@ -0,0 +1,72 @@
+#include "vector_index.h"
+#include <util/system/env.h>
+#include <util/stream/file.h>
+using namespace NLastGetopt;
+using namespace NYdb;
+int main(int argc, char** argv) {
+ TString endpoint;
+ TString command;
+ TOptions options;
+ TOpts opts = TOpts::Default();
+ opts.AddLongOption('e', "endpoint", "YDB endpoint").Required().RequiredArgument("HOST:PORT").StoreResult(&endpoint);
+ opts.AddLongOption('d', "database", "YDB database").Required().RequiredArgument("PATH").StoreResult(&options.Database);
+ opts.AddLongOption('c', "command", "execute command: [Create|Drop|Build|Recreate]Index or TopK (used for search, read request)").Required().RequiredArgument("COMMAND").StoreResult(&command);
+ opts.AddLongOption("table", "table name").Required().RequiredArgument("TABLE").StoreResult(&options.Table);
+ opts.AddLongOption("index_type", "index type: [flat]").Required().RequiredArgument("TYPE").StoreResult(&options.IndexType);
+ opts.AddLongOption("index_quantizer", "index quantizer: [none|int8|uint8|bit]").Required().RequiredArgument("QUANTIZER").StoreResult(&options.IndexQuantizer);
+ opts.AddLongOption("primary_key", "primary key column").Required().RequiredArgument("PK").StoreResult(&options.PrimaryKey);
+ opts.AddLongOption("embedding", "embedding (vector) column").Required().RequiredArgument("EMBEDDING").StoreResult(&options.Embedding);
+ opts.AddLongOption("distance", "distance function: [Cosine|Euclidean|Manhattan]Distance").Required().RequiredArgument("DISTANCE").StoreResult(&options.Distance);
+ opts.AddLongOption("rows", "count of rows in table, used only for [Build|Recreate]Index commands").Required().RequiredArgument("ROWS").StoreResult(&options.Rows);
+ opts.AddLongOption("top_k", "count of rows in top, used only for TopK command").Required().RequiredArgument("TOPK").StoreResult(&options.TopK);
+ opts.AddLongOption("data", "list of columns to read, used only for TopK command").Required().RequiredArgument("DATA").StoreResult(&options.Data);
+ opts.AddLongOption("target", "file with target vector, used only for TopK command").Required().RequiredArgument("TARGET").StoreResult(&options.Target);
+ opts.SetFreeArgsMin(0);
+ TOptsParseResult result(&opts, argc, argv);
+ ECommand cmd = Parse(command);
+ if (cmd == ECommand::None) {
+ Cerr << "Unsupported command: " << command << Endl;
+ return 1;
+ }
+ auto config = TDriverConfig()
+ .SetEndpoint(endpoint)
+ .SetDatabase(options.Database)
+ .SetAuthToken(GetEnv("YDB_TOKEN"));
+ TDriver driver(config);
+ try {
+ switch (cmd) {
+ case ECommand::DropIndex:
+ return DropIndex(driver, options);
+ case ECommand::CreateIndex:
+ return CreateIndex(driver, options);
+ case ECommand::BuildIndex:
+ return BuildIndex(driver, options);
+ case ECommand::RecreateIndex:
+ if (auto r = DropIndex(driver, options); r != 0) {
+ return r;
+ }
+ if (auto r = CreateIndex(driver, options); r != 0) {
+ return r;
+ }
+ return BuildIndex(driver, options);
+ case ECommand::TopK:
+ return TopK(driver, options);
+ default:
+ break;
+ }
+ } catch (const std::exception& e) {
+ Cerr << "Execution failed: " << e.what() << Endl;
+ }
+ return 1;
diff --git a/ydb/public/sdk/cpp/examples/vector_index/target.json b/ydb/public/sdk/cpp/examples/vector_index/target.json
new file mode 100644
index 0000000000..7eeb1d3946
--- /dev/null
+++ b/ydb/public/sdk/cpp/examples/vector_index/target.json
@@ -0,0 +1 @@
diff --git a/ydb/public/sdk/cpp/examples/vector_index/vector_index.cpp b/ydb/public/sdk/cpp/examples/vector_index/vector_index.cpp
new file mode 100644
index 0000000000..d8b452bfc4
--- /dev/null
+++ b/ydb/public/sdk/cpp/examples/vector_index/vector_index.cpp
@@ -0,0 +1,288 @@
+#include "vector_index.h"
+#include <format>
+#include <fstream>
+#include <sstream>
+#include <thread>
+template <>
+struct std::formatter<TString>: std::formatter<std::string_view> {
+ template <typename FormatContext>
+ auto format(const TString& param, FormatContext& fc) const {
+ return std::formatter<std::string_view>::format(std::string_view{param}, fc);
+ }
+using namespace NYdb;
+using namespace NTable;
+namespace {
+constexpr ui64 kBulkSize = 1000;
+constexpr std::string_view FlatIndex = "flat";
+namespace NQuantizer {
+static constexpr std::string_view None = "None";
+static constexpr std::string_view Int8 = "Int8";
+static constexpr std::string_view Uint8 = "Uint8";
+static constexpr std::string_view Bit = "Bit";
+} // namespace NQuantizer
+bool EqualsICase(std::string_view l, std::string_view r) {
+ return std::equal(l.begin(), l.end(), r.begin(), r.end(), [](char l, char r) {
+ return std::tolower(l) == std::tolower(r);
+ });
+void PrintTop(TResultSetParser&& parser) {
+ while (parser.TryNextRow()) {
+ Y_ASSERT(parser.ColumnsCount() >= 1);
+ Cout << *parser.ColumnParser(0).GetOptionalFloat() << "\t";
+ for (size_t i = 1; i < parser.ColumnsCount(); ++i) {
+ Cout << *parser.ColumnParser(1).GetOptionalUtf8() << "\t";
+ }
+ Cout << "\n";
+ }
+ Cout << Endl;
+TString FullName(const TOptions& options, const TString& name) {
+ return TString::Join(options.Database, "/", name);
+TString IndexName(const TOptions& options) {
+ return TString::Join(options.Table, "_", options.IndexType, "_", options.IndexQuantizer);
+TString FullIndexName(const TOptions& options) {
+ return FullName(options, IndexName(options));
+void DropTable(TTableClient& client, const TString& table) {
+ auto r = client.RetryOperationSync([&](TSession session) {
+ TDropTableSettings settings;
+ return session.DropTable(table).ExtractValueSync();
+ });
+ if (!r.IsSuccess() && r.GetStatus() != EStatus::SCHEME_ERROR) {
+ ythrow TVectorException{r};
+ }
+void DropIndex(TTableClient& client, const TOptions& options) {
+ DropTable(client, FullIndexName(options));
+void CreateFlat(TTableClient& client, const TOptions& options) {
+ auto r = client.RetryOperationSync([&](TSession session) {
+ auto desc = TTableBuilder()
+ .AddNonNullableColumn(options.PrimaryKey, EPrimitiveType::Uint32)
+ .AddNullableColumn(options.Embedding, EPrimitiveType::String)
+ .SetPrimaryKeyColumn(options.PrimaryKey)
+ .Build();
+ return session.CreateTable(FullIndexName(options), std::move(desc)).ExtractValueSync();
+ });
+ if (!r.IsSuccess()) {
+ ythrow TVectorException{r};
+ }
+void UpdateFlat(TTableClient& client, const TOptions& options, std::string_view type) {
+ TString query = std::format(R"(
+ DECLARE $begin AS Uint64;
+ DECLARE $rows AS Uint64;
+ SELECT COALESCE(CAST({2} AS Uint32), 0) AS {2}, Untag(Knn::ToBinaryString{4}(CAST(Knn::FloatFromBinaryString({3}) AS List<{5}>)), "{4}Vector") AS {3}
+ FROM {0}
+ WHERE $begin <= {2} AND {2} < $begin + $rows;
+ )",
+ options.Table,
+ IndexName(options),
+ options.PrimaryKey,
+ options.Embedding,
+ type,
+ type == NQuantizer::Bit ? "Float" : type);
+ Cout << query << Endl;
+ auto last = std::chrono::steady_clock::now();
+ ui64 current = 0;
+ ui64 overall = (options.Rows + kBulkSize - 1) / kBulkSize;
+ auto report = [&](auto curr) {
+ Cout << "Already done " << current << " / " << overall << " upserts, time spent: " << std::chrono::duration<double>{curr - last}.count() << Endl;
+ last = curr;
+ };
+ auto waitRequest = [&](auto& request) {
+ auto r = request.ExtractValueSync();
+ if (!r.IsSuccess()) {
+ ythrow TVectorException{r};
+ }
+ ++current;
+ if (auto curr = std::chrono::steady_clock::now(); (curr - last) >= std::chrono::seconds{1}) {
+ report(curr);
+ }
+ };
+ std::deque<TAsyncStatus> requests;
+ auto waitFirst = [&] {
+ if (requests.size() < std::thread::hardware_concurrency()) {
+ return;
+ }
+ waitRequest(requests.front());
+ requests.pop_front();
+ };
+ TParamsBuilder paramsBuilder;
+ TRetryOperationSettings retrySettings;
+ retrySettings
+ .MaxRetries(60)
+ .GetSessionClientTimeout(TDuration::Seconds(60))
+ .Idempotent(true)
+ .RetryUndefined(true);
+ for (ui64 i = 0; i < options.Rows; i += kBulkSize) {
+ waitFirst();
+ paramsBuilder.AddParam("$begin").Uint64(i).Build();
+ paramsBuilder.AddParam("$rows").Uint64(kBulkSize).Build();
+ auto f = client.RetryOperation([&, p = paramsBuilder.Build()](TSession session) {
+ auto params = p;
+ return session.ExecuteDataQuery(
+ query,
+ TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(),
+ std::move(params))
+ .Apply([](auto result) -> TStatus {
+ return result.ExtractValueSync();
+ });
+ }, retrySettings);
+ requests.push_back(std::move(f));
+ }
+ for (auto& request : requests) {
+ waitRequest(request);
+ }
+ report(std::chrono::steady_clock::now());
+void TopKFlat(TTableClient& client, const TOptions& options, std::string_view type) {
+ TString query = std::format(R"(
+ $TargetBinary = Knn::ToBinaryStringFloat($Target);
+ $TargetSQ = Knn::ToBinaryString{7}(CAST($Target AS List<{8}>));
+ $IndexIds = SELECT {1}, Knn::{0}({4}, $TargetSQ) as distance
+ FROM {2}
+ ORDER BY distance
+ LIMIT {6} * 2;
+ SELECT Knn::{0}({4}, $TargetBinary) as distance, {5}
+ FROM {3}
+ WHERE {1} IN (SELECT {1} FROM $IndexIds)
+ ORDER BY distance
+ LIMIT {6};
+ )",
+ options.Distance,
+ options.PrimaryKey,
+ IndexName(options),
+ options.Table,
+ options.Embedding,
+ options.Data,
+ options.TopK,
+ type,
+ type == NQuantizer::Bit ? "Float" : type);
+ Cout << query << Endl;
+ std::ifstream targetFileStream(options.Target);
+ std::stringstream targetStrStream;
+ targetStrStream << targetFileStream.rdbuf();
+ query = std::format(R"($Target = CAST({0} AS List<Float>); {1})", targetStrStream.view(), query);
+ TExecDataQuerySettings settings;
+ settings.KeepInQueryCache(true);
+ auto r = client.RetryOperationSync([&](TSession session) -> TStatus {
+ auto f = session.ExecuteDataQuery(query, TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(),settings);
+ auto r = f.ExtractValueSync();
+ if (r.IsSuccess()) {
+ PrintTop(r.GetResultSetParser(0));
+ }
+ return r;
+ });
+ if (!r.IsSuccess()) {
+ ythrow TVectorException{r};
+ }
+} // namespace
+ECommand Parse(std::string_view command) {
+ if (EqualsICase(command, "DropIndex")) {
+ return ECommand::DropIndex;
+ }
+ if (EqualsICase(command, "CreateIndex")) {
+ return ECommand::CreateIndex;
+ }
+ if (EqualsICase(command, "BuildIndex")) {
+ return ECommand::BuildIndex;
+ }
+ if (EqualsICase(command, "RecreateIndex")) {
+ return ECommand::RecreateIndex;
+ }
+ if (EqualsICase(command, "TopK")) {
+ return ECommand::TopK;
+ }
+ return ECommand::None;
+int DropIndex(NYdb::TDriver& driver, const TOptions& options) {
+ TTableClient client(driver);
+ DropIndex(client, options);
+ return 0;
+int CreateIndex(NYdb::TDriver& driver, const TOptions& options) {
+ TTableClient client(driver);
+ if (options.IndexType == FlatIndex) {
+ CreateFlat(client, options);
+ return 0;
+ }
+ return 1;
+int BuildIndex(NYdb::TDriver& driver, const TOptions& options) {
+ TTableClient client(driver);
+ if (EqualsICase(options.IndexType, FlatIndex)) {
+ if (EqualsICase(options.IndexQuantizer, NQuantizer::None)) {
+ return 0;
+ }
+ if (EqualsICase(options.IndexQuantizer, NQuantizer::Int8)) {
+ UpdateFlat(client, options, NQuantizer::Int8);
+ return 0;
+ }
+ if (EqualsICase(options.IndexQuantizer, NQuantizer::Uint8)) {
+ UpdateFlat(client, options, NQuantizer::Uint8);
+ return 0;
+ }
+ if (EqualsICase(options.IndexQuantizer, NQuantizer::Bit)) {
+ UpdateFlat(client, options, NQuantizer::Bit);
+ return 0;
+ }
+ }
+ return 1;
+int TopK(NYdb::TDriver& driver, const TOptions& options) {
+ TTableClient client(driver);
+ if (EqualsICase(options.IndexType, FlatIndex)) {
+ if (EqualsICase(options.IndexQuantizer, NQuantizer::None)) {
+ return 0;
+ }
+ if (EqualsICase(options.IndexQuantizer, NQuantizer::Int8)) {
+ TopKFlat(client, options, NQuantizer::Int8);
+ return 0;
+ }
+ if (EqualsICase(options.IndexQuantizer, NQuantizer::Uint8)) {
+ TopKFlat(client, options, NQuantizer::Uint8);
+ return 0;
+ }
+ if (EqualsICase(options.IndexQuantizer, NQuantizer::Bit)) {
+ TopKFlat(client, options, NQuantizer::Bit);
+ return 0;
+ }
+ }
+ return 1;
+} \ No newline at end of file
diff --git a/ydb/public/sdk/cpp/examples/vector_index/vector_index.h b/ydb/public/sdk/cpp/examples/vector_index/vector_index.h
new file mode 100644
index 0000000000..8ebcf5ab85
--- /dev/null
+++ b/ydb/public/sdk/cpp/examples/vector_index/vector_index.h
@@ -0,0 +1,51 @@
+#pragma once
+#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h>
+#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
+#include <library/cpp/getopt/last_getopt.h>
+#include <util/generic/string.h>
+#include <util/generic/yexception.h>
+#include <util/stream/output.h>
+#include <util/string/builder.h>
+#include <util/string/printf.h>
+enum class ECommand {
+ DropIndex,
+ CreateIndex,
+ BuildIndex,
+ RecreateIndex, // Drop, Create, Build
+ TopK,
+ None,
+ECommand Parse(std::string_view command);
+struct TOptions {
+ TString Database;
+ TString Table;
+ TString IndexType;
+ TString IndexQuantizer;
+ TString PrimaryKey;
+ TString Embedding;
+ TString Distance;
+ TString Data;
+ TString Target;
+ ui64 Rows = 0;
+ ui64 TopK = 0;
+int DropIndex(NYdb::TDriver& driver, const TOptions& options);
+int CreateIndex(NYdb::TDriver& driver, const TOptions& options);
+int BuildIndex(NYdb::TDriver& driver, const TOptions& options);
+int TopK(NYdb::TDriver& driver, const TOptions& options);
+class TVectorException: public yexception {
+ TVectorException(const NYdb::TStatus& status) {
+ *this << "Status:" << status;
+ }
diff --git a/ydb/public/sdk/cpp/examples/vector_index/ya.make b/ydb/public/sdk/cpp/examples/vector_index/ya.make
new file mode 100644
index 0000000000..2c7a5c2a6f
--- /dev/null
+++ b/ydb/public/sdk/cpp/examples/vector_index/ya.make
@@ -0,0 +1,13 @@
+ main.cpp
+ vector_index.cpp
+ library/cpp/getopt
+ ydb/public/sdk/cpp/client/ydb_table
diff --git a/ydb/public/sdk/cpp/examples/ya.make b/ydb/public/sdk/cpp/examples/ya.make
index 2cb41f28d2..14a498dfd8 100644
--- a/ydb/public/sdk/cpp/examples/ya.make
+++ b/ydb/public/sdk/cpp/examples/ya.make
@@ -6,4 +6,5 @@ RECURSE(
+ vector_index