Skip to content

Commit 42c677b

Browse files
authored
Fix file descriptor leak in EventPipeline (#284)
Changed from .let to .use to ensure InputStream is always closed, preventing FD exhaustion when exceptions occur during upload
1 parent c3499e6 commit 42c677b

File tree

2 files changed

+27
-2
lines changed

2 files changed

+27
-2
lines changed

core/src/main/java/com/segment/analytics/kotlin/core/platform/EventPipeline.kt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,14 +132,13 @@ open class EventPipeline(
132132
val fileUrlList = parseFilePaths(storage.read(Storage.Constants.Events))
133133
for (url in fileUrlList) {
134134
// upload event file
135-
storage.readAsStream(url)?.let { data ->
135+
storage.readAsStream(url)?.use { data ->
136136
var shouldCleanup = true
137137
try {
138138
val connection = httpClient.upload(apiHost)
139139
connection.outputStream?.let {
140140
// Write the payloads into the OutputStream
141141
data.copyTo(connection.outputStream)
142-
data.close()
143142
connection.outputStream.close()
144143

145144
// Upload the payloads.

core/src/test/kotlin/com/segment/analytics/kotlin/core/platform/EventPipelineTest.kt

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,4 +208,30 @@ internal class EventPipelineTest {
208208
storage.read(Storage.Constants.Events)
209209
}
210210
}
211+
212+
@Test
213+
fun `upload closes InputStream when exception occurs`() {
214+
// Create a trackable InputStream wrapper
215+
var isClosed = false
216+
val trackableInputStream = object : java.io.InputStream() {
217+
override fun read(): Int = -1
218+
override fun close() {
219+
isClosed = true
220+
super.close()
221+
}
222+
}
223+
every { storage.readAsStream(any()) } returns trackableInputStream
224+
225+
// Mock connection.upload to throw exception
226+
every { anyConstructed<HTTPClient>().upload(any()) } throws RuntimeException("Network error")
227+
228+
pipeline.put(event1)
229+
pipeline.put(event2)
230+
231+
// Give some time for async processing
232+
Thread.sleep(500)
233+
234+
// Verify that close() was called on the InputStream even when exception occurred
235+
assertTrue(isClosed, "InputStream should have been closed when exception occurs")
236+
}
211237
}

0 commit comments

Comments
 (0)