Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,049 changes: 150 additions & 1,899 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ members = ["crates/*"]
resolver = "2"

[workspace.dependencies]
sqd-contract-client = { git = "https://github.com/subsquid/sqd-network.git", rev = "5747a3d", version = "1.2.0" }
sqd-messages = { git = "https://github.com/subsquid/sqd-network.git", rev = "5747a3d", version = "2.0.1", features = ["bitstring"] }
sqd-network-transport = { git = "https://github.com/subsquid/sqd-network.git", rev = "5747a3d", version = "3.0.0" }
sqd-contract-client = { git = "https://github.com/subsquid/sqd-network.git", rev = "c57f520", version = "1.2.0" }
sqd-messages = { git = "https://github.com/subsquid/sqd-network.git", rev = "c57f520", version = "2.0.1", features = ["bitstring"] }
sqd-network-transport = { git = "https://github.com/subsquid/sqd-network.git", rev = "c57f520", version = "3.0.0" }
2 changes: 1 addition & 1 deletion crates/logs-collector/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "logs-collector"
version = "2.1.2"
version = "2.1.3"
edition = "2021"

[dependencies]
Expand Down
2 changes: 1 addition & 1 deletion crates/pings-collector/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pings-collector"
version = "2.1.2"
version = "2.1.3"
edition = "2021"

[dependencies]
Expand Down
6 changes: 0 additions & 6 deletions crates/pings-collector/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,6 @@ pub struct Cli {
)]
pub buffer_dir: PathBuf,

#[arg(long, env, default_value_t = true)]
pub use_gossipsub: bool,

#[arg(long, env, default_value_t = false)]
pub use_polling: bool,

#[arg(long, env, default_value_t = 15)]
pub request_timeout_sec: u32,

Expand Down
2 changes: 0 additions & 2 deletions crates/pings-collector/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ async fn main() -> anyhow::Result<()> {
transport_builder.contract_client().into();
let (incoming_pings, transport_handle) =
transport_builder.build_pings_collector(PingsCollectorConfig {
worker_status_via_gossipsub: args.use_gossipsub,
worker_status_via_requests: args.use_polling,
..Default::default()
})?;

Expand Down
2 changes: 1 addition & 1 deletion crates/portal-logs-collector/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "portal-logs-collector"
version = "1.0.2"
version = "1.0.3"
edition = "2021"

[dependencies]
Expand Down
2 changes: 1 addition & 1 deletion crates/portal-logs-collector/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ async fn main() -> anyhow::Result<()> {
let agent_info = get_agent_info!();
let transport_builder = P2PTransportBuilder::from_cli(args.transport, agent_info).await?;
let contract_client: Arc<_> = transport_builder.contract_client().into();
let config = PortalLogsCollectorConfig::new();
let config = PortalLogsCollectorConfig::default();

let transport = transport_builder.build_portal_logs_collector(config)?;

Expand Down
28 changes: 14 additions & 14 deletions crates/portal-logs-collector/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ where
{
_transport_handle: PortalLogsCollectorTransportHandle,
logs_collector: Arc<PortalLogsCollector<T>>,
registered_gateways: Arc<Mutex<HashSet<PeerId>>>,
registered_portals: Arc<Mutex<HashSet<PeerId>>>,
task_manager: TaskManager,
event_stream: Box<dyn Stream<Item = PortalLogsCollectorEvent> + Send + Unpin + 'static>,
collector_index: usize,
Expand All @@ -41,7 +41,7 @@ where
Self {
_transport_handle: transport_handle,
logs_collector: Arc::new(logs_collector),
registered_gateways: Default::default(),
registered_portals: Default::default(),
task_manager: Default::default(),
event_stream: Box::new(event_stream),
collector_index,
Expand All @@ -59,13 +59,13 @@ where
) -> anyhow::Result<()> {
log::info!("Starting logs collector server");

// Get registered gateways from chain
let gateways = contract_client
.active_gateways()
// Get registered portals from chain
let portals = contract_client
.active_portals()
.await?
.into_iter()
.collect();
*self.registered_gateways.lock() = gateways;
*self.registered_portals.lock() = portals;

self.spawn_portal_update_task(contract_client, portal_update_interval);

Expand Down Expand Up @@ -94,7 +94,7 @@ where
if !self.should_process(&peer_id) {
continue
}
if self.registered_gateways.lock().contains(&peer_id) {
if self.registered_portals.lock().contains(&peer_id) {
log::debug!("Got log from {peer_id:?}: {log:?}");
self.logs_collector.buffer_logs(peer_id, vec![log]);
} else {
Expand Down Expand Up @@ -126,19 +126,19 @@ where
contract_client: Arc<dyn ContractClient>,
interval: Duration,
) {
log::info!("Starting gateway update task");
log::info!("Starting portal update task");

let registered_gateways = self.registered_gateways.clone();
let registered_portals = self.registered_portals.clone();
let contract_client: Arc<dyn ContractClient> = contract_client;
let task = move |_| {
let registered_gateways = registered_gateways.clone();
let registered_portals = registered_portals.clone();
let contract_client = contract_client.clone();
async move {
let gateways = match contract_client.active_gateways().await {
Ok(gateways) => gateways,
Err(e) => return log::error!("Error getting registered gateways: {e:?}"),
let portals = match contract_client.active_portals().await {
Ok(portals) => portals,
Err(e) => return log::error!("Error getting registered portals: {e:?}"),
};
*registered_gateways.lock() = gateways.into_iter().collect::<HashSet<PeerId>>();
*registered_portals.lock() = portals.into_iter().collect::<HashSet<PeerId>>();
}
};
self.task_manager.spawn_periodic(task, interval);
Expand Down