Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,33 +13,36 @@
#ifndef IPROCESSSTATERECEIVER_HPP_INCLUDED
#define IPROCESSSTATERECEIVER_HPP_INCLUDED

#include <optional>
#include "score/result/result.h"
#include <score/lcm/exec_error_domain.h>
#include <memory>
#include <optional>

#include <score/lcm/posixprocess.hpp>

namespace score {
namespace score
{

namespace lcm {
namespace lcm
{

/// @brief IProcessStateReceiver interface for handling the information about each Process current state.
/// Health Monitor (HM) shall use this interface in order to properly receive
/// Alive Monitor (AM) shall use this interface in order to properly receive
/// information about the current state from the posix processes running in the scope of an Adaptive Machine.
/// Each posix process state change is sent by Launch Manager (LCM) and can be read by HM.
/// Each posix process state change is sent by Launch Manager (LCM) and can be read by AM.

class IProcessStateReceiver {
public:
class IProcessStateReceiver
{
public:
virtual ~IProcessStateReceiver() noexcept = default;

/// @brief Returns a queued PosixProcess that has not yet been parsed.
/// @returns Result containing PosixProcess in case of success, or ExecError in case of failure.
virtual score::Result<std::optional<PosixProcess>> getNextChangedPosixProcess() noexcept = 0;
};

} // namespace lcm
} // namespace lcm

} // namespace score
} // namespace score

#endif
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,28 @@
#ifndef SCORE_LCM_IRECOVERYCLIENT_H_
#define SCORE_LCM_IRECOVERYCLIENT_H_

#include <optional>
#include <score/lcm/identifier_hash.hpp>
#include <optional>

namespace score {
namespace lcm {
namespace score
{
namespace lcm
{

/// @brief Represents a recovery request for a failed process group.
struct RecoveryRequest {
struct RecoveryRequest
{
/// @brief The id of the process group the failed process is running in
score::lcm::IdentifierHash process_group_identifier_{};
};

/// @brief The RecoveryClient allows the HealthMonitor component to report supervision failures to the ProcessGroupManager
/// thus requesting recovery for a specific process group.
/// The requests are queued and periodically processed by the ProcessGroupManager.
/// In case the buffer is full and request cannot be queued, the overflow flag is set.
/// A detected overflow shall be handled as a critical failure by the ProcessGroupManager.
class IRecoveryClient {
public:
/// @brief The RecoveryClient allows the AliveMonitor component to report supervision failures to the
/// ProcessGroupManager thus requesting recovery for a specific process group. The requests are queued and periodically
/// processed by the ProcessGroupManager. In case the buffer is full and request cannot be queued, the overflow flag is
/// set. A detected overflow shall be handled as a critical failure by the ProcessGroupManager.
class IRecoveryClient
{
public:
IRecoveryClient() noexcept = default;
virtual ~IRecoveryClient() noexcept = default;
IRecoveryClient(const IRecoveryClient&) = delete;
Expand All @@ -48,12 +51,13 @@ class IRecoveryClient {
/// @return The request, or std::nullopt if no request is available
virtual std::optional<RecoveryRequest> getNextRequest() noexcept = 0;

/// @brief Checks if overflow has been set, by previously calling `sendRecoveryRequest` while the queue was already full
/// @brief Checks if overflow has been set, by previously calling `sendRecoveryRequest` while the queue was already
/// full
/// @details Since overflow is a critical error, the flag is never reset
/// @return True if overflow has occurred, else false.
virtual bool hasOverflow() const noexcept = 0;
};
} // namespace lcm
} // namespace score
} // namespace lcm
} // namespace score

#endif
8 changes: 4 additions & 4 deletions src/launch_manager_daemon/src/main/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

#include <score/lcm/internal/log.hpp>

#include <process_group_manager/health_monitor_thread.hpp>
#include <process_group_manager/alive_monitor_thread.hpp>
#include <process_group_manager/processgroupmanager.hpp>
#include <score/lcm/processstatenotifier.hpp>
#include <score/lcm/recovery_client.hpp>
Expand Down Expand Up @@ -134,11 +134,11 @@ int main([[maybe_unused]] int argc, [[maybe_unused]] const char* argv[])
std::unique_ptr<score::lcm::saf::daemon::IHealthMonitor> healthMonitor{
std::make_unique<score::lcm::saf::daemon::HealthMonitorImpl>(
recoveryClient, std::move(watchdog), process_state_notifier->constructReceiver())};
std::unique_ptr<score::lcm::internal::IHealthMonitorThread> healthMonitorThread{
std::make_unique<score::lcm::internal::HealthMonitorThread>(std::move(healthMonitor))};
std::unique_ptr<score::lcm::internal::IAliveMonitorThread> aliveMonitorThread{
std::make_unique<score::lcm::internal::AliveMonitorThread>(std::move(healthMonitor))};

std::unique_ptr<ProcessGroupManager> process_group_manager = std::make_unique<ProcessGroupManager>(
std::move(healthMonitorThread), recoveryClient, std::move(process_state_notifier));
std::move(aliveMonitorThread), recoveryClient, std::move(process_state_notifier));

if (initializeLCMDaemon(*process_group_manager))
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,27 +10,30 @@
*
* SPDX-License-Identifier: Apache-2.0
********************************************************************************/
#include "health_monitor_thread.hpp"
#include "alive_monitor_thread.hpp"

namespace score
{
namespace lcm
{
namespace internal
namespace internal
{

HealthMonitorThread::HealthMonitorThread(std::unique_ptr<saf::daemon::IHealthMonitor> health_monitor) : m_health_monitor(std::move(health_monitor)) {

AliveMonitorThread::AliveMonitorThread(std::unique_ptr<saf::daemon::IHealthMonitor> health_monitor)
: m_health_monitor(std::move(health_monitor))
{
}

bool HealthMonitorThread::start() {
bool AliveMonitorThread::start()
{
score::lcm::saf::daemon::EInitCode init_status{score::lcm::saf::daemon::EInitCode::kNotInitialized};
health_monitor_thread_ = std::thread([this, &init_status]() {
alive_monitor_thread_ = std::thread([this, &init_status]() {
const auto initResult = m_health_monitor->init();

notifyInitializationComplete(init_status, initResult);

if (initResult == saf::daemon::EInitCode::kNoError) {
if (initResult == saf::daemon::EInitCode::kNoError)
{
m_health_monitor->run(stop_thread_);
}
});
Expand All @@ -40,33 +43,33 @@ bool HealthMonitorThread::start() {
return init_status == saf::daemon::EInitCode::kNoError;
}

void HealthMonitorThread::stop() {
void AliveMonitorThread::stop()
{
stop_thread_.store(true);
if (health_monitor_thread_.joinable()) {
health_monitor_thread_.join();
if (alive_monitor_thread_.joinable())
{
alive_monitor_thread_.join();
}
}

void HealthMonitorThread::notifyInitializationComplete(
score::lcm::saf::daemon::EInitCode& f_init_status_r,
const score::lcm::saf::daemon::EInitCode f_init_result) {
void AliveMonitorThread::notifyInitializationComplete(score::lcm::saf::daemon::EInitCode& f_init_status_r,
const score::lcm::saf::daemon::EInitCode f_init_result)
{
{
std::lock_guard lk(m_initialization_mutex);
f_init_status_r = f_init_result;
}
m_initialization_cv.notify_all();
}

void HealthMonitorThread::waitForInitializationCompleted(
score::lcm::saf::daemon::EInitCode& f_init_status_r) {
void AliveMonitorThread::waitForInitializationCompleted(score::lcm::saf::daemon::EInitCode& f_init_status_r)
{
std::unique_lock lk(m_initialization_mutex);
m_initialization_cv.wait(
lk,
[&f_init_status_r]() {
return f_init_status_r != score::lcm::saf::daemon::EInitCode::kNotInitialized;
});
m_initialization_cv.wait(lk, [&f_init_status_r]() {
return f_init_status_r != score::lcm::saf::daemon::EInitCode::kNotInitialized;
});
}

}
}
}
} // namespace internal
} // namespace lcm
} // namespace score
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@
*
* SPDX-License-Identifier: Apache-2.0
********************************************************************************/
#ifndef SCORE_LCM_HEALTH_MONITOR_HPP_INCLUDED
#define SCORE_LCM_HEALTH_MONITOR_HPP_INCLUDED
#ifndef SCORE_LCM_ALIVE_MONITOR_THREAD_HPP_INCLUDED
#define SCORE_LCM_ALIVE_MONITOR_THREAD_HPP_INCLUDED

#include <thread>
#include <atomic>
#include <score/lcm/saf/daemon/IHealthMonitor.hpp>
#include <atomic>
#include <thread>

#include <process_group_manager/ihealth_monitor_thread.hpp>
#include <process_group_manager/ialive_monitor_thread.hpp>

namespace score
{
Expand All @@ -26,31 +26,32 @@ namespace lcm
namespace internal
{

/// @brief HealthMonitor manages the lifecycle of the Health Monitor daemon in a separate thread.
class HealthMonitorThread final : public IHealthMonitorThread {
public:
HealthMonitorThread(std::unique_ptr<saf::daemon::IHealthMonitor> health_monitor);
/// @brief AliveMonitor manages the lifecycle of the alive monitoring daemon in a separate thread.
class AliveMonitorThread final : public IAliveMonitorThread
{
public:
AliveMonitorThread(std::unique_ptr<saf::daemon::IHealthMonitor> health_monitor);

/// @brief Starts the Health Monitor thread.
/// @return true if the Health Monitor started successfully, false otherwise.
/// @brief Starts the Alive Monitor thread.
/// @return true if the Alive Monitor started successfully, false otherwise.
bool start() override;

/// @brief Stops the Health Monitor thread.
/// @brief Stops the Alive Monitor thread.
void stop() override;
private:
void notifyInitializationComplete(
score::lcm::saf::daemon::EInitCode& f_init_status_r,
const score::lcm::saf::daemon::EInitCode f_init_result);
void waitForInitializationCompleted(score::lcm::saf::daemon::EInitCode& f_init_status_r);

private:
void notifyInitializationComplete(score::lcm::saf::daemon::EInitCode& f_init_status_r,
const score::lcm::saf::daemon::EInitCode f_init_result);
void waitForInitializationCompleted(score::lcm::saf::daemon::EInitCode& f_init_status_r);

std::unique_ptr<saf::daemon::IHealthMonitor> m_health_monitor{nullptr};
std::thread health_monitor_thread_{};
std::thread alive_monitor_thread_{};
std::atomic_bool stop_thread_{false};
std::mutex m_initialization_mutex{};
std::condition_variable m_initialization_cv{};
};
}
}
}

} // namespace internal
} // namespace lcm
} // namespace score
#endif
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,25 @@
* SPDX-License-Identifier: Apache-2.0
********************************************************************************/


#ifndef SCORE_LCM_IHEALTH_MONITOR_HPP_INCLUDED
#define SCORE_LCM_IHEALTH_MONITOR_HPP_INCLUDED
#ifndef SCORE_LCM_IALIVE_MONITOR_THREAD_HPP_INCLUDED
#define SCORE_LCM_IALIVE_MONITOR_THREAD_HPP_INCLUDED

namespace score
{
namespace lcm
{
namespace internal
{
class IHealthMonitorThread {
public:
class IAliveMonitorThread
{
public:
virtual bool start() = 0;
virtual void stop() = 0;

virtual ~IHealthMonitorThread() = default;
virtual ~IAliveMonitorThread() = default;
};
}
}
}
} // namespace internal
} // namespace lcm
} // namespace score

#endif
#endif
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
#include <unistd.h>
#include <csignal>

#include <process_group_manager/ihealth_monitor_thread.hpp>
#include <process_group_manager/ialive_monitor_thread.hpp>
#include <process_group_manager/processgroupmanager.hpp>
#include <score/lcm/internal/log.hpp>

Expand All @@ -37,7 +37,7 @@ void ProcessGroupManager::cancel()
my_signal_handler(SIGTERM);
}

ProcessGroupManager::ProcessGroupManager(std::unique_ptr<IHealthMonitorThread> health_monitor,
ProcessGroupManager::ProcessGroupManager(std::unique_ptr<IAliveMonitorThread> alive_monitor_thread,
std::shared_ptr<IRecoveryClient> recovery_client,
std::unique_ptr<score::lcm::IProcessStateNotifier> process_state_notifier)
: configuration_manager_(),
Expand All @@ -49,7 +49,7 @@ ProcessGroupManager::ProcessGroupManager(std::unique_ptr<IHealthMonitorThread> h
num_process_groups_(0U),
process_groups_(),
process_state_notifier_(std::move(process_state_notifier)),
health_monitor_thread_(std::move(health_monitor)),
alive_monitor_thread_(std::move(alive_monitor_thread)),
recovery_client_(recovery_client) //,
// ucm_polling_thread_(
// [this](const Message::Action act, const Message::UpdateContext updateCtx, const lib::fun::string& swc) -> bool
Expand Down Expand Up @@ -98,9 +98,9 @@ bool ProcessGroupManager::initialize()
LM_LOG_DEBUG() << "Process Group initialization done";
createProcessComponentsObjects();
initializeGraphNodes();
if (!health_monitor_thread_->start())
if (!alive_monitor_thread_->start())
{
LM_LOG_ERROR() << "Health monitor thread failed to start";
LM_LOG_ERROR() << "Alive monitor thread failed to start";
return false;
}

Expand All @@ -116,7 +116,7 @@ bool ProcessGroupManager::initialize()
void ProcessGroupManager::deinitialize()
{
// ucm_polling_thread_.stopPolling();
health_monitor_thread_->stop();
alive_monitor_thread_->stop();
configuration_manager_.deinitialize();
process_groups_.clear();

Expand Down Expand Up @@ -528,13 +528,13 @@ inline void ProcessGroupManager::recoveryActionHandler()

if (nullptr == pg)
{
LM_LOG_ERROR() << "recoveryActionHandler: Unknown process group " << recovery_request->process_group_identifier_;
LM_LOG_ERROR() << "recoveryActionHandler: Unknown process group "
<< recovery_request->process_group_identifier_;
continue;
}

const IdentifierHash old_state = pg->getProcessGroupState();
const IdentifierHash recovery_state =
configuration_manager_.getNameOfRecoveryState(pg->getProcessGroupName());
const IdentifierHash recovery_state = configuration_manager_.getNameOfRecoveryState(pg->getProcessGroupName());
const GraphState graph_state = pg->getState();

LM_LOG_DEBUG() << "recoveryActionHandler: Processing recovery request for PG "
Expand Down
Loading
Loading