Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,20 @@ import io.ktor.server.routing.post
import io.ktor.server.routing.routing
import okhttp3.Interceptor
import okhttp3.Response
import java.io.File
import java.util.UUID
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.Condition
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock

// File-backed event queue; wiped between tests (see /init and /reset).
private const val QUEUE_STORAGE_PREFIX = "/tmp/posthog-queue"

// Upper bound on the wait for a flush()'d batch to reach the mock (generous for Docker).
private const val FLUSH_SETTLE_MS = 2000L

/**
* PostHog Android SDK Compliance Adapter
*
Expand Down Expand Up @@ -50,6 +60,7 @@ data class HealthResponse(
val sdk_name: String,
val sdk_version: String,
val adapter_version: String,
val capabilities: List<String>,
)

data class InitRequest(
Expand All @@ -73,6 +84,23 @@ data class CaptureResponse(
val uuid: String,
)

data class FlagRequest(
val key: String,
val distinct_id: String? = null,
val person_properties: Map<String, Any>? = null,
val groups: Map<String, String>? = null,
val group_properties: Map<String, Map<String, Any>>? = null,
// Decoded for harness parity but not applied: no public per-request geoip toggle and
// the SDK always loads remotely (known SDK gaps).
val disable_geoip: Boolean? = null,
val force_remote: Boolean? = null,
)

data class FlagResponse(
val success: Boolean,
val value: Any?,
)

data class FlushResponse(
val success: Boolean,
val events_flushed: Int,
Expand Down Expand Up @@ -108,6 +136,9 @@ class TestPostHogContext(private val sdkName: String, private val sdkVersion: St
object AdapterContext {
val state = AdapterState()
val lock = ReentrantLock()

// Signalled by the tracking interceptor when a /batch response lands (see awaitBatch).
val batchObserved: Condition = lock.newCondition()
var postHog: com.posthog.PostHogInterface? = null
val capturedEvents = mutableListOf<String>() // Store UUIDs of captured events
}
Expand Down Expand Up @@ -170,6 +201,7 @@ class TrackingInterceptor : Interceptor {
if (retryCount > 0) {
AdapterContext.state.totalRetries++
}
AdapterContext.batchObserved.signalAll()
}
} catch (_: Exception) {
// Ignore tracking errors
Expand All @@ -180,6 +212,19 @@ class TrackingInterceptor : Interceptor {
}
}

// Block until [predicate] holds (signalled as each /batch response lands) or [timeoutMs] elapses.
private fun awaitBatch(
timeoutMs: Long,
predicate: () -> Boolean,
): Boolean =
AdapterContext.lock.withLock {
var remainingNanos = TimeUnit.MILLISECONDS.toNanos(timeoutMs)
while (!predicate() && remainingNanos > 0) {
remainingNanos = AdapterContext.batchObserved.awaitNanos(remainingNanos)
}
predicate()
}

fun main() {
embeddedServer(Netty, port = 8080) {
install(ContentNegotiation) {
Expand All @@ -195,6 +240,8 @@ fun main() {
sdk_name = "posthog-android",
sdk_version = PostHogConfig(apiKey = "").sdkVersion,
adapter_version = "1.0.0",
// Opt into the capture suites (android posts /batch with gzip).
capabilities = listOf("capture_v0", "encoding_gzip"),
),
)
}
Expand All @@ -215,6 +262,9 @@ fun main() {
// Close existing instance if any
AdapterContext.postHog?.close()

// close() leaves queue files on disk; wipe so they don't replay into this test.
File(QUEUE_STORAGE_PREFIX).deleteRecursively()

// Create new config
val flushIntervalMs = req.flush_interval_ms ?: 100
val flushIntervalSeconds = maxOf(1, flushIntervalMs / 1000) // Min 1 second
Expand All @@ -241,7 +291,7 @@ fun main() {
req.max_retries?.let { config.maxRetries = it }

// Set storage prefix for file-backed queue
config.storagePrefix = "/tmp/posthog-queue"
config.storagePrefix = QUEUE_STORAGE_PREFIX

// Set minimal context to provide $lib and $lib_version
config.context = TestPostHogContext("posthog-android", config.sdkVersion)
Expand Down Expand Up @@ -305,11 +355,71 @@ fun main() {
call.respond(CaptureResponse(success = true, uuid = uuid))
}

post("/get_feature_flag") {
val req = call.receive<FlagRequest>()

val ph =
AdapterContext.lock.withLock {
AdapterContext.postHog
}

if (ph == null) {
call.respond(HttpStatusCode.BadRequest, mapOf("error" to "SDK not initialized"))
return@post
}

// Note: distinct_id is decoded for harness parity but not applied. The only
// public setter is identify(), which unconditionally fires its own /flags reload
// and would double the request count the harness asserts on (known SDK gap).

// Stage person properties without reloading, so the explicit reload below
// is the single /flags request we await (the harness counts requests).
req.person_properties?.takeIf { it.isNotEmpty() }?.let {
ph.setPersonPropertiesForFlags(it, reloadFeatureFlags = false)
}

// Set group properties via the flags-specific API so we don't fire an extra
// reload for them. The /flags `group_properties` field reads from this store.
req.group_properties?.forEach { (type, props) ->
if (props.isNotEmpty()) {
ph.setGroupPropertiesForFlags(type, props, reloadFeatureFlags = false)
}
}

// group() is the only public writer of the GROUPS pref (needed in the /flags body);
// on a fresh SDK the new key also forces a $groupidentify + an extra /flags.
req.groups?.forEach { (type, key) ->
ph.group(type, key, null)
}
Comment thread
turnipdabeets marked this conversation as resolved.

// Reload flags and wait for the /flags request to complete.
val latch = CountDownLatch(1)
ph.reloadFeatureFlags { latch.countDown() }
val reloaded = latch.await(5, TimeUnit.SECONDS)
if (!reloaded) {
// Surface a reload timeout; otherwise it looks like a genuine null flag.
System.err.println("[ADAPTER] /get_feature_flag reload timed out after 5s for key=${req.key}")
}

// Resolve the value, capturing the documented $feature_flag_called side effect.
val requestsBefore = AdapterContext.lock.withLock { AdapterContext.state.requestsMade.size }
val value = ph.getFeatureFlag(req.key, defaultValue = null, sendFeatureFlagEvent = true)

// $feature_flag_called is captured internally, so wait on the request count, not pendingEvents.
ph.flush()
awaitBatch(FLUSH_SETTLE_MS) { AdapterContext.state.requestsMade.size > requestsBefore }

call.respond(FlagResponse(success = true, value = value))
}

post("/flush") {
val pendingBefore = AdapterContext.lock.withLock { AdapterContext.state.pendingEvents }
AdapterContext.postHog?.flush()

// Wait for events to be sent (generous timeout for Docker network latency)
Thread.sleep(2000)
// Wait until every pending event's batch is acked; bounded so a no-op flush can't hang.
if (pendingBefore > 0) {
awaitBatch(FLUSH_SETTLE_MS) { AdapterContext.state.pendingEvents == 0 }
}

val eventsFlushed =
AdapterContext.lock.withLock {
Expand Down Expand Up @@ -339,7 +449,12 @@ fun main() {

post("/reset") {
AdapterContext.lock.withLock {
AdapterContext.postHog?.reset()
// close (not reset) — reset() reloads flags, lands as +1 /flags in the next test.
AdapterContext.postHog?.close()
AdapterContext.postHog = null

// close() leaves queue files on disk; wipe so they don't replay next test (matches iOS).
File(QUEUE_STORAGE_PREFIX).deleteRecursively()

AdapterContext.state.pendingEvents = 0
AdapterContext.state.totalEventsCaptured = 0
Expand Down
Loading