9191 parent :: undefined | pid (),
9292 ref :: ranch :ref (),
9393 socket = undefined :: inet :socket () | {pid (), cowboy_stream :streamid ()} | undefined ,
94- transport = undefined :: module () | undefined ,
94+ transport :: module () | { data_delivery , stream_handlers | relay } ,
9595 opts = #{} :: opts (),
9696 active = true :: boolean (),
9797 handler :: module (),
@@ -149,7 +149,7 @@ upgrade(Req, Env, Handler, HandlerState) ->
149149% % @todo Immediately crash if a response has already been sent.
150150upgrade (Req0 = #{version := Version }, Env , Handler , HandlerState , Opts ) ->
151151 FilteredReq = case maps :get (req_filter , Opts , undefined ) of
152- undefined -> maps :with ([method , version , scheme , host , port , path , qs , peer ], Req0 );
152+ undefined -> maps :with ([method , version , scheme , host , port , path , qs , peer , streamid ], Req0 );
153153 FilterFun -> FilterFun (Req0 )
154154 end ,
155155 Utf8State = case maps :get (validate_utf8 , Opts , true ) of
@@ -273,12 +273,28 @@ websocket_handshake(State=#state{key=Key},
273273% % For HTTP/2 we do not let the process die, we instead keep it
274274% % for the Websocket stream. This is because in HTTP/2 we only
275275% % have a stream, it doesn't take over the whole connection.
276- websocket_handshake (State , Req = #{ref := Ref , pid := Pid , streamid := StreamID },
276+ % %
277+ % % There are two methods of delivering data to the Websocket session:
278+ % % - 'stream_handlers' is the default and makes the data go
279+ % % through stream handlers just like when reading a request body;
280+ % % - 'relay' is a new method where data is sent as a message as
281+ % % soon as it is received from the socket in a DATA frame.
282+ websocket_handshake (State = # state {opts = Opts },
283+ Req = #{ref := Ref , pid := Pid , streamid := StreamID },
277284 HandlerState , _Env ) ->
278285 % % @todo We don't want date and server headers.
279286 Headers = cowboy_req :response_headers (#{}, Req ),
280- Pid ! {{Pid , StreamID }, {switch_protocol , Headers , ? MODULE , {State , HandlerState }}},
281- takeover (Pid , Ref , {Pid , StreamID }, undefined , #{}, <<>>,
287+ DataDelivery = maps :get (data_delivery , Opts , stream_handlers ),
288+ ModState = #{
289+ data_delivery => DataDelivery ,
290+ % % For relay data_delivery. The flow is a hint and may
291+ % % not be used by the underlying protocol.
292+ data_delivery_pid => self (),
293+ data_delivery_flow => maps :get (data_delivery_flow , Opts , 131072 )
294+ },
295+ Pid ! {{Pid , StreamID }, {switch_protocol , Headers , ? MODULE , ModState }},
296+ % % @todo We can't call the normal takeover because it tries to parse.
297+ takeover (Pid , Ref , {Pid , StreamID }, {data_delivery , DataDelivery }, #{}, <<>>,
282298 {State , HandlerState }).
283299
284300% % Connection process.
@@ -301,7 +317,7 @@ websocket_handshake(State, Req=#{ref := Ref, pid := Pid, streamid := StreamID},
301317-type parse_state () :: # ps_header {} | # ps_payload {}.
302318
303319-spec takeover (pid (), ranch :ref (), inet :socket () | {pid (), cowboy_stream :streamid ()},
304- module () | undefined , any (), binary (),
320+ module () | { data_delivery , stream_handlers | relay } , any (), binary (),
305321 {# state {}, any ()}) -> no_return ().
306322takeover (Parent , Ref , Socket , Transport , Opts , Buffer ,
307323 {State0 = # state {opts = WsOpts , handler = Handler , req = Req }, HandlerState }) ->
@@ -311,7 +327,7 @@ takeover(Parent, Ref, Socket, Transport, Opts, Buffer,
311327 _ -> ranch :remove_connection (Ref )
312328 end ,
313329 Messages = case Transport of
314- undefined -> undefined ;
330+ { data_delivery , _ } -> undefined ;
315331 _ -> Transport :messages ()
316332 end ,
317333 State = set_idle_timeout (State0 # state {parent = Parent ,
@@ -355,13 +371,14 @@ after_init(State, HandlerState, ParseState) ->
355371% % immediately but there might still be data to be processed in
356372% % the message queue.
357373
358- setopts_active (# state {transport = undefined }) ->
374+ setopts_active (# state {transport = { data_delivery , _ } }) ->
359375 ok ;
360376setopts_active (# state {socket = Socket , transport = Transport , opts = Opts }) ->
361377 N = maps :get (active_n , Opts , 1 ),
362378 Transport :setopts (Socket , [{active , N }]).
363379
364- maybe_read_body (# state {socket = Stream = {Pid , _ }, transport = undefined , active = true }) ->
380+ maybe_read_body (# state {transport = {data_delivery , stream_handlers },
381+ socket = Stream = {Pid , _ }, active = true }) ->
365382 % % @todo Keep Ref around.
366383 ReadBodyRef = make_ref (),
367384 Pid ! {Stream , {read_body , self (), ReadBodyRef , auto , infinity }},
@@ -374,10 +391,11 @@ active(State) ->
374391 maybe_read_body (State ),
375392 State # state {active = true }.
376393
377- passive (State = # state {transport = undefined }) ->
394+ passive (State = # state {transport = { data_delivery , _ } }) ->
378395 % % Unfortunately we cannot currently cancel read_body.
379396 % % But that's OK, we will just stop reading the body
380397 % % after the next message.
398+ % % @todo We can't stop relay data_delivery currently.
381399 State # state {active = false };
382400passive (State = # state {socket = Socket , transport = Transport , messages = Messages }) ->
383401 Transport :setopts (Socket , [{active , false }]),
@@ -454,6 +472,10 @@ loop(State=#state{parent=Parent, socket=Socket, messages=Messages,
454472 {request_body , _Ref , fin , _ , Data } ->
455473 maybe_read_body (State ),
456474 parse (? reset_idle_timeout (State ), HandlerState , ParseState , Data );
475+ % % @todo It would be better to check StreamID.
476+ % % @todo We must ensure that IsFin=fin is handled like a socket close?
477+ {'$cowboy_stream_data' , {Pid , _StreamID }, _IsFin , Data } when Pid =:= Parent ->
478+ parse (? reset_idle_timeout (State ), HandlerState , ParseState , Data );
457479 % % Timeouts.
458480 {timeout , TRef , ? MODULE } ->
459481 tick_idle_timeout (State , HandlerState , ParseState );
@@ -662,9 +684,14 @@ commands([Frame|Tail], State, Data0) ->
662684 commands (Tail , State , Data )
663685 end .
664686
665- transport_send (# state {socket = Stream = {Pid , _ }, transport = undefined }, IsFin , Data ) ->
687+ transport_send (# state {transport = {data_delivery , stream_handlers },
688+ socket = Stream = {Pid , _ }}, IsFin , Data ) ->
666689 Pid ! {Stream , {data , IsFin , Data }},
667690 ok ;
691+ transport_send (# state {transport = {data_delivery , relay },
692+ socket = Stream = {Pid , _ }}, IsFin , Data ) ->
693+ Pid ! {'$cowboy_stream_data' , Stream , IsFin , Data },
694+ ok ;
668695transport_send (# state {socket = Socket , transport = Transport }, _ , Data ) ->
669696 Transport :send (Socket , Data ).
670697
0 commit comments