diff --git a/sdk_compliance_adapter/src/main/kotlin/com/posthog/compliance/ComplianceAdapter.kt b/sdk_compliance_adapter/src/main/kotlin/com/posthog/compliance/ComplianceAdapter.kt index e769f657c..6fb3686da 100644 --- a/sdk_compliance_adapter/src/main/kotlin/com/posthog/compliance/ComplianceAdapter.kt +++ b/sdk_compliance_adapter/src/main/kotlin/com/posthog/compliance/ComplianceAdapter.kt @@ -18,10 +18,20 @@ import io.ktor.server.routing.post import io.ktor.server.routing.routing import okhttp3.Interceptor import okhttp3.Response +import java.io.File import java.util.UUID +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit +import java.util.concurrent.locks.Condition import java.util.concurrent.locks.ReentrantLock import kotlin.concurrent.withLock +// File-backed event queue; wiped between tests (see /init and /reset). +private const val QUEUE_STORAGE_PREFIX = "/tmp/posthog-queue" + +// Upper bound on the wait for a flush()'d batch to reach the mock (generous for Docker). +private const val FLUSH_SETTLE_MS = 2000L + /** * PostHog Android SDK Compliance Adapter * @@ -50,6 +60,7 @@ data class HealthResponse( val sdk_name: String, val sdk_version: String, val adapter_version: String, + val capabilities: List, ) data class InitRequest( @@ -73,6 +84,23 @@ data class CaptureResponse( val uuid: String, ) +data class FlagRequest( + val key: String, + val distinct_id: String? = null, + val person_properties: Map? = null, + val groups: Map? = null, + val group_properties: Map>? = null, + // Decoded for harness parity but not applied: no public per-request geoip toggle and + // the SDK always loads remotely (known SDK gaps). + val disable_geoip: Boolean? = null, + val force_remote: Boolean? = null, +) + +data class FlagResponse( + val success: Boolean, + val value: Any?, +) + data class FlushResponse( val success: Boolean, val events_flushed: Int, @@ -108,6 +136,9 @@ class TestPostHogContext(private val sdkName: String, private val sdkVersion: St object AdapterContext { val state = AdapterState() val lock = ReentrantLock() + + // Signalled by the tracking interceptor when a /batch response lands (see awaitBatch). + val batchObserved: Condition = lock.newCondition() var postHog: com.posthog.PostHogInterface? = null val capturedEvents = mutableListOf() // Store UUIDs of captured events } @@ -170,6 +201,7 @@ class TrackingInterceptor : Interceptor { if (retryCount > 0) { AdapterContext.state.totalRetries++ } + AdapterContext.batchObserved.signalAll() } } catch (_: Exception) { // Ignore tracking errors @@ -180,6 +212,19 @@ class TrackingInterceptor : Interceptor { } } +// Block until [predicate] holds (signalled as each /batch response lands) or [timeoutMs] elapses. +private fun awaitBatch( + timeoutMs: Long, + predicate: () -> Boolean, +): Boolean = + AdapterContext.lock.withLock { + var remainingNanos = TimeUnit.MILLISECONDS.toNanos(timeoutMs) + while (!predicate() && remainingNanos > 0) { + remainingNanos = AdapterContext.batchObserved.awaitNanos(remainingNanos) + } + predicate() + } + fun main() { embeddedServer(Netty, port = 8080) { install(ContentNegotiation) { @@ -195,6 +240,8 @@ fun main() { sdk_name = "posthog-android", sdk_version = PostHogConfig(apiKey = "").sdkVersion, adapter_version = "1.0.0", + // Opt into the capture suites (android posts /batch with gzip). + capabilities = listOf("capture_v0", "encoding_gzip"), ), ) } @@ -215,6 +262,9 @@ fun main() { // Close existing instance if any AdapterContext.postHog?.close() + // close() leaves queue files on disk; wipe so they don't replay into this test. + File(QUEUE_STORAGE_PREFIX).deleteRecursively() + // Create new config val flushIntervalMs = req.flush_interval_ms ?: 100 val flushIntervalSeconds = maxOf(1, flushIntervalMs / 1000) // Min 1 second @@ -241,7 +291,7 @@ fun main() { req.max_retries?.let { config.maxRetries = it } // Set storage prefix for file-backed queue - config.storagePrefix = "/tmp/posthog-queue" + config.storagePrefix = QUEUE_STORAGE_PREFIX // Set minimal context to provide $lib and $lib_version config.context = TestPostHogContext("posthog-android", config.sdkVersion) @@ -305,11 +355,71 @@ fun main() { call.respond(CaptureResponse(success = true, uuid = uuid)) } + post("/get_feature_flag") { + val req = call.receive() + + val ph = + AdapterContext.lock.withLock { + AdapterContext.postHog + } + + if (ph == null) { + call.respond(HttpStatusCode.BadRequest, mapOf("error" to "SDK not initialized")) + return@post + } + + // Note: distinct_id is decoded for harness parity but not applied. The only + // public setter is identify(), which unconditionally fires its own /flags reload + // and would double the request count the harness asserts on (known SDK gap). + + // Stage person properties without reloading, so the explicit reload below + // is the single /flags request we await (the harness counts requests). + req.person_properties?.takeIf { it.isNotEmpty() }?.let { + ph.setPersonPropertiesForFlags(it, reloadFeatureFlags = false) + } + + // Set group properties via the flags-specific API so we don't fire an extra + // reload for them. The /flags `group_properties` field reads from this store. + req.group_properties?.forEach { (type, props) -> + if (props.isNotEmpty()) { + ph.setGroupPropertiesForFlags(type, props, reloadFeatureFlags = false) + } + } + + // group() is the only public writer of the GROUPS pref (needed in the /flags body); + // on a fresh SDK the new key also forces a $groupidentify + an extra /flags. + req.groups?.forEach { (type, key) -> + ph.group(type, key, null) + } + + // Reload flags and wait for the /flags request to complete. + val latch = CountDownLatch(1) + ph.reloadFeatureFlags { latch.countDown() } + val reloaded = latch.await(5, TimeUnit.SECONDS) + if (!reloaded) { + // Surface a reload timeout; otherwise it looks like a genuine null flag. + System.err.println("[ADAPTER] /get_feature_flag reload timed out after 5s for key=${req.key}") + } + + // Resolve the value, capturing the documented $feature_flag_called side effect. + val requestsBefore = AdapterContext.lock.withLock { AdapterContext.state.requestsMade.size } + val value = ph.getFeatureFlag(req.key, defaultValue = null, sendFeatureFlagEvent = true) + + // $feature_flag_called is captured internally, so wait on the request count, not pendingEvents. + ph.flush() + awaitBatch(FLUSH_SETTLE_MS) { AdapterContext.state.requestsMade.size > requestsBefore } + + call.respond(FlagResponse(success = true, value = value)) + } + post("/flush") { + val pendingBefore = AdapterContext.lock.withLock { AdapterContext.state.pendingEvents } AdapterContext.postHog?.flush() - // Wait for events to be sent (generous timeout for Docker network latency) - Thread.sleep(2000) + // Wait until every pending event's batch is acked; bounded so a no-op flush can't hang. + if (pendingBefore > 0) { + awaitBatch(FLUSH_SETTLE_MS) { AdapterContext.state.pendingEvents == 0 } + } val eventsFlushed = AdapterContext.lock.withLock { @@ -339,7 +449,12 @@ fun main() { post("/reset") { AdapterContext.lock.withLock { - AdapterContext.postHog?.reset() + // close (not reset) — reset() reloads flags, lands as +1 /flags in the next test. + AdapterContext.postHog?.close() + AdapterContext.postHog = null + + // close() leaves queue files on disk; wipe so they don't replay next test (matches iOS). + File(QUEUE_STORAGE_PREFIX).deleteRecursively() AdapterContext.state.pendingEvents = 0 AdapterContext.state.totalEventsCaptured = 0