Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 117 additions & 47 deletions src/cpp/src/kvs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <fstream>
#include <iostream>
#include <sstream>
#include <filesystem>

// TODO Default Value Handling TBD
// TODO Add Score Logging
Expand Down Expand Up @@ -47,7 +48,8 @@ Kvs::Kvs(Kvs&& other) noexcept
object would also be okay*/
,
writer(std::move(other.writer)),
logger(std::move(other.logger))
logger(std::move(other.logger)),
max_storage_bytes(other.max_storage_bytes)
{
{
std::lock_guard<std::mutex> lock(other.kvs_mutex);
Expand Down Expand Up @@ -81,6 +83,7 @@ Kvs& Kvs::operator=(Kvs&& other) noexcept
parser = std::move(other.parser);
writer = std::move(other.writer);
logger = std::move(other.logger);
max_storage_bytes = other.max_storage_bytes;
}
return *this;
}
Expand Down Expand Up @@ -218,7 +221,8 @@ score::Result<std::unordered_map<string, KvsValue>> Kvs::open_json(const score::
score::Result<Kvs> Kvs::open(const InstanceId& instance_id,
OpenNeedDefaults need_defaults,
OpenNeedKvs need_kvs,
const std::string&& dir)
const std::string&& dir,
std::optional<size_t> max_storage_bytes)
{
score::Result<Kvs> result =
score::MakeUnexpected(ErrorCode::UnmappedError); /* Redundant initialization needed, since Resul<KVS> would call
Expand All @@ -230,6 +234,7 @@ score::Result<Kvs> Kvs::open(const InstanceId& instance_id,
const score::filesystem::Path filename_kvs = filename_prefix.Native() + "_0";

Kvs kvs; /* Create KVS instance */
kvs.max_storage_bytes = max_storage_bytes; /* Store maximum storage limit */
auto default_res = kvs.open_json(
filename_default,
need_defaults == OpenNeedDefaults::Required ? OpenJsonNeedFile::Required : OpenJsonNeedFile::Optional);
Expand Down Expand Up @@ -328,6 +333,7 @@ score::Result<bool> Kvs::key_exists(const std::string_view key)
return result;
}


/* Retrieve the value associated with a key*/
score::Result<KvsValue> Kvs::get_value(const std::string_view key)
{
Expand Down Expand Up @@ -472,6 +478,54 @@ score::ResultBlank Kvs::remove_key(const std::string_view key)
return result;
}

score::Result<size_t> Kvs::get_file_size(const score::filesystem::Path& file_path) {
std::error_code ec;
const auto size = std::filesystem::file_size(file_path.CStr(), ec);

if (ec) {
// Check if the error is "file not found"
if (ec == std::errc::no_such_file_or_directory) {
// File does not exist, its size is 0. This is not an error.
return 0;
}
logger->LogError() << "Error: Could not get size of file " << file_path << ": " << ec.message();
return score::MakeUnexpected(ErrorCode::PhysicalStorageFailure);
}

return size;
}

/* Helper Function to get current storage size of all persisted files (defaults and historical snapshots) */
score::Result<size_t> Kvs::get_current_storage_size() {
size_t total_size = 0;
const std::array<const char*, 2> file_extensions = {".json", ".hash"};

// Add the size of the default files
const std::string default_suffix = "_default";
for (const char* extension : file_extensions) {
const score::filesystem::Path file_path = filename_prefix.Native() + default_suffix + extension;
auto size_result = get_file_size(file_path);
if (!size_result) {
return size_result; // Propagate error directly
}
total_size += size_result.value();
}

// Add the size of current and historical snapshots that will be kept after rotation (0 to N-1).
for (size_t snapshot_index = 0; snapshot_index < KVS_MAX_SNAPSHOTS; ++snapshot_index) {
const std::string snapshot_suffix = "_" + to_string(snapshot_index);

for (const char* extension : file_extensions) {
const score::filesystem::Path file_path = filename_prefix.Native() + snapshot_suffix + extension;
auto size_result = get_file_size(file_path);
if (!size_result) {
return size_result; // Propagate error directly
}
total_size += size_result.value();
}
}
return total_size;
}
/* Helper: write data to a file and ensure it reaches physical storage.*/
score::ResultBlank Kvs::write_and_sync(const std::string& path, const void* data, std::size_t size)
{
Expand Down Expand Up @@ -550,66 +604,82 @@ score::ResultBlank Kvs::write_json_data(const std::string& buf)
return result;
}

/* Flush the key-value store*/
score::ResultBlank Kvs::flush()
{
score::ResultBlank result = score::MakeUnexpected(ErrorCode::UnmappedError);
/* Create JSON Object */
score::Result<std::string> Kvs::serialize_and_check() {
score::json::Object root_obj;
bool error = false;

// 1. Serialize the current KVS map to a buffer
{
std::unique_lock<std::mutex> lock(kvs_mutex, std::try_to_lock);
if (lock.owns_lock())
{
for (const auto& [key, value] : kvs)
{
auto conv = kvsvalue_to_any(value);
if (!conv)
{
result = score::MakeUnexpected(static_cast<ErrorCode>(*conv.error()));
error = true;
break;
}
else
{
root_obj.emplace(key, std::move(conv.value()) /*emplace in map uses move operator*/
);
if (!conv) {
return score::MakeUnexpected(static_cast<ErrorCode>(*conv.error()));
}
root_obj.emplace(key, std::move(conv.value()));
}
}
else
{
result = score::MakeUnexpected(ErrorCode::MutexLockFailed);
error = true;
} else {
return score::MakeUnexpected(ErrorCode::MutexLockFailed);
}
}

if (!error)
{
/* Serialize Buffer */
auto buf_res = writer->ToBuffer(root_obj);
if (!buf_res)
{
result = score::MakeUnexpected(ErrorCode::JsonGeneratorError);
}
else
{
/* Rotate Snapshots */
auto rotate_result = snapshot_rotate();
if (!rotate_result)
{
result = rotate_result;
}
else
{
/* Write JSON Data */
std::string buf = std::move(buf_res.value());
result = write_json_data(buf);
}
}
auto buf_res = writer->ToBuffer(root_obj);
if (!buf_res) {
return score::MakeUnexpected(ErrorCode::JsonGeneratorError);
}
const std::string& buf = buf_res.value();

return result;
// 2. Get the size of all other persisted files
auto current_size_res = get_current_storage_size();
if (!current_size_res) {
return score::MakeUnexpected(static_cast<ErrorCode>(*current_size_res.error()));
}

// 3. Calculate the potential total size
const size_t total_size = current_size_res.value() + buf.size() + HASH_FILE_SIZE;

// 4. Check against the limit
if (this->max_storage_bytes.has_value() && total_size > this->max_storage_bytes.value()) {
logger->LogError() << "error: KVS storage limit would be exceeded. total_size:" << total_size
<< " max_storage_bytes:" << this->max_storage_bytes.value();
return score::MakeUnexpected(ErrorCode::OutOfStorageSpace);
}

return buf;
}

/* Flush the key-value store*/
score::ResultBlank Kvs::flush() {
auto result = serialize_and_check();
if (!result) {
return score::MakeUnexpected(static_cast<ErrorCode>(*result.error()));
}

auto rotate_result = snapshot_rotate();
if (!rotate_result) {
return rotate_result;
}

return write_json_data(result.value());
}

/* Performs a 'dry run' to check the potential storage size */
score::Result<size_t> Kvs::calculate_potential_size() {
auto result = serialize_and_check();
if (!result) {
return score::MakeUnexpected(static_cast<ErrorCode>(*result.error()));
}

// Re-calculate size to return it, as serialize_and_check only returns the buffer
const std::string& buf = result.value();
auto current_size_res = get_current_storage_size();
if (!current_size_res) {
return score::MakeUnexpected(static_cast<ErrorCode>(*current_size_res.error()));
}

return current_size_res.value() + buf.size() + HASH_FILE_SIZE;
}

/* Retrieve the snapshot count*/
Expand Down
62 changes: 43 additions & 19 deletions src/cpp/src/kvs.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@
#include <string>
#include <unordered_map>
#include <vector>
#include <optional>

#define KVS_MAX_SNAPSHOTS 3
static constexpr size_t HASH_FILE_SIZE = 4;

namespace score::mw::per::kvs
{

{
struct InstanceId
{
size_t id;
Expand Down Expand Up @@ -172,7 +174,8 @@ class Kvs final
static score::Result<Kvs> open(const InstanceId& instance_id,
OpenNeedDefaults need_defaults,
OpenNeedKvs need_kvs,
const std::string&& dir);
const std::string&& dir,
std::optional<size_t> max_storage_bytes);

/**
* @brief Resets a key-value-storage to its initial state
Expand Down Expand Up @@ -352,37 +355,58 @@ class Kvs final
*/
score::Result<score::filesystem::Path> get_hash_filename(const SnapshotId& snapshot_id) const;

private:
/* Private constructor to prevent direct instantiation */
Kvs();
/**
* @brief Performs a 'dry run' to check if the current in-memory store would
* exceed the storage limit upon flushing.
*
* This function serializes the current key-value data to a temporary buffer
* and calculates the potential total storage size. It checks this size against
* the configured `max_storage_bytes` limit.
*
* @return A score::Result object containing either:
* - On success: The estimated total size (size_t) that the KVS would occupy after a flush.
* - On failure: An `OutOfStorageSpace` error if the limit would be exceeded,
* or another ErrorCode for other failures (e.g., serialization).
*/
score::Result<size_t> calculate_potential_size();

private:
/* Private constructor to prevent direct instantiation */
Kvs();

/* Internal storage and configuration details.*/
std::mutex kvs_mutex;
std::unordered_map<std::string, KvsValue> kvs;
/* Internal storage and configuration details.*/
std::mutex kvs_mutex;
std::unordered_map<std::string, KvsValue> kvs;

/* Optional default values */
std::unordered_map<std::string, KvsValue> default_values;
/* Optional default values */
std::unordered_map<std::string, KvsValue> default_values;

/* Filename prefix */
score::filesystem::Path filename_prefix;
/* Filename prefix */
score::filesystem::Path filename_prefix;

/* Filesystem handling */
std::unique_ptr<score::filesystem::Filesystem> filesystem;
/* Filesystem handling */
std::unique_ptr<score::filesystem::Filesystem> filesystem;

/* Json handling */
std::unique_ptr<score::json::IJsonParser> parser;
std::unique_ptr<score::json::IJsonWriter> writer;
/* Json handling */
std::unique_ptr<score::json::IJsonParser> parser;
std::unique_ptr<score::json::IJsonWriter> writer;

/* Logging */
std::unique_ptr<score::mw::log::Logger> logger;
/* Logging */
std::unique_ptr<score::mw::log::Logger> logger;

/* Maximum storage limit in bytes */
std::optional<size_t> max_storage_bytes;
/* Private Methods */
score::ResultBlank snapshot_rotate();
score::Result<std::unordered_map<std::string, KvsValue>> parse_json_data(const std::string& data);
score::Result<std::unordered_map<std::string, KvsValue>> open_json(const score::filesystem::Path& prefix,
OpenJsonNeedFile need_file);
score::ResultBlank write_json_data(const std::string& buf);
score::ResultBlank write_and_sync(const std::string& path, const void* data, std::size_t size);

score::Result<std::string> serialize_and_check();
score::Result<size_t> get_file_size(const score::filesystem::Path& file_path);
score::Result<size_t> get_current_storage_size();
};

} /* namespace score::mw::per::kvs */
Expand Down
12 changes: 10 additions & 2 deletions src/cpp/src/kvsbuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ KvsBuilder::KvsBuilder(const InstanceId& instance_id)
: instance_id(instance_id),
need_defaults(false),
need_kvs(false),
directory("./data_folder/") /* Default Directory */
directory("./data_folder/"), /* Default Directory */
maximum_size(KVS_DEFAULT_MAX_SIZE_BYTES) /* Default max size in bytes */
{
}

Expand All @@ -42,6 +43,12 @@ KvsBuilder& KvsBuilder::dir(std::string&& dir_path)
return *this;
}

KvsBuilder& KvsBuilder::max_size(std::optional<size_t> max_bytes)
{
this->maximum_size = max_bytes;
return *this;
}

score::Result<Kvs> KvsBuilder::build()
{
score::Result<Kvs> result = score::MakeUnexpected(ErrorCode::UnmappedError);
Expand All @@ -55,7 +62,8 @@ score::Result<Kvs> KvsBuilder::build()
result = Kvs::open(instance_id,
need_defaults ? OpenNeedDefaults::Required : OpenNeedDefaults::Optional,
need_kvs ? OpenNeedKvs::Required : OpenNeedKvs::Optional,
std::move(directory));
std::move(directory),
maximum_size);

return result;
}
Expand Down
Loading
Loading