Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
699 changes: 239 additions & 460 deletions Cargo.lock

Large diffs are not rendered by default.

23 changes: 14 additions & 9 deletions ofborg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,34 +6,39 @@ build = "build.rs"
edition = "2024"

[dependencies]
async-std = { version = "=1.12.0", features = ["unstable", "tokio1"] }
async-trait = "0.1.89"
brace-expand = "0.1.0"
chrono = { version = "0.4.38", default-features = false, features = [
"clock",
"std",
] }
either = "1.13.0"
fs2 = "0.4.3"
futures = "0.3.31"
futures-util = "0.3.31"
hex = "0.4.3"
hmac = "0.12.1"
http = "1"
http-body-util = "0.1"
#hubcaps = "0.6"
# for Conclusion::Skipped which is in master
hubcaps = { git = "https://github.com/ofborg/hubcaps.git", rev = "50dbe6ec45c9dfea4e3cfdf27bbadfa565f69dec", default-features = false, features = ["app", "rustls-tls"] }
http = "1"
# hyper = { version = "0.14", features = ["full"] }
hyper = "=0.10.*"
# maybe can be removed when hyper is updated
hyper = { version = "1.0", features = ["full", "server", "http1"] }
hyper-util = { version = "0.1", features = ["server", "tokio", "http1"] }
lapin = "2.5.4"
lru-cache = "0.1.2"
md5 = "0.8.0"
mime = "0.3"
nom = "4.2.3"
parking_lot = "0.12.4"
regex = "1.11.1"
rustls-pemfile = "2.2.0"
serde = { version = "1.0.217", features = ["derive"] }
serde_json = "1.0.135"
sha2 = "0.10.8"
tempfile = "3.15.0"
tokio = { version = "1", features = ["rt-multi-thread", "net", "macros", "sync"] }
tokio-stream = "0.1"
tracing = "0.1.41"
tracing-subscriber = { version = "0.3.19", features = ["json", "env-filter"] }
uuid = { version = "1.12", features = ["v4"] }
rustls-pemfile = "2.2.0"
hmac = "0.12.1"
sha2 = "0.10.8"
hex = "0.4.3"
4 changes: 4 additions & 0 deletions ofborg/src/asynccmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ impl SpawnedAsyncCmd {
self.rx.iter()
}

pub fn get_next_line(&mut self) -> Result<String, mpsc::RecvError> {
self.rx.recv()
}

pub fn wait(self) -> Result<ExitStatus, io::Error> {
self.waiter
.join()
Expand Down
18 changes: 9 additions & 9 deletions ofborg/src/bin/build-faker.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use std::env;
use std::error::Error;

use async_std::task;
use lapin::BasicProperties;
use lapin::message::Delivery;
use std::env;
use std::error::Error;

use ofborg::commentparser;
use ofborg::config;
Expand All @@ -12,14 +10,15 @@ use ofborg::message::{Pr, Repo, buildjob};
use ofborg::notifyworker::NotificationReceiver;
use ofborg::worker;

fn main() -> Result<(), Box<dyn Error>> {
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
ofborg::setup_log();

let arg = env::args().nth(1).expect("usage: build-faker <config>");
let cfg = config::load(arg.as_ref());

let conn = easylapin::from_config(&cfg.builder.unwrap().rabbitmq)?;
let mut chan = task::block_on(conn.create_channel())?;
let conn = easylapin::from_config(&cfg.builder.unwrap().rabbitmq).await?;
let chan = conn.create_channel().await?;

let repo_msg = Repo {
clone_url: "https://github.com/nixos/ofborg.git".to_owned(),
Expand Down Expand Up @@ -56,14 +55,15 @@ fn main() -> Result<(), Box<dyn Error>> {
data: vec![],
acker: Default::default(),
};
let mut recv = easylapin::ChannelNotificationReceiver::new(&mut chan, &deliver);
let recv = easylapin::ChannelNotificationReceiver::new(chan.clone(), deliver);

for _i in 1..2 {
recv.tell(worker::publish_serde_action(
None,
Some("build-inputs-x86_64-darwin".to_owned()),
&msg,
));
))
.await;
}
}

Expand Down
62 changes: 35 additions & 27 deletions ofborg/src/bin/builder.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
use std::env;
use std::error::Error;
use std::future::Future;
use std::path::Path;
use std::pin::Pin;

use async_std::task::{self, JoinHandle};
use futures_util::future;
use tracing::{error, info, warn};

use ofborg::easyamqp::{self, ChannelExt, ConsumerExt};
use ofborg::easylapin;
use ofborg::{checkout, config, tasks};

fn main() -> Result<(), Box<dyn Error>> {
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
ofborg::setup_log();

let arg = env::args()
Expand All @@ -23,27 +25,27 @@ fn main() -> Result<(), Box<dyn Error>> {
panic!();
};

let conn = easylapin::from_config(&builder_cfg.rabbitmq)?;
let mut handles = Vec::new();
let conn = easylapin::from_config(&builder_cfg.rabbitmq).await?;
let mut handles: Vec<Pin<Box<dyn Future<Output = ()> + Send>>> = Vec::new();

for system in &cfg.nix.system {
let handle_ext = self::create_handle(&conn, &cfg, system.to_string())?;
handles.push(handle_ext);
handles.push(self::create_handle(&conn, &cfg, system.to_string()).await?);
}

task::block_on(future::join_all(handles));
future::join_all(handles).await;

drop(conn); // Close connection.
info!("Closed the session... EOF");
Ok(())
}

fn create_handle(
#[allow(clippy::type_complexity)]
async fn create_handle(
conn: &lapin::Connection,
cfg: &config::Config,
system: String,
) -> Result<JoinHandle<()>, Box<dyn Error>> {
let mut chan = task::block_on(conn.create_channel())?;
) -> Result<Pin<Box<dyn Future<Output = ()> + Send>>, Box<dyn Error>> {
let mut chan = conn.create_channel().await?;

let cloner = checkout::cached_cloner(Path::new(&cfg.checkout.root));
let nix = cfg.nix().with_system(system.clone());
Expand All @@ -56,7 +58,8 @@ fn create_handle(
auto_delete: false,
no_wait: false,
internal: false,
})?;
})
.await?;

let queue_name = if cfg.runner.build_all_jobs != Some(true) {
let queue_name = format!("build-inputs-{system}");
Expand All @@ -67,7 +70,8 @@ fn create_handle(
exclusive: false,
auto_delete: false,
no_wait: false,
})?;
})
.await?;
queue_name
} else {
warn!("Building all jobs, please don't use this unless you're");
Expand All @@ -80,7 +84,8 @@ fn create_handle(
exclusive: true,
auto_delete: true,
no_wait: false,
})?;
})
.await?;
queue_name
};

Expand All @@ -89,20 +94,23 @@ fn create_handle(
exchange: "build-jobs".to_owned(),
routing_key: None,
no_wait: false,
})?;

let handle = easylapin::NotifyChannel(chan).consume(
tasks::build::BuildWorker::new(cloner, nix, system, cfg.runner.identity.clone()),
easyamqp::ConsumeConfig {
queue: queue_name.clone(),
consumer_tag: format!("{}-builder", cfg.whoami()),
no_local: false,
no_ack: false,
no_wait: false,
exclusive: false,
},
)?;
})
.await?;

let handle = easylapin::NotifyChannel(chan)
.consume(
tasks::build::BuildWorker::new(cloner, nix, system, cfg.runner.identity.clone()),
easyamqp::ConsumeConfig {
queue: queue_name.clone(),
consumer_tag: format!("{}-builder", cfg.whoami()),
no_local: false,
no_ack: false,
no_wait: false,
exclusive: false,
},
)
.await?;

info!("Fetching jobs from {}", &queue_name);
Ok(task::spawn(handle))
Ok(handle)
}
46 changes: 26 additions & 20 deletions ofborg/src/bin/evaluation-filter.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
use std::env;
use std::error::Error;

use async_std::task;
use tracing::{error, info};

use ofborg::config;
use ofborg::easyamqp::{self, ChannelExt, ConsumerExt};
use ofborg::easylapin;
use ofborg::tasks;

fn main() -> Result<(), Box<dyn Error>> {
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
ofborg::setup_log();

let arg = env::args()
Expand All @@ -22,8 +22,8 @@ fn main() -> Result<(), Box<dyn Error>> {
panic!();
};

let conn = easylapin::from_config(&filter_cfg.rabbitmq)?;
let mut chan = task::block_on(conn.create_channel())?;
let conn = easylapin::from_config(&filter_cfg.rabbitmq).await?;
let mut chan = conn.create_channel().await?;

chan.declare_exchange(easyamqp::ExchangeConfig {
exchange: "github-events".to_owned(),
Expand All @@ -33,7 +33,8 @@ fn main() -> Result<(), Box<dyn Error>> {
auto_delete: false,
no_wait: false,
internal: false,
})?;
})
.await?;

chan.declare_queue(easyamqp::QueueConfig {
queue: "mass-rebuild-check-jobs".to_owned(),
Expand All @@ -42,7 +43,8 @@ fn main() -> Result<(), Box<dyn Error>> {
exclusive: false,
auto_delete: false,
no_wait: false,
})?;
})
.await?;

let queue_name = String::from("mass-rebuild-check-inputs");
chan.declare_queue(easyamqp::QueueConfig {
Expand All @@ -52,29 +54,33 @@ fn main() -> Result<(), Box<dyn Error>> {
exclusive: false,
auto_delete: false,
no_wait: false,
})?;
})
.await?;

chan.bind_queue(easyamqp::BindQueueConfig {
queue: queue_name.clone(),
exchange: "github-events".to_owned(),
routing_key: Some("pull_request.nixos/*".to_owned()),
no_wait: false,
})?;
})
.await?;

let handle = easylapin::WorkerChannel(chan).consume(
tasks::evaluationfilter::EvaluationFilterWorker::new(cfg.acl()),
easyamqp::ConsumeConfig {
queue: queue_name.clone(),
consumer_tag: format!("{}-evaluation-filter", cfg.whoami()),
no_local: false,
no_ack: false,
no_wait: false,
exclusive: false,
},
)?;
let handle = easylapin::WorkerChannel(chan)
.consume(
tasks::evaluationfilter::EvaluationFilterWorker::new(cfg.acl()),
easyamqp::ConsumeConfig {
queue: queue_name.clone(),
consumer_tag: format!("{}-evaluation-filter", cfg.whoami()),
no_local: false,
no_ack: false,
no_wait: false,
exclusive: false,
},
)
.await?;

info!("Fetching jobs from {}", &queue_name);
task::block_on(handle);
handle.await;

drop(conn); // Close connection.
info!("Closed the session... EOF");
Expand Down
Loading