Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions test/src/unit-vfs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -926,6 +926,69 @@ TEST_CASE("VFS: Test remove_dir_if_empty", "[vfs][remove-dir-if-empty]") {
REQUIRE_NOTHROW(vfs.remove_dir(URI(path)));
}

// Basic smoke test for persistent file handle caching in Posix::write().
// The fd is kept open across consecutive writes and only closed on flush().
TEST_CASE(
"VFS: Persistent file handles for local writes",
"[vfs][persistent-handles]") {
ThreadPool compute_tp(4);
ThreadPool io_tp(4);
VFS vfs{
&g_helper_stats, g_helper_logger().get(), &compute_tp, &io_tp, Config{}};
std::string base = local_path();

SECTION("Multiple writes accumulate before flush") {
URI testfile(base + "multi_write_file");

std::string part1 = "Hello, ";
std::string part2 = "persistent ";
std::string part3 = "handles!";
REQUIRE_NOTHROW(vfs.write(testfile, part1.data(), part1.size()));
REQUIRE_NOTHROW(vfs.write(testfile, part2.data(), part2.size()));
REQUIRE_NOTHROW(vfs.write(testfile, part3.data(), part3.size()));

require_tiledb_ok(vfs.close_file(testfile));

std::string expected = "Hello, persistent handles!";
CHECK(vfs.file_size(testfile) == expected.size());

std::vector<char> buf(expected.size());
require_tiledb_ok(vfs.read_exactly(testfile, 0, buf.data(), buf.size()));
CHECK(std::string(buf.data(), buf.size()) == expected);

REQUIRE_NOTHROW(vfs.remove_file(testfile));
}

SECTION("Append across separate open cycles") {
URI testfile(base + "reopen_append_file");

// First write cycle.
std::string data1 = "first";
REQUIRE_NOTHROW(vfs.write(testfile, data1.data(), data1.size()));
require_tiledb_ok(vfs.close_file(testfile));
CHECK(vfs.file_size(testfile) == data1.size());

// Second write cycle -- file already exists on disk.
std::string data2 = "second";
REQUIRE_NOTHROW(vfs.write(testfile, data2.data(), data2.size()));
require_tiledb_ok(vfs.close_file(testfile));

std::string expected = "firstsecond";
CHECK(vfs.file_size(testfile) == expected.size());

std::vector<char> buf(expected.size());
require_tiledb_ok(vfs.read_exactly(testfile, 0, buf.data(), buf.size()));
CHECK(std::string(buf.data(), buf.size()) == expected);

REQUIRE_NOTHROW(vfs.remove_file(testfile));
}

// Clean up the test directory.
if (vfs.is_dir(URI(base))) {
REQUIRE_NOTHROW(vfs.remove_dir(URI(base)));
}
}

#ifdef HAVE_AZURE
TEST_CASE("VFS: Construct Azure Blob Storage endpoint URIs", "[azure][uri]") {
// Test the construction of Azure Blob Storage URIs from account name and SAS
Expand Down
105 changes: 83 additions & 22 deletions tiledb/sm/filesystem/posix.cc
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The newly added logic should be moved to local.cc as much as currently possible, to reduce duplications across POSIX and Windows.

Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,14 @@ class PosixDIR {
optional<DIR*> dir_;
};

Posix::~Posix() {
std::lock_guard<std::mutex> lock(open_files_mtx_);
for (auto& [path, of] : open_files_) {
::close(of.fd);
}
open_files_.clear();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
open_files_.clear();

No need to clear the map; it's going to be freed very soon either way.

}

Posix::Posix(const Config& config) {
// Initialize member variables with posix config parameters.

Expand Down Expand Up @@ -199,8 +207,22 @@ bool Posix::is_file(const URI& uri) const {
return (stat(uri.to_path().c_str(), &st) == 0) && !S_ISDIR(st.st_mode);
}

void Posix::evict_cached_fds(const std::string& path_prefix) const {
std::lock_guard<std::mutex> lock(open_files_mtx_);
for (auto it = open_files_.begin(); it != open_files_.end();) {
if (it->first == path_prefix ||
it->first.compare(0, path_prefix.size() + 1, path_prefix + "/") == 0) {
::close(it->second.fd);
it = open_files_.erase(it);
} else {
++it;
}
}
}

void Posix::remove_dir(const URI& uri) const {
auto path = uri.to_path();
evict_cached_fds(path);
int rc = nftw(path.c_str(), unlink_cb, 64, FTW_DEPTH | FTW_PHYS);
if (rc) {
throw IOError(
Expand All @@ -223,6 +245,7 @@ bool Posix::remove_dir_if_empty(const std::string& path) const {

void Posix::remove_file(const URI& uri) const {
auto path = uri.to_path();
evict_cached_fds(path);
if (remove(path.c_str()) != 0) {
throw IOError(
std::string("Cannot delete file '") + path + "'; " + strerror(errno));
Expand All @@ -245,6 +268,7 @@ uint64_t Posix::file_size(const URI& uri) const {
}

void Posix::move_file(const URI& old_path, const URI& new_path) const {
evict_cached_fds(old_path.to_path());
auto new_uri_path = new_path.to_path();
throw_if_not_ok(ensure_directory(new_uri_path));
if (rename(old_path.to_path().c_str(), new_path.to_path().c_str()) != 0) {
Expand Down Expand Up @@ -296,7 +320,34 @@ uint64_t Posix::read(
}

void Posix::flush(const URI& uri, bool) {
sync(uri);
auto path = uri.to_path();
int fd = -1;

{
std::lock_guard<std::mutex> lock(open_files_mtx_);
auto it = open_files_.find(path);
if (it != open_files_.end()) {
fd = it->second.fd;
open_files_.erase(it);
}
}

if (fd != -1) {
// fsync and close the cached file descriptor.
if (::fsync(fd) != 0) {
auto err = errno;
::close(fd);
throw IOError(
std::string("Cannot sync file '") + path + "'; " + strerror(err));
}
if (::close(fd) != 0) {
throw IOError(
std::string("Cannot close file '") + path + "'; " + strerror(errno));
}
} else {
// No cached fd (e.g. directory sync or file not written through us).
sync(uri);
}
}

void Posix::sync(const URI& uri) const {
Expand Down Expand Up @@ -350,32 +401,42 @@ void Posix::write(
}
}

// Get file offset (equal to file size)
Status st;
uint64_t file_offset = 0;
if (is_file(URI(path))) {
file_offset = file_size(URI(path));
} else {
throw_if_not_ok(ensure_directory(path));
}
int fd;
uint64_t file_offset;

// Open or create file.
int fd = open(path.c_str(), O_WRONLY | O_CREAT, file_permissions_);
if (fd == -1) {
throw IOError(
std::string("Cannot open file '") + path + "'; " + strerror(errno));
{
std::lock_guard<std::mutex> lock(open_files_mtx_);
auto it = open_files_.find(path);
if (it != open_files_.end()) {
// Reuse the cached file descriptor.
fd = it->second.fd;
file_offset = it->second.offset;
} else {
// First write to this path: open fd and determine current size.
if (is_file(URI(path))) {
file_offset = file_size(URI(path));
} else {
throw_if_not_ok(ensure_directory(path));
file_offset = 0;
}
fd = ::open(path.c_str(), O_WRONLY | O_CREAT, file_permissions_);
if (fd == -1) {
throw IOError(
std::string("Cannot open file '") + path + "'; " + strerror(errno));
}
open_files_.emplace(path, OpenFile{fd, file_offset});
}
}

st = write_at(fd, file_offset, buffer, buffer_size);
auto st = write_at(fd, file_offset, buffer, buffer_size);
if (!st.ok()) {
close(fd);
std::stringstream errmsg;
errmsg << "Cannot write to file '" << path << "'; " << st.message();
throw IOError(errmsg.str());
}
if (close(fd) != 0) {
throw IOError(
std::string("Cannot close file '") + path + "'; " + strerror(errno));
std::string("Cannot write to file '") + path + "'; " + st.message());
}

{
std::lock_guard<std::mutex> lock(open_files_mtx_);
open_files_[path].offset = file_offset + buffer_size;
}
}

Expand Down
25 changes: 23 additions & 2 deletions tiledb/sm/filesystem/posix.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@
#include <unistd.h>

#include <functional>
#include <mutex>
#include <string>
#include <unordered_map>
#include <vector>

#include "tiledb/common/status.h"
Expand Down Expand Up @@ -79,8 +81,8 @@ class Posix : public LocalFilesystem {
/** Constructor. */
explicit Posix(const Config& config);

/** Destructor. */
~Posix() override = default;
/** Destructor. Closes any cached file descriptors. */
~Posix() override;

/* ********************************* */
/* API */
Expand Down Expand Up @@ -299,6 +301,25 @@ class Posix : public LocalFilesystem {

private:
uint32_t file_permissions_, directory_permissions_;

/**
* Closes and removes cached fds whose path equals or is under the given
* prefix. Called from const removal/move methods to keep the cache
* consistent.
*/
void evict_cached_fds(const std::string& path_prefix) const;

/** State for a file descriptor kept open between write() and flush(). */
struct OpenFile {
int fd;
uint64_t offset;
};
Comment on lines +312 to +316
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The struct should have a destructor (and be un-copiable and immovable).


/** Maps path -> cached file descriptor and current write offset. */
mutable std::unordered_map<std::string, OpenFile> open_files_;

/** Protects open_files_. */
mutable std::mutex open_files_mtx_;
};

} // namespace sm
Expand Down
Loading
Loading