Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 124 additions & 2 deletions ui/src/components/app/freenet_api/room_synchronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,20 @@ use river_core::room_state::{ChatRoomParametersV1, ChatRoomStateV1, ChatRoomStat
use std::collections::HashMap;
use std::sync::Arc;

fn compute_update_data(
state: &ChatRoomStateV1,
baseline: Option<&ChatRoomStateV1>,
params: &ChatRoomParametersV1,
) -> Option<UpdateData<'static>> {
if let Some(baseline) = baseline {
let summary = baseline.summarize(baseline, params);
let delta = state.delta(baseline, params, &summary)?;
Some(UpdateData::Delta(to_cbor_vec(&delta).into()))
} else {
Some(UpdateData::State(to_cbor_vec(state).into()))
}
}

/// Identifies contracts that have changed in order to send state updates to Freene
#[derive(Clone)]
pub struct RoomSynchronizer {
Expand Down Expand Up @@ -644,7 +658,7 @@ impl RoomSynchronizer {
rooms_to_sync.len()
);

for (room_vk, mut state) in rooms_to_sync {
for (room_vk, (mut state, last_synced_state)) in rooms_to_sync {
info!("Processing room: {:?}", MemberId::from(room_vk));

// Sanitize: remove any messages with invalid signatures before
Expand Down Expand Up @@ -705,9 +719,33 @@ impl RoomSynchronizer {

let contract_key = owner_vk_to_contract_key(&room_vk);

let update_data =
match compute_update_data(&state, last_synced_state.as_ref(), &params) {
Some(data) => {
match &data {
UpdateData::Delta(d) => info!(
"Room {:?}: sending delta ({} bytes)",
MemberId::from(room_vk),
d.as_ref().len(),
),
_ => info!(
"Room {:?}: no baseline, sending full state",
MemberId::from(room_vk),
),
}
data
}
None => {
SYNC_INFO.with_mut(|sync_info| {
sync_info.state_updated(&room_vk, state);
});
continue;
}
};

let update_request = ContractRequest::Update {
key: contract_key,
data: UpdateData::State(to_cbor_vec(&state).into()),
data: update_data,
};

let client_request = ClientRequest::ContractOp(update_request);
Expand Down Expand Up @@ -1152,3 +1190,87 @@ impl RoomSynchronizer {
pub struct ContractSyncInfo {
pub owner_vk: VerifyingKey,
}

#[cfg(test)]
mod tests {
use super::*;
use ed25519_dalek::SigningKey;
use river_core::room_state::message::{AuthorizedMessageV1, MessageV1, RoomMessageBody};
use std::time::SystemTime;

fn create_test_room() -> (ChatRoomStateV1, ChatRoomParametersV1, SigningKey) {
let owner_sk = SigningKey::generate(&mut rand::thread_rng());
let owner_vk = owner_sk.verifying_key();
let params = ChatRoomParametersV1 { owner: owner_vk };
let state = ChatRoomStateV1::default();
(state, params, owner_sk)
}

fn add_message(state: &mut ChatRoomStateV1, author_sk: &SigningKey, content: &str) {
let msg = MessageV1 {
room_owner: state.configuration.configuration.owner_member_id,
author: MemberId::from(&author_sk.verifying_key()),
content: RoomMessageBody::public(content.to_string()),
time: SystemTime::now(),
};
let authorized = AuthorizedMessageV1::new(msg, author_sk);
state.recent_messages.messages.push(authorized);
}

#[test]
fn no_baseline_returns_full_state() {
let (state, params, _) = create_test_room();
let result = compute_update_data(&state, None, &params);
assert!(matches!(result, Some(UpdateData::State(_))));
}

#[test]
fn identical_states_returns_none() {
let (state, params, _) = create_test_room();
let result = compute_update_data(&state, Some(&state), &params);
assert!(result.is_none());
}

#[test]
fn changed_state_returns_delta() {
let (state, params, owner_sk) = create_test_room();
let baseline = state.clone();

let mut current = state;
add_message(&mut current, &owner_sk, "hello");

let result = compute_update_data(&current, Some(&baseline), &params);
assert!(matches!(result, Some(UpdateData::Delta(_))));
}

#[test]
fn delta_is_smaller_than_full_state() {
let (mut state, params, owner_sk) = create_test_room();
for i in 0..10 {
add_message(&mut state, &owner_sk, &format!("message {}", i));
}
let baseline = state.clone();

let mut current = state;
add_message(&mut current, &owner_sk, "new message");

let delta = compute_update_data(&current, Some(&baseline), &params).unwrap();
let full = compute_update_data(&current, None, &params).unwrap();

let delta_size = match &delta {
UpdateData::Delta(d) => d.as_ref().len(),
_ => panic!("expected delta"),
};
let full_size = match &full {
UpdateData::State(s) => s.as_ref().len(),
_ => panic!("expected state"),
};

assert!(
delta_size < full_size,
"delta ({} bytes) should be smaller than full state ({} bytes)",
delta_size,
full_size
);
}
}
16 changes: 12 additions & 4 deletions ui/src/components/app/sync_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,11 @@ impl SyncInfo {
rooms_awaiting_subscription
}

/// Returns a list of rooms for which an update should be sent to the network,
/// automatically updates the last_synced_state for each room
pub fn needs_to_send_update(&mut self) -> HashMap<VerifyingKey, ChatRoomStateV1> {
/// Returns rooms needing sync: current state + last synced baseline (if any).
/// The baseline is used by the caller to compute a delta instead of sending full state.
pub fn needs_to_send_update(
&mut self,
) -> HashMap<VerifyingKey, (ChatRoomStateV1, Option<ChatRoomStateV1>)> {
let mut rooms_needing_update = HashMap::new();

// FIXME: Temporarily disabled to fix infinite loop bug
Expand Down Expand Up @@ -261,7 +263,13 @@ impl SyncInfo {
"Room {:?} needs update - state has changed",
MemberId::from(key)
);
rooms_needing_update.insert(*key, room_data.room_state.clone());
rooms_needing_update.insert(
*key,
(
room_data.room_state.clone(),
sync_info.last_synced_state.clone(),
),
);
// Don't update the last synced state here - it will be updated after successful network send
} else {
debug!(
Expand Down
Loading