diff --git a/core/src/main/java/com/segment/analytics/kotlin/core/platform/EventPipeline.kt b/core/src/main/java/com/segment/analytics/kotlin/core/platform/EventPipeline.kt index a133ad2e..997bdfc7 100644 --- a/core/src/main/java/com/segment/analytics/kotlin/core/platform/EventPipeline.kt +++ b/core/src/main/java/com/segment/analytics/kotlin/core/platform/EventPipeline.kt @@ -134,14 +134,13 @@ open class EventPipeline( val fileUrlList = parseFilePaths(storage.read(Storage.Constants.Events)) for (url in fileUrlList) { // upload event file - storage.readAsStream(url)?.let { data -> + storage.readAsStream(url)?.use { data -> var shouldCleanup = true try { val connection = httpClient.upload(apiHost) connection.outputStream?.let { // Write the payloads into the OutputStream data.copyTo(connection.outputStream) - data.close() connection.outputStream.close() // Upload the payloads. diff --git a/core/src/test/kotlin/com/segment/analytics/kotlin/core/platform/EventPipelineTest.kt b/core/src/test/kotlin/com/segment/analytics/kotlin/core/platform/EventPipelineTest.kt index b8a28578..e6e3b2b5 100644 --- a/core/src/test/kotlin/com/segment/analytics/kotlin/core/platform/EventPipelineTest.kt +++ b/core/src/test/kotlin/com/segment/analytics/kotlin/core/platform/EventPipelineTest.kt @@ -208,4 +208,30 @@ internal class EventPipelineTest { storage.read(Storage.Constants.Events) } } + + @Test + fun `upload closes InputStream when exception occurs`() { + // Create a trackable InputStream wrapper + var isClosed = false + val trackableInputStream = object : java.io.InputStream() { + override fun read(): Int = -1 + override fun close() { + isClosed = true + super.close() + } + } + every { storage.readAsStream(any()) } returns trackableInputStream + + // Mock connection.upload to throw exception + every { anyConstructed().upload(any()) } throws RuntimeException("Network error") + + pipeline.put(event1) + pipeline.put(event2) + + // Give some time for async processing + Thread.sleep(500) + + // Verify that close() was called on the InputStream even when exception occurred + assertTrue(isClosed, "InputStream should have been closed when exception occurs") + } } \ No newline at end of file