@@ -298,8 +298,67 @@ impl CdnBackend {
298298 }
299299}
300300
301+ /// fully invalidate the CDN distribution, also emptying the queue.
302+ #[ instrument( skip( conn) ) ]
303+ pub ( crate ) async fn full_invalidation (
304+ config : & Config ,
305+ cdn : & CdnBackend ,
306+ metrics : & InstanceMetrics ,
307+ conn : & mut sqlx:: PgConnection ,
308+ distribution_id : & str ,
309+ ) -> Result < ( ) > {
310+ let mut transaction = conn. begin ( ) . await ?;
311+
312+ let now = Utc :: now ( ) ;
313+ for row in sqlx:: query!(
314+ "SELECT queued
315+ FROM cdn_invalidation_queue
316+ WHERE cdn_distribution_id = $1 AND created_in_cdn IS NULL
317+ FOR UPDATE" ,
318+ distribution_id,
319+ )
320+ . fetch_all ( & mut * transaction)
321+ . await ?
322+ {
323+ if let Ok ( duration) = ( now - row. queued ) . to_std ( ) {
324+ // This can only fail when the duration is negative, which can't happen anyways
325+ metrics
326+ . cdn_queue_time
327+ . with_label_values ( & [ distribution_id] )
328+ . observe ( duration_to_seconds ( duration) ) ;
329+ }
330+ }
331+
332+ match cdn
333+ . create_invalidation ( distribution_id, & [ "/*" ] )
334+ . await
335+ . context ( "error creating new invalidation" )
336+ {
337+ Ok ( invalidation) => {
338+ sqlx:: query!(
339+ "UPDATE cdn_invalidation_queue
340+ SET
341+ created_in_cdn = CURRENT_TIMESTAMP,
342+ cdn_reference = $1
343+ WHERE
344+ cdn_distribution_id = $2 AND created_in_cdn IS NULL" ,
345+ invalidation. invalidation_id,
346+ distribution_id,
347+ )
348+ . execute ( & mut * transaction)
349+ . await ?;
350+
351+ transaction. commit ( ) . await ?;
352+ }
353+ Err ( err) => return Err ( err) ,
354+ } ;
355+
356+ Ok ( ( ) )
357+ }
358+
301359#[ instrument( skip( conn) ) ]
302360pub ( crate ) async fn handle_queued_invalidation_requests (
361+ config : & Config ,
303362 cdn : & CdnBackend ,
304363 metrics : & InstanceMetrics ,
305364 conn : & mut sqlx:: PgConnection ,
@@ -385,6 +444,24 @@ pub(crate) async fn handle_queued_invalidation_requests(
385444 return Ok ( ( ) ) ;
386445 }
387446
447+ if let Some ( min_queued) = sqlx:: query_scalar!(
448+ "SELECT
449+ min(queued)
450+ FROM cdn_invalidation_queue
451+ WHERE
452+ cdn_distribution_id = $1 AND
453+ created_in_cdn IS NULL" ,
454+ distribution_id
455+ )
456+ . fetch_one ( & mut * conn)
457+ . await ?
458+ {
459+ if ( now - min_queued) . to_std ( ) . unwrap_or_default ( ) >= config. cdn_max_queued_age {
460+ full_invalidation ( config, cdn, metrics, conn, distribution_id) . await ?;
461+ return Ok ( ( ) ) ;
462+ }
463+ }
464+
388465 // create new an invalidation for the queued path patterns
389466 let mut transaction = conn. begin ( ) . await ?;
390467 let mut path_patterns: Vec < String > = Vec :: new ( ) ;
@@ -566,6 +643,8 @@ pub(crate) async fn queued_or_active_crate_invalidation_count_by_distribution(
566643
567644#[ cfg( test) ]
568645mod tests {
646+ use std:: time:: Duration ;
647+
569648 use super :: * ;
570649 use crate :: test:: async_wrapper;
571650
@@ -671,6 +750,111 @@ mod tests {
671750 } )
672751 }
673752
753+ #[ test]
754+ fn escalate_to_full_invalidation ( ) {
755+ crate :: test:: async_wrapper ( |env| async move {
756+ env. override_config ( |config| {
757+ config. cloudfront_distribution_id_web = Some ( "distribution_id_web" . into ( ) ) ;
758+ config. cloudfront_distribution_id_static = Some ( "distribution_id_static" . into ( ) ) ;
759+ config. cdn_max_queued_age = Duration :: from_secs ( 0 ) ;
760+ } ) ;
761+
762+ let cdn = env. cdn ( ) . await ;
763+ let config = env. config ( ) ;
764+ let mut conn = env. async_db ( ) . await . async_conn ( ) . await ;
765+ assert ! ( queued_or_active_crate_invalidations( & mut conn)
766+ . await ?
767+ . is_empty( ) ) ;
768+
769+ queue_crate_invalidation ( & mut conn, & env. config ( ) , "krate" ) . await ?;
770+
771+ // invalidation paths are queued.
772+ assert_eq ! (
773+ queued_or_active_crate_invalidations( & mut conn)
774+ . await ?
775+ . into_iter( )
776+ . map( |i| (
777+ i. cdn_distribution_id,
778+ i. krate,
779+ i. path_pattern,
780+ i. cdn_reference
781+ ) )
782+ . collect:: <Vec <_>>( ) ,
783+ vec![
784+ (
785+ "distribution_id_web" . into( ) ,
786+ "krate" . into( ) ,
787+ "/krate*" . into( ) ,
788+ None
789+ ) ,
790+ (
791+ "distribution_id_web" . into( ) ,
792+ "krate" . into( ) ,
793+ "/crate/krate*" . into( ) ,
794+ None
795+ ) ,
796+ (
797+ "distribution_id_static" . into( ) ,
798+ "krate" . into( ) ,
799+ "/rustdoc/krate*" . into( ) ,
800+ None
801+ ) ,
802+ ]
803+ ) ;
804+
805+ let counts =
806+ queued_or_active_crate_invalidation_count_by_distribution ( & mut conn, & config)
807+ . await ?;
808+ assert_eq ! ( counts. len( ) , 2 ) ;
809+ assert_eq ! ( * counts. get( "distribution_id_web" ) . unwrap( ) , 2 ) ;
810+ assert_eq ! ( * counts. get( "distribution_id_static" ) . unwrap( ) , 1 ) ;
811+
812+ // queueing the invalidation doesn't create it in the CDN
813+ assert ! ( active_invalidations( & cdn, "distribution_id_web" ) . is_empty( ) ) ;
814+ assert ! ( active_invalidations( & cdn, "distribution_id_static" ) . is_empty( ) ) ;
815+
816+ let cdn = env. cdn ( ) . await ;
817+ let config = env. config ( ) ;
818+
819+ // now handle the queued invalidations
820+ handle_queued_invalidation_requests (
821+ & config,
822+ & cdn,
823+ & env. instance_metrics ( ) ,
824+ & mut conn,
825+ "distribution_id_web" ,
826+ )
827+ . await ?;
828+ handle_queued_invalidation_requests (
829+ & config,
830+ & cdn,
831+ & env. instance_metrics ( ) ,
832+ & mut conn,
833+ "distribution_id_static" ,
834+ )
835+ . await ?;
836+
837+ // which creates them in the CDN
838+ {
839+ let ir_web = active_invalidations ( & cdn, "distribution_id_web" ) ;
840+ assert_eq ! ( ir_web. len( ) , 1 ) ;
841+ assert_eq ! ( ir_web[ 0 ] . path_patterns, vec![ "/*" ] ) ;
842+
843+ let ir_static = active_invalidations ( & cdn, "distribution_id_static" ) ;
844+ assert_eq ! ( ir_web. len( ) , 1 ) ;
845+ assert_eq ! ( ir_static[ 0 ] . path_patterns, vec![ "/*" ] ) ;
846+ }
847+
848+ // the queued entries got a CDN reference attached
849+ assert ! ( queued_or_active_crate_invalidations( & mut conn)
850+ . await ?
851+ . iter( )
852+ . all( |i| i. cdn_reference. is_some( ) && i. created_in_cdn. is_some( ) ) ) ;
853+
854+ Ok ( ( ) )
855+ } ) ;
856+ }
857+
674858 #[ test]
675859 fn invalidate_a_crate ( ) {
676860 crate :: test:: async_wrapper ( |env| async move {
@@ -734,16 +918,19 @@ mod tests {
734918 assert ! ( active_invalidations( & cdn, "distribution_id_static" ) . is_empty( ) ) ;
735919
736920 let cdn = env. cdn ( ) . await ;
921+ let config = env. config ( ) ;
737922
738923 // now handle the queued invalidations
739924 handle_queued_invalidation_requests (
925+ & config,
740926 & cdn,
741927 & env. instance_metrics ( ) ,
742928 & mut conn,
743929 "distribution_id_web" ,
744930 )
745931 . await ?;
746932 handle_queued_invalidation_requests (
933+ & config,
747934 & cdn,
748935 & env. instance_metrics ( ) ,
749936 & mut conn,
@@ -774,13 +961,15 @@ mod tests {
774961
775962 // now handle again
776963 handle_queued_invalidation_requests (
964+ & config,
777965 & cdn,
778966 & env. instance_metrics ( ) ,
779967 & mut conn,
780968 "distribution_id_web" ,
781969 )
782970 . await ?;
783971 handle_queued_invalidation_requests (
972+ & config,
784973 & cdn,
785974 & env. instance_metrics ( ) ,
786975 & mut conn,
@@ -849,6 +1038,7 @@ mod tests {
8491038
8501039 // handle the queued invalidations
8511040 handle_queued_invalidation_requests (
1041+ & env. config ( ) ,
8521042 & * env. cdn ( ) . await ,
8531043 & env. instance_metrics ( ) ,
8541044 & mut conn,
@@ -909,6 +1099,7 @@ mod tests {
9091099
9101100 // handle the queued invalidations
9111101 handle_queued_invalidation_requests (
1102+ & env. config ( ) ,
9121103 & * env. cdn ( ) . await ,
9131104 & env. instance_metrics ( ) ,
9141105 & mut conn,
@@ -937,6 +1128,7 @@ mod tests {
9371128
9381129 // now handle again
9391130 handle_queued_invalidation_requests (
1131+ & env. config ( ) ,
9401132 & * env. cdn ( ) . await ,
9411133 & env. instance_metrics ( ) ,
9421134 & mut conn,
@@ -976,6 +1168,7 @@ mod tests {
9761168
9771169 // run the handler
9781170 handle_queued_invalidation_requests (
1171+ & env. config ( ) ,
9791172 & * env. cdn ( ) . await ,
9801173 & env. instance_metrics ( ) ,
9811174 & mut conn,
0 commit comments