Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 93 additions & 0 deletions src/rx/src/Shared/Rx.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2183,4 +2183,97 @@ function Rx.mergeScan(accumulator, seed)
})
end

--[=[
Collects values as an array, emitting the result when another Observable emits.

https://rxjs.dev/api/index/function/buffer

@param closingNotifier Observable
@return (source: Observable) -> Observable
]=]

function Rx.buffer(closingNotifier)
assert(Observable.isObservable(closingNotifier), "Bad observable")

return function(source)
assert(Observable.isObservable(source), "Bad observable")

return Observable.new(function(sub)
local maid = Maid.new()
local latestBuffer = {}

maid:GiveTask(closingNotifier:Subscribe(function()
local latest = table.clone(latestBuffer)

table.clear(latestBuffer)

sub:Fire(latest)
end))

maid:GiveTask(source:Subscribe(
function(value)
table.insert(latestBuffer, value)
end,
nil,
function()
sub:Fire(latestBuffer)
sub:Complete()
end
))

maid:GiveTask(function()
table.clear(latestBuffer)
end)

return maid
end)
end
end

--[=[
Collects values, then, when another Observable fires, emits the collected values as an Observable.

https://rxjs.dev/api/index/function/window

@param windowBoundaries Observable
@return (source: Observable) -> Observable
]=]

function Rx.window(windowBoundaries)
assert(Observable.isObservable(windowBoundaries), "Bad observable")

return Rx.pipe({
Rx.buffer(windowBoundaries) :: any,
Rx.map(function(data)
return Rx.of(unpack(data)) :: any
end) :: any,
})
end

--[=[
Groups pairs of emissions together, emitting them as an array.

https://rxjs.dev/api/index/function/pairwise

@return (source: Observable) -> Observable
]=]

function Rx.pairwise()
return function(source)
assert(Observable.isObservable(source), "Bad observable")

return Observable.new(function(sub)
local previous = UNSET_VALUE

return source:Subscribe(function(value)
if previous ~= UNSET_VALUE then
sub:Fire({ previous, value })
end

previous = value
end, sub:GetFailComplete())
end)
end
end

return Rx
Loading