diff --git a/Cargo.lock b/Cargo.lock index 4cbb377b..80179813 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1128,6 +1128,7 @@ dependencies = [ "android-tzdata", "iana-time-zone", "num-traits", + "serde", "winapi", ] @@ -1558,8 +1559,18 @@ version = "0.13.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a01d95850c592940db9b8194bc39f4bc0e89dee5c4265e4b1807c34a9aba453c" dependencies = [ - "darling_core", - "darling_macro", + "darling_core 0.13.4", + "darling_macro 0.13.4", +] + +[[package]] +name = "darling" +version = "0.20.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0209d94da627ab5605dcccf08bb18afa5009cfbef48d8a8b7d7bdbc79be25c5e" +dependencies = [ + "darling_core 0.20.3", + "darling_macro 0.20.3", ] [[package]] @@ -1576,17 +1587,42 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "darling_core" +version = "0.20.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "177e3443818124b357d8e76f53be906d60937f0d3a90773a664fa63fa253e621" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2 1.0.66", + "quote 1.0.32", + "strsim", + "syn 2.0.27", +] + [[package]] name = "darling_macro" version = "0.13.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c972679f83bdf9c42bd905396b6c3588a843a17f0f16dfcfa3e2c5d57441835" dependencies = [ - "darling_core", + "darling_core 0.13.4", "quote 1.0.32", "syn 1.0.109", ] +[[package]] +name = "darling_macro" +version = "0.20.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "836a9bbc7ad63342d6d6e7b815ccab164bc77a2d95d84bc3117a8c0d5c98e2d5" +dependencies = [ + "darling_core 0.20.3", + "quote 1.0.32", + "syn 2.0.27", +] + [[package]] name = "dashmap" version = "5.4.0" @@ -2386,7 +2422,7 @@ dependencies = [ "rustc_version 0.2.3", "serde", "serde_json", - "serde_with", + "serde_with 1.14.0", "url", "void", ] @@ -3374,6 +3410,7 @@ checksum = "d5477fe2230a79769d8dc68e0eabf5437907c0457a5614a9e8dddb67f65eb65d" dependencies = [ "equivalent", "hashbrown 0.14.0", + "serde", ] [[package]] @@ -4304,6 +4341,7 @@ dependencies = [ "reqwest", "serde", "serde_json", + "serde_with 3.3.0", "strum", "strum_macros", "thiserror", @@ -5719,7 +5757,24 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "678b5a069e50bf00ecd22d0cd8ddf7c236f68581b03db652061ed5eb13a312ff" dependencies = [ "serde", - "serde_with_macros", + "serde_with_macros 1.5.2", +] + +[[package]] +name = "serde_with" +version = "3.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ca3b16a3d82c4088f343b7480a93550b3eabe1a358569c2dfe38bbcead07237" +dependencies = [ + "base64 0.21.0", + "chrono", + "hex", + "indexmap 1.9.3", + "indexmap 2.0.0", + "serde", + "serde_json", + "serde_with_macros 3.3.0", + "time", ] [[package]] @@ -5728,12 +5783,24 @@ version = "1.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e182d6ec6f05393cc0e5ed1bf81ad6db3a8feedf8ee515ecdd369809bcce8082" dependencies = [ - "darling", + "darling 0.13.4", "proc-macro2 1.0.66", "quote 1.0.32", "syn 1.0.109", ] +[[package]] +name = "serde_with_macros" +version = "3.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e6be15c453eb305019bfa438b1593c731f36a289a7853f7707ee29e870b3b3c" +dependencies = [ + "darling 0.20.3", + "proc-macro2 1.0.66", + "quote 1.0.32", + "syn 2.0.27", +] + [[package]] name = "sha-1" version = "0.10.0" diff --git a/Cargo.toml b/Cargo.toml index b6de593e..4955e901 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,7 +24,7 @@ sink-gcp-pubsub = ["google-cloud-pubsub", "google-cloud-googleapis", "google-clo sink-gcp-cloudfunction = ["reqwest", "jsonwebtoken"] sink-redis = ["r2d2_redis"] sink-elasticsearch = ["elasticsearch"] -source-utxorpc = ["tonic","futures"] +source-utxorpc = ["tonic", "futures"] [dependencies] pallas = "0.19.0-alpha.1" @@ -78,4 +78,5 @@ jsonwebtoken = { version = "8.3.0", optional = true } file-rotate = { version = "0.7.5", optional = true } tonic = { version = "0.9.2", features = ["tls", "tls-roots"], optional = true} futures = { version = "0.3.28", optional = true } +serde_with = { version = "3.3.0", features = ["macros"] } diff --git a/src/filters/match_pattern.rs b/src/filters/match_pattern.rs deleted file mode 100644 index 561c4dea..00000000 --- a/src/filters/match_pattern.rs +++ /dev/null @@ -1,238 +0,0 @@ -use gasket::framework::*; -use pallas::{ - ledger::addresses::{Address, StakeAddress}, - network::miniprotocols::Point, -}; -use serde::Deserialize; -use tracing::error; - -use crate::framework::*; - -#[derive(Stage)] -#[stage(name = "filter-match-pattern", unit = "ChainEvent", worker = "Worker")] -pub struct Stage { - predicate: Predicate, - - pub input: FilterInputPort, - pub output: FilterOutputPort, - - #[metric] - ops_count: gasket::metrics::Counter, -} - -pub struct Worker; - -impl From<&Stage> for Worker { - fn from(_: &Stage) -> Self { - Worker {} - } -} - -gasket::impl_splitter!(|_worker: Worker, stage: Stage, unit: ChainEvent| => { - let out = match unit { - ChainEvent::Apply(point, record) => match record { - Record::ParsedTx(tx) => { - if stage.predicate.tx_match(point, tx)? { - Ok(Some(unit.to_owned())) - } else { - Ok(None) - } - }, - _ => { - error!("The MatchPattern filter is valid only with the ParsedTx record"); - Err(WorkerError::Panic) - } - }, - _ => Ok(Some(unit.to_owned())) - }?; - - stage.ops_count.inc(1); - - out -}); - -#[derive(Deserialize, Clone, Debug)] -pub enum AddressPatternValue { - ExactHex(String), - ExactBech32(String), - PaymentHex(String), - PaymentBech32(String), - StakeHex(String), - StakeBech32(String), -} - -#[derive(Deserialize, Clone, Debug)] -pub struct AddressPattern { - pub value: AddressPatternValue, - pub is_script: Option, -} -impl AddressPattern { - fn address_match(&self, address: &Address) -> Result { - match address { - Address::Byron(addr) => match &self.value { - AddressPatternValue::ExactHex(exact_hex) => Ok(addr.to_hex().eq(exact_hex)), - AddressPatternValue::PaymentHex(payment_hex) => Ok(addr.to_hex().eq(payment_hex)), - _ => Ok(false), - }, - Address::Shelley(addr) => match &self.value { - AddressPatternValue::ExactHex(exact_hex) => Ok(addr.to_hex().eq(exact_hex)), - AddressPatternValue::ExactBech32(exact_bech32) => { - Ok(addr.to_bech32().or_panic()?.eq(exact_bech32)) - } - AddressPatternValue::PaymentHex(payment_hex) => { - Ok(addr.payment().to_hex().eq(payment_hex)) - } - AddressPatternValue::PaymentBech32(payment_bech32) => { - Ok(addr.payment().to_bech32().or_panic()?.eq(payment_bech32)) - } - AddressPatternValue::StakeHex(stake_hex) => { - if addr.delegation().as_hash().is_none() { - return Ok(false); - } - - let stake_address: StakeAddress = addr.clone().try_into().or_panic()?; - Ok(stake_address.to_hex().eq(stake_hex)) - } - AddressPatternValue::StakeBech32(stake_bech32) => { - if addr.delegation().as_hash().is_none() { - return Ok(false); - } - - let stake_address: StakeAddress = addr.clone().try_into().or_panic()?; - Ok(stake_address.to_bech32().or_panic()?.eq(stake_bech32)) - } - }, - Address::Stake(stake_address) => match &self.value { - AddressPatternValue::StakeHex(stake_hex) => { - Ok(stake_address.to_hex().eq(stake_hex)) - } - AddressPatternValue::StakeBech32(stake_bech32) => { - Ok(stake_address.to_bech32().or_panic()?.eq(stake_bech32)) - } - _ => Ok(false), - }, - } - } -} - -#[derive(Deserialize, Clone, Debug)] -pub struct BlockPattern { - pub slot_before: Option, - pub slot_after: Option, -} - -#[derive(Deserialize, Clone, Debug)] -#[serde(rename_all = "snake_case")] -pub enum Predicate { - Block(BlockPattern), - OutputAddress(AddressPattern), - WithdrawalAddress(AddressPattern), - CollateralAddress(AddressPattern), - Not(Box), - AnyOf(Vec), - AllOf(Vec), -} - -impl Predicate { - fn tx_match(&self, point: &Point, tx: &ParsedTx) -> Result { - match self { - Predicate::Block(block_pattern) => Ok(block_match(point, block_pattern)), - Predicate::OutputAddress(address_pattern) => Ok(output_match(tx, address_pattern)?), - Predicate::WithdrawalAddress(address_pattern) => { - Ok(withdrawal_match(tx, address_pattern)?) - } - Predicate::CollateralAddress(address_pattern) => { - Ok(collateral_match(tx, address_pattern)?) - } - Predicate::Not(x) => Ok(!x.tx_match(point, tx)?), - Predicate::AnyOf(x) => { - for p in x { - if p.tx_match(point, tx)? { - return Ok(true); - }; - } - Ok(false) - } - Predicate::AllOf(x) => { - for p in x { - if !p.tx_match(point, tx)? { - return Ok(false); - }; - } - Ok(true) - } - } - } -} - -fn block_match(point: &Point, block_pattern: &BlockPattern) -> bool { - if let Some(slot_after) = block_pattern.slot_after { - if point.slot_or_default() <= slot_after { - return false; - } - } - - if let Some(slot_before) = block_pattern.slot_before { - if point.slot_or_default() >= slot_before { - return false; - } - } - - true -} - -fn output_match(tx: &ParsedTx, address_pattern: &AddressPattern) -> Result { - if address_pattern.is_script.unwrap_or_default() { - // TODO: validate inside script - return Ok(false); - } - - for output in tx.outputs.iter() { - let address = Address::from_bytes(&output.address).or_panic()?; - if !address.has_script() && address_pattern.address_match(&address)? { - return Ok(true); - } - } - - Ok(false) -} - -fn withdrawal_match(tx: &ParsedTx, address_pattern: &AddressPattern) -> Result { - for withdrawal in tx.withdrawals.iter() { - let address = Address::from_bytes(&withdrawal.reward_account).or_panic()?; - if address_pattern.address_match(&address)? { - return Ok(true); - } - } - - Ok(false) -} - -fn collateral_match(tx: &ParsedTx, address_pattern: &AddressPattern) -> Result { - if tx.collateral.is_some() { - if let Some(collateral_return) = &tx.collateral.as_ref().unwrap().collateral_return { - let address = Address::from_bytes(&collateral_return.address).or_panic()?; - return address_pattern.address_match(&address); - } - } - - Ok(false) -} - -#[derive(Deserialize)] -pub struct Config { - pub predicate: Predicate, -} - -impl Config { - pub fn bootstrapper(self, _ctx: &Context) -> Result { - let stage = Stage { - predicate: self.predicate, - ops_count: Default::default(), - input: Default::default(), - output: Default::default(), - }; - - Ok(stage) - } -} diff --git a/src/filters/match_pattern/eval.rs b/src/filters/match_pattern/eval.rs new file mode 100644 index 00000000..ce5b54dc --- /dev/null +++ b/src/filters/match_pattern/eval.rs @@ -0,0 +1,410 @@ +use pallas::{ledger::addresses::Address, network::miniprotocols::Point}; +use thiserror::Error; +use utxorpc::proto::cardano::v1::{ + Asset, Multiasset, PlutusData, Tx, TxInput, TxOutput, Withdrawal, +}; + +use crate::framework::Record; + +use super::{ + AddressPattern, AssetPattern, BlockPattern, DatumPattern, InputPattern, OutputPattern, + QuantityPattern, TxPredicate, UtxoRefPattern, WithdrawalPattern, +}; + +fn eval_quantity_matches(value: u64, pattern: &QuantityPattern) -> EvalResult { + let eval = match pattern { + QuantityPattern::Equals(expected) => value.eq(expected), + QuantityPattern::RangeInclusive(a, b) => value.ge(a) && value.le(b), + QuantityPattern::Greater(a) => value.gt(a), + QuantityPattern::GreaterOrEqual(a) => value.ge(a), + QuantityPattern::Lower(a) => value.lt(a), + QuantityPattern::LowerOrEqual(b) => value.le(b), + }; + + Ok(eval) +} + +fn eval_block_matches(point: &Point, pattern: &BlockPattern) -> EvalResult { + if let Some(slot_after) = pattern.slot_after { + if point.slot_or_default() <= slot_after { + return Ok(false); + } + } + + if let Some(slot_before) = pattern.slot_before { + if point.slot_or_default() >= slot_before { + return Ok(false); + } + } + + Ok(true) +} + +fn eval_address_matches(addr: &[u8], pattern: &AddressPattern) -> EvalResult { + let addr = + Address::from_bytes(addr).map_err(|_| Error::inconclusive("can't parse address bytes"))?; + + let is_match = match (pattern, &addr) { + (AddressPattern::Exact(expected), _) => addr.eq(expected), + (AddressPattern::Payment(expected), Address::Shelley(shelley)) => { + shelley.payment().eq(expected) + } + (AddressPattern::Delegation(expected), Address::Shelley(shelley)) => { + shelley.delegation().eq(expected) + } + _ => false, + }; + + Ok(is_match) +} + +fn eval_datum_matches(datum_hash: &[u8], pattern: &DatumPattern) -> EvalResult { + if let Some(expected) = pattern.hash { + let eval = datum_hash.as_ref().eq(expected.as_ref()); + + if !eval { + return Ok(false); + } + } + + Ok(true) +} + +fn eval_asset_matches(policy: &[u8], asset: &Asset, pattern: &AssetPattern) -> EvalResult { + if let Some(pattern) = &pattern.policy { + let eval = policy.eq(pattern.as_slice()); + + if !eval { + return Ok(false); + } + } + + if let Some(pattern) = &pattern.quantity { + let eval = eval_quantity_matches(asset.output_coin, pattern)?; + + if !eval { + return Ok(false); + } + } + + Ok(true) +} + +fn eval_some_asset_matches(assets: &[Multiasset], pattern: &AssetPattern) -> EvalResult { + for multiasset in assets.iter() { + for asset in multiasset.assets.iter() { + let eval = eval_asset_matches(&multiasset.policy_id, &asset, pattern)?; + + if eval { + return Ok(true); + } + } + } + + Ok(false) +} + +fn eval_output_matches(output: &TxOutput, pattern: &OutputPattern) -> EvalResult { + if let Some(pattern) = &pattern.to { + let eval = eval_address_matches(&output.address, &pattern)?; + + if !eval { + return Ok(false); + } + } + + if let Some(pattern) = &pattern.datum { + let eval = eval_datum_matches(&output.datum_hash, &pattern)?; + + if !eval { + return Ok(false); + } + } + + if let Some(pattern) = &pattern.assets { + let eval = eval_some_asset_matches(&output.assets, pattern)?; + + if !eval { + return Ok(false); + } + } + + Ok(true) +} + +fn eval_some_output_matches(tx: &Tx, pattern: &OutputPattern) -> EvalResult { + for output in tx.outputs.iter() { + let eval = eval_output_matches(output, pattern)?; + + if eval { + return Ok(true); + } + } + + Ok(false) +} + +fn eval_withdrawal_matches(withdrawal: &Withdrawal, pattern: &WithdrawalPattern) -> EvalResult { + if let Some(pattern) = &pattern.quantity { + let eval = eval_quantity_matches(withdrawal.coin, pattern)?; + + if eval { + return Ok(true); + } + } + + Ok(false) +} + +fn eval_some_withdrawal_matches(tx: &Tx, pattern: &WithdrawalPattern) -> EvalResult { + for withdrawal in tx.withdrawals.iter() { + let eval = eval_withdrawal_matches(withdrawal, pattern)?; + + if eval { + return Ok(true); + } + } + + Ok(false) +} + +fn eval_some_collateral_matches(tx: &Tx, pattern: &InputPattern) -> EvalResult { + if let Some(collateral) = &tx.collateral { + for input in collateral.collateral.iter() { + let eval = eval_input_matches(input, pattern)?; + + if eval { + return Ok(true); + } + } + } + + Ok(false) +} + +fn eval_collateral_return_matches(tx: &Tx, pattern: &OutputPattern) -> EvalResult { + if let Some(collateral) = &tx.collateral { + if let Some(return_) = &collateral.collateral_return { + let eval = eval_output_matches(return_, pattern)?; + + if eval { + return Ok(true); + } + } + } + + Ok(false) +} + +fn eval_total_collateral_matches(tx: &Tx, pattern: &QuantityPattern) -> EvalResult { + if let Some(collateral) = &tx.collateral { + let eval = eval_quantity_matches(collateral.total_collateral, pattern)?; + + if eval { + return Ok(true); + } + } + + Ok(false) +} + +fn eval_input_utxoref_matches(input: &TxInput, pattern: &UtxoRefPattern) -> EvalResult { + if let Some(pattern) = &pattern.tx_hash { + let eval = input.tx_hash.as_ref().eq(pattern.as_ref()); + + if !eval { + return Ok(false); + } + } + + if let Some(pattern) = &pattern.output_idx { + let eval = input.output_index.eq(pattern); + + if !eval { + return Ok(false); + } + } + + Ok(true) +} + +fn eval_input_matches(input: &TxInput, pattern: &InputPattern) -> EvalResult { + if let Some(pattern) = &pattern.utxo { + let eval = eval_input_utxoref_matches(input, &pattern)?; + + if !eval { + return Ok(false); + } + } + + if let Some(pattern) = &pattern.from { + let output = input.as_output.as_ref().ok_or(Error::no_input_ref())?; + + let eval = eval_address_matches(&output.address, &pattern)?; + + if !eval { + return Ok(false); + } + } + + if let Some(pattern) = &pattern.datum { + let output = input.as_output.as_ref().ok_or(Error::no_input_ref())?; + + let eval = eval_datum_matches(&output.datum_hash, &pattern)?; + + if !eval { + return Ok(false); + } + } + + if let Some(pattern) = &pattern.assets { + let output = input.as_output.as_ref().ok_or(Error::no_input_ref())?; + + let eval = eval_some_asset_matches(&output.assets, pattern)?; + + if !eval { + return Ok(false); + } + } + + Ok(true) +} + +fn eval_some_input_matches(tx: &Tx, pattern: &InputPattern) -> EvalResult { + for input in tx.inputs.iter() { + let eval = eval_input_matches(input, pattern)?; + + if eval { + return Ok(true); + } + } + + Ok(false) +} + +fn eval_some_input_address_matches(tx: &Tx, pattern: &AddressPattern) -> EvalResult { + for input in tx.inputs.iter() { + let output = input.as_output.as_ref().ok_or(Error::no_input_ref())?; + + let eval = eval_address_matches(&output.address, pattern)?; + + if eval { + return Ok(true); + } + } + + Ok(false) +} + +fn eval_some_input_asset_matches(tx: &Tx, pattern: &AssetPattern) -> EvalResult { + for input in tx.inputs.iter() { + let output = input.as_output.as_ref().ok_or(Error::no_input_ref())?; + + for multiasset in output.assets.iter() { + for asset in multiasset.assets.iter() { + let eval = eval_asset_matches(&multiasset.policy_id, asset, pattern)?; + + if eval { + return Ok(true); + } + } + } + } + + Ok(false) +} + +fn eval_some_output_address_matches(tx: &Tx, pattern: &AddressPattern) -> EvalResult { + for output in tx.outputs.iter() { + let eval = eval_address_matches(&output.address, pattern)?; + + if eval { + return Ok(true); + } + } + + Ok(false) +} + +fn eval_tx_any_of(predicates: &[TxPredicate], point: &Point, tx: &Tx) -> EvalResult { + for p in predicates.iter() { + if eval_tx(p, point, tx)? { + return Ok(true); + }; + } + + Ok(false) +} + +fn eval_tx_all_of(predicates: &[TxPredicate], point: &Point, tx: &Tx) -> EvalResult { + for p in predicates.iter() { + if !eval_tx(p, point, tx)? { + return Ok(false); + }; + } + + Ok(true) +} + +#[inline] +fn eval_tx_not(predicate: &TxPredicate, point: &Point, tx: &Tx) -> EvalResult { + let not = !eval_tx(predicate, point, tx)?; + + Ok(not) +} + +fn eval_tx(predicate: &TxPredicate, point: &Point, tx: &Tx) -> EvalResult { + match predicate { + TxPredicate::AnyOf(x) => eval_tx_any_of(x, point, tx), + TxPredicate::AllOf(x) => eval_tx_all_of(x, point, tx), + TxPredicate::Not(x) => eval_tx_not(x, point, tx), + TxPredicate::HashEquals(_) => todo!(), + TxPredicate::IsValid(_) => todo!(), + TxPredicate::BlockMatches(pattern) => eval_block_matches(point, pattern), + TxPredicate::SomeInputMatches(x) => eval_some_input_matches(tx, x), + TxPredicate::TotalInputAssetsMatch(_) => todo!(), + TxPredicate::SomeInputAddressMatches(x) => eval_some_input_address_matches(tx, x), + TxPredicate::SomeInputAssetMatches(p) => eval_some_input_asset_matches(tx, p), + TxPredicate::SomeInputDatumMatches(_) => todo!(), + TxPredicate::TotalOutputAssetsMatch(_) => todo!(), + TxPredicate::SomeOutputMatches(p) => eval_some_output_matches(tx, p), + TxPredicate::SomeOutputAddressMatches(p) => eval_some_output_address_matches(tx, p), + TxPredicate::SomeOutputDatumMatches(_) => todo!(), + TxPredicate::SomeOutputAssetMatches(_) => todo!(), + TxPredicate::SomeMintedAssetMatches(_) => todo!(), + TxPredicate::SomeBurnedAssetMatches(_) => todo!(), + TxPredicate::SomeMetadataMatches(_) => todo!(), + TxPredicate::SomeCollateralMatches(x) => eval_some_collateral_matches(tx, x), + TxPredicate::CollateralReturnMatches(x) => eval_collateral_return_matches(tx, x), + TxPredicate::TotalCollateralMatches(x) => eval_total_collateral_matches(tx, x), + TxPredicate::SomeWithdrawalMatches(x) => eval_some_withdrawal_matches(tx, x), + TxPredicate::SomeAddressMatches(_) => todo!(), + } +} + +#[derive(Error, Debug)] +pub enum Error { + #[error("predicate evaluation is inconclusive {0}")] + Inconclusive(String), +} + +impl Error { + pub fn inconclusive(msg: impl Into) -> Self { + Self::Inconclusive(msg.into()) + } + + pub fn no_input_ref() -> Self { + Self::inconclusive("no UTxO data for input ref") + } +} + +pub type EvalResult = Result; + +pub fn eval(predicate: &TxPredicate, point: &Point, record: &Record) -> EvalResult { + match record { + Record::ParsedTx(tx) => eval_tx(predicate, point, &tx), + _ => Err(Error::inconclusive( + "we only know how to evaluate parsed transaction records", + )), + } +} diff --git a/src/filters/match_pattern/mod.rs b/src/filters/match_pattern/mod.rs new file mode 100644 index 00000000..595bac3c --- /dev/null +++ b/src/filters/match_pattern/mod.rs @@ -0,0 +1,205 @@ +use std::str::FromStr; + +use gasket::framework::*; +use pallas::{ + crypto::hash::Hash, + ledger::addresses::{Address, ShelleyDelegationPart, ShelleyPaymentPart}, + network::miniprotocols::Point, +}; +use serde::Deserialize; +use serde_with::DeserializeFromStr; +use tracing::{error, warn}; + +mod eval; + +use crate::framework::*; + +#[derive(Stage)] +#[stage(name = "filter-match-pattern", unit = "ChainEvent", worker = "Worker")] +pub struct Stage { + predicate: TxPredicate, + + pub input: FilterInputPort, + pub output: FilterOutputPort, + + #[metric] + pass_count: gasket::metrics::Counter, + + #[metric] + drop_count: gasket::metrics::Counter, + + #[metric] + inconclusive_count: gasket::metrics::Counter, + + #[metric] + ops_count: gasket::metrics::Counter, +} + +pub struct Worker; + +impl From<&Stage> for Worker { + fn from(_: &Stage) -> Self { + Worker {} + } +} + +fn eval_record(stage: &Stage, point: &Point, record: &Record) -> Option { + match eval::eval(&stage.predicate, point, record) { + Ok(pass) => { + if pass { + stage.pass_count.inc(1); + Some(record.clone()) + } else { + stage.drop_count.inc(1); + None + } + } + Err(eval::Error::Inconclusive(msg)) => { + warn!(msg); + stage.inconclusive_count.inc(1); + None + } + } +} + +gasket::impl_splitter!(|_worker: Worker, stage: Stage, unit: ChainEvent| => { + stage.ops_count.inc(1); + + if let Some(record) = unit.record() { + eval_record(stage, unit.point(), record) + .map(|x| unit.new_record(x)) + .map(|x| vec![x]) + .unwrap_or(vec![]) + } else { + vec![unit.clone()] + } +}); + +#[derive(Clone, Debug)] +pub struct AsciiPattern {} + +#[derive(Clone, Debug)] +pub enum QuantityPattern { + Equals(u64), + RangeInclusive(u64, u64), + Greater(u64), + GreaterOrEqual(u64), + Lower(u64), + LowerOrEqual(u64), +} + +#[derive(Clone, Debug)] +pub struct BlockPattern { + pub slot_before: Option, + pub slot_after: Option, +} + +#[derive(Clone, Debug)] +pub struct UtxoRefPattern { + tx_hash: Option>, + output_idx: Option, +} + +#[derive(Clone, Debug)] +pub struct DatumPattern { + hash: Option>, +} + +#[derive(Clone, Debug)] +pub struct WithdrawalPattern { + quantity: Option, + // reward account pattern? +} + +#[derive(Clone, Debug)] +pub struct AssetPattern { + policy: Option>, + name: Option, + quantity: Option, +} + +#[derive(Clone, Debug)] +pub enum AddressPattern { + Exact(Address), + Payment(ShelleyPaymentPart), + Delegation(ShelleyDelegationPart), +} + +#[derive(Clone, Debug)] +pub struct InputPattern { + assets: Option, + from: Option, + utxo: Option, + datum: Option, +} + +#[derive(Clone, Debug)] +pub struct OutputPattern { + assets: Option, + to: Option, + datum: Option, +} + +#[derive(Clone, Debug)] +pub struct MetadataPattern { + label: Option, + key: Option, + value: Option, +} + +#[derive(Clone, Debug, DeserializeFromStr)] +pub enum TxPredicate { + HashEquals(Option>), + IsValid(Option), + BlockMatches(BlockPattern), + SomeInputMatches(InputPattern), + TotalInputAssetsMatch(AssetPattern), + SomeInputAddressMatches(AddressPattern), + SomeInputAssetMatches(AssetPattern), + SomeInputDatumMatches(DatumPattern), + TotalOutputAssetsMatch(AssetPattern), + SomeOutputMatches(OutputPattern), + SomeOutputAddressMatches(AddressPattern), + SomeOutputDatumMatches(DatumPattern), + SomeOutputAssetMatches(AssetPattern), + SomeMintedAssetMatches(AssetPattern), + SomeBurnedAssetMatches(AssetPattern), + SomeMetadataMatches(MetadataPattern), + SomeCollateralMatches(InputPattern), + CollateralReturnMatches(OutputPattern), + TotalCollateralMatches(QuantityPattern), + SomeWithdrawalMatches(WithdrawalPattern), + SomeAddressMatches(AddressPattern), + Not(Box), + AnyOf(Vec), + AllOf(Vec), +} + +impl FromStr for TxPredicate { + type Err = std::convert::Infallible; + + fn from_str(s: &str) -> Result { + todo!() + } +} + +#[derive(Deserialize)] +pub struct Config { + pub predicate: TxPredicate, +} + +impl Config { + pub fn bootstrapper(self, _ctx: &Context) -> Result { + let stage = Stage { + predicate: self.predicate, + ops_count: Default::default(), + pass_count: Default::default(), + drop_count: Default::default(), + inconclusive_count: Default::default(), + input: Default::default(), + output: Default::default(), + }; + + Ok(stage) + } +} diff --git a/src/framework/mod.rs b/src/framework/mod.rs index 6e8c2c71..f2ba6487 100644 --- a/src/framework/mod.rs +++ b/src/framework/mod.rs @@ -155,6 +155,14 @@ impl ChainEvent { Ok(out) } + + pub fn new_record(&self, new: Record) -> Self { + match self { + ChainEvent::Apply(p, _) => ChainEvent::Apply(p.clone(), new), + ChainEvent::Undo(p, _) => ChainEvent::Undo(p.clone(), new), + ChainEvent::Reset(p) => ChainEvent::Reset(p.clone()), + } + } } fn point_to_json(point: Point) -> JsonValue { diff --git a/src/sinks/mod.rs b/src/sinks/mod.rs index ed4e79a6..52a7ad7a 100644 --- a/src/sinks/mod.rs +++ b/src/sinks/mod.rs @@ -4,11 +4,11 @@ use serde::Deserialize; use crate::framework::*; //pub mod assert; +mod assert; mod common; mod noop; mod stdout; mod terminal; -mod assert; #[cfg(feature = "sink-file-rotate")] mod file_rotate;