-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathcommit-stream.coffee
More file actions
55 lines (52 loc) · 1.52 KB
/
commit-stream.coffee
File metadata and controls
55 lines (52 loc) · 1.52 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
{Stream} = require 'stream'
util = require 'util'
module.exports = (sourceStream) ->
commit = null
source = sourceStream
stream = new Stream()
promiseCommit = ->
return if commit
commit = source.createCommit()
unless commit
throw new Error('commit is required')
unless commit.streamId? and commit.streamRevision?
throw new Error('commit must have streamId and streamRevision')
commit.payload = commit.payload ? []
stream.writable = true
stream.readable = true
ended = false
destroyed = false
paused = false
stream.write = (data) ->
promiseCommit()
if ended or destroyed
throw new Error 'commit-stream no longer writable'
return true unless data
commit.streamRevision++
commit.payload.push data
return true
stream.pause = ->
paused = true
stream.resume = ->
paused = false
stream.end() if ended
stream.on 'end', ->
stream.readable = false
stream.emit 'data', commit
stream.readable = false
stream.end = ->
return if ended
ended = true
stream.writable = false
return if paused
stream.emit 'end'
stream.emit 'close'
stream.destroy = ->
if destroyed
throw new Error 'commit-stream already destroyed'
stream.writable = stream.readable = false
destroyed = true
commit = null
stream.emit 'end'
stream.emit 'close'
stream